The vast majority of problems tackled by RMINC as of 2016 are considered “embarassingly parallel” this means that the problem can be broken into smaller pieces that can be solved completely independently. Take for example the standard massively-univariate approach to analyzing neuroimages. Each image is composed of voxels, and a separate model is computed at each voxel. These models are computed without any dependency on the models being computed at other voxels. Since there is no interdependency it is easy to split the voxels up between multiple cores on a computer, or even between different computers and combine the results after.

A Basic Example

Imagine for example there are some simple images that are 10 x 10 x 10 voxels.

library(RMINC)
suppressPackageStartupMessages(library(dplyr))

## Pull in some example data
getRMINCTestData("./")

## Read a csv containing the paths to the test files and some covariates
image_frame <- read.csv("rminctestdata/test_data_set.csv", stringsAsFactors = FALSE)

## Take a look at a few sample slices through the first volume volume
image_frame$jacobians_fixed_2[1] %>%
  mincGetVolume %>%
  mincArray %>%
  mincPlotSliceSeries(mfrow = c(2,2), begin = 2, end = -2)

Now to see if the values at each voxel depend on some variable, body weight for example, we can fit a linear model. Since the models are independent we can divide the problem up. I’ll use four cores on my computer, each one will fit 250 of the 1000 models, and then the results can be stuck back together.

model <- mincLm(jacobians_fixed_2 ~ Body.Weight, data = image_frame, parallel = c("local", 4))
## Range: 4.000000 1.000000
image_frame$jacobians_fixed_2[1] %>%
  mincGetVolume %>%
  mincArray %>%
  mincPlotSliceSeries(statistic = mincArray(model, "tvalue-Body.Weight")
                      , symmetric = TRUE
                      , low = 2, high = 5
                      , begin = 2, end = -2
                      , legend = "t-value", locator = FALSE
                      , mfrow = c(2,2))

Because the job is divided between 4 cores the computation goes much faster. There is some overhead to splitting and recombining the job, so you won’t get a 4x speedup, but it will be faster. The argument parallel = c("local", 4) says to run the jobs on 4 cores of the local machine.

This system is even more powerful if you have a cluster at your disposal. You can divide the problem in to potentially 100s of peices that can be solved independently.

Parallel Functions

As of Oct. 20th, 2016, The following functions support the parallel option as used above:

  1. mincLm
  2. pMincApply
  3. mincLmer
  4. anatGetAll
  5. anatLmer
  6. vertexApply
  7. vertexLmer

Each of these can run sequentially, locally (multicore), or on a cluster.

RMINC on a Cluster

Using RMINC on a cluster is now relatively simple. Thanks to the wonderful “batchtools” package RMINC can be configured to run on Torque, SGE, Slurm, ad-hoc SSH clusters and more. We and our collaborators typically use Torque and SGE but using these other systems is possible with a little configuration effort.

Quick Start

To setup RMINC to run on your cluster, you will typically set a default configuration file in a bash session before opening R.

export RMINC_BATCH_CONF=/path/to/my/config.R

This file will likely look something like:

In fact this is the default configuration that comes with RMINC, it assumes you are using a torque cluster and each job will take under an hour, using less than 8 gigabytes of memory and using a single node.

Then to run a job you can do:

suppressMessages(  
  qmodel <- mincLm(jacobians_fixed_2 ~ Body.Weight, data = image_frame, parallel = c("pbs", 6))
)
## Range: 6.000000 1.000000
##  Check the results look the same as the local version
image_frame$jacobians_fixed_2[1] %>%
  mincGetVolume %>%
  mincArray %>%
  mincPlotSliceSeries(statistic = mincArray(qmodel, "tvalue-Body.Weight")
                      , symmetric = TRUE
                      , low = 2, high = 5
                      , begin = 2, end = -2
                      , legend = "t-value", locator = FALSE
                      , mfrow = c(2,2))

As of RMINC 1.4.3 there is no configuration necessary within parallel function calls other than to specify the number of batches. The argument parallel = c("pbs", 6) is splitting the job into 6 peices. The "pbs" portion is checked to see if it is “local” or “snowfall”, otherwise it is ignored, relying instead on batchtools configuration to control execution. This contrasts from earlier versions of RMINC.

Site Configuration

To make it so you do not need to configure batchtools each time you open an R session you can add a line that looks like:

export RMINC_BATCH_CONF=/path/to/my/conf.R

To your bashrc. The file pointed to by RMINC_BATCH_CONF is R code that sets the cluster.functions and default.resources for batchtools. The function makeClusterFunctionsTORQUE finds a template file that it fills in whenever a new job is created. The example PBS template looks like:

