Basics of Parallel Processing in R, Python, Matlab, and C

Threaded linear algebra and parallel for loops in a shared memory (single machine) context

Chris Paciorek, Department of Statistics, UC Berkeley

0) This Tutorial

This tutorial covers basic strategies for using parallel processing in R, Python, Matlab, and C on single machine in which multiple processors (cores) share memory.

You should be able to replicate much of what is covered here provided you have R, Python, or Matlab installed on your computer, but some of the parallelization approaches may not work on Windows.

This tutorial was developed using a virtual machine developed here at Berkeley, the Berkeley Common Environment (BCE). BCE is a virtual Linux machine - basically it is a Linux computer that you can run within your own computer, regardless of whether you are using Windows, Mac, or Linux. This provides a common environment so that things behave the same for all of us. You're welcome to try using BCE with this tutorial, but I should not that we haven't updated BCE recently and the VirtualBox software on which BCE will run can be a pain.

This tutorial assumes you have a working knowledge of either R, Python, Matlab, or C.

Materials for this tutorial, including the R markdown file and associated code files that were used to create this document are available on Github at https://github.com/berkeley-scf/tutorial-parallel-basics. You can download the files by doing a git clone from a terminal window on a UNIX-like machine, as follows:

git clone https://github.com/berkeley-scf/tutorial-parallel-basics

To create this HTML document, simply compile the corresponding R Markdown file in R as follows (the following will work from within BCE after cloning the repository as above).

Rscript -e "library(knitr); knit2html('parallel-basics.Rmd')"

This tutorial by Christopher Paciorek is licensed under a Creative Commons Attribution 3.0 Unported License.

1) Types of parallel processing

There are two basic flavors of parallel processing (leaving aside GPUs): distributed memory and shared memory. With shared memory, multiple processors (which I'll call cores) share the same memory. With distributed memory, you have multiple nodes, each with their own memory. You can think of each node as a separate computer connected by a fast network.

1.1) Some useful terminology:

1.2) Shared memory

For shared memory parallelism, each core is accessing the same memory so there is no need to pass information (in the form of messages) between different machines. But in some programming contexts one needs to be careful that activity on different cores doesn't mistakenly overwrite places in memory that are used by other cores.

We'll cover two types of shared memory parallelism approaches:

Threading

Threads are multiple paths of execution within a single process. If you are monitoring CPU usage (such as with top in Linux or Mac) and watching a job that is executing threaded code, you'll see the process using more than 100% of CPU. When this occurs, the process is using multiple cores, although it appears as a single process rather than as multiple processes.

Note that this is a different notion than a processor that is hyperthreaded. With hyperthreading a single core appears as two cores to the operating system.

1.3) Distributed memory

Parallel programming for distributed memory parallelism requires passing messages between the different nodes. The standard protocol for doing this is MPI, of which there are various versions, including openMPI.

The R package Rmpi implements MPI in R. The pbd packages for R also implement MPI as well as distributed linear algebra (linear algebra calculations across nodes). In addition, there are various ways to do simple parallelization of multiple computational tasks (across multiple nodes) that use MPI and other tools on the back-end without users needing to understand them.

Python has a package mpi4py that allows use of MPI within Python. And the IPython parallel tools allow one to do simple parallelization of multiple computational tasks across multiple nodes (as do other Python packages as well).

MATLAB has its own system for distributed computation, called the Distributed Computing Server (DCS), requiring additional licensing above the standard MATLAB installation.

This tutorial will not cover distributed memory parallelization; please see our tutorial on parallel processing in a distributed context.

1.4) Other type of parallel processing

We won't cover either of these in this material.

GPUs

GPUs (Graphics Processing Units) are processing units originally designed for rendering graphics on a computer quickly. This is done by having a large number of simple processing units for massively parallel calculation. The idea of general purpose GPU (GPGPU) computing is to exploit this capability for general computation. In spring 2016, I gave a workshop on using GPUs.

Most researchers don't program for a GPU directly but rather use software (often machine learning software such as Tensorflow or Caffe) that has been programmed to take advantage of a GPU if one is available.

Spark and Hadoop

Spark and Hadoop are systems for implementing computations in a distributed memory environment, using the MapReduce approach.

2) Threading, particularly for linear algebra

2.1) What is the BLAS?

The BLAS is the library of basic linear algebra operations (written in Fortran or C). A fast BLAS can greatly speed up linear algebra relative to the default BLAS on a machine. Some fast BLAS libraries are

In addition to being fast when used on a single core, all of these BLAS libraries are threaded - if your computer has multiple cores and there are free resources, your linear algebra will use multiple cores, provided your program is linked against the threaded BLAS installed on your machine and provided OMP_NUM_THREADS is not set to one. (Macs make use of VECLIB_MAXIMUM_THREADS rather than OMP_NUM_THREADS.)

On BCE, both R and (to some degree) Python are linked against OpenBLAS as of BCE-fall-2015.

2.2) Using threading

2.2.1) R

Threading in R is limited to linear algebra, provided R is linked against a threaded BLAS.

Here's some code that illustrates the speed of using a threaded BLAS:

# install.packages('RhpcBLASctl') 
library(RhpcBLASctl)
x <- matrix(rnorm(5000^2), 5000)

