Running eHive pipelines


A quick overview

Each eHive pipeline is a potentially complex computational process.

Whether it runs locally, on the farm, or on multiple compute resources, this process is centred around a "blackboard" (a MySQL, SQLite or PostgreSQL database) where individual jobs of the pipeline are created, claimed by independent Workers and later recorded as done or failed.

Running the pipeline involves the following steps:


Initialization of the pipeline database

Every eHive pipeline is centered around a "blackboard", which is usually a MySQL/SQLite/PostgreSQL database. This database contains both static information (general definition of analyses, associated runnables, parameters and resources, dependency rules, etc) and runtime information about states of single jobs running on the farm or locally.

By initialization we mean an act of moulding one such new pipeline database from a PipeConfig file. This is done by feeding the PipeConfig file to ensembl-hive/scripts/init_pipeline.pl script.
A typical example:

		init_pipeline.pl Bio::EnsEMBL::Hive::PipeConfig::LongMult_conf -pipeline_url mysql://user:password@host:port/long_mult
    

It will create a MySQL pipeline database called 'long_mult' with the given connection parameters. In case of newer PipeConfig files these could be the only parameters needed, as the rest could be set at a later stage via "seeding" (see below).

If heavy concurrent traffic to the database is not expected, we may choose to keep the blackboard in a local SQLite file:

		init_pipeline.pl Bio::EnsEMBL::Hive::PipeConfig::LongMult_conf -pipeline_url sqlite:///long_mult
    

In the latter case no other connection parameters except for the filename are necessary, so they are skipped.

A couple of more complicated examples:

		init_pipeline.pl Bio::EnsEMBL::Compara::PipeConfig::ProteinTrees_conf -user "my_db_username" -password "my_db_password" -mlss_id 12345
    

It sets 'user', 'password' and 'mlss_id' parameters via command line options. At this stage you can also override any of the other options mentioned in the default_options section of the PipeConfig file.

If you need to modify second-level values of a "hash option" (such as the '-host' or '-port' of the 'pipeline_db' option), the syntax is the following (follows the extended syntax of Getopt::Long) :

		init_pipeline.pl Bio::EnsEMBL::Compara::PipeConfig::ProteinTrees_conf -pipeline_db -host=myhost -pipeline_db -port=5306
    

PLEASE NOTE: Although many older PipeConfig files make extensive use of command-line options such as -password and -mlss_id above (so-called o() syntax), it is no longer the only or recommended way of pre-configuring pipelines. There are better ways to configure pipelines, so if you find yourself struggling to make sense of an existing PipeConfig's o() syntax, please talk to eHive developers or power-users who are usually happy to help.
Some pipelines may have other dependencies beyond eHive (e.g. the Ensembl Core API, BioPerl, etc). Make sure you have installed them and configured your environment (PATH and PERL5LIB). init_pipeline.pl will try to compile all the analysis modules, which ensures that most of the dependencies are installed, but some others can only be found at runtime.

Normally, one run of init_pipeline.pl should create you a pipeline database.
If anything goes wrong and the process does not complete successfully, you will need to drop the partially created database in order to try again. You can either drop the database manually, or use "-hive_force_init 1" option, which will automatically drop the database before trying to create it.

If init_pipeline.pl completes successfully, it will print a legend of commands that could be run next:

Please remember that these command lines are for use only with a particular pipeline database, and are likely to be different next time you run the pipeline. Moreover, they will contain a sensitive password! So don't write them down.


Generating a pipeline's flow diagram

As soon as the pipeline database is ready you can store its visual flow diagram in an image file. This diagram is a much better tool for understanding what is going on in the pipeline. Run the following command to produce it:

        generate_graph.pl -url sqlite:///my_pipeline_database -out my_diagram.png
    

You only have to choose the format (gif, jpg, png, svg, etc) by setting the output file extension.

example_diagram

LEGEND:

  • The rounded nodes on the flow diagram represent Analyses (classes of jobs).
  • The white rectangular nodes represent Tables that hold user data.
  • The blue solid arrows are called "dataflow rules". They either generate new jobs (if they point to an Analysis node) or store data (if they point at a Table node).
  • The red solid arrows with T-heads are "analysis control rules". They block the pointed-at Analysis until all the jobs of the pointing Analysis are done.
  • Light-blue shadows behind some analyses stand for "semaphore rules". Together with red and green dashed lines they represent our main job control mechanism that will be described elsewhere.

Each flow diagram thus generated is a momentary snapshot of the pipeline state, and these snapshots will be changing as the pipeline runs. One of the things changing will be the colour of the Analysis nodes. The default colour legend is as follows:

  •  [ EMPTY ]  : the Analysis never had any jobs to do. Since pipelines are dynamic it may be ok for some Analyses to stay EMPTY until the very end.
  •  [ DONE ]  : all jobs of the Analysis are DONE. Since pipelines are dynamic, it may be a temporary state, until new jobs are added.
  •  [ READY ]  : some jobs are READY to be run, but nothing is running at the moment.
  •  [ IN PROGRESS ]  : some jobs of the Analysis are being processed at the moment of the snapshot.
  •  [ BLOCKED ]  : none of the jobs of this Analysis can be run at the moment because of job dependency rules.
  •  [ FAILED ]  : the number of FAILED jobs in this Analysis has gone over a threshold (which is 0 by default). By default beekeeper.pl will exit if it encounters a FAILED analysis.

Another thing that will be changing from snapshot to snapshot is the job "breakout" formula displayed under the name of the Analysis. It shows how many jobs are in which state and the total number of jobs. Separate parts of this formula are similarly colour-coded:

  • grey :  s  (SEMAPHORED) - individually blocked jobs
  • green :  r  (READY) - jobs that are ready to be claimed by Workers
  • yellow :  i  (IN PROGRESS) - jobs that are currently being processed by Workers
  • skyblue :  d  (DONE) - successfully completed jobs
  • red :  f  (FAILED) - unsuccessfully completed jobs

Actually, you don't even need to generate a pipeline database to see its diagram, as the diagram can be generated directly from the PipeConfig file:

        generate_graph.pl -pipeconfig Bio::EnsEMBL::Hive::PipeConfig::LongMult_conf -out my_diagram2.png
    

Such a "standalone" diagram may look slightly different (analysis_ids will be missing).

PLEASE NOTE: A very friendly guiHive web interface can periodically regenerate the pipeline flow diagram for you, so you can now monitor (and to a certain extent control) your pipeline from a web browser.


Seeding jobs into the pipeline database

Pipeline database contains a dynamic collection of jobs (tasks) to be done. The jobs can be added to the "blackboard" either by the user (we call this process "seeding") or dynamically, by already running jobs. When a database is created using init_pipeline.pl it may or may not be already seeded, depending on the PipeConfig file (you can always make sure whether it has been automatically seeded by looking at the flow diagram). If the pipeline needs seeding, this is done by running seed_pipeline.pl script, by providing both the Analysis to be seeded and the parameters of the job being created:

		seed_pipeline.pl -url sqlite:///my_pipeline_database -logic_name "analysis_name" -input_id '{ "paramX" => "valueX", "paramY" => "valueY" }'
    

It only makes sense to seed certain analyses, typically they are the ones that do not have any incoming dataflow on the flow diagram.


Synchronizing ("sync"-ing) the pipeline database

In order to function properly (to monitor the progress, block and unblock analyses and send correct number of workers to the farm) the eHive system needs to maintain certain number of job counters. These counters and associated analysis states are updated in the process of "synchronization" (or "sync"). This has to be done once before running the pipeline, and normally the pipeline will take care of synchronization by itself and will trigger the 'sync' process automatically. However sometimes things go out of sync. Especially when people try to outsmart the scheduler by manually stopping and running jobs :) This is when you might want to re-sync the database. It is done by running the ensembl-hive/scripts/beekeeper.pl in "sync" mode:

		beekeeper.pl -url sqlite:///my_pipeline_database -sync
    

Running the pipeline in automatic mode

