my ($self, $specialization_arghash) = @_;
if( my $worker_log_dir = $self->log_dir ) {
$self->get_stdout_redirector->push( $worker_log_dir.'/worker.out' );
$self->get_stderr_redirector->push( $worker_log_dir.'/worker.err' );
}
my $job_adaptor = $self->adaptor->db->get_AnalysisJobAdaptor;
print "\n"; # to clear beekeeper's prompt in case output is not logged
$self->worker_say( $self->toString() );
$self->specialize_and_compile_wrapper( $specialization_arghash );
while (!$self->cause_of_death) { # Worker's lifespan loop (ends only when the worker dies for any reason)
my $jobs_done_by_batches_loop = 0; # by all iterations of internal loop
$self->{'_interval_partial_timing'} = {};
if( my $special_batch = $self->special_batch() ) {
my $special_batch_length = scalar(@$special_batch); # has to be recorded because the list is gradually destroyed
$jobs_done_by_batches_loop += $self->run_one_batch( $special_batch );
$self->cause_of_death( $jobs_done_by_batches_loop == $special_batch_length ? 'JOB_LIMIT' : 'CONTAMINATED');
} else { # a proper "BATCHES" loop
while (!$self->cause_of_death and $batches_stopwatch->get_elapsed < $min_batch_time) {
my $current_role = $self->current_role;
if( scalar(@{ $job_adaptor->fetch_all_incomplete_jobs_by_role_id( $current_role->dbID ) }) ) {
my $msg = "Lost control. Check your Runnable for loose 'next' statements that are not part of a loop";
$self->worker_say( $msg );
$self->cause_of_death('CONTAMINATED');
$job_adaptor->release_undone_jobs_from_role($current_role, $msg);
} elsif( $self->job_limiter->reached()) {
$self->worker_say( "job_limit reached (".$self->work_done." jobs completed)" );
$self->cause_of_death('JOB_LIMIT');
} elsif ( my $alive_for_secs = $self->life_span_limit_reached()) {
$self->worker_say( "life_span limit reached (alive for $alive_for_secs secs)" );
$self->cause_of_death('LIFESPAN');
} else {
my $desired_batch_size = $current_role->analysis->stats->get_or_estimate_batch_size();
my $hit_the_limit; # dummy at the moment
($desired_batch_size, $hit_the_limit) = $self->job_limiter->preliminary_offer( $desired_batch_size );
my $actual_batch = $job_adaptor->grab_jobs_for_role( $current_role, $desired_batch_size );
if(scalar(@$actual_batch)) {
$self->adaptor->db->get_AnalysisStatsAdaptor->interval_update_claim($self->current_role->analysis->dbID, scalar(@$actual_batch));
my $jobs_done_by_this_batch = $self->run_one_batch( $actual_batch );
$jobs_done_by_batches_loop += $jobs_done_by_this_batch;
$self->job_limiter->final_decision( $jobs_done_by_this_batch );
} else {
$self->cause_of_death('NO_WORK');
}
}
}
}
# The following two database-updating operations are resource-expensive (all workers hammering the same database+tables),
# so they are not allowed to happen too frequently (not before $min_batch_time of work has been done)
#
if($jobs_done_by_batches_loop) {
$self->adaptor->db->get_AnalysisStatsAdaptor->interval_update_work_done(
$self->current_role->analysis->dbID,
$jobs_done_by_batches_loop,
$batches_stopwatch->get_elapsed,
$self->{'_interval_partial_timing'}{'FETCH_INPUT'} || 0,
$self->{'_interval_partial_timing'}{'RUN'} || 0,
$self->{'_interval_partial_timing'}{'WRITE_OUTPUT'} || 0,
);
}
# A mechanism whereby workers can be caused to exit even if they were doing fine:
if (!$self->cause_of_death) {
my $analysis = $self->current_role->analysis;
my $stats = $analysis->stats; # make sure it is fresh from the DB
if( defined($stats->hive_capacity) && (0 <= $stats->hive_capacity) && ($self->adaptor->db->get_RoleAdaptor->get_hive_current_load >= 1.1)
or defined($analysis->analysis_capacity) && (0 <= $analysis->analysis_capacity) && ($analysis->analysis_capacity < $stats->num_running_workers)
) {
$self->cause_of_death('HIVE_OVERLOAD');
}
}
my $cod = $self->cause_of_death() || '';
if( $cod eq 'NO_WORK') {
$self->adaptor->db->get_AnalysisStatsAdaptor->update_status( $self->current_role->analysis_id, 'ALL_CLAIMED' );
}
if( $cod =~ /^(NO_WORK|HIVE_OVERLOAD)$/ and $self->can_respecialize and (!$specialization_arghash->{'-analyses_pattern'} or $specialization_arghash->{'-analyses_pattern'}!~/^\w+$/) ) {
my $old_role = $self->current_role;
$self->adaptor->db->get_RoleAdaptor->finalize_role( $old_role, 1 );
$self->current_role( undef );
$self->cause_of_death(undef);
$self->specialize_and_compile_wrapper( $specialization_arghash, $old_role->analysis );
}
} # /Worker's lifespan loop
# have runnable clean up any global/process files/data it may have created
if($self->perform_cleanup) {
if(my $runnable_object = $self->runnable_object()) { # the temp_directory is actually kept in the Process object:
$runnable_object->cleanup_worker_temp_directory();
}
}
# The second arguments ("self_burial") controls whether we need to
# release the current (unfinished) batch
$self->adaptor->register_worker_death($self, ($self->cause_of_death eq 'CONTAMINATED' ? 0 : 1));
if($self->debug) {
$self->worker_say( 'AnalysisStats : '.$self->current_role->analysis->stats->toString ) if( $self->current_role );
$self->worker_say( 'dbc '.$self->adaptor->db->dbc->disconnect_count. ' disconnect cycles' );
}
$self->worker_say( "Having completed ".$self->work_done." jobs the Worker exits : ".$self->cause_of_death );
if( $self->log_dir ) {
$self->get_stdout_redirector->pop();
$self->get_stderr_redirector->pop();
}
}