#!/bin/bash
<%
## Check some resources and set sane defaults
resources$walltime = resources$walltime
resources$memory = resources$memory
resources$ncpus = if (is.null(resources$ncpus)) 1L else asInt(resources$ncpus, lower = 1L)
resources$modules = if (is.null(resources$modules)) character(0L) else assertCharacter(resources$modules, any.missing = FALSE)
resources$omp.threads = if (is.null(resources$omp.threads)) 1L else asInt(resources$omp.threads, lower = 1L)
resources$blas.threads = if (is.null(resources$blas.threads)) 1L else asInt(resources$blas.threads, lower = 1L)


## first string of queue, selected by walltime
#walltimes = 3600L * c(1L, 8L, 48L, 672L)
#queue = c("short", "med", "long", "ultralong")[wf(resources$walltime <= walltimes)]

## very ugly hack because we cannot log to data (nobackup) filesystem on lido,
## only home fs is available
## unfortunately there seems to be no generic solution
## does log path start with /data/?
log.file = log.file
if (length(grep("^/data/", log.file)) > 0L) {
  ## strip that
  log.file = substr(log.file, 7L, nchar(log.file))
  ## find next forward slash
  i = regexpr("/", log.file)
  if (i != -1) {
    ## this must be "user": e.g. /data/bischl/...
    user = substr(log.file, 1L, i-1L)
    ## put together
    log.file = sprintf("/home/%s/nobackup%s", user, substr(log.file, i, nchar(log.file)))
  }
}
-%>

#PBS -N <%= job.hash %>
#PBS -o <%= log.file %>
#PBS -l walltime=<%= resources$walltime %>,nodes=<%=resources$nodes%>:ppn=1,vmem=<%= resources$memory %>
#PBS -j oe
<%= if (array.jobs) sprintf("#PBS -t 1-%i", nrow(jobs)) else "" %>
#PBS -V  

## export value of DEBUGME environemnt var to slave
export DEBUGME=<%= Sys.getenv("DEBUGME") %>

## run R
Rscript -e 'batchtools::doJobCollection("<%= uri %>")'

Batchtools handles the job.name, log.file, and rscript portions of the template. Our default.resources configuration provides the nodes, walltime, and memory.

If you do not use PBS or SGE, you can find example configurations on batchtools’ github

If in the middle of a session you need to change you configuration you can pass a resources argument explicitly to most parallel functions, or pass in an alternative configuration file with the conf_file argument.

Internals For Intrepid Developers

The basics of parallel system is to dispatch one of three commands, the basic sequential form, the local form, and the queued form. Within the parallel forms the first task is to divide the problem into pieces.

We’ll look at vertexApply as a simple case study

vertexApply <- function(filenames, fun, ..., mask = NULL, parallel = NULL) 
{
  # Load the data
  vertexData <- vertexTable(filenames)
  
  results <- matrixApply(vertexData, fun, ..., mask = mask, parallel = parallel)
  attr(results, "likeFile") <- filenames[1]
  
  results
}

First the filenames are converted into a matrix of values with each column representing a subject and each row representing a vertex. This is shipped off to matrixApply an internal alternative to apply that can parallelize both locally and on a queue.

