sub main {
$| = 1;
# ok this is a hack, but I'm going to pretend I've got an object here
# by creating a hash ref and passing it around like an object
# this is to avoid using global variables in functions, and to consolidate
# the globals into a nice '$self' package
my $self = {};
my $help = 0;
my $report_versions = 0;
my $loopit = 0;
my $sync = 0;
my $local = 0;
my $show_failed_jobs = 0;
my $default_meadow_type = undef;
my $submit_workers_max = undef;
my $total_running_workers_max = undef;
my $submission_options = undef;
my $run = 0;
my $max_loops = 0; # not running by default
my $run_job_id = undef;
my $force = undef;
my $keep_alive = 0; # ==1 means run even when there is nothing to do
my $check_for_dead = 0;
my $all_dead = 0;
my $balance_semaphores = 0;
my $job_id_for_output = 0;
my $show_worker_stats = 0;
my $kill_worker_id = 0;
my $reset_job_id = 0;
my $reset_all_jobs_for_analysis = 0; # DEPRECATED
my $reset_failed_jobs_for_analysis = 0; # DEPRECATED
my $reset_all_jobs = 0;
my $reset_failed_jobs = 0;
$self->{'url'} = undef;
$self->{'reg_conf'} = undef;
$self->{'reg_type'} = undef;
$self->{'reg_alias'} = undef;
$self->{'nosqlvc'} = undef;
$self->{'config_files'} = [];
$self->{'sleep_minutes'} = 1;
$self->{'retry_throwing_jobs'} = undef;
$self->{'can_respecialize'} = undef;
$self->{'hive_log_dir'} = undef;
$self->{'submit_log_dir'} = undef;
GetOptions(
# connection parameters
'url=s' => \$self->{'url'},
'reg_conf|regfile=s' => \$self->{'reg_conf'},
'reg_type=s' => \$self->{'reg_type'},
'reg_alias|regname=s'=> \$self->{'reg_alias'},
'nosqlvc=i' => \$self->{'nosqlvc'}, # can't use the binary "!" as it is a propagated option
# json config files
'config_file=s@' => $self->{'config_files'},
# loop control
'run' => \$run,
'loop' => \$loopit,
'max_loops=i' => \$max_loops,
'keep_alive' => \$keep_alive,
'job_id|run_job_id=i'=> \$run_job_id,
'force=i' => \$force,
'sleep=f' => \$self->{'sleep_minutes'},
# meadow control
'local!' => \$local,
'meadow_type=s' => \$default_meadow_type,
'total_running_workers_max=i' => \$total_running_workers_max,
'submit_workers_max=i' => \$submit_workers_max,
'submission_options=s' => \$submission_options,
# worker control
'job_limit=i' => \$self->{'job_limit'},
'life_span|lifespan=i' => \$self->{'life_span'},
'logic_name=s' => \$self->{'logic_name'},
'analyses_pattern=s' => \$self->{'analyses_pattern'},
'hive_log_dir|hive_output_dir=s' => \$self->{'hive_log_dir'},
'retry_throwing_jobs=i' => \$self->{'retry_throwing_jobs'},
'can_respecialize=i' => \$self->{'can_respecialize'},
'debug=i' => \$self->{'debug'},
'submit_log_dir=s' => \$self->{'submit_log_dir'},
# other commands/options
'h|help!' => \$help,
'v|versions!' => \$report_versions,
'sync!' => \$sync,
'dead!' => \$check_for_dead,
'killworker=i' => \$kill_worker_id,
'alldead!' => \$all_dead,
'balance_semaphores'=> \$balance_semaphores,
'no_analysis_stats' => \$self->{'no_analysis_stats'},
'worker_stats' => \$show_worker_stats,
'failed_jobs' => \$show_failed_jobs,
'reset_job_id=i' => \$reset_job_id,
'reset_failed_jobs_for_analysis=s' => \$reset_failed_jobs_for_analysis,
'reset_all_jobs_for_analysis=s' => \$reset_all_jobs_for_analysis,
'reset_failed_jobs' => \$reset_failed_jobs,
'reset_all_jobs' => \$reset_all_jobs,
'job_output=i' => \$job_id_for_output,
);
if ($help) { script_usage(0); }
if($report_versions) {
report_versions();
exit(0);
}
my $config = Bio::EnsEMBL::Hive::Utils::Config->new(@{$self->{'config_files'}});
if($run or $run_job_id) {
$max_loops = 1;
} elsif ($loopit or $keep_alive) {
unless($max_loops) {
$max_loops = -1; # unlimited
}
}
if($self->{'url'} or $self->{'reg_alias'}) {
$self->{'dba'} = Bio::EnsEMBL::Hive::DBSQL::DBAdaptor->new(
-url => $self->{'url'},
-reg_conf => $self->{'reg_conf'},
-reg_type => $self->{'reg_type'},
-reg_alias => $self->{'reg_alias'},
-no_sql_schema_version_check => $self->{'nosqlvc'},
);
} else {
print "\nERROR : Connection parameters (url or reg_conf+reg_alias) need to be specified\n\n";
script_usage(1);
}
if( $self->{'url'} ) { # protect the URL that we pass to Workers by hiding the password in %ENV:
$self->{'url'} = $self->{'dba'}->dbc->url('EHIVE_PASS');
}
my $queen = $self->{'dba'}->get_Queen;
my $pipeline_name = $self->{'dba'}->get_MetaAdaptor->get_value_by_key( 'hive_pipeline_name' );
if($pipeline_name) {
warn "Pipeline name: $pipeline_name\n";
} else {
print STDERR "+---------------------------------------------------------------------+\n";
print STDERR "! !\n";
print STDERR "! WARNING: !\n";
print STDERR "! !\n";
print STDERR "! At the moment your pipeline doesn't have 'pipeline_name' defined. !\n";
print STDERR "! This may seriously impair your beekeeping experience unless you are !\n";
print STDERR "! the only farm user. The name should be set in your PipeConfig file, !\n";
print STDERR "! or if you are running an old pipeline you can just set it by hand !\n";
print STDERR "! in the 'meta' table. !\n";
print STDERR "! !\n";
print STDERR "+---------------------------------------------------------------------+\n";
}
if($run_job_id) {
$submit_workers_max = 1;
}
$default_meadow_type = 'LOCAL' if($local);
my $valley = Bio::EnsEMBL::Hive::Valley->new( $config, $default_meadow_type, $pipeline_name );
my ($beekeeper_meadow_type, $beekeeper_meadow_name) = $valley->whereami();
unless($beekeeper_meadow_type eq 'LOCAL') {
die "beekeeper.pl detected it has been itself submitted to '$beekeeper_meadow_type/$beekeeper_meadow_name', but this mode of operation is not supported.\n"
."Please just run beekeeper.pl on a farm head node, preferably from under a 'screen' session.\n";
}
$valley->config_set('SubmitWorkersMax', $submit_workers_max) if(defined $submit_workers_max);
my $default_meadow = $valley->get_default_meadow();
warn "Default meadow: ".$default_meadow->signature."\n\n";
$default_meadow->config_set('TotalRunningWorkersMax', $total_running_workers_max) if(defined $total_running_workers_max);
$default_meadow->config_set('SubmissionOptions', $submission_options) if(defined $submission_options);
if($reset_job_id) { $queen->reset_job_by_dbID_and_sync($reset_job_id); }
if($job_id_for_output) {
printf("===== job output\n");
my $job = $self->{'dba'}->get_AnalysisJobAdaptor->fetch_by_dbID($job_id_for_output);
print $job->toString. "\n";
}
if($reset_all_jobs_for_analysis) {
die "Deprecated option -reset_all_jobs_for_analysis. Please use -reset_all_jobs in combination with -analyses_pattern <pattern>";
}
if($reset_failed_jobs_for_analysis) {
die "Deprecated option -reset_failed_jobs_for_analysis. Please use -reset_failed_jobs in combination with -analyses_pattern <pattern>";
}
if ($kill_worker_id) {
my $kill_worker = $queen->fetch_by_dbID($kill_worker_id)
or die "Could not fetch worker with dbID='$kill_worker_id' to kill";
unless( $kill_worker->cause_of_death() ) {
if( my $meadow = $valley->find_available_meadow_responsible_for_worker( $kill_worker ) ) {
if( $meadow->check_worker_is_alive_and_mine($kill_worker) ) {
printf("Killing worker: %10d %35s %15s : ",
$kill_worker->dbID, $kill_worker->host, $kill_worker->process_id);
$meadow->kill_worker($kill_worker);
$kill_worker->cause_of_death('KILLED_BY_USER');
$queen->register_worker_death($kill_worker);
# what about clean-up? Should we do it here or not?
} else {
die "According to the Meadow, the Worker (dbID=$kill_worker_id) is not running, so cannot kill";
}
} else {
die "Cannot access the Meadow responsible for the Worker (dbID=$kill_worker_id), so cannot kill";
}
} else {
die "According to the Queen, the Worker (dbID=$kill_worker_id) is not running, so cannot kill";
}
}
if( $self->{'logic_name'} ) { # FIXME: for now, logic_name will override analyses_pattern quietly
warn "-logic_name is now deprecated, please use -analyses_pattern that extends the functionality of -logic_name .\n";
$self->{'analyses_pattern'} = $self->{'logic_name'};
}
my $run_job;
if($run_job_id) {
$run_job = $self->{'dba'}->get_AnalysisJobAdaptor->fetch_by_dbID( $run_job_id )
or die "Could not fetch Job with dbID=$run_job_id.\n";
}
my $list_of_analyses = $run_job
? [ $run_job->analysis ]
: $self->{'dba'}->get_AnalysisAdaptor->fetch_all_by_pattern( $self->{'analyses_pattern'} );
if( $self->{'analyses_pattern'} ) {
if( @$list_of_analyses ) {
print "Beekeeper : the following Analyses matched your -analyses_pattern '".$self->{'analyses_pattern'}."' : "
.join(', ', map { $_->logic_name.'('.$_->dbID.')' } @$list_of_analyses)."\n\n";
} else {
die "Beekeeper : the -analyses_pattern '".$self->{'analyses_pattern'}."' did not match any Analyses.\n"
}
}
if($reset_all_jobs || $reset_failed_jobs) {
$self->{'dba'}->get_AnalysisJobAdaptor->reset_jobs_for_analysis_id( $list_of_analyses, $reset_all_jobs );
$self->{'dba'}->get_Queen->synchronize_hive( $list_of_analyses );
}
if($all_dead) { $queen->register_all_workers_dead(); }
if($check_for_dead) { $queen->check_for_dead_workers($valley, 1); }
if($balance_semaphores) { $self->{'dba'}->get_AnalysisJobAdaptor->balance_semaphores( $list_of_analyses ); }
if ($max_loops) { # positive $max_loop means limited, negative means unlimited
run_autonomously($self, $max_loops, $keep_alive, $queen, $valley, $list_of_analyses, $self->{'analyses_pattern'}, $run_job_id, $force);
} else {
# the output of several methods will look differently depending on $analysis being [un]defined
if($sync) {
$queen->synchronize_hive( $list_of_analyses );
}
print $queen->print_status_and_return_reasons_to_exit( $list_of_analyses, !$self->{'no_analysis_stats'} );
if($show_worker_stats) {
print "\n===== List of live Workers according to the Queen: ======\n";
foreach my $worker (@{ $queen->fetch_overdue_workers(0) }) {
print $worker->toString(1)."\n";
}
}
$self->{'dba'}->get_RoleAdaptor->print_active_role_counts;
Bio::EnsEMBL::Hive::Scheduler::schedule_workers_resync_if_necessary($queen, $valley, $list_of_analyses); # show what would be submitted, but do not actually submit
if($show_failed_jobs) {
print("===== failed jobs\n");
my $failed_job_list = $self->{'dba'}->get_AnalysisJobAdaptor->fetch_all_by_analysis_id_status( $self->{'logic_name'} and $list_of_analyses , 'FAILED');
foreach my $job (@{$failed_job_list}) {
print $job->toString. "\n";
}
}
}
exit(0);
}
Code:
