sub main {
$| = 1;
# ok this is a hack, but I'm going to pretend I've got an object here
# by creating a hash ref and passing it around like an object
# this is to avoid using global variables in functions, and to consolidate
# the globals into a nice '$self' package
my $self = {};
my $help = 0;
my $report_versions = 0;
my $loopit = 0;
my $sync = 0;
my $local = 0;
my $show_failed_jobs = 0;
my $default_meadow_type = undef;
my $submit_workers_max = undef;
my $total_running_workers_max = undef;
my $submission_options = undef;
my $run = 0;
my $max_loops = 0; # not running by default
my $run_job_id = undef;
my $force = undef;
my $keep_alive = 0; # ==1 means run even when there is nothing to do
my $check_for_dead = 0;
my $all_dead = 0;
my $balance_semaphores = 0;
my $job_id_for_output = 0;
my $show_worker_stats = 0;
my $kill_worker_id = 0;
my $reset_job_id = 0;
my $reset_all_jobs_for_analysis = 0;
my $reset_failed_jobs_for_analysis = 0;
$self->{'url'} = undef;
$self->{'reg_conf'} = undef;
$self->{'reg_type'} = undef;
$self->{'reg_alias'} = undef;
$self->{'nosqlvc'} = undef;
$self->{'config_files'} = [];
$self->{'sleep_minutes'} = 1;
$self->{'retry_throwing_jobs'} = undef;
$self->{'can_respecialize'} = undef;
$self->{'hive_log_dir'} = undef;
$self->{'submit_log_dir'} = undef;
GetOptions(
# connection parameters
'url=s' => \$self->{'url'},
'reg_conf|regfile=s' => \$self->{'reg_conf'},
'reg_type=s' => \$self->{'reg_type'},
'reg_alias|regname=s'=> \$self->{'reg_alias'},
'nosqlvc=i' => \$self->{'nosqlvc'}, # can't use the binary "!" as it is a propagated option
# json config files
'config_file=s' => $self->{'config_files'},
# loop control
'run' => \$run,
'loop' => \$loopit,
'max_loops=i' => \$max_loops,
'keep_alive' => \$keep_alive,
'job_id|run_job_id=i'=> \$run_job_id,
'force=i' => \$force,
'sleep=f' => \$self->{'sleep_minutes'},
# meadow control
'local!' => \$local,
'meadow_type=s' => \$default_meadow_type,
'total_running_workers_max=i' => \$total_running_workers_max,
'submit_workers_max=i' => \$submit_workers_max,
'submission_options=s' => \$submission_options,
# worker control
'job_limit=i' => \$self->{'job_limit'},
'life_span|lifespan=i' => \$self->{'life_span'},
'logic_name=s' => \$self->{'logic_name'},
'hive_log_dir|hive_output_dir=s' => \$self->{'hive_log_dir'},
'retry_throwing_jobs=i' => \$self->{'retry_throwing_jobs'},
'can_respecialize=i' => \$self->{'can_respecialize'},
'debug=i' => \$self->{'debug'},
'submit_log_dir=s' => \$self->{'submit_log_dir'},
# other commands/options
'h|help!' => \$help,
'v|versions!' => \$report_versions,
'sync!' => \$sync,
'dead!' => \$check_for_dead,
'killworker=i' => \$kill_worker_id,
'alldead!' => \$all_dead,
'balance_semaphores'=> \$balance_semaphores,
'no_analysis_stats' => \$self->{'no_analysis_stats'},
'worker_stats' => \$show_worker_stats,
'failed_jobs' => \$show_failed_jobs,
'reset_job_id=i' => \$reset_job_id,
'reset_failed|reset_failed_jobs_for_analysis=s' => \$reset_failed_jobs_for_analysis,
'reset_all|reset_all_jobs_for_analysis=s' => \$reset_all_jobs_for_analysis,
'job_output=i' => \$job_id_for_output,
);
if ($help) { script_usage(0); }
if($report_versions) {
report_versions();
exit(0);
}
my $config = Bio::EnsEMBL::Hive::Utils::Config->new(@{$self->{'config_files'}});
if($run or $run_job_id) {
$max_loops = 1;
} elsif ($loopit or $keep_alive) {
unless($max_loops) {
$max_loops = -1; # unlimited
}
}
if($self->{'url'} or $self->{'reg_alias'}) {
$self->{'dba'} = Bio::EnsEMBL::Hive::DBSQL::DBAdaptor->new(
-url => $self->{'url'},
-reg_conf => $self->{'reg_conf'},
-reg_type => $self->{'reg_type'},
-reg_alias => $self->{'reg_alias'},
-no_sql_schema_version_check => $self->{'nosqlvc'},
);
} else {
print "\nERROR : Connection parameters (url or reg_conf+reg_alias) need to be specified\n\n";
script_usage(1);
}
$self->{'safe_url'} = $self->{'dba'}->dbc->url('WORKER_PASSWORD');
my $queen = $self->{'dba'}->get_Queen;
my $pipeline_name = $self->{'dba'}->get_MetaAdaptor->get_value_by_key( 'hive_pipeline_name' );
if($pipeline_name) {
warn "Pipeline name: $pipeline_name\n";
} else {
print STDERR "+---------------------------------------------------------------------+\n";
print STDERR "! !\n";
print STDERR "! WARNING: !\n";
print STDERR "! !\n";
print STDERR "! At the moment your pipeline doesn't have 'pipeline_name' defined. !\n";
print STDERR "! This may seriously impair your beekeeping experience unless you are !\n";
print STDERR "! the only farm user. The name should be set in your PipeConfig file, !\n";
print STDERR "! or if you are running an old pipeline you can just set it by hand !\n";
print STDERR "! in the 'meta' table. !\n";
print STDERR "! !\n";
print STDERR "+---------------------------------------------------------------------+\n";
}
if($run_job_id) {
$submit_workers_max = 1;
}
$default_meadow_type = 'LOCAL' if($local);
my $valley = Bio::EnsEMBL::Hive::Valley->new( $config, $default_meadow_type, $pipeline_name );
my ($beekeeper_meadow_type, $beekeeper_meadow_name) = $valley->whereami();
unless($beekeeper_meadow_type eq 'LOCAL') {
die "beekeeper.pl detected it has been itself submitted to '$beekeeper_meadow_type/$beekeeper_meadow_name', but this mode of operation is not supported.\n"
."Please just run beekeeper.pl on a farm head node, preferably from under a 'screen' session.\n";
}
$valley->config_set('SubmitWorkersMax', $submit_workers_max) if(defined $submit_workers_max);
my $default_meadow = $valley->get_default_meadow();
warn "Default meadow: ".$default_meadow->signature."\n\n";
$default_meadow->config_set('TotalRunningWorkersMax', $total_running_workers_max) if(defined $total_running_workers_max);
$default_meadow->config_set('SubmissionOptions', $submission_options) if(defined $submission_options);
if($reset_job_id) { $queen->reset_job_by_dbID_and_sync($reset_job_id); }
if($job_id_for_output) {
printf("===== job output\n");
my $job = $self->{'dba'}->get_AnalysisJobAdaptor->fetch_by_dbID($job_id_for_output);
print $job->toString. "\n";
}
if(my $reset_logic_name = $reset_all_jobs_for_analysis || $reset_failed_jobs_for_analysis) {
my $reset_analysis = $self->{'dba'}->get_AnalysisAdaptor->fetch_by_logic_name($reset_logic_name)
|| die( "Cannot AnalysisAdaptor->fetch_by_logic_name($reset_logic_name)");
$self->{'dba'}->get_AnalysisJobAdaptor->reset_jobs_for_analysis_id($reset_analysis->dbID, $reset_all_jobs_for_analysis);
$self->{'dba'}->get_Queen->synchronize_AnalysisStats($reset_analysis->stats);
}
if ($kill_worker_id) {
my $kill_worker = $queen->fetch_by_dbID($kill_worker_id);
unless( $kill_worker->cause_of_death() ) {
if( my $meadow = $valley->find_available_meadow_responsible_for_worker( $kill_worker ) ) {
if( $meadow->check_worker_is_alive_and_mine ) {
printf("Killing worker: %10d %35s %15s %20s(%d) : ",
$kill_worker->dbID, $kill_worker->host, $kill_worker->process_id,
$kill_worker->analysis->logic_name, $kill_worker->analysis_id);
$meadow->kill_worker($kill_worker);
$kill_worker->cause_of_death('KILLED_BY_USER');
$queen->register_worker_death($kill_worker);
# what about clean-up? Should we do it here or not?
} else {
die "According to the Meadow, the Worker (dbID=$kill_worker_id) is not running, so cannot kill";
}
} else {
die "Cannot access the Meadow responsible for the Worker (dbID=$kill_worker_id), so cannot kill";
}
} else {
die "According to the Queen, the Worker (dbID=$kill_worker_id) is not running, so cannot kill";
}
}
my $analysis = $run_job_id
? $self->{'dba'}->get_AnalysisAdaptor->fetch_by_dbID( $self->{'dba'}->get_AnalysisJobAdaptor->fetch_by_dbID( $run_job_id )->analysis_id )
: ( $self->{'logic_name'} && $self->{'dba'}->get_AnalysisAdaptor->fetch_by_logic_name($self->{'logic_name'}) );
if($all_dead) { $queen->register_all_workers_dead(); }
if($check_for_dead) { $queen->check_for_dead_workers($valley, 1); }
if($balance_semaphores) { $self->{'dba'}->get_AnalysisJobAdaptor->balance_semaphores( $analysis && $analysis->dbID ); }
if ($max_loops) { # positive $max_loop means limited, negative means unlimited
run_autonomously($self, $max_loops, $keep_alive, $queen, $valley, $analysis, $run_job_id, $force);
} else {
# the output of several methods will look differently depending on $analysis being [un]defined
if($sync) {
$queen->synchronize_hive($analysis);
}
$queen->print_analysis_status($analysis) unless($self->{'no_analysis_stats'});
if($show_worker_stats) {
print "\n===== List of live Workers according to the Queen: ======\n";
foreach my $worker (@{ $queen->fetch_overdue_workers(0) }) {
print $worker->toString(1)."\n";
}
}
$self->{'dba'}->get_RoleAdaptor->print_active_role_counts;
Bio::EnsEMBL::Hive::Scheduler::schedule_workers_resync_if_necessary($queen, $valley, $analysis); # show what would be submitted, but do not actually submit
$queen->get_remaining_jobs_show_hive_progress( $analysis ) if ($analysis);
$queen->get_remaining_jobs_show_hive_progress();
if($show_failed_jobs) {
print("===== failed jobs\n");
my $failed_job_list = $self->{'dba'}->get_AnalysisJobAdaptor->fetch_all_by_analysis_id_status($analysis && $analysis->dbID, 'FAILED');
foreach my $job (@{$failed_job_list}) {
print $job->toString. "\n";
}
}
}
exit(0);
}