15 This module mainly implements python's counterpart of GuestProcess. Read 16 the later for more information about the JSON protocol used to communicate. 20 """Dummy class to hold job-related information""" 23 class CompleteEarlyException(Exception):
24 """Can be raised by a derived class of BaseRunnable to indicate an early successful termination""" 27 """Can be raised by a derived class of BaseRunnable to indicate an early unsuccessful termination""" 29 class HiveJSONMessageException(Exception):
30 """Raised when we could not parse the JSON message coming from GuestProcess""" 33 """Raised when the process has lost the communication pipe with the Perl side""" 38 """This is the counterpart of GuestProcess. Note that most of the methods 39 are private to be hidden in the derived classes. 41 This class can be used as a base-class for people to redefine fetch_input(), 42 run() and/or write_output() (and/or pre_cleanup(), post_cleanup()). 43 Jobs are supposed to raise CompleteEarlyException in case they complete before 44 reaching. They can also raise JobFailedException to indicate a general failure 50 def __init__(self, read_fileno, write_fileno):
52 self.
__read_pipe = os.fdopen(read_fileno, mode=
'rb', buffering=0)
53 self.
__write_pipe = os.fdopen(write_fileno, mode=
'wb', buffering=0)
54 self.
__pid = os.getpid()
60 print(
"PYTHON {0}".format(self.
__pid), *args, file=sys.stderr)
65 """seralizes the message in JSON and send it to the parent process""" 66 def default_json_encoder(o):
67 self.
__print_debug(
"Cannot serialize {0} (type {1}) in JSON".format(o, type(o)))
68 return 'UNSERIALIZABLE OBJECT' 69 j = json.dumps({
'event': event,
'content': content}, indent=
None, default=default_json_encoder)
74 except BrokenPipeError
as e:
78 """Sends a response message to the parent process""" 82 self.
__write_pipe.write(bytes(
'{"response": "' + str(response) +
'"}\n',
'utf-8'))
83 except BrokenPipeError
as e:
87 """Read a message from the parent and parse it""" 92 return json.loads(l.decode())
93 except BrokenPipeError
as e:
95 except ValueError
as e:
97 raise HiveJSONMessageException
from e
100 """Send a message and expects a response to be 'OK'""" 103 if response[
'response'] !=
'OK':
107 """Simple loop: wait for job parameters, do the job's life-cycle""" 114 if 'input_job' not in config:
115 self.
__print_debug(
"no params, this is the end of the wrapper")
120 """Job's life-cycle. See GuestProcess for a description of the protocol to communicate with the parent""" 128 for x
in [
'dbID',
'input_id',
'retry_count']:
129 setattr(self.
input_job, x, config[
'input_job'][x])
135 self.
debug = config[
'debug']
138 steps = [
'fetch_input',
'run' ]
140 steps.insert(0,
'pre_cleanup')
141 if config[
'execute_writes']:
142 steps.append(
'write_output')
143 steps.append(
'post_healthcheck')
148 died_somewhere =
False 152 except CompleteEarlyException
as e:
153 self.
warning(e.args[0]
if len(e.args)
else repr(e),
False)
154 except LostHiveConnectionException
as e:
158 died_somewhere =
True 163 except LostHiveConnectionException
as e:
167 died_somewhere =
True 170 job_end_structure = {
'complete' :
not died_somewhere,
'job': {},
'params': {
'substituted': self.
__params.param_hash,
'unsubstituted': self.
__params.unsubstituted_param_hash}}
171 for x
in [
'autoflow',
'lethal_for_worker',
'transient_error' ]:
172 job_end_structure[
'job'][x] = getattr(self.
input_job, x)
176 """method is one of "pre_cleanup", "fetch_input", "run", "write_output", "post_cleanup". 177 We only the call the method if it exists to save a trip to the database.""" 178 if hasattr(self, method):
180 getattr(self, method)()
183 """Remove "skipped_traces" lines from the stack trace (the eHive part)""" 184 (etype, value, tb) = sys.exc_info()
185 s1 = traceback.format_exception_only(etype, value)
186 l = traceback.extract_tb(tb)[skipped_traces:]
187 s2 = traceback.format_list(l)
188 return "".join(s1+s2)
194 def warning(self, message, is_error = False):
195 """Store a message in the log_message table with is_error indicating whether the warning is actually an error or not""" 198 def dataflow(self, output_ids, branch_name_or_code = 1):
199 """Dataflows the output_id(s) on a given branch (default 1). Returns whatever the Perl side returns""" 200 if branch_name_or_code == 1:
202 self.
__send_message(
'DATAFLOW', {
'output_ids': output_ids,
'branch_name_or_code': branch_name_or_code,
'params': {
'substituted': self.
__params.param_hash,
'unsubstituted': self.
__params.unsubstituted_param_hash}})
206 """Returns the full path of the temporary directory created by the worker. 207 Runnables can implement "worker_temp_directory_name()" to return the name 208 they would like to use 211 template_name = self.worker_temp_directory_name()
if hasattr(self,
'worker_temp_directory_name')
else None 220 """Returns the defaults parameters for this runnable""" 224 """Returns the value of the parameter "param_name" or raises an exception 225 if anything wrong happens or the value is None. The exception is 226 marked as non-transient.""" 229 v = self.
__params.get_param(param_name)
235 def param(self, param_name, *args):
236 """When called as a setter: sets the value of the parameter "param_name". 237 When called as a getter: returns the value of the parameter "param_name". 238 It does not raise an exception if the parameter (or another one in the 239 substitution stack) is undefined""" 242 return self.
__params.set_param(param_name, args[0])
246 return self.
__params.get_param(param_name)
247 except KeyError
as e:
248 warnings.warn(
"parameter '{0}' cannot be initialized because {1} is missing !".format(param_name, e),
eHive.Params.ParamWarning, 2)
252 """Returns True if the parameter exists and can be successfully 253 substituted, None if the substitution fails, False if it is missing""" 254 if not self.
__params.has_param(param_name):
263 """Returns True if the parameter exists and can be successfully 264 substituted to a defined value, None if the substitution fails, 265 False if it is missing or evaluates as None""" 271 return self.
__params.get_param(param_name)
is not None 276 def test_job_param(self):
282 j = FakeRunnableWithParams({
290 self.assertIs( j.param_exists(
'a'),
True,
'"a" exists' )
291 self.assertIs( j.param_exists(
'b'),
True,
'"b" exists' )
292 self.assertIs( j.param_exists(
'c'),
None,
'"c"\'s existence is unclear' )
293 self.assertIs( j.param_exists(
'd'),
False,
'"d" doesn\'t exist' )
298 self.assertIs( j.param_is_defined(
'a'),
True,
'"a" is defined' )
299 self.assertIs( j.param_is_defined(
'b'),
False,
'"b" is not defined' )
300 self.assertIs( j.param_is_defined(
'c'),
None,
'"c"\'s defined-ness is unclear' )
301 self.assertIs( j.param_is_defined(
'd'),
False,
'"d" is not defined (it doesn\'t exist)' )
303 j.param_is_defined(
'e')
306 self.assertIs( j.param(
'a'), 3,
'"a" is 3' )
307 self.assertIs( j.param(
'b'),
None,
'"b" is None' )
309 self.assertIs( j.param(
'c'),
None,
'"c"\'s value is unclear' )
311 self.assertIs( j.param(
'd'),
None,
'"d" is not defined (it doesn\'t exist)' )
316 self.assertIs( j.param_required(
'a'), 3,
'"a" is 3' )
318 j.param_required(
'b')
319 with self.assertRaises(KeyError):
320 j.param_required(
'c')
321 with self.assertRaises(KeyError):
322 j.param_required(
'd')
324 j.param_required(
'e')
def __read_message(self)
Read a message from the parent and parse it.
def param_exists(self, param_name)
Returns True if the parameter exists and can be successfully substituted, None if the substitution fa...
__created_worker_temp_directory
def __print_debug(self, args)
def param_required(self, param_name)
Returns the value of the parameter "param_name" or raises an exception if anything wrong happens or t...
def __send_message(self, event, content)
seralizes the message in JSON and send it to the parent process
def __run_method_if_exists(self, method)
method is one of "pre_cleanup", "fetch_input", "run", "write_output", "post_cleanup".
def param_defaults(self)
Returns the defaults parameters for this runnable.
def __send_message_and_wait_for_OK(self, event, content)
Send a message and expects a response to be 'OK'.
Raised when the process has lost the communication pipe with the Perl side.
Equivalent of eHive's Param module.
This is the counterpart of GuestProcess.
Raised when we could not parse the JSON message coming from GuestProcess.
def warning(self, message, is_error=False)
Store a message in the log_message table with is_error indicating whether the warning is actually an ...
Can be raised by a derived class of BaseRunnable to indicate an early unsuccessful termination...
def __process_life_cycle(self)
Simple loop: wait for job parameters, do the job's life-cycle.
def worker_temp_directory(self)
Returns the full path of the temporary directory created by the worker.
Raised when a parameter cannot be required because it is null (None)
def param(self, param_name, args)
When called as a setter: sets the value of the parameter "param_name".
def __traceback(self, skipped_traces)
Remove "skipped_traces" lines from the stack trace (the eHive part)
def __job_life_cycle(self, config)
Job's life-cycle.
Used by Process.BaseRunnable.
def dataflow(self, output_ids, branch_name_or_code=1)
Dataflows the output_id(s) on a given branch (default 1).
def __init__(self, read_fileno, write_fileno)
def param_is_defined(self, param_name)
Returns True if the parameter exists and can be successfully substituted to a defined value...
Dummy class to hold job-related information.
Raised when parameters depend on each other, forming a loop.
def __send_response(self, response)
Sends a response message to the parent process.