ensembl-hive-python3  2.4
Process.py
Go to the documentation of this file.
1 
2 import eHive.Params
3 
4 import os
5 import sys
6 import json
7 import numbers
8 import unittest
9 import warnings
10 import traceback
11 
12 __version__ = "0.2"
13 
14 __doc__ = """
15 This module mainly implements python's counterpart of GuestProcess. Read
16 the later for more information about the JSON protocol used to communicate.
17 """
18 
19 class Job(object):
20  """Dummy class to hold job-related information"""
21  pass
22 
23 class CompleteEarlyException(Exception):
24  """Can be raised by a derived class of BaseRunnable to indicate an early successful termination"""
25  pass
26 class JobFailedException(Exception):
27  """Can be raised by a derived class of BaseRunnable to indicate an early unsuccessful termination"""
28  pass
29 class HiveJSONMessageException(Exception):
30  """Raised when we could not parse the JSON message coming from GuestProcess"""
31  pass
32 class LostHiveConnectionException(Exception):
33  """Raised when the process has lost the communication pipe with the Perl side"""
34  pass
35 
36 
37 class BaseRunnable(object):
38  """This is the counterpart of GuestProcess. Note that most of the methods
39  are private to be hidden in the derived classes.
40 
41  This class can be used as a base-class for people to redefine fetch_input(),
42  run() and/or write_output() (and/or pre_cleanup(), post_cleanup()).
43  Jobs are supposed to raise CompleteEarlyException in case they complete before
44  reaching. They can also raise JobFailedException to indicate a general failure
45  """
46 
47  # Private BaseRunnable interface
48 
49 
50  def __init__(self, read_fileno, write_fileno):
51  # We need the binary mode to disable the buffering
52  self.__read_pipe = os.fdopen(read_fileno, mode='rb', buffering=0)
53  self.__write_pipe = os.fdopen(write_fileno, mode='wb', buffering=0)
54  self.__pid = os.getpid()
55  self.debug = 0
57 
58  def __print_debug(self, *args):
59  if self.debug > 1:
60  print("PYTHON {0}".format(self.__pid), *args, file=sys.stderr)
61 
62  # FIXME: we can probably merge __send_message and __send_response
63 
64  def __send_message(self, event, content):
65  """seralizes the message in JSON and send it to the parent process"""
66  def default_json_encoder(o):
67  self.__print_debug("Cannot serialize {0} (type {1}) in JSON".format(o, type(o)))
68  return 'UNSERIALIZABLE OBJECT'
69  j = json.dumps({'event': event, 'content': content}, indent=None, default=default_json_encoder)
70  self.__print_debug('__send_message:', j)
71  # UTF8 encoding has never been tested. Just hope it works :)
72  try:
73  self.__write_pipe.write(bytes(j+"\n", 'utf-8'))
74  except BrokenPipeError as e:
75  raise LostHiveConnectionException("__write_pipe") from None
76 
77  def __send_response(self, response):
78  """Sends a response message to the parent process"""
79  self.__print_debug('__send_response:', response)
80  # Like above, UTF8 encoding has never been tested. Just hope it works :)
81  try:
82  self.__write_pipe.write(bytes('{"response": "' + str(response) + '"}\n', 'utf-8'))
83  except BrokenPipeError as e:
84  raise LostHiveConnectionException("__write_pipe") from None
85 
86  def __read_message(self):
87  """Read a message from the parent and parse it"""
88  try:
89  self.__print_debug("__read_message ...")
90  l = self.__read_pipe.readline()
91  self.__print_debug(" ... -> ", l[:-1].decode())
92  return json.loads(l.decode())
93  except BrokenPipeError as e:
94  raise LostHiveConnectionException("__read_pipe") from None
95  except ValueError as e:
96  # HiveJSONMessageException is a more meaningful name than ValueError
97  raise HiveJSONMessageException from e
98 
99  def __send_message_and_wait_for_OK(self, event, content):
100  """Send a message and expects a response to be 'OK'"""
101  self.__send_message(event, content)
102  response = self.__read_message()
103  if response['response'] != 'OK':
104  raise HiveJSONMessageException("Received '{0}' instead of OK".format(response))
105 
106  def __process_life_cycle(self):
107  """Simple loop: wait for job parameters, do the job's life-cycle"""
108  self.__send_message_and_wait_for_OK('VERSION', __version__)
109  self.__send_message_and_wait_for_OK('PARAM_DEFAULTS', self.param_defaults())
111  while True:
112  self.__print_debug("waiting for instructions")
113  config = self.__read_message()
114  if 'input_job' not in config:
115  self.__print_debug("no params, this is the end of the wrapper")
116  return
117  self.__job_life_cycle(config)
119  def __job_life_cycle(self, config):
120  """Job's life-cycle. See GuestProcess for a description of the protocol to communicate with the parent"""
121  self.__print_debug("__life_cycle")
122 
123  # Params
124  self.__params = eHive.Params.ParamContainer(config['input_job']['parameters'], self.debug > 1)
125 
126  # Job attributes
127  self.input_job = Job()
128  for x in ['dbID', 'input_id', 'retry_count']:
129  setattr(self.input_job, x, config['input_job'][x])
130  self.input_job.autoflow = True
131  self.input_job.lethal_for_worker = False
132  self.input_job.transient_error = True
133 
134  # Worker attributes
135  self.debug = config['debug']
137  # Which methods should be run
138  steps = [ 'fetch_input', 'run' ]
139  if self.input_job.retry_count > 0:
140  steps.insert(0, 'pre_cleanup')
141  if config['execute_writes']:
142  steps.append('write_output')
143  steps.append('post_healthcheck')
144  self.__print_debug("steps to run:", steps)
145  self.__send_response('OK')
146 
147  # The actual life-cycle
148  died_somewhere = False
149  try:
150  for s in steps:
151  self.__run_method_if_exists(s)
152  except CompleteEarlyException as e:
153  self.warning(e.args[0] if len(e.args) else repr(e), False)
154  except LostHiveConnectionException as e:
155  # Mothing we can do, let's just exit
156  raise
157  except:
158  died_somewhere = True
159  self.warning( self.__traceback(2), True)
160 
161  try:
162  self.__run_method_if_exists('post_cleanup')
163  except LostHiveConnectionException as e:
164  # Mothing we can do, let's just exit
165  raise
166  except:
167  died_somewhere = True
168  self.warning( self.__traceback(2), True)
169 
170  job_end_structure = {'complete' : not died_somewhere, 'job': {}, 'params': {'substituted': self.__params.param_hash, 'unsubstituted': self.__params.unsubstituted_param_hash}}
171  for x in [ 'autoflow', 'lethal_for_worker', 'transient_error' ]:
172  job_end_structure['job'][x] = getattr(self.input_job, x)
173  self.__send_message_and_wait_for_OK('JOB_END', job_end_structure)
174 
175  def __run_method_if_exists(self, method):
176  """method is one of "pre_cleanup", "fetch_input", "run", "write_output", "post_cleanup".
177  We only the call the method if it exists to save a trip to the database."""
178  if hasattr(self, method):
179  self.__send_message_and_wait_for_OK('JOB_STATUS_UPDATE', method)
180  getattr(self, method)()
181 
182  def __traceback(self, skipped_traces):
183  """Remove "skipped_traces" lines from the stack trace (the eHive part)"""
184  (etype, value, tb) = sys.exc_info()
185  s1 = traceback.format_exception_only(etype, value)
186  l = traceback.extract_tb(tb)[skipped_traces:]
187  s2 = traceback.format_list(l)
188  return "".join(s1+s2)
189 
191  # Public BaseRunnable interface
192 
193 
194  def warning(self, message, is_error = False):
195  """Store a message in the log_message table with is_error indicating whether the warning is actually an error or not"""
196  self.__send_message_and_wait_for_OK('WARNING', {'message': message, 'is_error': is_error})
198  def dataflow(self, output_ids, branch_name_or_code = 1):
199  """Dataflows the output_id(s) on a given branch (default 1). Returns whatever the Perl side returns"""
200  if branch_name_or_code == 1:
201  self.autoflow = False
202  self.__send_message('DATAFLOW', {'output_ids': output_ids, 'branch_name_or_code': branch_name_or_code, 'params': {'substituted': self.__params.param_hash, 'unsubstituted': self.__params.unsubstituted_param_hash}})
203  return self.__read_message()['response']
204 
205  def worker_temp_directory(self):
206  """Returns the full path of the temporary directory created by the worker.
207  Runnables can implement "worker_temp_directory_name()" to return the name
208  they would like to use
209  """
211  template_name = self.worker_temp_directory_name() if hasattr(self, 'worker_temp_directory_name') else None
212  self.__send_message('WORKER_TEMP_DIRECTORY', template_name)
213  self.__created_worker_temp_directory = self.__read_message()['response']
216  # Param interface
217 
218 
219  def param_defaults(self):
220  """Returns the defaults parameters for this runnable"""
221  return {}
222 
223  def param_required(self, param_name):
224  """Returns the value of the parameter "param_name" or raises an exception
225  if anything wrong happens or the value is None. The exception is
226  marked as non-transient."""
227  t = self.input_job.transient_error
228  self.input_job.transient_error = False
229  v = self.__params.get_param(param_name)
230  if v is None:
231  raise eHive.Params.NullParamException(param_name)
232  self.input_job.transient_error = t
233  return v
234 
235  def param(self, param_name, *args):
236  """When called as a setter: sets the value of the parameter "param_name".
237  When called as a getter: returns the value of the parameter "param_name".
238  It does not raise an exception if the parameter (or another one in the
239  substitution stack) is undefined"""
240  # As a setter
241  if len(args):
242  return self.__params.set_param(param_name, args[0])
243 
244  # As a getter
245  try:
246  return self.__params.get_param(param_name)
247  except KeyError as e:
248  warnings.warn("parameter '{0}' cannot be initialized because {1} is missing !".format(param_name, e), eHive.Params.ParamWarning, 2)
249  return None
250 
251  def param_exists(self, param_name):
252  """Returns True if the parameter exists and can be successfully
253  substituted, None if the substitution fails, False if it is missing"""
254  if not self.__params.has_param(param_name):
255  return False
256  try:
257  self.__params.get_param(param_name)
258  return True
259  except KeyError:
260  return None
261 
262  def param_is_defined(self, param_name):
263  """Returns True if the parameter exists and can be successfully
264  substituted to a defined value, None if the substitution fails,
265  False if it is missing or evaluates as None"""
266  e = self.param_exists(param_name)
267  if not e:
268  # False or None
269  return e
270  try:
271  return self.__params.get_param(param_name) is not None
272  except KeyError:
273  return False
275 class RunnableTest(unittest.TestCase):
276  def test_job_param(self):
277  class FakeRunnableWithParams(BaseRunnable):
278  def __init__(self, d):
279  self._BaseRunnable__params = eHive.Params.ParamContainer(d)
280  self.input_job = Job()
281  self.input_job.transient_error = True
282  j = FakeRunnableWithParams({
283  'a': 3,
284  'b': None,
285  'c': '#other#',
286  'e': '#e#'
287  })
288 
289  # param_exists
290  self.assertIs( j.param_exists('a'), True, '"a" exists' )
291  self.assertIs( j.param_exists('b'), True, '"b" exists' )
292  self.assertIs( j.param_exists('c'), None, '"c"\'s existence is unclear' )
293  self.assertIs( j.param_exists('d'), False, '"d" doesn\'t exist' )
294  with self.assertRaises(eHive.Params.ParamInfiniteLoopException):
295  j.param_exists('e')
296 
297  # param_is_defined
298  self.assertIs( j.param_is_defined('a'), True, '"a" is defined' )
299  self.assertIs( j.param_is_defined('b'), False, '"b" is not defined' )
300  self.assertIs( j.param_is_defined('c'), None, '"c"\'s defined-ness is unclear' )
301  self.assertIs( j.param_is_defined('d'), False, '"d" is not defined (it doesn\'t exist)' )
303  j.param_is_defined('e')
304 
305  # param
306  self.assertIs( j.param('a'), 3, '"a" is 3' )
307  self.assertIs( j.param('b'), None, '"b" is None' )
308  with self.assertWarns(eHive.Params.ParamWarning):
309  self.assertIs( j.param('c'), None, '"c"\'s value is unclear' )
310  with self.assertWarns(eHive.Params.ParamWarning):
311  self.assertIs( j.param('d'), None, '"d" is not defined (it doesn\'t exist)' )
312  with self.assertRaises(eHive.Params.ParamInfiniteLoopException):
313  j.param('e')
314 
315  # param_required
316  self.assertIs( j.param_required('a'), 3, '"a" is 3' )
317  with self.assertRaises(eHive.Params.NullParamException):
318  j.param_required('b')
319  with self.assertRaises(KeyError):
320  j.param_required('c')
321  with self.assertRaises(KeyError):
322  j.param_required('d')
323  with self.assertRaises(eHive.Params.ParamInfiniteLoopException):
324  j.param_required('e')
325 
def __read_message(self)
Read a message from the parent and parse it.
Definition: Process.py:96
def param_exists(self, param_name)
Returns True if the parameter exists and can be successfully substituted, None if the substitution fa...
Definition: Process.py:274
def __print_debug(self, args)
Definition: Process.py:64
def param_required(self, param_name)
Returns the value of the parameter "param_name" or raises an exception if anything wrong happens or t...
Definition: Process.py:245
def __send_message(self, event, content)
seralizes the message in JSON and send it to the parent process
Definition: Process.py:72
def __run_method_if_exists(self, method)
method is one of "pre_cleanup", "fetch_input", "run", "write_output", "post_cleanup".
Definition: Process.py:190
def param_defaults(self)
Returns the defaults parameters for this runnable.
Definition: Process.py:238
def __send_message_and_wait_for_OK(self, event, content)
Send a message and expects a response to be 'OK'.
Definition: Process.py:110
Raised when the process has lost the communication pipe with the Perl side.
Definition: Process.py:38
Equivalent of eHive's Param module.
Definition: Params.py:49
This is the counterpart of GuestProcess.
Definition: Process.py:51
Raised when we could not parse the JSON message coming from GuestProcess.
Definition: Process.py:34
def warning(self, message, is_error=False)
Store a message in the log_message table with is_error indicating whether the warning is actually an ...
Definition: Process.py:210
Can be raised by a derived class of BaseRunnable to indicate an early unsuccessful termination...
Definition: Process.py:30
def __process_life_cycle(self)
Simple loop: wait for job parameters, do the job's life-cycle.
Definition: Process.py:118
def worker_temp_directory(self)
Returns the full path of the temporary directory created by the worker.
Definition: Process.py:226
Raised when a parameter cannot be required because it is null (None)
Definition: Params.py:42
def param(self, param_name, args)
When called as a setter: sets the value of the parameter "param_name".
Definition: Process.py:259
def __traceback(self, skipped_traces)
Remove "skipped_traces" lines from the stack trace (the eHive part)
Definition: Process.py:197
def __job_life_cycle(self, config)
Job's life-cycle.
Definition: Process.py:132
Used by Process.BaseRunnable.
Definition: Params.py:17
def dataflow(self, output_ids, branch_name_or_code=1)
Dataflows the output_id(s) on a given branch (default 1).
Definition: Process.py:215
def __init__(self, read_fileno, write_fileno)
Definition: Process.py:56
def param_is_defined(self, param_name)
Returns True if the parameter exists and can be successfully substituted to a defined value...
Definition: Process.py:287
Dummy class to hold job-related information.
Definition: Process.py:21
Raised when parameters depend on each other, forming a loop.
Definition: Params.py:37
def __send_response(self, response)
Sends a response message to the parent process.
Definition: Process.py:86