my ($self, $valley, $check_buried_in_haste, $bury_unkwn_workers) = @_;
my $last_few_seconds = 5; # FIXME: It is probably a good idea to expose this parameter for easier tuning.
warn "GarbageCollector:\tChecking for lost Workers...\n";
my $meadow_type_2_name_2_users = $self->meadow_type_2_name_2_users_of_running_workers();
my %signature_and_pid_to_worker_status = ();
while(my ($meadow_type, $level2) = each %$meadow_type_2_name_2_users) {
if(my $meadow = $valley->available_meadow_hash->{$meadow_type}) { # if this Valley supports $meadow_type at all...
while(my ($meadow_name, $level3) = each %$level2) {
if($meadow->cached_name eq $meadow_name) { # and we can reach the same $meadow_name from this Valley...
my $meadow_users_of_interest = [ keys %$level3 ];
my $meadow_signature = $meadow_type.'/'.$meadow_name;
$signature_and_pid_to_worker_status{$meadow_signature} ||= $meadow->status_of_all_our_workers( $meadow_users_of_interest );
}
}
}
}
my $queen_overdue_workers = $self->fetch_overdue_workers( $last_few_seconds ); # check the workers we have not seen active during the $last_few_seconds
warn "GarbageCollector:\t[Queen:] out of ".scalar(@$queen_overdue_workers)." Workers that haven't checked in during the last $last_few_seconds seconds...\n";
my $update_when_seen_sql = "UPDATE worker SET when_seen=CURRENT_TIMESTAMP WHERE worker_id=?";
my $update_when_seen_sth;
my %meadow_status_counts = ();
my %mt_and_pid_to_lost_worker = ();
foreach my $worker (@$queen_overdue_workers) {
my $meadow_signature = $worker->meadow_type.'/'.$worker->meadow_name;
if(my $pid_to_worker_status = $signature_and_pid_to_worker_status{$meadow_signature}) { # the whole Meadow subhash is either present or the Meadow is unreachable
my $meadow_type = $worker->meadow_type;
my $process_id = $worker->process_id;
my $status = $pid_to_worker_status->{$process_id};
if($bury_unkwn_workers and ($status eq 'UNKWN')) {
if( my $meadow = $valley->find_available_meadow_responsible_for_worker( $worker ) ) {
if($meadow->can('kill_worker')) {
if($worker->meadow_user eq $ENV{'USER'}) { # if I'm actually allowed to kill the worker...
warn "GarbageCollector:\tKilling/forgetting the UNKWN worker by process_id $process_id";
$meadow->kill_worker($worker, 1);
$status = ''; # make it look like LOST
}
}
}
}
if($status) { # can be RUN|PEND|xSUSP
$meadow_status_counts{$meadow_signature}{$status}++;
# only prepare once at most:
$update_when_seen_sth ||= $self->prepare( $update_when_seen_sql );
$update_when_seen_sth->execute( $worker->dbID );
} else {
$meadow_status_counts{$meadow_signature}{'LOST'}++;
$mt_and_pid_to_lost_worker{$meadow_type}{$process_id} = $worker;
}
} else {
$meadow_status_counts{$meadow_signature}{'UNREACHABLE'}++; # Worker is unreachable from this Valley
}
}
$update_when_seen_sth->finish() if $update_when_seen_sth;
# print a quick summary report:
while(my ($meadow_signature, $status_count) = each %meadow_status_counts) {
warn "GarbageCollector:\t[$meadow_signature Meadow:]\t".join(', ', map { "$_:$status_count->{$_}" } keys %$status_count )."\n\n";
}
while(my ($meadow_type, $pid_to_lost_worker) = each %mt_and_pid_to_lost_worker) {
my $this_meadow = $valley->available_meadow_hash->{$meadow_type};
if(my $lost_this_meadow = scalar(keys %$pid_to_lost_worker) ) {
warn "GarbageCollector:\tDiscovered $lost_this_meadow lost $meadow_type Workers\n";
my $report_entries;
if($this_meadow->can('find_out_causes')) {
die "Your Meadow::$meadow_type driver now has to support get_report_entries_for_process_ids() method instead of find_out_causes(). Please update it.\n";
} else {
if ($report_entries = $this_meadow->get_report_entries_for_process_ids( keys %$pid_to_lost_worker )) {
my $lost_with_known_cod = scalar( grep { $_->{'cause_of_death'} } values %$report_entries);
warn "GarbageCollector:\tFound why $lost_with_known_cod of $meadow_type Workers died\n";
}
}
warn "GarbageCollector:\tReleasing the jobs\n";
while(my ($process_id, $worker) = each %$pid_to_lost_worker) {
$worker->when_died( $report_entries->{$process_id}{'when_died'} );
$worker->cause_of_death( $report_entries->{$process_id}{'cause_of_death'} );
$self->register_worker_death( $worker );
}
if( %$report_entries ) { # use the opportunity to also store resource usage of the buried workers:
my $processid_2_workerid = { map { $_ => $pid_to_lost_worker->{$_}->dbID } keys %$pid_to_lost_worker };
$self->store_resource_usage( $report_entries, $processid_2_workerid );
}
}
}
# the following bit is completely Meadow-agnostic and only restores database integrity:
if($check_buried_in_haste) {
warn "GarbageCollector:\tChecking for orphan roles...\n";
my $orphan_roles = $self->db->get_RoleAdaptor->fetch_all_unfinished_roles_of_dead_workers();
if(my $orphan_role_number = scalar @$orphan_roles) {
warn "GarbageCollector:\tfound $orphan_role_number orphan roles, finalizing...\n\n";
foreach my $orphan_role (@$orphan_roles) {
$self->db->get_RoleAdaptor->finalize_role( $orphan_role );
}
}
warn "GarbageCollector:\tChecking for orphan jobs...\n";
my $buried_in_haste_roles = $self->db->get_RoleAdaptor->fetch_all_finished_roles_with_unfinished_jobs();
if(my $bih_number = scalar @$buried_in_haste_roles) {
warn "GarbageCollector:\tfound $bih_number buried roles with orphan jobs, reclaiming.\n\n";
my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
foreach my $role (@$buried_in_haste_roles) {
$job_adaptor->release_undone_jobs_from_role( $role );
}
} else {
warn "GarbageCollector:\tfound none\n";
}
}
}