14 This module mainly implements python's counterpart of GuestProcess. Read 15 the later for more information about the JSON protocol used to communicate. 19 """Dummy class to hold job-related information""" 22 class CompleteEarlyException(Exception):
23 """Can be raised by a derived class of BaseRunnable to indicate an early successful termination""" 26 """Can be raised by a derived class of BaseRunnable to indicate an early unsuccessful termination""" 28 class HiveJSONMessageException(Exception):
29 """Raised when we could not parse the JSON message coming from GuestProcess""" 32 """Raised when the process has lost the communication pipe with the Perl side""" 37 """This is the counterpart of GuestProcess. Note that most of the methods 38 are private to be hidden in the derived classes. 40 This class can be used as a base-class for people to redefine fetch_input(), 41 run() and/or write_output() (and/or pre_cleanup(), post_cleanup()). 42 Jobs are supposed to raise CompleteEarlyException in case they complete before 43 reaching. They can also raise JobFailedException to indicate a general failure 49 def __init__(self, read_fileno, write_fileno):
51 self.
__read_pipe = os.fdopen(read_fileno, mode=
'rb', buffering=0)
52 self.
__write_pipe = os.fdopen(write_fileno, mode=
'wb', buffering=0)
53 self.
__pid = os.getpid()
59 print(
"PYTHON {0}".format(self.
__pid), *args, file=sys.stderr)
64 """seralizes the message in JSON and send it to the parent process""" 65 def default_json_encoder(o):
66 self.
__print_debug(
"Cannot serialize {0} (type {1}) in JSON".format(o, type(o)))
67 return 'UNSERIALIZABLE OBJECT' 68 j = json.dumps({
'event': event,
'content': content}, indent=
None, default=default_json_encoder)
73 except BrokenPipeError
as e:
77 """Sends a response message to the parent process""" 81 self.
__write_pipe.write(bytes(
'{"response": "' + str(response) +
'"}\n',
'utf-8'))
82 except BrokenPipeError
as e:
86 """Read a message from the parent and parse it""" 91 return json.loads(l.decode())
92 except BrokenPipeError
as e:
94 except ValueError
as e:
96 raise HiveJSONMessageException
from e
99 """Send a message and expects a response to be 'OK'""" 102 if response[
'response'] !=
'OK':
106 """Simple loop: wait for job parameters, do the job's life-cycle""" 113 if 'input_job' not in config:
114 self.
__print_debug(
"no params, this is the end of the wrapper")
119 """Job's life-cycle. See GuestProcess for a description of the protocol to communicate with the parent""" 127 for x
in [
'dbID',
'input_id',
'retry_count']:
128 setattr(self.
input_job, x, config[
'input_job'][x])
134 self.
debug = config[
'debug']
137 steps = [
'fetch_input',
'run' ]
139 steps.insert(0,
'pre_cleanup')
140 if config[
'execute_writes']:
141 steps.append(
'write_output')
146 died_somewhere =
False 150 except CompleteEarlyException
as e:
151 self.
warning(e.args[0]
if len(e.args)
else repr(e),
False)
152 except LostHiveConnectionException
as e:
156 died_somewhere =
True 161 except LostHiveConnectionException
as e:
165 died_somewhere =
True 168 job_end_structure = {
'complete' :
not died_somewhere,
'job': {},
'params': {
'substituted': self.
__params.param_hash,
'unsubstituted': self.
__params.unsubstituted_param_hash}}
169 for x
in [
'autoflow',
'lethal_for_worker',
'transient_error' ]:
170 job_end_structure[
'job'][x] = getattr(self.
input_job, x)
174 """method is one of "pre_cleanup", "fetch_input", "run", "write_output", "post_cleanup". 175 We only the call the method if it exists to save a trip to the database.""" 176 if hasattr(self, method):
178 getattr(self, method)()
181 """Remove "skipped_traces" lines from the stack trace (the eHive part)""" 182 (etype, value, tb) = sys.exc_info()
183 s1 = traceback.format_exception_only(etype, value)
184 l = traceback.extract_tb(tb)[skipped_traces:]
185 s2 = traceback.format_list(l)
186 return "".join(s1+s2)
192 def warning(self, message, is_error = False):
193 """Store a message in the log_message table with is_error indicating whether the warning is actually an error or not""" 196 def dataflow(self, output_ids, branch_name_or_code = 1):
197 """Dataflows the output_id(s) on a given branch (default 1). Returns whatever the Perl side returns""" 198 if branch_name_or_code == 1:
200 self.
__send_message(
'DATAFLOW', {
'output_ids': output_ids,
'branch_name_or_code': branch_name_or_code,
'params': {
'substituted': self.
__params.param_hash,
'unsubstituted': self.
__params.unsubstituted_param_hash}})
204 """Returns the full path of the temporary directory created by the worker. 205 Runnables can implement "worker_temp_directory_name()" to return the name 206 they would like to use 209 template_name = self.worker_temp_directory_name()
if hasattr(self,
'worker_temp_directory_name')
else None 218 """Returns the defaults parameters for this runnable""" 222 """Returns the value of the parameter "param_name" or raises an exception 223 if anything wrong happens. The exception is marked as non-transient.""" 226 v = self.
__params.get_param(param_name)
230 def param(self, param_name, *args):
231 """When called as a setter: sets the value of the parameter "param_name". 232 When called as a getter: returns the value of the parameter "param_name". 233 It does not raise an exception if the parameter (or another one in the 234 substitution stack) is undefined""" 237 return self.
__params.set_param(param_name, args[0])
241 return self.
__params.get_param(param_name)
242 except KeyError
as e:
243 warnings.warn(
"parameter '{0}' cannot be initialized because {1} is not defined !".format(param_name, e),
eHive.Params.ParamWarning, 2)
247 """Returns True or False, whether the parameter exists (it doesn't mean it can be successfully substituted)""" 248 return self.
__params.has_param(param_name)
251 """Returns True or False, whether the parameter exists, can be successfully substituted, and is not None""" 255 return self.
__params.get_param(param_name)
is not None def __read_message(self)
Read a message from the parent and parse it.
def param_exists(self, param_name)
Returns True or False, whether the parameter exists (it doesn't mean it can be successfully substitut...
__created_worker_temp_directory
def __print_debug(self, args)
def param_required(self, param_name)
Returns the value of the parameter "param_name" or raises an exception if anything wrong happens...
def __send_message(self, event, content)
seralizes the message in JSON and send it to the parent process
def __run_method_if_exists(self, method)
method is one of "pre_cleanup", "fetch_input", "run", "write_output", "post_cleanup".
def param_defaults(self)
Returns the defaults parameters for this runnable.
def __send_message_and_wait_for_OK(self, event, content)
Send a message and expects a response to be 'OK'.
Raised when the process has lost the communication pipe with the Perl side.
Equivalent of eHive's Param module.
This is the counterpart of GuestProcess.
Raised when we could not parse the JSON message coming from GuestProcess.
def warning(self, message, is_error=False)
Store a message in the log_message table with is_error indicating whether the warning is actually an ...
Can be raised by a derived class of BaseRunnable to indicate an early unsuccessful termination...
def __process_life_cycle(self)
Simple loop: wait for job parameters, do the job's life-cycle.
def worker_temp_directory(self)
Returns the full path of the temporary directory created by the worker.
def param(self, param_name, args)
When called as a setter: sets the value of the parameter "param_name".
def __traceback(self, skipped_traces)
Remove "skipped_traces" lines from the stack trace (the eHive part)
def __job_life_cycle(self, config)
Job's life-cycle.
Used by Process.BaseRunnable.
def dataflow(self, output_ids, branch_name_or_code=1)
Dataflows the output_id(s) on a given branch (default 1).
def param_is_defined(self, param_name)
Returns True or False, whether the parameter exists, can be successfully substituted, and is not None.
Dummy class to hold job-related information.
def __send_response(self, response)
Sends a response message to the parent process.