As mentioned previously, the usual life-cycle of an eHive pipeline is revolving around the pipeline database. There are several "Worker" processes that run on the farm. The Workers pick suitable tasks from the database, run them, and report back to the database. There is also one "Beekeeper" process that normally loops on a head node of the farm (it is very light-weight). It monitors the progress of Workers and whenever needed submits more Workers to the farm (since Workers die from time to time for natural and not-so-natural reasons, Beekeeper maintains the correct load).
Beekeeper reports back the completion percentage of the pipeline and how many jobs are to do, failed or done for each individual analysis. As analyses progress they usually create new jobs for analyses further down the pipeline. Once the pipeline is complete (or breaks) Beekeeper exits.

So to "run the pipeline" all you have to do is to run the Beekeeper:

		beekeeper.pl -url sqlite:///my_pipeline_database -loop
    

You can also restrict running to a subset of Analyses (either by analysis_id or by name pattern) :

		beekeeper.pl -url sqlite:///my_pipeline_database -analyses_pattern 'alignment_%' -loop       # all analyses whose name starts with 'alignment_'
    
or
		beekeeper.pl -url sqlite:///my_pipeline_database -analyses_pattern '1..5,fasta_check' -loop  # only analyses with analysis_id between 1 and 5 and 'fasta_check'
    

In order to make sure the beekeeper.pl process doesn't die when you disconnect your ssh session from the farm, it is normally run in a "screen session". If your Beekeeper process gets killed for some reason, don't worry - you can re-sync and start another Beekeeper process. It will pick up from where the previous Beekeeper left it.

At each loop, Beekeeper will print out the status of each analysis and the completion level of the pipeline, and then go to sleep for 1 minute. Here is an example of the output from the beekeeper.pl script running the Ensembl Compara LastZ pipeline:

======= beekeeper loop ** 19 **==========
GarbageCollector: Checking for lost Workers...
GarbageCollector: [Queen:] out of 32 Workers that haven't checked in during the last 5 seconds...
GarbageCollector: [LSF Meadow:]   RUN:32

