my ($self, $valley, $check_buried_in_haste) = @_;
warn "GarbageCollector:\tChecking for lost Workers...\n";
my $last_few_seconds = 5; # FIXME: It is probably a good idea to expose this parameter for easier tuning.
my $queen_overdue_workers = $self->fetch_overdue_workers( $last_few_seconds ); # check the workers we have not seen active during the $last_few_seconds
my %mt_and_pid_to_worker_status = ();
my %worker_status_counts = ();
my %mt_and_pid_to_lost_worker = ();
warn "GarbageCollector:\t[Queen:] out of ".scalar(@$queen_overdue_workers)." Workers that haven't checked in during the last $last_few_seconds seconds...\n";
foreach my $worker (@$queen_overdue_workers) {
my $meadow_type = $worker->meadow_type;
if(my $meadow = $valley->find_available_meadow_responsible_for_worker($worker)) {
$mt_and_pid_to_worker_status{$meadow_type} ||= $meadow->status_of_all_our_workers; # only run this once per reachable Meadow
my $process_id = $worker->process_id;
if(my $status = $mt_and_pid_to_worker_status{$meadow_type}{$process_id}) { # can be RUN|PEND|xSUSP
$worker_status_counts{$meadow_type}{$status}++;
} else {
$worker_status_counts{$meadow_type}{'LOST'}++;
$mt_and_pid_to_lost_worker{$meadow_type}{$process_id} = $worker;
}
} else {
$worker_status_counts{$meadow_type}{'UNREACHABLE'}++; # Worker is unreachable from this Valley
}
}
# print a quick summary report:
foreach my $meadow_type (keys %worker_status_counts) {
warn "GarbageCollector:\t[$meadow_type Meadow:]\t".join(', ', map { "$_:$worker_status_counts{$meadow_type}{$_}" } keys %{$worker_status_counts{$meadow_type}})."\n\n";
}
while(my ($meadow_type, $pid_to_lost_worker) = each %mt_and_pid_to_lost_worker) {
my $this_meadow = $valley->available_meadow_hash->{$meadow_type};
if(my $lost_this_meadow = scalar(keys %$pid_to_lost_worker) ) {
warn "GarbageCollector:\tDiscovered $lost_this_meadow lost $meadow_type Workers\n";
my $report_entries;
if($this_meadow->can('find_out_causes')) {
die "Your Meadow::$meadow_type driver now has to support get_report_entries_for_process_ids() method instead of find_out_causes(). Please update it.\n";
} else {
if ($report_entries = $this_meadow->get_report_entries_for_process_ids( keys %$pid_to_lost_worker )) {
my $lost_with_known_cod = scalar( grep { $_->{'cause_of_death'} } values %$report_entries);
warn "GarbageCollector:\tFound why $lost_with_known_cod of $meadow_type Workers died\n";
}
}
warn "GarbageCollector:\tReleasing the jobs\n";
while(my ($process_id, $worker) = each %$pid_to_lost_worker) {
$worker->died( $report_entries->{$process_id}{'died'} );
$worker->cause_of_death( $report_entries->{$process_id}{'cause_of_death'} );
$self->register_worker_death( $worker );
}
if( %$report_entries ) { # use the opportunity to also
store resource usage of the buried workers:
my $processid_2_workerid = { map { $_ => $pid_to_lost_worker->{$_}->dbID } keys %$pid_to_lost_worker };
$self->store_resource_usage( $report_entries, $processid_2_workerid );
}
}
}
# the following bit is completely Meadow-agnostic and only restores database integrity:
if($check_buried_in_haste) {
warn "GarbageCollector:\tChecking for orphan roles...\n";
my $orphan_roles = $self->db->get_RoleAdaptor->fetch_all_unfinished_roles_of_dead_workers();
if(my $orphan_role_number = scalar @$orphan_roles) {
warn "GarbageCollector:\tfound $orphan_role_number orphan roles, finalizing...\n\n";
foreach my $orphan_role (@$orphan_roles) {
$self->db->get_RoleAdaptor->finalize_role( $orphan_role );
}
}
warn "GarbageCollector:\tChecking for orphan jobs...\n";
my $buried_in_haste_roles = $self->db->get_RoleAdaptor->fetch_all_finished_roles_with_unfinished_jobs();
if(my $bih_number = scalar @$buried_in_haste_roles) {
warn "GarbageCollector:\tfound $bih_number buried roles with orphan jobs, reclaiming.\n\n";
my $job_adaptor = $self->db->get_AnalysisJobAdaptor;
foreach my $role (@$buried_in_haste_roles) {
$job_adaptor->release_undone_jobs_from_role( $role );
}
} else {
warn "GarbageCollector:\tfound none\n";
}
}
}