blas_set_num_threads(4)
system.time({
   x <- crossprod(x)
   U <- chol(x)
})

#   user  system elapsed 
# 14.104   5.403   6.752 

blas_set_num_threads(1)
system.time({
   x <- crossprod(x)
   U <- chol(x)
})

#   user  system elapsed 
# 12.393   0.055  12.344 

Here the elapsed time indicates that using four threads gave us a two times (2x) speedup in terms of real time, while the user time indicates that the threaded calculation took a bit more total processing time (combining time across all processors) because of the overhead of using multiple threads.

Note that the code also illustrates use of an R package that can control the number of threads from within R.

2.2.2) Python

As with R, threading in Python is limited to linear algebra, provided Python is linked against a threaded BLAS. Python has something called the Global Interpreter Lock that interferes with threading in Python (but not in threaded linear algebra packages called by Python).

Here's some linear algebra in Python that will use threading if numpy is linked against a threaded BLAS, though I don't compare the timing for different numbers of threads here.

import numpy as np
n = 5000
x = np.random.normal(0, 1, size=(n, n))
x = x.T.dot(x)
U = np.linalg.cholesky(x)

2.2.3) MATLAB

Many MATLAB functions are automatically threaded (not just linear algebra), so you don't need to do anything special in your code to take advantage of this. So if you're running MATLAB and monitoring CPU usage (e.g., using top on Linux or OS X), you may see a process using more than 100% of CPU. However worker tasks within a parfor (see Section 3) use only a single thread. MATLAB uses MKL for linear algebra.

2.3) Fixing the number of threads (cores used)

In general, threaded code will detect the number of cores available on a machine and make use of them. However, you can also explicitly control the number of threads available to a process.

2.3.1) R, Python, C, etc.

For most threaded code (that based on the openMP protocol), the number of threads can be set by setting the OMP_NUM_THREADS environment variable (VECLIB_MAXIMUM_THREADS on a Mac). E.g., to set it for four threads in bash:

export OMP_NUM_THREADS=4

Do this before starting your R or Python session or before running your compiled executable.

Alternatively, you can set OMP_NUM_THREADS as you invoke your job, e.g., here with R:

OMP_NUM_THREADS=4 R CMD BATCH --no-save job.R job.out

2.3.2) MATLAB

MATLAB is an exception. Threading in MATLAB can be controlled in two ways. From within your MATLAB code you can set the number of threads, e.g., to four in this case:

feature('numThreads', 4)

To use only a single thread, you can use 1 instead of 4 above, or you can start MATLAB with the singleCompThread flag:

matlab -singleCompThread ...

2.4) Important warnings about use of threaded BLAS

2.4.1) Speed and threaded BLAS

In many cases, using multiple threads for linear algebra operations will outperform using a single thread, but there is no guarantee that this will be the case, in particular for operations with small matrices and vectors. Testing with openBLAS suggests that sometimes a job may take more time when using multiple threads; this seems to be less likely with ACML. This presumably occurs because openBLAS is not doing a good job in detecting when the overhead of threading outweights the gains from distributing the computations. You can compare speeds by setting OMP_NUM_THREADS to different values. In cases where threaded linear algebra is slower than unthreaded, you would want to set OMP_NUM_THREADS to 1.

More generally, if you are using the parallel tools in Section 3 to simultaneously carry out many independent calculations (tasks), it is likely to be more effective to use the fixed number of cores available on your machine so as to split up the tasks, one per core, without taking advantage of the threaded BLAS (i.e., restricting each process to a single thread).

2.4.2) Conflict between openBLAS and some parallel functionality in R

There are conflicts between forking in R and threaded BLAS that in some cases have affected:

The result is that if linear algebra is used within your parallel code, R hangs. This has affected both openBLAS and ACML in the past, though it may not affect current versions of these software.

If you find your R session hanging, before running an R job that does linear algebra, you can set OMP_NUM_THREADS to 1 to prevent the BLAS from doing threaded calculations. Alternatively, you can use MPI as the parallel backend (via doMPI in place of doParallel). You may also be able to convert your code to use par{L,S}apply with the default PSOCK type and avoid foreach entirely.

2.4.3) Conflict between threaded BLAS and R profiling

There is also a conflict between threaded BLAS and R profiling, so if you are using Rprof, you may need to set OMP_NUM_THREADS to one. This has definitely occurred with openBLAS; I'm not sure about other threaded BLAS libraries.

Caution: Note that I don't pay any attention to possible danger in generating random numbers in separate processes in this Section. More on this issue in Section 4.

2.5) Using an optimized BLAS on your own machine(s)

To use an optimized BLAS with R, talk to your systems administrator, see Section A.3 of the R Installation and Administration Manual, or see these instructions to use vecLib BLAS from Apple's Accelerate framework on your own Mac.

It's also possible to use an optimized BLAS with Python's numpy and scipy packages, on either Linux or using the Mac's vecLib BLAS. Details will depend on how you install Python, numpy, and scipy.

3) Basic parallelized loops/maps/apply

All of the functionality discussed here applies only if the iterations/loops of your calculations can be done completely separately and do not depend on one another. This scenario is called an embarrassingly parallel computation. So coding up the evolution of a time series or a Markov chain is not possible using these tools. However, bootstrapping, random forests, simulation studies, cross-validation and many other statistical methods can be handled in this way.