get_species_list           ( 1)        DONE jobs(Sem:0, Rdy:0, InProg:0, Done+Pass:1, Fail:0)=1 Ave_msec:23, workers(Running:0, Reqired:0)   h.cap:-  a.cap:-  (sync'd 552 sec ago)
populate_new_database      ( 2)        DONE jobs(Sem:0, Rdy:0, InProg:0, Done+Pass:1, Fail:0)=1 Ave_msec:50810, workers(Running:0, Reqired:0)   h.cap:-  a.cap:-  (sync'd 552sec ago)
parse_pair_aligner_conf    ( 3)        DONE jobs(Sem:0, Rdy:0, InProg:0, Done+Pass:1, Fail:0)=1 Ave_msec:563, workers(Running:0, Reqired:0)   h.cap:-  a.cap:-  (sync'd 552 sec ago)
chunk_and_group_dna        ( 4)        DONE jobs(Sem:0, Rdy:0, InProg:0, Done+Pass:2, Fail:0)=2 Ave_msec:9044, workers(Running:0, Reqired:0)   h.cap:-  a.cap:-  (sync'd 552 sec ago)
store_sequence             ( 5)        DONE jobs(Sem:0, Rdy:0, InProg:0, Done+Pass:853, Fail:0)=853 Ave_msec:20692, workers(Running:0, Reqired:0)   h.cap:50  a.cap:-  (sync'd 518 sec ago)
store_sequence_again       ( 6)       EMPTY jobs(Sem:0, Rdy:0, InProg:0, Done+Pass:0, Fail:0)=0 Ave_msec:0, workers(Running:0, Reqired:0)   h.cap:100  a.cap:-  (sync'd 552 sec ago)
dump_dna_factory           ( 7)       EMPTY jobs(Sem:0, Rdy:0, InProg:0, Done+Pass:0, Fail:0)=0 Ave_msec:0, workers(Running:0, Reqired:0)   h.cap:-  a.cap:-  (sync'd 492 sec ago)
dump_dna                   ( 8)       EMPTY jobs(Sem:0, Rdy:0, InProg:0, Done+Pass:0, Fail:0)=0 Ave_msec:0, workers(Running:0, Reqired:0)   h.cap:10  a.cap:-  (sync'd 552 sec ago)
create_pair_aligner_jobs   ( 9)        DONE jobs(Sem:0, Rdy:0, InProg:0, Done+Pass:1, Fail:0)=1 Ave_msec:394188, workers(Running:0, Reqired:0)   h.cap:10  a.cap:-  (sync'd 93 sec ago)
LastZ                      (10)     WORKING jobs(Sem:0, Rdy:162524, InProg:192, Done+Pass:3573, Fail:0)=166289 Ave_msec:9572, workers(Running:100, Reqired:54437)   h.cap:100  a.cap:-  (sync'd 93 sec ago)
.
.
.
.
.
pairaligner_stats          (37)     BLOCKED jobs(Sem:0, Rdy:1, InProg:0, Done+Pass:0, Fail:0)=1 Ave_msec:0, workers(Running:0, Reqired:1)   h.cap:-  a.cap:-  (sync'd 61 sec ago)
coding_exon_stats          (38)       EMPTY jobs(Sem:0, Rdy:0, InProg:0, Done+Pass:0, Fail:0)=0 Ave_msec:0, workers(Running:0, Reqired:0)   h.cap:10  a.cap:-  (sync'd 552 sec ago)
coding_exon_stats_summary  (39)     BLOCKED jobs(Sem:0, Rdy:0, InProg:0, Done+Pass:0, Fail:0)=0 Ave_msec:0, workers(Running:0, Reqired:0)   h.cap:-  a.cap:-  (sync'd 61 sec ago)

===== Stats of active Roles as recorded in the pipeline database: ======
                         LastZ : 100 active Roles
         ======= TOTAL ======= : 100 active Roles

Not submitting any workers this iteration
                          hive 2.651% complete (< 432.133 CPU_hrs) (162726 todo + 4432 done + 0 failed = 167158 total)
sleep 1.00 minutes. Next loop at Mon Sep  8 15:27:48 2014

NB: colours are normally not used in beekeeper's output. They have been inserted here to refer to the above-mentioned legend

Last note about Beekeeper: you can see it as a pump. Its task is to add new workers to maintain the job flow. If you kill Beekeeper, you stop the pump, but the water is still flowing, i.e. the workers are not killed but still running. To actually kill the workers, you have to use the specific commands of your grid engine (e.g. bkill for Platform LSF).


Monitoring the progress via a direct database session

In addition to monitoring the visual flow diagram (that could be generated manually using generate_graph.pl or via the guiHive web interface) you can also connect to the pipeline database directly and issue SQL commands. To avoid typing in all the connection details (syntax is different depending on the particular database engine used) you can use a bespoke db_cmd.pl script that takes the eHive database URL and performs the connection for you:

		db_cmd.pl -url sqlite:///my_pipeline_database
    
or
		db_cmd.pl -url mysql://user:password@host:port/long_mult
    
or
		db_cmd.pl -url pgsql://user:password@host:port/long_mult
    

Once connected, you can list the tables and views with SHOW TABLES;. The default set of tables should look something like:

+----------------------------+
| Tables_in_hive_pipeline_db |
+----------------------------+
| accu                       |
| analysis_base              |
| analysis_ctrl_rule         |
| analysis_data              |
| analysis_stats             |
| analysis_stats_monitor     |
| dataflow_rule              |
| hive_meta                  |
| job                        |
| job_file                   |
| log_message                |
| msg                        |
| pipeline_wide_parameters   |
| progress                   |
| resource_class             |
| resource_description       |
| resource_usage_stats       |
| role                       |
| worker                     |
| worker_resource_usage      |
+----------------------------+
 

Some of these tables, such as analysis_base, job and resource_class may be populated with entries depending on what is in you configuration file. At the very least you should expect to have your analyses in analysis_base. Some tables such as log_message will only get populated while the pipeline is running (for example log_message will get an entry when a job exceeds the memory limit and dies).

Please refer to the eHive schema (see eHive schema diagram and eHive schema description) to find out how those tables are related.

In addition to the tables, there is a "progress" view from which you can select and see how your jobs are doing:

		SELECT * from progress;
    

If you see jobs in 'FAILED' state or jobs with retry_count>0 (which means they have failed at least once and had to be retried), you may need to look at the "msg" view in order to find out the reason for the failures:

		SELECT * FROM msg WHERE job_id=1234;	# a specific job
    
or
		SELECT * FROM msg WHERE analysis_id=15;	# jobs of a specific analysis
    
or
		SELECT * FROM msg;	# show me all messages
    

Some of the messages indicate temporary errors (such as temporary lack of connectivity with a database or file), but some others may be critical (wrong path to a binary) that will eventually make all jobs of an analysis fail. If the "is_error" flag of a message is false, it may be just a diagnostic message which is not critical.


Monitoring the progress via guiHive

guiHive is a web-interface to a eHive database that allows to monitor the state of the pipeline. It displays flow diagrams of all the steps in the pipeline and their relationship to one another. In addition it colours analyses based on completion and each analysis has a progress circle which indicates the number of complete, running and failed jobs. guiHive also offers the ability to directly modify analyses, for example you can change the resource class used by the analysis directly through guiHive.

guiHive is already installed at the Sanger and at the EBI (both for internal use only), but can also be installed locally. Instructions for this are on GitHub


Testing the pipeline and debugging failed jobs

During the course of running a pipeline issues may arise from things like incorrect resource allocation, issues with input data, problems in the configuration itself or in the modules that are run for each analysis. Alternatively you might be building a pipeline from scratch and instead of wanting to run the entire pipeline from scratch you may want to test a single job at a time. eHive provides a script that is useful in both cases: runWorker.pl.

Before debugging the first thing you should do is look in the database or in guiHive, and query the job table for jobs that have failed (which can be found from the status column). Then you can search for these job ids in the log_message table and see if there are any log messages there that would explain why the jobs are failing. guiHive also directly reports the last error message in the job table. If they are failing because of insufficient resources you should be able to find that from the message itself. If this turns out to be the case, the best thing to do is to either make a new entry in the resource_class table with sufficient resources and point the analysis in the analysis_base to that resource_class_id, or you could just directly modify the existing resource class entry. Again, the same can be achieved entirely in guiHive by editing the "Resources" tab, and then the analysis itself (by clicking on it).

If there is no obvious reason why the jobs are failing then it is probably time to use runWorker.pl. Here is an example of how to run it:

        runWorker.pl -url mysql://user:pass@host:port/my_pipeline_db -job_id 1 -debug 1

This would run job id 1 directly. There are a few things to note about this. Firstly if the job is considered 'DONE' but you know it has failed, you will need to force runWorker.pl to run it using the -force flag. Alternatively you can reset the job state to READY through the beekeeper.pl using -reset_job_id <num>, and then just run runWorker.pl with the job id. The second thing to note is that by default runWorker.pl will try to run the job entirely, which may write things into files or the database itself. If you are afraid that it may happen, and if the module is nicely implemented, you can use the -no_write flag to disable the write_output() method in whatever module the analysis uses. Note that some modules may misbehave and still write their output outside of write_output() ! Using -no_write will also stop any resulting jobs that are created from being written to the pipeline db. This is useful when both testing and debugging.

When trying to track down a problem via runWorker.pl the difficulty varies from analysis to analysis and problem to problem. If the code throws an exception then it is usually easy to track down where to start debugging. On the other hand sometimes the output from runWorker.pl will simply be to say the job died. In these cases it is useful to consider the general structure of analysis modules. There should be the following subroutines: fetch_input(), run(), and write_output(). A failing job should state in which state the error occurred. Then you can go for more fine grain debugging.

As Perl objects are essentially just hashes, using the Data::Dumper is a good idea for checking the contents of variables. You can do this by putting use Data::Dumper; at the top of the module and then when you want to check a variable use:

	$self->warning("Dumped var ".Dumper($myvar));

With enough warnings statements any debugging problem can be overcome. Of course, you can also use the Perl debugger.

A final note on testing/debugging is that if you want to restart an entire analysis you can run the following command:

        beekeeper.pl -url mysql://user:pass@host:port/my_pipeline_db -analyses_pattern MyAnalysisName -reset_all_jobs

If you do this, it is important to note that it is your responsibility to then clean up all the output from any jobs that had completed. So if you were writing objects to a database, you would have to delete them out yourself, or delete any flatfiles that were generated as the pipeline is not designed to do this.