ensembl-hive-python3  2.3
Process.py
Go to the documentation of this file.
1 
2 import eHive.Params
3 
4 import os
5 import sys
6 import json
7 import numbers
8 import warnings
9 import traceback
10 
11 __version__ = "0.1"
12 
13 __doc__ = """
14 This module mainly implements python's counterpart of GuestProcess. Read
15 the later for more information about the JSON protocol used to communicate.
16 """
17 
18 class Job(object):
19  """Dummy class to hold job-related information"""
20  pass
21 
22 class CompleteEarlyException(Exception):
23  """Can be raised by a derived class of BaseRunnable to indicate an early successful termination"""
24  pass
25 class JobFailedException(Exception):
26  """Can be raised by a derived class of BaseRunnable to indicate an early unsuccessful termination"""
27  pass
28 class HiveJSONMessageException(Exception):
29  """Raised when we could not parse the JSON message coming from GuestProcess"""
30  pass
31 class LostHiveConnectionException(Exception):
32  """Raised when the process has lost the communication pipe with the Perl side"""
33  pass
34 
35 
36 class BaseRunnable(object):
37  """This is the counterpart of GuestProcess. Note that most of the methods
38  are private to be hidden in the derived classes.
39 
40  This class can be used as a base-class for people to redefine fetch_input(),
41  run() and/or write_output() (and/or pre_cleanup(), post_cleanup()).
42  Jobs are supposed to raise CompleteEarlyException in case they complete before
43  reaching. They can also raise JobFailedException to indicate a general failure
44  """
45 
46  # Private BaseRunnable interface
47 
48 
49  def __init__(self, read_fileno, write_fileno):
50  # We need the binary mode to disable the buffering
51  self.__read_pipe = os.fdopen(read_fileno, mode='rb', buffering=0)
52  self.__write_pipe = os.fdopen(write_fileno, mode='wb', buffering=0)
53  self.__pid = os.getpid()
54  self.debug = 0
56 
57  def __print_debug(self, *args):
58  if self.debug > 1:
59  print("PYTHON {0}".format(self.__pid), *args, file=sys.stderr)
60 
61  # FIXME: we can probably merge __send_message and __send_response
62 
63  def __send_message(self, event, content):
64  """seralizes the message in JSON and send it to the parent process"""
65  def default_json_encoder(o):
66  self.__print_debug("Cannot serialize {0} (type {1}) in JSON".format(o, type(o)))
67  return 'UNSERIALIZABLE OBJECT'
68  j = json.dumps({'event': event, 'content': content}, indent=None, default=default_json_encoder)
69  self.__print_debug('__send_message:', j)
70  # UTF8 encoding has never been tested. Just hope it works :)
71  try:
72  self.__write_pipe.write(bytes(j+"\n", 'utf-8'))
73  except BrokenPipeError as e:
74  raise LostHiveConnectionException("__write_pipe") from None
75 
76  def __send_response(self, response):
77  """Sends a response message to the parent process"""
78  self.__print_debug('__send_response:', response)
79  # Like above, UTF8 encoding has never been tested. Just hope it works :)
80  try:
81  self.__write_pipe.write(bytes('{"response": "' + str(response) + '"}\n', 'utf-8'))
82  except BrokenPipeError as e:
83  raise LostHiveConnectionException("__write_pipe") from None
84 
85  def __read_message(self):
86  """Read a message from the parent and parse it"""
87  try:
88  self.__print_debug("__read_message ...")
89  l = self.__read_pipe.readline()
90  self.__print_debug(" ... -> ", l[:-1].decode())
91  return json.loads(l.decode())
92  except BrokenPipeError as e:
93  raise LostHiveConnectionException("__read_pipe") from None
94  except ValueError as e:
95  # HiveJSONMessageException is a more meaningful name than ValueError
96  raise HiveJSONMessageException from e
97 
98  def __send_message_and_wait_for_OK(self, event, content):
99  """Send a message and expects a response to be 'OK'"""
100  self.__send_message(event, content)
101  response = self.__read_message()
102  if response['response'] != 'OK':
103  raise HiveJSONMessageException("Received '{0}' instead of OK".format(response))
104 
105  def __process_life_cycle(self):
106  """Simple loop: wait for job parameters, do the job's life-cycle"""
107  self.__send_message_and_wait_for_OK('VERSION', __version__)
108  self.__send_message_and_wait_for_OK('PARAM_DEFAULTS', self.param_defaults())
110  while True:
111  self.__print_debug("waiting for instructions")
112  config = self.__read_message()
113  if 'input_job' not in config:
114  self.__print_debug("no params, this is the end of the wrapper")
115  return
116  self.__job_life_cycle(config)
118  def __job_life_cycle(self, config):
119  """Job's life-cycle. See GuestProcess for a description of the protocol to communicate with the parent"""
120  self.__print_debug("__life_cycle")
121 
122  # Params
123  self.__params = eHive.Params.ParamContainer(config['input_job']['parameters'])
124 
125  # Job attributes
126  self.input_job = Job()
127  for x in ['dbID', 'input_id', 'retry_count']:
128  setattr(self.input_job, x, config['input_job'][x])
129  self.input_job.autoflow = True
130  self.input_job.lethal_for_worker = False
131  self.input_job.transient_error = True
132 
133  # Worker attributes
134  self.debug = config['debug']
136  # Which methods should be run
137  steps = [ 'fetch_input', 'run' ]
138  if self.input_job.retry_count > 0:
139  steps.insert(0, 'pre_cleanup')
140  if config['execute_writes']:
141  steps.append('write_output')
142  self.__print_debug("steps to run:", steps)
143  self.__send_response('OK')
144 
145  # The actual life-cycle
146  died_somewhere = False
147  try:
148  for s in steps:
149  self.__run_method_if_exists(s)
150  except CompleteEarlyException as e:
151  self.warning(e.args[0] if len(e.args) else repr(e), False)
152  except LostHiveConnectionException as e:
153  # Mothing we can do, let's just exit
154  raise
155  except:
156  died_somewhere = True
157  self.warning( self.__traceback(2), True)
158 
159  try:
160  self.__run_method_if_exists('post_cleanup')
161  except LostHiveConnectionException as e:
162  # Mothing we can do, let's just exit
163  raise
164  except:
165  died_somewhere = True
166  self.warning( self.__traceback(2), True)
167 
168  job_end_structure = {'complete' : not died_somewhere, 'job': {}, 'params': {'substituted': self.__params.param_hash, 'unsubstituted': self.__params.unsubstituted_param_hash}}
169  for x in [ 'autoflow', 'lethal_for_worker', 'transient_error' ]:
170  job_end_structure['job'][x] = getattr(self.input_job, x)
171  self.__send_message_and_wait_for_OK('JOB_END', job_end_structure)
172 
173  def __run_method_if_exists(self, method):
174  """method is one of "pre_cleanup", "fetch_input", "run", "write_output", "post_cleanup".
175  We only the call the method if it exists to save a trip to the database."""
176  if hasattr(self, method):
177  self.__send_message_and_wait_for_OK('JOB_STATUS_UPDATE', method)
178  getattr(self, method)()
179 
180  def __traceback(self, skipped_traces):
181  """Remove "skipped_traces" lines from the stack trace (the eHive part)"""
182  (etype, value, tb) = sys.exc_info()
183  s1 = traceback.format_exception_only(etype, value)
184  l = traceback.extract_tb(tb)[skipped_traces:]
185  s2 = traceback.format_list(l)
186  return "".join(s1+s2)
187 
189  # Public BaseRunnable interface
190 
191 
192  def warning(self, message, is_error = False):
193  """Store a message in the log_message table with is_error indicating whether the warning is actually an error or not"""
194  self.__send_message_and_wait_for_OK('WARNING', {'message': message, 'is_error': is_error})
196  def dataflow(self, output_ids, branch_name_or_code = 1):
197  """Dataflows the output_id(s) on a given branch (default 1). Returns whatever the Perl side returns"""
198  if branch_name_or_code == 1:
199  self.autoflow = False
200  self.__send_message('DATAFLOW', {'output_ids': output_ids, 'branch_name_or_code': branch_name_or_code, 'params': {'substituted': self.__params.param_hash, 'unsubstituted': self.__params.unsubstituted_param_hash}})
201  return self.__read_message()
202 
203  def worker_temp_directory(self):
204  """Returns the full path of the temporary directory created by the worker.
205  Runnables can implement "worker_temp_directory_name()" to return the name
206  they would like to use
207  """
209  template_name = self.worker_temp_directory_name() if hasattr(self, 'worker_temp_directory_name') else None
210  self.__send_message('WORKER_TEMP_DIRECTORY', template_name)
211  self.__created_worker_temp_directory = self.__read_message()['response']
214  # Param interface
215 
216 
217  def param_defaults(self):
218  """Returns the defaults parameters for this runnable"""
219  return {}
220 
221  def param_required(self, param_name):
222  """Returns the value of the parameter "param_name" or raises an exception
223  if anything wrong happens. The exception is marked as non-transient."""
224  t = self.input_job.transient_error
225  self.input_job.transient_error = False
226  v = self.__params.get_param(param_name)
227  self.input_job.transient_error = t
228  return v
229 
230  def param(self, param_name, *args):
231  """When called as a setter: sets the value of the parameter "param_name".
232  When called as a getter: returns the value of the parameter "param_name".
233  It does not raise an exception if the parameter (or another one in the
234  substitution stack) is undefined"""
235  # As a setter
236  if len(args):
237  return self.__params.set_param(param_name, args[0])
238 
239  # As a getter
240  try:
241  return self.__params.get_param(param_name)
242  except KeyError as e:
243  warnings.warn("parameter '{0}' cannot be initialized because {1} is not defined !".format(param_name, e), eHive.Params.ParamWarning, 2)
244  return None
245 
246  def param_exists(self, param_name):
247  """Returns True or False, whether the parameter exists (it doesn't mean it can be successfully substituted)"""
248  return self.__params.has_param(param_name)
249 
250  def param_is_defined(self, param_name):
251  """Returns True or False, whether the parameter exists, can be successfully substituted, and is not None"""
252  if not self.param_exists(param_name):
253  return False
254  try:
255  return self.__params.get_param(param_name) is not None
256  except KeyError:
257  return False
258 
def __read_message(self)
Read a message from the parent and parse it.
Definition: Process.py:95
def param_exists(self, param_name)
Returns True or False, whether the parameter exists (it doesn't mean it can be successfully substitut...
Definition: Process.py:268
def __print_debug(self, args)
Definition: Process.py:63
def param_required(self, param_name)
Returns the value of the parameter "param_name" or raises an exception if anything wrong happens...
Definition: Process.py:242
def __send_message(self, event, content)
seralizes the message in JSON and send it to the parent process
Definition: Process.py:71
def __run_method_if_exists(self, method)
method is one of "pre_cleanup", "fetch_input", "run", "write_output", "post_cleanup".
Definition: Process.py:188
def param_defaults(self)
Returns the defaults parameters for this runnable.
Definition: Process.py:236
def __send_message_and_wait_for_OK(self, event, content)
Send a message and expects a response to be 'OK'.
Definition: Process.py:109
Raised when the process has lost the communication pipe with the Perl side.
Definition: Process.py:37
Equivalent of eHive's Param module.
Definition: Params.py:44
This is the counterpart of GuestProcess.
Definition: Process.py:50
Raised when we could not parse the JSON message coming from GuestProcess.
Definition: Process.py:33
def warning(self, message, is_error=False)
Store a message in the log_message table with is_error indicating whether the warning is actually an ...
Definition: Process.py:208
Can be raised by a derived class of BaseRunnable to indicate an early unsuccessful termination...
Definition: Process.py:29
def __process_life_cycle(self)
Simple loop: wait for job parameters, do the job's life-cycle.
Definition: Process.py:117
def worker_temp_directory(self)
Returns the full path of the temporary directory created by the worker.
Definition: Process.py:224
def param(self, param_name, args)
When called as a setter: sets the value of the parameter "param_name".
Definition: Process.py:254
def __traceback(self, skipped_traces)
Remove "skipped_traces" lines from the stack trace (the eHive part)
Definition: Process.py:195
def __job_life_cycle(self, config)
Job's life-cycle.
Definition: Process.py:131
Used by Process.BaseRunnable.
Definition: Params.py:17
def dataflow(self, output_ids, branch_name_or_code=1)
Dataflows the output_id(s) on a given branch (default 1).
Definition: Process.py:213
def param_is_defined(self, param_name)
Returns True or False, whether the parameter exists, can be successfully substituted, and is not None.
Definition: Process.py:273
Dummy class to hold job-related information.
Definition: Process.py:20
def __send_response(self, response)
Sends a response message to the parent process.
Definition: Process.py:85