3.1) Parallel loops and apply functions in R

3.1.1) Parallel for loops with foreach

A simple way to exploit parallelism in R is to use the foreach package to do a for loop in parallel.

The foreach package provides a foreach command that allows you to do this easily. foreach can use a variety of parallel “back-ends''. For our purposes, the main one is use of the parallel package to use shared memory cores. When using parallel as the back-end, you should see multiple processes (as many as you registered; ideally each at 100%) when you monitor CPU usage. The multiple processes are created by forking or using sockets. foreach can also use Rmpi or SNOW to access cores in a distributed memory setting; please see the tutorial on distributed parallel processing mentioned above.

Here we'll parallelize leave-one-out cross-validation for a random forest model. An iteration involves holding out a data point, fitting the model with all the other data points, and then predicting the held-out point.

library(doParallel)  # uses parallel package, a core R package
## Loading required package: foreach
## Loading required package: iterators
## Loading required package: parallel
source('rf.R')  # loads in data and looFit()
## randomForest 4.6-12
## Type rfNews() to see new features/changes/bug fixes.
looFit
## function (i, Y, X, loadLib = FALSE) 
## {
##     if (loadLib) 
##         library(randomForest)
##     out <- randomForest(y = Y[-i], x = X[-i, ], xtest = X[i, 
##         ])
##     return(out$test$predicted)
## }
nCores <- 4  # to set manually
registerDoParallel(nCores) 

nSub <- 30  # do only first 30 for illustration

result <- foreach(i = 1:nSub) %dopar% {
    cat('Starting ', i, 'th job.\n', sep = '')
    output <- looFit(i, Y, X)
    cat('Finishing ', i, 'th job.\n', sep = '')
    output # this will become part of the out object
}
print(result[1:5])
## [[1]]
##        1 
## 1.524218 
## 
## [[2]]
##         2 
## 0.2154531 
## 
## [[3]]
##        3 
## 1.056725 
## 
## [[4]]
##          4 
## -0.2916539 
## 
## [[5]]
##          5 
## -0.2747813

(Note that the printed statements from cat are not showing up in the creation of this document but should show if you run the code.)

Note that foreach also provides functionality for collecting and managing the results to avoid some of the bookkeeping you would need to do if writing your own standard for loop. The result of foreach will generally be a list, unless we request the results be combined in different way, as we do here using .combine = c.

You can debug by running serially using %do% rather than %dopar%. Note that you may need to load packages within the foreach construct to ensure a package is available to all of the calculations.

It is possible to use foreach to parallelize over nested loops. Suppose that the outer loop has too few tasks to effectively parallelize over and you also want to parallelize over the inner loop as well. Provided the calculations in each task (defined based on the pair of indexes from both loops) are independent of the other tasks, you can define two foreach loops, with the outer foreach using the %:% operator and the inner foreach using the usual %dopar% operator. More details can be found in this vignette.

3.1.2) Parallel apply functionality

The parallel package has the ability to parallelize the various apply functions (apply, lapply, sapply, etc.). It's a bit hard to find the vignette for the parallel package because parallel is not listed as one of the contributed packages on CRAN (it gets installed with R by default).

We'll consider parallel lapply and sapply. These rely on having started a cluster using cluster, which uses the PSOCK mechanism as in the SNOW package - starting new jobs via Rscript and communicating via a technology called sockets.

library(parallel)
nCores <- 4  # to set manually 
cl <- makeCluster(nCores) 

nSub <- 30
input <- seq_len(nSub) # same as 1:nSub but more robust

# clusterExport(cl, c('x', 'y')) # if the processes need objects
# from master's workspace (not needed here as no global vars used)

# need to load randomForest package within function
# when using par{L,S}apply
system.time(
    res <- parSapply(cl, input, looFit, Y, X, TRUE)
)
##    user  system elapsed 
##   0.008   0.004  23.350
system.time(
    res2 <- sapply(input, looFit, Y, X)
)
##    user  system elapsed 
##  84.296   0.036  84.321
res <- parLapply(cl, input, looFit, Y, X, TRUE)

Here the miniscule user time is probably because the time spent in the worker processes is not counted at the level of the overall master process that dispatches the workers.

For help with these functions and additional related parallelization functions (including parApply), see help(clusterApply).

mclapply is an alternative that uses forking to start up the worker processes.

system.time(
    res <- mclapply(input, looFit, Y, X, mc.cores = nCores) 
)
##    user  system elapsed 
##  84.060   0.160  22.516

Note that some R packages can directly interact with the parallelization packages to work with multiple cores. E.g., the boot package can make use of the parallel package directly.

3.1.3) Loading packages and accessing global variables within your parallel tasks

Whether you need to explicitly load packages and export global variables from the master process to the parallelized worker processes depends on the details of how you are doing the parallelization.

With foreach with the doParallel backend, parallel apply statements (starting the cluster via makeForkCluster, instead of the usual makeCluster), and mclapply, packages and global variables in the main R process are automatically available to the worker tasks without any work on your part. This is because all of these approaches fork the original R process, thereby creating worker processes with the same state as the original R process. Interestingly, this means that global variables in the forked worker processes are just references to the objects in memory in the original R process. So the additional processes do not use additional memory for those objects (despite what is shown in top) and there is no time involved in making copies. However, if you modify objects in the worker processes then copies are made.

