The vast majority of problems tackled by RMINC as of 2016 are considered “embarassingly parallel” this means that the problem can be broken into smaller pieces that can be solved completely independently. Take for example the standard massively-univariate approach to analyzing neuroimages. Each image is composed of voxels, and a separate model is computed at each voxel. These models are computed without any dependency on the models being computed at other voxels. Since there is no interdependency it is easy to split the voxels up between multiple cores on a computer, or even between different computers and combine the results after.
Imagine for example there are some simple images that are 10 x 10 x 10 voxels.
library(RMINC)
suppressPackageStartupMessages(library(dplyr))
## Pull in some example data
getRMINCTestData("./")
## Read a csv containing the paths to the test files and some covariates
image_frame <- read.csv("rminctestdata/test_data_set.csv", stringsAsFactors = FALSE)
## Take a look at a few sample slices through the first volume volume
image_frame$jacobians_fixed_2[1] %>%
mincGetVolume %>%
mincArray %>%
mincPlotSliceSeries(mfrow = c(2,2), begin = 2, end = -2)
Now to see if the values at each voxel depend on some variable, body weight for example, we can fit a linear model. Since the models are independent we can divide the problem up. I’ll use four cores on my computer, each one will fit 250 of the 1000 models, and then the results can be stuck back together.
model <- mincLm(jacobians_fixed_2 ~ Body.Weight, data = image_frame, parallel = c("local", 4))
## Range: 4.000000 1.000000
image_frame$jacobians_fixed_2[1] %>%
mincGetVolume %>%
mincArray %>%
mincPlotSliceSeries(statistic = mincArray(model, "tvalue-Body.Weight")
, symmetric = TRUE
, low = 2, high = 5
, begin = 2, end = -2
, legend = "t-value", locator = FALSE
, mfrow = c(2,2))
Because the job is divided between 4 cores the computation goes much faster. There is some overhead to splitting and recombining the job, so you won’t get a 4x speedup, but it will be faster. The argument parallel = c("local", 4)
says to run the jobs on 4 cores of the local machine.
This system is even more powerful if you have a cluster at your disposal. You can divide the problem in to potentially 100s of peices that can be solved independently.
As of Oct. 20th, 2016, The following functions support the parallel option as used above:
Each of these can run sequentially, locally (multicore), or on a cluster.
Using RMINC on a cluster is now relatively simple. Thanks to the wonderful “batchtools” package RMINC can be configured to run on Torque, SGE, Slurm, ad-hoc SSH clusters and more. We and our collaborators typically use Torque and SGE but using these other systems is possible with a little configuration effort.
To setup RMINC to run on your cluster, you will typically set a default configuration file in a bash session before opening R.
export RMINC_BATCH_CONF=/path/to/my/config.R
This file will likely look something like:
In fact this is the default configuration that comes with RMINC, it assumes you are using a torque cluster and each job will take under an hour, using less than 8 gigabytes of memory and using a single node.
Then to run a job you can do:
suppressMessages(
qmodel <- mincLm(jacobians_fixed_2 ~ Body.Weight, data = image_frame, parallel = c("pbs", 6))
)
## Range: 6.000000 1.000000
## Check the results look the same as the local version
image_frame$jacobians_fixed_2[1] %>%
mincGetVolume %>%
mincArray %>%
mincPlotSliceSeries(statistic = mincArray(qmodel, "tvalue-Body.Weight")
, symmetric = TRUE
, low = 2, high = 5
, begin = 2, end = -2
, legend = "t-value", locator = FALSE
, mfrow = c(2,2))
As of RMINC 1.4.3 there is no configuration necessary within parallel function calls other than to specify the number of batches. The argument parallel = c("pbs", 6)
is splitting the job into 6 peices. The "pbs"
portion is checked to see if it is “local” or “snowfall”, otherwise it is ignored, relying instead on batchtools configuration to control execution. This contrasts from earlier versions of RMINC.
To make it so you do not need to configure batchtools each time you open an R session you can add a line that looks like:
export RMINC_BATCH_CONF=/path/to/my/conf.R
To your bashrc. The file pointed to by RMINC_BATCH_CONF
is R code that sets the cluster.functions
and default.resources
for batchtools. The function makeClusterFunctionsTORQUE
finds a template file that it fills in whenever a new job is created. The example PBS template looks like:
#!/bin/bash
<%
## Check some resources and set sane defaults
resources$walltime = resources$walltime
resources$memory = resources$memory
resources$ncpus = if (is.null(resources$ncpus)) 1L else asInt(resources$ncpus, lower = 1L)
resources$modules = if (is.null(resources$modules)) character(0L) else assertCharacter(resources$modules, any.missing = FALSE)
resources$omp.threads = if (is.null(resources$omp.threads)) 1L else asInt(resources$omp.threads, lower = 1L)
resources$blas.threads = if (is.null(resources$blas.threads)) 1L else asInt(resources$blas.threads, lower = 1L)
## first string of queue, selected by walltime
#walltimes = 3600L * c(1L, 8L, 48L, 672L)
#queue = c("short", "med", "long", "ultralong")[wf(resources$walltime <= walltimes)]
## very ugly hack because we cannot log to data (nobackup) filesystem on lido,
## only home fs is available
## unfortunately there seems to be no generic solution
## does log path start with /data/?
log.file = log.file
if (length(grep("^/data/", log.file)) > 0L) {
## strip that
log.file = substr(log.file, 7L, nchar(log.file))
## find next forward slash
i = regexpr("/", log.file)
if (i != -1) {
## this must be "user": e.g. /data/bischl/...
user = substr(log.file, 1L, i-1L)
## put together
log.file = sprintf("/home/%s/nobackup%s", user, substr(log.file, i, nchar(log.file)))
}
}
-%>
#PBS -N <%= job.hash %>
#PBS -o <%= log.file %>
#PBS -l walltime=<%= resources$walltime %>,nodes=<%=resources$nodes%>:ppn=1,vmem=<%= resources$memory %>
#PBS -j oe
<%= if (array.jobs) sprintf("#PBS -t 1-%i", nrow(jobs)) else "" %>
#PBS -V
## export value of DEBUGME environemnt var to slave
export DEBUGME=<%= Sys.getenv("DEBUGME") %>
## run R
Rscript -e 'batchtools::doJobCollection("<%= uri %>")'
Batchtools handles the job.name, log.file, and rscript portions of the template. Our default.resources configuration provides the nodes, walltime, and memory.
If you do not use PBS or SGE, you can find example configurations on batchtools’ github
If in the middle of a session you need to change you configuration you can pass a resources
argument explicitly to most parallel functions, or pass in an alternative configuration file with the conf_file
argument.
The basics of parallel system is to dispatch one of three commands, the basic sequential form, the local form, and the queued form. Within the parallel forms the first task is to divide the problem into pieces.
We’ll look at vertexApply as a simple case study
vertexApply <- function(filenames, fun, ..., mask = NULL, parallel = NULL)
{
# Load the data
vertexData <- vertexTable(filenames)
results <- matrixApply(vertexData, fun, ..., mask = mask, parallel = parallel)
attr(results, "likeFile") <- filenames[1]
results
}
First the filenames are converted into a matrix of values with each column representing a subject and each row representing a vertex. This is shipped off to matrixApply
an internal alternative to apply
that can parallelize both locally and on a queue.
matrixApply <- function(mat, fun, ..., mask = NULL, parallel = NULL){
if(!is.null(mask)){
if(length(mask) == 1 && is.character(mask))
mask <- readLines(mask)
mask_lgl <- mask > .5
mat <- mat[mask_lgl,]
}
fun <- match.fun(fun)
apply_fun <- function(sub_matrix){
if(!is.matrix(sub_matrix))
sub_matrix <- matrix(sub_matrix, nrow = 1)
lapply(seq_len(nrow(sub_matrix))
, function(i) fun(sub_matrix[i,], ...)
}
if(is.null(parallel)){
results <- apply_fun(mat)
} else {
n_groups <- as.numeric(parallel[2])
groups <- split(seq_len(nrow(mat)), groupingVector(nrow(mat), n_groups))
if(parallel[1] == "local") {
results <-
quiet_mclapply(groups, function(group){
apply_fun(mat[group,])
}, mc.cores = n_groups) %>%
Reduce(cbind, ., NULL)
} else {
reg <- qMincRegistry(new_file("matrixApply_registry"), conf_file = conf_file)
on.exit( tenacious_remove_registry(reg) )
ids <- batchMap(reg = reg, function(group){
apply_fun(mat[group,])
}, group = groups)
submitJobs(ids, reg = reg)
waitForJobs(reg = reg)
results <-
reduceResults(c, init = NULL, reg = reg)
}
}
# Flesh out the object if a mask was used
if(!is.null(mask)){
mask_val <- getOption("RMINC_MASKED_VALUE")
results_expanded <- replicate(length(mask_lgl), getOption("RMINC_MASKED_VALUE"), simplify = FALSE)
results_expanded[mask_lgl] <- results
results <- results_expanded
}
collate_fun <- match.fun(collate)
results <- collate(results)
results
}
We’ll break it down section by section:
The first block handles numeric and file name masks. This is used to subset the rows to save computation time
if(!is.null(mask)){
if(length(mask) == 1 && is.character(mask))
mask <- readLines(mask)
mask_lgl <- mask > .5
mat <- mat[mask_lgl,]
}
match.fun ensures that the function passed in is a properly bound function.
fun <- match.fun(fun)
The next block is the engine of matrixApply
, its a function that applies the user supplied function to each row
apply_fun <- function(sub_matrix){
if(!is.matrix(sub_matrix))
sub_matrix <- matrix(sub_matrix, nrow = 1)
lapply(seq_len(nrow(sub_matrix))
, function(i) fun(sub_matrix[i,], ...))
}
Now we handle dispatch
if(is.null(parallel)){
results <- apply_fun(mat)
} else {
n_groups <- as.numeric(parallel[2])
groups <- split(seq_len(nrow(mat)), groupingVector(nrow(mat), n_groups))
if(parallel[1] == "local") {
results <-
quiet_mclapply(groups, function(group){
apply_fun(mat[group,])
}, mc.cores = n_groups) %>%
Reduce(cbind, ., NULL)
} else {
reg <- makeRegistry("matrixApply_registry")
on.exit( tenacious_remove_registry(reg) )
ids <- batchMap(reg = reg, function(group){
apply_fun(mat[group,])
}, group = groups)
submitJobs(ids, reg = reg)
waitForJobs(reg = reg)
results <-
reduceResults(c, init = NULL, reg = reg)
}
}
If parallel is NULL, we just pass the whole matrix into apply_fun
, otherwise we handle the two parallel cases. First the number of parallel jobs the user wants is retrieved. Then the row indices are split into groups using split and RMINC’s internal groupingVector
function. The split returns a list of indices, one per group.
Then if the first portion of parallel is “local”, a muffled version of parallel::mclapply
is called, split the job up amongst cores. This creates a list of results which are then merged via Reduce
and cbind.
The BatchJobs case follows the same logic, although a few more infrastructure functions are required. A BatchJobs registry is created with makeRegistry
and registered for removal when the function ends. tenacious_remove_registry
works hard to ensure all evidence of a failed run is scrubbed from the system. The triplet of batchMap
, submitJobs
, and waitForJobs
instructs the queue to start running the jobs and wait patiently for them to finish. The results are then loaded with reduceResults
and reduced as above.
After this there is a little result coercion
# The apply part (transpose to match output of mincApply)
results <- t(results)
if(!is.null(mask)){
mask_val <- getOption("RMINC_MASKED_VALUE")
results_expanded <- replicate(length(mask_lgl), getOption("RMINC_MASKED_VALUE"), simplify = FALSE)
results_expanded[mask_lgl] <- results
results <- results_expanded
}
collate_fun <- match.fun(collate)
results <- collate(results)
results
The results are transposed to give results that resemble mincApply, if a mask was used, the results are inflated back to their original dimensions, and if the result has only a single column it is returned to a vector form.
Then the results are returned. This bubbles back up to vertexApply which tacks on a likeFile attribute and returns.
Implementing additional parallel functions will follow this general framework:
parallel
is NULL, if so dispatch the engine function as isparallel[1]
is “local” run mclapply or its muffled variant on the engine function and the grouping objectAlthough it is possible matrixApply
will work for your problem (potentially with a little transposition). If not mincLm
is a little harder to read, but has some examples for how to use a mask to parallelize jobs.