matrixApply <- function(mat, fun, ..., mask = NULL, parallel = NULL){
  
  if(!is.null(mask)){
    if(length(mask) == 1 && is.character(mask))
      mask <- readLines(mask)
    
    mask_lgl <- mask > .5
    mat <- mat[mask_lgl,]
  }
  
  fun <- match.fun(fun)
  
  apply_fun <- function(sub_matrix){
    if(!is.matrix(sub_matrix))
      sub_matrix <- matrix(sub_matrix, nrow = 1)
    
    lapply(seq_len(nrow(sub_matrix))
           , function(i) fun(sub_matrix[i,], ...)
  }
  
  if(is.null(parallel)){
    results <- apply_fun(mat)
  } else {
    n_groups <- as.numeric(parallel[2])
    groups <- split(seq_len(nrow(mat)), groupingVector(nrow(mat), n_groups))
    
    if(parallel[1] == "local") {
      results <- 
        quiet_mclapply(groups, function(group){
          apply_fun(mat[group,])
        }, mc.cores = n_groups) %>%
        Reduce(cbind, ., NULL)
    } else {
      reg <- qMincRegistry(new_file("matrixApply_registry"), conf_file = conf_file)
      on.exit( tenacious_remove_registry(reg) )
      
      ids <- batchMap(reg = reg, function(group){
        apply_fun(mat[group,])
      }, group = groups)
      
      submitJobs(ids, reg = reg)
      waitForJobs(reg = reg)
      
      results <-
        reduceResults(c, init = NULL, reg = reg)
    }
  }
  
  # Flesh out the object if a mask was used
  if(!is.null(mask)){
    mask_val <- getOption("RMINC_MASKED_VALUE")
    results_expanded <- replicate(length(mask_lgl), getOption("RMINC_MASKED_VALUE"), simplify = FALSE)
    results_expanded[mask_lgl] <- results
    results <- results_expanded
  }
  
  collate_fun <- match.fun(collate)
  results <- collate(results)
  
  
  results
}

We’ll break it down section by section:

The first block handles numeric and file name masks. This is used to subset the rows to save computation time

  if(!is.null(mask)){
    if(length(mask) == 1 && is.character(mask))
      mask <- readLines(mask)
    
    mask_lgl <- mask > .5
    mat <- mat[mask_lgl,]
  }

match.fun ensures that the function passed in is a properly bound function.

fun <- match.fun(fun)

The next block is the engine of matrixApply, its a function that applies the user supplied function to each row

  apply_fun <- function(sub_matrix){
    if(!is.matrix(sub_matrix))
      sub_matrix <- matrix(sub_matrix, nrow = 1)
    
    lapply(seq_len(nrow(sub_matrix))
           , function(i) fun(sub_matrix[i,], ...))
  }

Now we handle dispatch

if(is.null(parallel)){
    results <- apply_fun(mat)
  } else {
    n_groups <- as.numeric(parallel[2])
    groups <- split(seq_len(nrow(mat)), groupingVector(nrow(mat), n_groups))
    
    if(parallel[1] == "local") {
      results <- 
        quiet_mclapply(groups, function(group){
          apply_fun(mat[group,])
        }, mc.cores = n_groups) %>%
        Reduce(cbind, ., NULL)
    } else {
      reg <- makeRegistry("matrixApply_registry")
      on.exit( tenacious_remove_registry(reg) )
      
      ids <- batchMap(reg = reg, function(group){
        apply_fun(mat[group,])
      }, group = groups)
      
      submitJobs(ids, reg = reg)
      waitForJobs(reg = reg)
      
      results <-
        reduceResults(c, init = NULL, reg = reg)
    }
  }

If parallel is NULL, we just pass the whole matrix into apply_fun, otherwise we handle the two parallel cases. First the number of parallel jobs the user wants is retrieved. Then the row indices are split into groups using split and RMINC’s internal groupingVector function. The split returns a list of indices, one per group.

Then if the first portion of parallel is “local”, a muffled version of parallel::mclapply is called, split the job up amongst cores. This creates a list of results which are then merged via Reduce and cbind.

The BatchJobs case follows the same logic, although a few more infrastructure functions are required. A BatchJobs registry is created with makeRegistry and registered for removal when the function ends. tenacious_remove_registry works hard to ensure all evidence of a failed run is scrubbed from the system. The triplet of batchMap, submitJobs, and waitForJobs instructs the queue to start running the jobs and wait patiently for them to finish. The results are then loaded with reduceResults and reduced as above.

After this there is a little result coercion

  # The apply part (transpose to match output of mincApply)
  results <- t(results)
  
  if(!is.null(mask)){
    mask_val <- getOption("RMINC_MASKED_VALUE")
    results_expanded <- replicate(length(mask_lgl), getOption("RMINC_MASKED_VALUE"), simplify = FALSE)
    results_expanded[mask_lgl] <- results
    results <- results_expanded
  }
  
  collate_fun <- match.fun(collate)
  results <- collate(results)

  results

The results are transposed to give results that resemble mincApply, if a mask was used, the results are inflated back to their original dimensions, and if the result has only a single column it is returned to a vector form.

Then the results are returned. This bubbles back up to vertexApply which tacks on a likeFile attribute and returns.

Implementing additional parallel functions will follow this general framework:

  1. Create an engine function that does the work
  2. Check if parallel is NULL, if so dispatch the engine function as is
  3. Determine how to split the job up into peices that the engine can run on, create a list or vector with the appropriate information
  4. If parallel[1] is “local” run mclapply or its muffled variant on the engine function and the grouping object
  5. Otherwise run the batchtools commands above
  6. Reduce the results to an appropriate format.

Although it is possible matrixApply will work for your problem (potentially with a little transposition). If not mincLm is a little harder to read, but has some examples for how to use a mask to parallelize jobs.