In contrast, with parallel apply statements when starting the cluster using the standard makeCluster (which sets up a so-called PSOCK cluster, starting the R worker processes via Rscript), one needs to load packages within the code that is executed in parallel. In addition one needs to use clusterExport to tell R which objects in the global environment should be available to the worker processes. This involves making as many copies of the objects as there are worker processes, so one can easily exceed the physical memory (RAM) on the machine if one has large objects, and the copying of large objects will take time.

3.2) Parallel looping in Python

I'll cover IPython parallel functionality, which allows one to parallelize on a single machine (discussed here) or across multiple machines (see the tutorial on distributed memory parallelization). There are a variety of other approaches one could use, of which I discuss two (the pp and multiprocessing packages) in the Appendix.

First we need to start our worker engines.

ipcluster start -n 4 &
sleep 45

Here we'll do the same random forest cross-validation as before.

import numpy as np
np.random.seed(0)
n = 500
p = 50
X = np.random.normal(0, 1, size = (n, p))
Y = X[: , 0] + pow(abs(X[:,1] * X[:,2]), 0.5) + X[:,1] - X[:,2] + np.random.normal(0, 1, n)
def looFit(index, Ylocal, Xlocal):
    rf = rfr(n_estimators=100)
    fitted = rf.fit(np.delete(Xlocal, index, axis = 0), np.delete(Ylocal, index))
    pred = rf.predict(np.array([Xlocal[index, :]]))
    return(pred[0])
from ipyparallel import Client
c = Client()
c.ids
dview = c[:]
dview.block = True
dview.apply(lambda : "Hello, World")
lview = c.load_balanced_view()
lview.block = True
dview.execute('from sklearn.ensemble import RandomForestRegressor as rfr')
dview.execute('import numpy as np')
mydict = dict(X = X, Y = Y, looFit = looFit)
dview.push(mydict)
nSub = 50  # for illustration only do a subset
# need a wrapper function because map() only operates on one argument
def wrapper(i):
    return(looFit(i, Y, X))
import time
time.time()
pred = lview.map(wrapper, range(nSub))
time.time()
print(pred[0:10])
# import pylab
# import matplotlib.pyplot as plt
# plt.plot(Y, pred, '.')
# pylab.show()

Finally we stop the worker engines:

ipcluster stop

3.3) Basic shared memory parallel programming in MATLAB

To run a loop in parallel in MATLAB, you can use the parfor construction. Before running the parfor you need to start up a set of workers using parpool. MATLAB will use only one thread per worker. Here is some demo code:

nslots = 4; % to set manually
mypool = parpool(nslots) 
% parpool open local nslots # alternative

n = 3000;
nIts = 500;
c = zeros(n, nIts);
parfor i = 1:nIts
     c(:,i) = eig(rand(n)); 
end

delete(mypool)

% delete(gcp) works if you forget to name your pool by assigning the output of parpool to a variable

MATLAB has a default limit on the number of workers in a pool, but you can modify your MATLAB settings as follows to increase that limit (in this case to allow up to 32 workers). It should work to run the following code once in a MATLAB session, which will modify the settings for future MATLAB sessions.

cl = parcluster('local');
cl.NumWorkers = 32;
saveProfile(cl);

By default MATLAB uses a single thread (core) per worker. However you can also use multiple threads. Here is how you can set that up.

cl = parcluster('local');
cl.NumThreads = 2;  % 2 threads per worker
cl.parpool(4);      % 4 workers

3.4) Using OpenMP threads for basic shared memory programming in C

It's straightforward to write threaded code in C and C++ (as well as Fortran) to exploit multiple cores. The basic approach is to use the OpenMP protocol. Here's how one would parallelize a loop in C/C++ using an OpenMP compiler directive. In this case we are parallelizing the outer loop; the iterations of the outer loop are done in parallel, while the iterations of the inner loop are done serially within a thread. As with foreach in R, you only want to do this if the iterations do not depend on each other. The code is available as a C++ program (but the core of the code is just C code) in testOpenMP.cpp.

// see testOpenMP.cpp
#include <iostream>
using namespace std;

// compile with:  g++ -fopenmp -L/usr/local/lib  
//                  testOpenMP.cpp -o testOpenMP 

int main(){
  int nReps = 20;
  double x[nReps];
  #pragma omp parallel for
  for (int i=0; i<nReps; i++){
    x[i] = 0.0;
    for ( int j=0; j<1000000000; j++){
      x[i] = x[i] + 1.0;
    }
    cout << x[i] << endl;
  }
  return 0;
}

We would compile this program as follows

$ g++ -fopenmp testOpenMP.cpp -o testOpenMP

The main thing to be aware of in using OpenMP is not having different threads overwrite variables used by other threads. In the example above, variables declared within the #pragma directive will be recognized as variables that are private to each thread. In fact, you could declare int i before the compiler directive and things would be fine because OpenMP is smart enough to deal properly with the primary looping variable. But big problems would ensue if you had instead written the following code:

int main(){
  int nReps = 20;
  int j;  // DON'T DO THIS !!!!!!!!!!!!!
  double x[nReps];
  #pragma omp parallel for
  for (int i=0; i<nReps; i++){
    x[i] = 0.0;
    for (j=0; j<1000000000; j++){
      x[i] = x[i] + 1.0;
    }
    cout << x[i] << endl;
  }
  return 0;
}

Note that we do want x declared before the compiler directive because we want all the threads to write to a common x (but, importantly, to different components of x). That's the point!

We can also be explicit about what is shared and what is private to each thread:

int main(){
  int nReps = 20;
  int i, j;
  double x[nReps];
  #pragma omp parallel for private(i,j) shared(x, nReps)
  for (i=0; i<nReps; i++){
    x[i] = 0.0;
    for (j=0; j<1000000000; j++){
      x[i] = x[i] + 1.0;
    }
    cout << x[i] << endl;
  }
  return 0;
}

3.5) Calling OpenMP-based C code from R

The easiest path here is to use the Rcpp package. In this case, you can write your C++ code with OpenMP pragma statemetns as in the previous subsection. You'll need to make sure that the PKG_CXXFLAGS and PKG_LIBS environment variables are set to include -f openmp so the compilation is done correctly. More details/examples linked to from this Stack overflow post.

4) Parallelization strategies

The following are some basic principles/suggestions for how to parallelize your computation.

Should I use one machine/node or many machines/nodes?

What level or dimension should I parallelize over?

How do I balance communication overhead with keeping my cores busy?

Should multiple tasks be pre-assigned to a process (i.e., a worker) (sometimes called prescheduling) or should tasks be assigned dynamically as previous tasks finish?

5) Random number generation (RNG) in parallel

The key thing when thinking about random numbers in a parallel context is that you want to avoid having the same 'random' numbers occur on multiple processes. On a computer, random numbers are not actually random but are generated as a sequence of pseudo-random numbers designed to mimic true random numbers. The sequence is finite (but very long) and eventually repeats itself. When one sets a seed, one is choosing a position in that sequence to start from. Subsequent random numbers are based on that subsequence. All random numbers can be generated from one or more random uniform numbers, so we can just think about a sequence of values between 0 and 1.

The worst thing that could happen is that one sets things up in such a way that every process is using the same sequence of random numbers. This could happen if you mistakenly set the same seed in each process, e.g., using set.seed(mySeed) in R on every process.

The naive approach is to use a different seed for each process. E.g., if your processes are numbered id = 1,2,...,p with a variable id that is unique to a process, setting the seed to be the value of id on each process. This is likely not to cause problems, but raises the danger that two (or more sequences) might overlap. For an algorithm with dependence on the full sequence, such as an MCMC, this probably won't cause big problems (though you likely wouldn't know if it did), but for something like simple simulation studies, some of your 'independent' samples could be exact replicates of a sample on another process. Given the period length of the default generators in R, MATLAB and Python, this is actually quite unlikely, but it is a bit sloppy.

One approach to avoid the problem is to do all your RNG on one process and distribute the random deviates, but this can be infeasible with many random numbers.

More generally to avoid this problem, the key is to use an algorithm that ensures sequences that do not overlap.

5.1) Ensuring separate sequences in R

In R, the rlecuyer package deals with this. The L'Ecuyer algorithm has a period of \(2^{191}\), which it divides into subsequences of length \(2^{127}\).

5.1.1) With the parallel package

Here's how you initialize independent sequences on different processes when using the parallel package's parallel apply functionality (illustrated here with parSapply).

library(parallel)
library(rlecuyer)
nSims <- 250
taskFun <- function(i){
    val <- runif(1)
    return(val)
}

nCores <- 4
RNGkind()
## [1] "Mersenne-Twister" "Inversion"
cl <- makeCluster(nCores)
iseed <- 0
clusterSetRNGStream(cl = cl, iseed = iseed)
RNGkind() # clusterSetRNGStream sets RNGkind as L'Ecuyer-CMRG
## [1] "Mersenne-Twister" "Inversion"
# but it doesn't show up here on the master
res <- parSapply(cl, 1:nSims, taskFun)
# now redo with same master seed to see results are the same
clusterSetRNGStream(cl = cl, iseed = iseed)
res2 <- parSapply(cl, 1:nSims, taskFun)
identical(res,res2)
## [1] TRUE
stopCluster(cl)

If you want to explicitly move from stream to stream, you can use nextRNGStream. For example:

RNGkind("L'Ecuyer-CMRG") 
seed <- 0
set.seed(seed)
## now start M workers 
s <- .Random.seed 
for (i in 1:M) { 
    s <- nextRNGStream(s) 
    # send s to worker i as .Random.seed 
} 

When using mclapply, you can use the mc.set.seed argument as follows (note that mc.set.seed is TRUE by default, so you should get different seeds for the different processes by default), but one needs to invoke RNGkind("L'Ecuyer-CMRG") to get independent streams via the L'Ecuyer algorithm.

library(parallel)
library(rlecuyer)
RNGkind("L'Ecuyer-CMRG")
res <- mclapply(seq_len(nSims), taskFun, mc.cores = nCores, 
    mc.set.seed = TRUE) 
