my ($self, $output_ids, $branch_name_or_code, $create_job_options) = @_;
my $input_id = $self->input_id();
my $param_id_stack = $self->param_id_stack();
my $accu_id_stack = $self->accu_id_stack();
my $job_adaptor = $self->adaptor() || 'Bio::EnsEMBL::Hive::DBSQL::AnalysisJobAdaptor';
my $hive_use_param_stack = ref($job_adaptor) && $job_adaptor->db->hive_use_param_stack();
if($hive_use_param_stack) {
if($input_id and ($input_id ne
'{}')) { # add the parent to the
param_id_stack if it had non-trivial extra parameters
$param_id_stack = ($param_id_stack ? $param_id_stack.',' : '').$self->dbID();
}
if(scalar(keys %{$self->accu_hash()})) { # add the parent to the
accu_id_stack if it had
"own" accumulator
$accu_id_stack = ($accu_id_stack ? $accu_id_stack.',' : '').$self->dbID();
}
}
$output_ids ||= [ $hive_use_param_stack ? {} : $input_id ]; # by default replicate the parameters of the parent in the child
$output_ids = [ $output_ids ] unless(ref($output_ids) eq 'ARRAY'); # force previously used single values into an arrayref
if($create_job_options) {
die "Please consider configuring semaphored dataflow from PipeConfig rather than setting it up manually";
}
# map branch names to numbers:
# if branch_code is set to 1 (explicitly or impliticly), turn off automatic dataflow:
$self->autoflow(0) if($branch_code == 1);
my @output_job_ids = ();
# sort rules to make sure the fan rules come before funnel rules for the same branch_code:
foreach my $rule (sort {($b->funnel_dataflow_rule
# parameter substitution into input_id_template is rule-specific
my $output_ids_for_this_rule;
if(my $template_string = $rule->input_id_template()) {
my $template_hash = destringify($template_string);
$output_ids_for_this_rule = [ map { $self->param_substitute($template_hash, $_) } @$output_ids ];
} else {
$output_ids_for_this_rule = $output_ids;
}
my $target_analysis_or_table = $rule->to_analysis();
if($target_analysis_or_table->can('dataflow')) {
$target_analysis_or_table->dataflow( $output_ids_for_this_rule, $self );
} else {
my @common_params = (
'prev_job' => $self,
'analysis' => $target_analysis_or_table, # expecting an Analysis
'param_id_stack' => $param_id_stack,
'accu_id_stack' => $accu_id_stack,
);
if( my $funnel_dataflow_rule = $rule->funnel_dataflow_rule ) { # members of a semaphored fan will have to wait in cache until the funnel is created:
my $fan_cache_this_branch = $self->fan_cache->{"$funnel_dataflow_rule"} ||= [];
@common_params,
'input_id' => $_,
) } @$output_ids_for_this_rule;
} else { # either a semaphored funnel or a non-semaphored dataflow:
my $fan_jobs = delete $self->fan_cache->{"$rule"}; # clear the cache at the same time
if( $fan_jobs && @$fan_jobs ) { # a semaphored funnel
if( (my $funnel_job_count = scalar(@$output_ids_for_this_rule)) !=1 ) {
$self->transient_error(0);
die "Asked to dataflow into $funnel_job_count funnel jobs instead of 1";
} else {
@common_params,
'input_id' => $output_ids_for_this_rule->[0],
'semaphore_count' => scalar(@$fan_jobs), # "pre-increase" the semaphore count before creating the dependent jobs
'semaphored_job_id' => $self->semaphored_job_id(), # propagate parent's semaphore if any
);
my ($funnel_job_id) = @{ $job_adaptor->store_jobs_and_adjust_counters( [ $funnel_job ], 0) };
unless($funnel_job_id) { # apparently it has been created previously, trying to leech to it:
if( $funnel_job = $job_adaptor->fetch_by_analysis_id_and_input_id( $funnel_job->analysis->dbID, $funnel_job->input_id) ) {
$funnel_job_id = $funnel_job->dbID;
if( $funnel_job->status eq 'SEMAPHORED' ) {
$job_adaptor->increase_semaphore_count_for_jobid( $funnel_job_id, scalar(@$fan_jobs) ); # "pre-increase" the semaphore count before creating the dependent jobs
$job_adaptor->db->get_LogMessageAdaptor->store_job_message($self->dbID, "Discovered and using an existing funnel ".$funnel_job->toString, 0);
} else {
die "The funnel job (id=$funnel_job_id) fetched from the database was not in SEMAPHORED status";
}
} else {
die "The funnel job could neither be stored nor fetched";
}
}
foreach my $fan_job (@$fan_jobs) { # set the funnel in every fan's job:
$fan_job->semaphored_job_id( $funnel_job_id );
}
push @output_job_ids, $funnel_job_id, @{ $job_adaptor->store_jobs_and_adjust_counters( $fan_jobs, 1) };
}
} else { # non-semaphored dataflow (but potentially propagating any existing semaphores)
@common_params,
'input_id' => $_,
'semaphored_job_id' => $self->semaphored_job_id(), # propagate parent's semaphore if any
) } @$output_ids_for_this_rule;
push @output_job_ids, @{ $job_adaptor->store_jobs_and_adjust_counters( \@non_semaphored_jobs, 0) };
}
} # /if funnel
} # /if (table or analysis)
} # /foreach my $rule
return \@output_job_ids;
}