# this also seems to reset the seed when it is run
res2 <- mclapply(seq_len(nSims), taskFun, mc.cores = nCores, 
    mc.set.seed = TRUE) 
identical(res,res2)
## [1] TRUE

The documentation for mcparallel gives more information about reproducibility based on mc.set.seed.

5.1.2) With foreach

Getting independent streams

One question is whether foreach deals with RNG correctly. This is not documented, but the developers (Revolution Analytics) are well aware of RNG issues. Digging into the underlying code reveals that the doParallel backend invokes mclapply and sets mc.set.seed to TRUE by default. This suggests that the discussion above r.e. mclapply holds for foreach as well, so you should do RNGkind("L'Ecuyer-CMRG") before your foreach call.

Ensuring reproducibility

While using foreach as just described should ensure that the streams on each worker are are distinct, it does not ensure reproducibility because task chunks may be assigned to workers differently in different runs and the substreams are specific to workers, not to tasks.

For backends other than doMPI, such as doParallel, there is a package called doRNG that ensures that foreach loops are reproducible. (For doMPI you simply pass .options.mpi = list(seed = your_seed_value_here) as an additional argument to foreach.)

Here's how you do it:

library(doRNG)
## Loading required package: rngtools
## Loading required package: methods
## Loading required package: pkgmaker
## Loading required package: registry
## 
## Attaching package: 'pkgmaker'
## The following object is masked from 'package:base':
## 
##     isNamespaceLoaded
library(doParallel)
registerDoParallel(nCores)
registerDoRNG(seed = 0) 
result <- foreach(i = 1:20) %dopar% { 
    out <- mean(rnorm(1000)) 
}
registerDoRNG(seed = 0) 
result2 <- foreach(i = 1:20) %dopar% { 
    out <- mean(rnorm(1000)) 
}
identical(result,result2)
## [1] TRUE

You can ignore the warnings about closing unused connections printed out above.

5.2) RNG in Python

Python uses the Mersenne-Twister generator. If you're using the RNG in numpy/scipy, you can set the seed using numpy.random.seed or scipy.random.seed. The advice I'm seeing online in various Python forums is to just set separate seeds, so it appears the Python is a bit behind R and MATLAB here. There is a function random.jumpahead that allows you to move the seed ahead as if a given number of random numbers had been generated, but this function will not be in Python 3.x, so I won't suggest using it.

5.3) RNG in MATLAB

MATLAB also uses the Mersenne-Twister. We can set the seed as: rng(seed), with seed being a non-negative integer.

Happily, like R, we can set up independent streams, using either of the Combined Multiple Recursive ('mrg32k3a') and the Multiplicative Lagged Fibonacci ('mlfg6331_64') generators. Here's an example, where we create the second of the 5 streams, as if we were using this code in the second of our parallel processes. The 'Seed',0 part is not actually needed as that is the default.

thisStream = 2;
totalNumStreams = 5;
seed = 0;
cmrg1 = RandStream.create('mrg32k3a', 'NumStreams', totalNumStreams, 
   'StreamIndices', thisStream, 'Seed', seed); 
RandStream.setGlobalStream(cmrg1);
randn(5, 1)

Appendix: Other shared memory parallelization functionality in R and MATLAB

Other parallel functionality in R

One can use mcparallel in the parallel package to send different chunks of code to different processes. Here we would need to manage the number of tasks so that we don't have more tasks than available cores.

library(parallel)
n <- 10000000
system.time({
    p <- mcparallel(mean(rnorm(n)))
    q <- mcparallel(mean(rgamma(n, shape = 1)))
    res <- mccollect(list(p,q))
})
##    user  system elapsed 
##    1.60    0.12    3.61
system.time({
    p <- mean(rnorm(n))
    q <- mean(rgamma(n, shape = 1))
})
##    user  system elapsed 
##   4.756   0.228   4.985

Note that mcparallel also allows the use of the mc.set.seed argument as with mclapply.

Note that on the cluster, one should create only as many parallel blocks of code as were requested when submitting the job.

Now let's consider parallel evaluation of a vectorized function. This will often only be worthwhile on very long vectors and for computationally intensive calculations. (The Matern call here is more time-consuming than exp).

library(parallel)
nCores <- 8
cl <- makeCluster(nCores) 
library(fields)
## Loading required package: spam
## Loading required package: grid
## Spam version 1.4-0 (2016-08-29) is loaded.
## Type 'help( Spam)' or 'demo( spam)' for a short introduction 
## and overview of this package.
## Help for individual functions is also obtained by adding the
## suffix '.spam' to the function name, e.g. 'help( chol.spam)'.
## 
## Attaching package: 'spam'
## The following objects are masked from 'package:base':
## 
##     backsolve, forwardsolve
## Loading required package: maps
ds <- runif(6000000, .1, 10)
ds_exp <- pvec(ds, exp, mc.cores = nCores)
# here's a more computationally intensive function
system.time(
    corVals <- pvec(ds, Matern, .1, 2, mc.cores = nCores)
)
##    user  system elapsed 
##   5.028   0.948   1.335
system.time(
    corVals <- Matern(ds, .1, 2)
)
##    user  system elapsed 
##   5.128   0.104   5.236

Manually parallelizing individual tasks in MATLAB

In addition to using parfor in MATLAB, you can also explicitly program parallelization, managing the individual parallelized tasks. Here is some template code for doing this. We'll submit our jobs to a pool of workers so that we have control over how many jobs are running at once. Note that here I submit 6 jobs that call the same function, but the different jobs could call different functions and have varying inputs and outputs. MATLAB will run as many jobs as available workers in the pool and will queue the remainder, starting them as workers in the pool become available. Here is some demo code

feature('numThreads', 1); 
ncores = 4;
pool = parpool(ncores); 
% assume you have test.m with a function, test, taking two inputs 
% (n and seed) and returning 1 output
n = 10000000;
job = cell(1,6); 
job{1} = parfeval(pool, @test, 1, n, 1);  
job{2} = parfeval(pool, @test, 1, n, 2);  
job{3} = parfeval(pool, @test, 1, n, 3);  
job{4} = parfeval(pool, @test, 1, n, 4);  
job{5} = parfeval(pool, @test, 1, n, 5);  
job{6} = parfeval(pool, @test, 1, n, 6);  

% wait for outputs, in order
output = cell(1, 6);
for idx = 1:6
  output{idx} = fetchOutputs(job{idx});
end 

% alternative way to loop over jobs:
for idx = 1:6
  jobs(idx) = parfeval(pool, @test, 1, n, idx); 
end 

% wait for outputs as they finish
output = cell(1, 6);
for idx = 1:6
  [completedIdx, value] = fetchNext(jobs);
  output{completedIdx} = value;
end 

delete(pool);

And if you want to run threaded code in a given job, you can do that by setting the number of threads within the function called by parfeval. See the testThread.m file for the testThread function.

ncores = 8;
n = 5000;
nJobs = 4;
pool = parpool(nJobs);
% pass number of threads as number of slots divided by number of jobs
% testThread function should then do: 
% feature('numThreads', nThreads);
% where nThreads is the name of the relevant argument to testThread
jobt1 = parfeval(pool, @testThread, 1, n, 1, nCores/nJobs);
jobt2 = parfeval(pool, @testThread, 1, n, 2, nCores/nJobs);
jobt3 = parfeval(pool, @testThread, 1, n, 3, nCores/nJobs);
jobt4 = parfeval(pool, @testThread, 1, n, 4, nCores/nJobs);

output1 = fetchOutputs(jobt1);
output2 = fetchOutputs(jobt2);
output3 = fetchOutputs(jobt3);
output4 = fetchOutputs(jobt4);

delete(pool);

Other approaches to parallelization in Python

pp package

Here we create a server object and submit jobs to the server object, which manages the farming out of the tasks. Note that this will run interactively in IPython or as a script from UNIX, but will not run interactively in the base Python interpreter (for reasons that are unclear to me). Also note that while we are illustrating this as basically another parallelized for loop, the individual jobs can be whatever calculations you want, so the taskFun function could change from job to job.

import numpy
import pp
nCores = 4
job_server = pp.Server(ncpus = nCores, secret = 'mysecretphrase')
# set 'secret' to some passphrase (you need to set it but 
#   what it is should not be crucial)
job_server.get_ncpus()
nSmp = 10000000
m = 40
def taskFun(i, n):
    numpy.random.seed(i)
    return (i, numpy.mean(numpy.random.normal(0, 1, n)))
# create list of tuples to iterate over
inputs = [(i, nSmp) for i in xrange(m)]
# submit and run jobs using list comprehension
jobs = [job_server.submit(taskFun, invalue, modules = ('numpy',)) for invalue in inputs]
# collect results (will have to wait for longer tasks to finish)
results = [job() for job in jobs]
print(results)
job_server.destroy()
[(0, 0.00030280243091597474), (1, -8.5722825540149767e-05), (2, 0.00013566614947237407), (3, 0.00061310818505479474), (4, -0.0004702706491795272), (5, 0.00024515486966970537), (6, -0.00017472300458822845), (7, -0.00025050095623507584), (8, -0.00033399772183492841), (9, -0.00049137138871004158), (10, 0.00029251318047107422), (11, 1.1956375483643322e-05), (12, -0.00010810414999124078), (13, 0.00015533121727716678), (14, -0.00092143784872822018), (15, -7.4020047531168942e-05), (16, -0.00027179897723462343), (17, -0.00020500359099047446), (18, 5.0102720605584639e-05), (19, -0.00031948846032527046), (20, -5.4961570167677311e-05), (21, -0.00057477384497516828), (22, 0.00035571929916218195), (23, 0.0003172760600221845), (24, -3.9757431343687736e-05), (25, 0.00037903275195759294), (26, -0.00010435497860874407), (27, 0.0001701626336006962), (28, -0.00069358450543517865), (29, 0.00067886194920371693), (30, -0.00051310981441539557), (31, -3.0022955955111069e-05), (32, -0.00063590672702952002), (33, -0.00031966078315322541), (34, -0.00015649509164027703), (35, 0.00028376009875884391), (36, 0.00018534703816611961), (37, -0.00021821998172858934), (38, 8.0842394421238762e-05), (39, -0.00014637870897851111)]

multiprocessing package

Here we'll use the Pool.map method to iterate in a parallelized fashion, as the Python analog to foreach or parfor. Pool.map only supports having a single argument to the function being used, so we'll use list of tuples, and pass each tuple as the argument.

import multiprocessing as mp
import numpy as np
nCores = 4 # to set manually
nSmp = 10000000
m = 40
def taskFun(input):
    np.random.seed(input[0])
    return np.mean(np.random.normal(0, 1, input[1]))
# create list of tuples to iterate over, since
# Pool.map() does not support multiple arguments
inputs = [(i, nSmp) for i in xrange(m)]
## NameError: name 'xrange' is not defined
## 
## Detailed traceback: 
##   File "<string>", line 1, in <module>
inputs[0:2]
## NameError: name 'inputs' is not defined
## 
## Detailed traceback: 
##   File "<string>", line 1, in <module>
pool = mp.Pool(processes = nCores)
results = pool.map(taskFun, inputs)
## NameError: name 'inputs' is not defined
## 
## Detailed traceback: 
##   File "<string>", line 1, in <module>
print(results)
## NameError: name 'results' is not defined
## 
## Detailed traceback: 
##   File "<string>", line 1, in <module>

More advanced use of OpenMP in C

The goal here is just to give you a sense of what is possible with OpenMP.

The OpenMP API provides three components: compiler directives that parallelize your code (such as #pragma omp parallel for), library functions (such as omp_get_thread_num()), and environment variables (such as OMP_NUM_THREADS)

OpenMP constructs apply to structured blocks of code. Blocks may be executed in parallel or sequentially, depending on how one uses the OpenMP pragma statements. One can also force execution of a block to wait until particular preceding blocks have finished, using a barrier.

Here's a basic "Hello, world” example that illustrates how it works (the full program is in helloWorldOpenMP.cpp):

// see helloWorldOpenMP.cpp
#include <stdio.h>
#include <omp.h> // needed when using any openMP functions 
//                               like omp_get_thread_num()

void myFun(double *in, int id){
// this is the function that would presumably do the heavy computational stuff
}

int main()
{
   int nthreads, myID;
   double* input;
   /* make the values of nthreads and myid private to each thread */
   #pragma omp parallel private (nthreads, myID)
   { // beginning of block
      myID = omp_get_thread_num();
      printf("Hello, I am thread %d\n", myID);
      myFun(input, myID);  // do some computation on each thread
      /* only master node print the number of threads */
      if (myid == 0)
      {
         nthreads = omp_get_num_threads();
         printf("I'm the boss and control %i threads. How come they're in front of me?\n", nThreads);
      }
   } // end of block
   return 0;
} 

The parallel directive starts a team of threads, including the master, which is a member of the team and has thread number 0. The number of threads is determined in the following ways - here the first two options specify four threads:

  1. #pragma omp parallel NUM_THREADS (4) // set 4 threads for this parallel block

  2. omp_set_num_threads(4) // set four threads in general

  3. the value of the OMP_NUM_THREADS environment variable

  4. a default - usually the number of cores on the compute node

Note that in #pragma omp parallel for, there are actually two instructions, parallel starts a team of threads, and for farms out the iterations to the team. In our parallel for invocation, we could have done it more explicitly as:

#pragma omp parallel
#pragma omp for

We can also explicitly distribute different chunks of code amongst different threads as seen here and in the full program in sectionsOpenMP.cpp.

// see sectionsOpenMP.cpp
#pragma omp parallel // starts a new team of threads
{
   Work0(); // this function would be run by all threads. 
   #pragma omp sections // divides the team into sections 
   { 
      // everything herein is run only once. 
      #pragma omp section 
      { Work1(); } 
      #pragma omp section 
      { 
         Work2(); 
         Work3(); 
      } 
      #pragma omp section 
      { Work4(); } 
   }
} // implied barrier

Here Work1, {Work2 + Work3} and Work4 are done in parallel, but Work2 and Work3 are done in sequence (on a single thread).

If one wants to make sure that all of a parallized calculation is complete before any further code is executed you can insert #pragma omp barrier.

Note that a #pragma for statement includes an implicit barrier as does the end of any block specified with #pragma omp parallel.

You can use nowait if you explicitly want to prevent threads from waiting at an implicit barrier: e.g., #pragma omp parallel sections nowait or #pragma omp parallel for nowait

One should be careful about multiple threads writing to the same variable at the same time (this is an example of a race condition). In the example below, if one doesn't have the #pragma omp critical directive two threads could read the current value of result at the same time and then sequentially write to result after incrementing their local copy, which would result in one of the increments being lost. A way to avoid this is with the critical directive (for single lines of code you can also use atomic instead of critical), as seen here and in the full program in criticalOpenMP.cpp:

// see criticalOpenMP.cpp
double result = 0.0;
double tmp;
#pragma omp parallel for private (tmp, i) shared (result)
for (int i=0; i<n; i++){
   tmp = myFun(i);
   #pragma omp critical
   result += tmp;
}

You should also be able to use syntax like the following for the parallel for declaration (in which case you shouldn't need the #pragma omp critical):

#pragma omp parallel for reduction(+:result)

I believe that doing this sort of calculation where multiple threads write to the same variable may be rather inefficient given time lost in waiting to have access to result, but presumably this would depend on how much time is spent in myFun() relative to the reduction operation.