Parallel Processing for Distributed Computing in R, Python, Matlab, and C

Parallelization tools in a distributed memory (multiple machine) context

Chris Paciorek, Department of Statistics, UC Berkeley

0) This Tutorial

This tutorial covers strategies for using parallel processing in R, Python, Matlab (briefly), and C on multiple machines, in which the various processes must interact across a network linking the machines.

This tutorial assumes you have access to two or more servers on which to parallelize your computation, potentially via a Linux cluster managed via scheduling software such as SLURM, and that MPI, R, and Python are installed on the machines.

Alternatively, you may be able to start a virtual cluster on Amazon Web Services using CfnCluster. If using CfnCluster, we recommend using a virtual machine developed here at Berkeley, the Berkeley Common Environment (BCE). BCE is a virtual Linux machine - basically it is a Linux computer that you can run within your own computer, regardless of whether you are using Windows, Mac, or Linux. This provides a common environment so that things behave the same for all of us. Please follow the instructions related to CfnCluster at the BCE install page.

This tutorial assumes you have a working knowledge of either R, Python, or C.

Materials for this tutorial, including the R markdown file and associated code files that were used to create this document are available on Github at (https://github.com/berkeley-scf/tutorial-parallel-distributed). You can download the files by doing a git clone from a terminal window on a UNIX-like machine, as follows:

git clone https://github.com/berkeley-scf/tutorial-parallel-distributed

To create this HTML document, simply compile the corresponding R Markdown file in R as follows (the following will work from within BCE after cloning the repository as above).

Rscript -e "library(knitr); knit2html('parallel-dist.Rmd')"

This tutorial by Christopher Paciorek is licensed under a Creative Commons Attribution 3.0 Unported License.

1) Types of parallel processing

There are two basic flavors of parallel processing (leaving aside GPUs): distributed memory and shared memory. With shared memory, multiple processors (which I'll call cores) share the same memory. With distributed memory, you have multiple nodes, each with their own memory. You can think of each node as a separate computer connected by a fast network.

1.1) Some useful terminology:

1.2) Distributed memory and an overview of the topics in this tutorial

Parallel programming for distributed memory parallelism requires passing messages between the different nodes. The standard protocol for doing this is MPI, of which there are various versions, including openMPI, which we'll use here.

The R package Rmpi implements MPI in R. The pbdR packages for R also implement MPI as well as distributed linear algebra.

Python has a package mpi4py that allows use of MPI within Python.

In both R and Python, there are also easy ways to do embarrassingly parallel calculations (such as simple parallel for loops) across multiple machines, with MPI and similar tools used behind the scenes to manage the worker processes.

Matlab has its own system for distributed computation, called the Distributed Computing Server (DCS), requiring additional licensing above the standard Matlab installation.

This tutorial will cover:

1.3) Other type of parallel processing

We won't cover any of these in this material.

Shared memory parallelization

For shared memory parallelism, each core is accessing the same memory so there is no need to pass information (in the form of messages) between different machines. But in some programming contexts one needs to be careful that activity on different cores doesn't mistakenly overwrite places in memory that are used by other cores. Threading is a form of shared memory parallelism.

This tutorial will not cover shared memory parallelization, as it is covered in a separate tutorial.

For information about working with random numbers in a parallel computation, please see that same tutorial, as the discussion applies to both shared and distributed memory.

GPUs

GPUs (Graphics Processing Units) are processing units originally designed for rendering graphics on a computer quickly. This is done by having a large number of simple processing units for massively parallel calculation. The idea of general purpose GPU (GPGPU) computing is to exploit this capability for general computation. In spring 2016, I gave a workshop on using GPUs.

Most researchers don't program for a GPU directly but rather use software (often machine learning software such as Tensorflow or Caffe) that has been programmed to take advantage of a GPU if one is available.

Spark and Hadoop

Spark and Hadoop are systems for implementing computations in a distributed memory environment, using the MapReduce approach.

2) Starting MPI-based jobs

Code that explicitly uses MPI, as well as code using MPI under the hood, such as foreach with doMPI in R and pbdR, requires that you start your process(es) in a special way via the mpirun command. Note that mpirun, mpiexec and orterun are synonyms under openMPI.

The basic requirements for starting such a job are that you specify the number of processes you want to run and that you indicate what machines those processes should run on. Those machines should be networked together such that MPI can ssh to the various machines without any password required.

2.1) Running an MPI job under SLURM

There are two ways to tell mpirun the machines on which to run the worker processes.

First, we can pass the machine names directly, replicating the name if we want multiple processes on a single machine. In the example here, these are machines accessible to me, and you would need to replace those names with the names of machines you have access to. You'll need to set up SSH keys so that you can access the machines without a password.

mpirun --host smeagol,radagast,arwen,arwen -np 4 hostname
## smeagol
## radagast
## arwen
## arwen

Alternatively, we can create a file with the relevant information.

echo 'smeagol slots=1' > .hosts
echo 'radagast slots=1' >> .hosts
echo 'arwen slots=2' >> .hosts
mpirun -machinefile .hosts -np 4 hostname
## smeagol
## radagast
## arwen
## arwen

If you are running your code as part of a job submitted to SLURM, you generally won't need to pass the machinefile or np arguments as MPI will get that information from SLURM. So you can simply do:

mpirun hostname

Note that on a CfnCluster-based EC2 VM, you could run your job through SLURM, or you can directly use the node names, which can be seen by invoking sinfo and looking at the NODELIST column.

To limit the number of threads for each process, we can tell mpirun to export the value of OMP_NUM_THREADS to the processes. E.g., calling a C program, quad_mpi:

export OMP_NUM_THREADS=2
mpirun -machinefile .hosts -np 4 -x OMP_NUM_THREADS quad_mpi

In the examples above, I illustrated with a simple bash command (hostname) and with a compiled C program, but one would similarly use the -machinefile flag when starting R or Python or a C program via mpirun.

There are additional details involved in carefully controlling how processes are allocated to nodes, but the default arguments for mpirun should do a reasonable job in many situations.

Also, I've had inconsistent results in terms of having the correct number of workers start up on each of the machines specified, depending on whether I specify the number of workers implicitly via the hosts information (without specifying -np), explicitly via -np or both. You may want to check that the right number of workers is running on each host.

3) Basic parallelization across nodes

Here we'll see the use of high-level packages in R, Python, and Matlab that hide the details of communication between nodes.

3.1) R

3.1.1) foreach with the doMPI and doSNOW backends

Just as we used foreach in a shared memory context, we can use it in a distributed memory context as well, and R will handle everything behind the scenes for you.

doMPI

Start R through the mpirun command as discussed above, either as a batch job or for interactive use. We'll only ask for 1 process because the worker processes will be started automatically from within R (but using the machine names information passed to mpirun).

mpirun -machinefile .hosts -np 1 R CMD BATCH -q --no-save doMPI.R doMPI.out
mpirun -machinefile .hosts -np 1 R --no-save

Here's R code for using Rmpi as the back-end to foreach. If you call startMPIcluster with no arguments, it will start up one fewer worker processes than the number of hosts times slots given to mpirun so your R code will be more portable.

## you should have invoked R as:
## mpirun -machinefile .hosts -np 1 R CMD BATCH --no-save doMPI.R doMPI.out
## unless running within a SLURM job, in which case you should do:
## mpirun R CMD BATCH --no-save file.R file.out

library(Rmpi)
library(doMPI)

cl = startMPIcluster()  # by default will start one fewer slave
# than elements in .hosts

registerDoMPI(cl)
clusterSize(cl) # just to check

results <- foreach(i = 1:200) %dopar% {
  out = mean(rnorm(1e6))
}

closeCluster(cl)

mpi.quit()
mpirun -machinefile .hosts -np 1 R CMD BATCH -q --no-save doMPI.R doMPI.out
cat doMPI.out
## > ## @knitr doMPI
## > 
## > ## you should have invoked R as:
## > ## mpirun -machinefile .hosts -np 1 R CMD BATCH --no-save doMPI.R doMPI.out
## > ## unless running within a SLURM job, in which case you should do:
## > ## mpirun R CMD BATCH --no-save file.R file.out
## > 
## > library(Rmpi)
## > library(doMPI)
## Loading required package: foreach
## Loading required package: iterators
## > 
## > cl = startMPIcluster()  # by default will start one fewer slave
##  3 slaves are spawned successfully. 0 failed.
## > # than elements in .hosts
## >                                         
## > registerDoMPI(cl)
## > clusterSize(cl) # just to check
## [1] 3
## > 
## > results <- foreach(i = 1:200) %dopar% {
## +   out = mean(rnorm(1e6))
## + }
## > 
## > closeCluster(cl)
## > 
## > mpi.quit()

A caution concerning Rmpi/doMPI: when you invoke startMPIcluster(), all the slave R processes become 100% active and stay active until the cluster is closed. In addition, when foreach is actually running, the master process also becomes 100% active. So using this functionality involves some inefficiency in CPU usage. This inefficiency is not seen with a sockets cluster (Section 3.1.4) nor when using other Rmpi functionality - i.e., starting slaves with mpi.spawn.Rslaves and then issuing commands to the slaves.

If you specified -np with more than one process then as with the C-based MPI job above, you can control the threading via OMP_NUM_THREADS and the -x flag to mpirun. Note that this only works when the R processes are directly started by mpirun, which they are not if you set -np 1. The maxcores argument to startMPIcluster() does not seem to function (perhaps it does on other systems).

Sidenote: You can use doMPI on a single node, which might be useful for avoiding some of the conflicts between R's forking functionality and openBLAS that can cause R to hang when using foreach with doParallel.

doSNOW

The doSNOW backend has the advantage that it doesn't need to have MPI installed on the system. MPI can be tricky to install and keep working, so this is an easy approach to using foreach across multiple machines.

Simply start R as you usually would.

Here's R code for using doSNOW as the back-end to foreach. Make sure to use the type = "SOCK" argument or doSNOW will actually use MPI behind the scenes.

library(doSNOW)
machines = c(rep("beren.berkeley.edu", 1),
    rep("gandalf.berkeley.edu", 1),
    rep("arwen.berkeley.edu", 2))

cl = makeCluster(machines, type = "SOCK")
cl

registerDoSNOW(cl)

fun = function(i)
  out = mean(rnorm(n))

nTasks <- 120

print(system.time(out <- foreach(i = 1:nTasks) %dopar% {
    outSub <- fun()
    outSub # this will become part of the out object
}))

stopCluster(cl)  # good practice, but not strictly necessary

Loading packages and accessing variables within your parallel tasks

When using foreach with multiple machines, you need to use the .packages argument (or load the package in the code being run in parallel) to load any packages needed in the code. You do not need to explicitly export variables from the master process to the workers. Rather, foreach determines which variables in the global environment of the master process are used in the code being run in parallel and makes copies of those in each worker process. Note that these variables are read-only on the workers and cannot be modified (if you try to do so, you'll notice that foreach actually did not make copies of the variables that your code tries to modify).

3.1.2) Using pbdR

There is a project to enhance R's capability for distributed memory processing called pbdR. For an extensive tutorial, see the pbdDEMO vignette. pbdR is designed for SPMD processing in batch mode, which means that you start up multiple processes in a non-interactive fashion using mpirun. The same code runs in each R process so you need to have the code behavior depend on the process ID.

pbdR provides the following capabilities:

Personally, I think the second of the three is the most exciting as it's a functionality not readily available in R or even more generally in other readily-accessible software.

Let's see parallel-apply style computations in pbdR.

Here's some basic syntax for doing a distributed apply() on a matrix that is on one of the workers. So in this case, the matrix is not initially distributed to the workers – that is done as part of the pbdApply computation. (One can also use pbdApply on matrices that are already distributed, and this is of course recommended for large matrices – see Section 4.)

As mentioned above, pbdR code is always run in batch mode, with the same code running on all of the processes. This means that you often need to explicitly build in logic about which process should execute a given piece of code, including print statements. Here the check for comm.rank() == 0 allows us to only create the matrix and call some print statements on the master node (rank 0).

## you should have invoked R as:
## mpirun -machinefile .hosts -np 4 R CMD BATCH --no-save pbd-apply.R pbd-apply.out
## unless running within a SLURM job, in which case you should do:
## mpirun R CMD BATCH --no-save pbd-apply.R pbd-apply.out

library(pbdMPI, quiet = TRUE )
init()

nrows <- 1e6

if(comm.rank()==0) {
    x <- matrix(rnorm(nrows*50), nrow = nrows)
}

sm <- comm.timer(out <- pbdApply(x, 1, mean, pbd.mode = 'mw', rank.source = 0))
if(comm.rank()==0) {
    print(out[1:5])
    print(sm)
}

finalize()
mpirun -machinefile .hosts -np 4 Rscript pbd-apply.R > pbd-apply.out
cat pbd-apply.out
## [1]  0.17686351 -0.12216986  0.04345966 -0.06581673  0.07439472
##      min     mean      max 
##  7.81000 12.99175 14.74900

In this case it's a fair amount slower to parallelize the calculation than just to do it in R using rowSums(), because of the overhead of communication (including passing the data) with the workers.

3.1.3) Using parallel apply functionality in Rmpi

Rmpi is a package that provides MPI capabilities from R, including low-level MPI type calls (see Section 5). It also provides high-level wrapper functions that use MPI behind the scenes, including parallel apply functionality for operating on lists (and vectors) with functions such as mpi.parSapply.

The documentation (see help(mpi.parSapply)) documents a number of confusingly-named functions. It appears that they are basically multi-node versions of the analogous parSapply and related functions.

## you should have invoked R as:
## mpirun -machinefile .hosts -np 1 R CMD BATCH --no-save mpi.parSapply.R mpi.parSapply.out
## unless running within a SLURM job, in which case you should do:
## mpirun R CMD BATCH --no-save mpi.parSapply.R mpi.parSapply.out

library(Rmpi)
## on my system, this fails unless explicitly
## ask for one fewer slave than total number of slots across hosts
mpi.spawn.Rslaves(nslaves = mpi.universe.size()-1)

myfun <- function(i) {
      set.seed(i)
      mean(rnorm(1e7))
}

x <- seq_len(25)
# parallel sapply-type calculations on a vector 
system.time(out <- mpi.parSapply(x, myfun))
system.time(out <- mpi.applyLB(x, myfun))

nrows <- 10000
x <- matrix(rnorm(nrows*50), nrow = nrows)
# parallel apply on a matrix
out <- mpi.parApply(x, 1, mean)

mpi.close.Rslaves()
mpi.quit()
mpirun -machinefile .hosts -np 1 R CMD BATCH -q --no-save mpi.parSapply.R mpi.parSapply.out
cat mpi.parSapply.out
## > ## @knitr mpi.parSapply
## > 
## > ## you should have invoked R as:
## > ## mpirun -machinefile .hosts -np 1 R CMD BATCH --no-save mpi.parSapply.R mpi.parSapply.out
## > ## unless running within a SLURM job, in which case you should do:
## > ## mpirun R CMD BATCH --no-save mpi.parSapply.R mpi.parSapply.out
## > 
## > library(Rmpi)
## > ## on my system, this fails unless explicitly
## > ## ask for one fewer slave than total number of slots across hosts
## > mpi.spawn.Rslaves(nslaves = mpi.universe.size()-1)
##  3 slaves are spawned successfully. 0 failed.
## master (rank 0, comm 1) of size 4 is running on: smeagol 
## slave1 (rank 1, comm 1) of size 4 is running on: radagast 
## slave2 (rank 2, comm 1) of size 4 is running on: arwen 
## slave3 (rank 3, comm 1) of size 4 is running on: arwen 
## > 
## > myfun <- function(i) {
## +       set.seed(i)
## +       mean(rnorm(1e7))
## + }
## > 
## > x <- seq_len(25)
## > # parallel sapply-type calculations on a vector 
## > system.time(out <- mpi.parSapply(x, myfun))
##    user  system elapsed 
##   5.612   8.032  13.644 
## > system.time(out <- mpi.applyLB(x, myfun))
##    user  system elapsed 
##   5.400   7.044  12.441 
## > 
## > nrows <- 10000
## > x <- matrix(rnorm(nrows*50), nrow = nrows)
## > # parallel apply on a matrix
## > out <- mpi.parApply(x, 1, mean)
## > 
## > mpi.close.Rslaves()
## [1] 1
## > mpi.quit()

In some cases, it may be useful to specify job.num when the number of tasks is bigger than the number of worker processes to ensure load-balancing.

3.1.4) Using sockets

One can also set up a cluster with the worker processes communicating via sockets. You just need to specify a character vector with the machine names as the input to makeCluster(). A nice thing about this is that it doesn't involve any of the complications of working with needing MPI installed.

library(parallel)

machines = c(rep("beren.berkeley.edu", 1),
    rep("gandalf.berkeley.edu", 1),
    rep("arwen.berkeley.edu", 2))
cl = makeCluster(machines)
cl
## socket cluster with 4 nodes on hosts 'beren.berkeley.edu', 'gandalf.berkeley.edu', 'arwen.berkeley.edu'
n = 1e7
clusterExport(cl, c('n'))

fun = function(i)
  out = mean(rnorm(n))

result <- parSapply(cl, 1:20, fun)

result[1:5]
## [1]  1.431600e-04  6.146156e-04 -8.718859e-05  8.976951e-05  1.152365e-04
stopCluster(cl) # not strictly necessary

Note the use of clusterExport, needed to make variables in the master process available to the workers; this involves making a copy of each variable for each worker process. You'd also need to load any packages used in the code being run in parallel in that code.

3.1.5) The partools package

partools is a somewhat new package developed by Norm Matloff at UC-Davis. He has the perspective that Spark/Hadoop are not the right tools in many cases when doing statistics-related work and has developed some simple tools for parallelizing computation across multiple nodes, also referred to as Snowdoop. The tools make use of the key idea in Hadoop of a distributed file system and distributed data objects but avoid the complications of trying to ensure fault tolerance, which is critical only on very large clusters of machines.

I won't go into details, but partools allows you to split up your data across multiple nodes and then read the data into R in parallel across R sessions running on those nodes, all controlled from a single master R session. You can then do operations on the subsets and gather results back to the master session as needed. One point that confused me in the partools vignette is that it shows how to split up a dataset that you can read into your R session, but it's not clear what one does if the dataset is too big to read into a single R session.

3.2) Python

3.2.1) IPython parallel

One can use IPython's parallelization tools in a context with multiple nodes, though the setup to get the worker processes is a bit more involved when you have multiple nodes. For details on using IPython parallel on a single node, see the parallel basics tutorial appendix.

If we are using the SLURM scheduling software, here's how we start up the worker processes:

ipcontroller --ip='*' &
sleep 25
# next line will start as many ipengines as we have SLURM tasks 
#   because srun is a SLURM command
srun ipengine &  
sleep 45  # wait until all engines have successfully started

We can then run IPython to split up our computational tasks across the engines.

import numpy as np
np.random.seed(0)
n = 500
p = 50
X = np.random.normal(0, 1, size = (n, p))
Y = X[: , 0] + pow(abs(X[:,1] * X[:,2]), 0.5) + X[:,1] - X[:,2] + np.random.normal(0, 1, n)

def looFit(index, Ylocal, Xlocal):
    rf = rfr(n_estimators=100)
    fitted = rf.fit(np.delete(Xlocal, index, axis = 0), np.delete(Ylocal, index))
    pred = rf.predict(np.array([Xlocal[index, :]]))
    return(pred[0])

from ipyparallel import Client
c = Client()
c.ids

dview = c[:]
dview.block = True
dview.apply(lambda : "Hello, World")

lview = c.load_balanced_view()
lview.block = True

dview.execute('from sklearn.ensemble import RandomForestRegressor as rfr')
dview.execute('import numpy as np')
mydict = dict(X = X, Y = Y, looFit = looFit)
dview.push(mydict)

nSub = 50  # for illustration only do a subset

# need a wrapper function because map() only operates on one argument
def wrapper(i):
    return(looFit(i, Y, X))

import time
time.time()
pred = lview.map(wrapper, range(nSub))
time.time()

print(pred[0:10])

# import pylab
# import matplotlib.pyplot as plt
# plt.plot(Y, pred, '.')
# pylab.show()

To finish up, we need to shut down the cluster of workers:

ipcluster stop

To start the engines in a context outside of using slurm (provided all machines share a filesystem), you should be able ssh to each machine and run ipengine & for as many worker processes as you want to start as follows. In some, but not all cases (depending on how the network is set up) you may not need the --location flag.

ipcontroller --ip='*' --location=URL_OF_THIS_MACHINE &
sleep 25
nengines=8
ssh other_host "for (( i = 0; i < ${nengines}; i++ )); do ipengine & done"
sleep 45  # wait until all engines have successfully started

3.2.2) pp package

Another way to parallelize across multiple nodes that uses more manual setup and doesn't integrate as well with scheduling software like SLURM is to use the pp package (also useful for parallelizing on a single machine as discussed in the parallel basics tutorial appendix.

Assuming that the pp package is installed on each node (e.g., sudo apt-get install python-pp on an Ubuntu machine), you need to start up a ppserver process on each node. E.g., if $nodes is a UNIX environment variable containing the names of the worker nodes and you want to start 2 workers per node:

nodes='smeagol radagast beren arwen'
for node in $nodes; do
# cd /tmp is because of issue with starting ppserver in home directory
# -w says how many workers to start on the node
    ssh $node "cd /tmp && ppserver -s mysecretphrase -t 120 -w 2 &" & 
done

Now in our Python code we create a server object and submit jobs to the server object, which manages the farming out of the tasks. Note that this will run interactively in IPython or as a script from UNIX, but there have been times where I was not able to run it interactively in the base Python interpreter. Also note that while we are illustrating this as basically another parallelized for loop, the individual jobs can be whatever calculations you want, so the function (in this case it's always pi.sample) could change from job to job.

import numpy.random
import pp
import time
import pi_code # provided in pi_code.py

samples_per_slice = 10000000
num_slices = 24*20

# remember to start ppserver on worker nodes

# assume 'hosts' contains the names of the nodes on which you 
# started ppserver
nprocsPerNode = 2
hosts = ['smeagol', 'radagast', 'beren', 'arwen']
ppservers = hosts * nprocsPerNode

print ppservers
# put ncpus=0 here or it will start workers locally too
job_server = pp.Server(ncpus = 0, ppservers = tuple(ppservers), secret = 'mysecretphrase')

inputs = [(i, samples_per_slice) for i in xrange(num_slices)]

t0 = time.time()
jobs = [job_server.submit(pi_code.sample, invalue, modules = ('numpy.random',)) for invalue in inputs]
results = [job() for job in jobs]
t1 = time.time()

print "Pi is roughly %f" % (4.0 * sum(results) / (num_slices*samples_per_slice))
print "Time elapsed: ", t1 - t0
python python-pp.py > python-pp.out
cat python-pp.out
['smeagol', 'radagast', 'beren', 'arwen', 'smeagol', 'radagast', 'beren', 'arwen']
Pi is roughly 3.141567
Time elapsed:  32.0389587879

The -t flag used when starting ppserver should ensure that the server processes are removed, but if you need to do it manually, this should work:

for node in $nodes; do
    killall ppserver
done

3.3) Matlab

To use Matlab across multiple nodes, you need to have the Matlab Distributed Computing Server (DCS). If it is installed, one can set up Matlab so that parfor will distribute its work across multiple nodes. Details may vary depending on how DCS is installed on your system.

4) Distributed linear algebra in R using pbdR

4.1) Distributed linear algebra example

And here's how you would set up a distributed matrix and do linear algebra on it. Note that when working with large matrices, you would generally want to construct the matrices (or read from disk) in a parallel fashion rather than creating the full matrix on one worker. For simplicity in the example, I construct the matrix, x, on the master and then create the distributed version of the matrix, dx, with as.ddmatrix.

Here's the code in pbd-linalg.R.

library(pbdDMAT, quiet = TRUE )

n <- 4096*2

# if you are putting multiple processes on node
# you may want to prevent threading of the linear algebra:
# library(RhpcBLASctl)
# blas_set_num_threads(1)
# (or do by passing OMP_NUM_THREADS to mpirun)

init.grid()

if(comm.rank()==0) print(date())

# pbd allows for parallel I/O, but here
# we keep things simple and distribute
# an object from one process
if(comm.rank() == 0) {
    x <- rnorm(n^2)
    dim(x) <- c(n, n)
} else x <- NULL
dx <- as.ddmatrix(x)

timing <- comm.timer(sigma <- crossprod(dx))

if(comm.rank()==0) {
    print(date())
    print(timing)
}

timing <- comm.timer(out <- chol(sigma))

if(comm.rank()==0) {
    print(date())
    print(timing)
}

finalize()

As before we run the job in batch mode via mpirun:

export OMP_NUM_THREADS=1
mpirun -machinefile .hosts -np 4 -x OMP_NUM_THREADS Rscript pbd-linalg.R > pbd-linalg.out
cat pbd-linalg.out
## Using 2x2 for the default grid size
## 
## [1] "Sat Oct 17 12:05:38 2015"
## [1] "Sat Oct 17 12:06:54 2015"
##    min   mean    max 
## 48.086 50.806 52.585 
## [1] "Sat Oct 17 12:08:10 2015"
##      min     mean      max 
## 76.47000 76.51125 76.53300

You may want to set the bldim argument to as.ddmatrix. That determines the size of the submatrices (aka 'blocks') into which the overall matrix is split. Generally, multiple submatrices are owned by an individual worker process. For example, to use 100x100 blocks, you'd have

dx <- as.ddmatrix(x, bldim = c(100, 100))

In general, you don't want the blocks too big as the work may not be well load-balanced, or too small as that may have a higher computational cost in terms of latency and communication. My experiments suggest that it's worth exploring block sizes of 10x10 through 1000x1000 (if you have square matrices).

As a quick, completely non-definitive point of comparison, doing the crossproduct and Cholesky for the 8192x8192 matrix on 3 EC2 nodes (2 cores per node) with -np 6 took 39 seconds for each operation, while doing with two threads on the master node took 64 seconds (crossproduct) and 23 seconds (Cholesky). While that is a single test, some other experiments I've done also haven't show much speedup in using multiple nodes with pbdR compared to simply using a threaded BLAS on one machine. So you may need to get fairly big matrices that won't fit in memory on a single machine before it's worthwhile to do the computation in distributed fashion using pbdR.

4.2) Constructing a distributed matrix on parallel

pbdR has functionality for reading in parallel from a parallel file system such as Lustre (available on Berkeley's Savio cluster). Things are bit more complicated if that's not the case. Here's some code that illustrates how to construct a distributed matrix from constituent column blocks. First create a distributed version of the matrix using a standard R matrix with each process owning a block of columns (I haven't yet gotten the syntax to work for blocks of rows). Then create a pbd version of that distributed matrix and finally convert the distributed matrix to a standard pbd block structure on which the linear algebra can be done efficiently.

library(pbdDMAT, quiet = TRUE)
init.grid()

nprocs <- comm.size()

nrows <- 10000
ncolsPerBlock <- nrows/nprocs

# each process has a block of columns as an R matrix
subdata <- matrix(rnorm(nrows * ncolsPerBlock), ncol = ncols)

# now construct the distributed matrix object
tmp <- ddmatrix(data = subdata, nrow = nrows, ncol = nrows,
               bldim = c(nrows, ncolsPerBlock), ICTXT = 1)
# now rearrange the blocks for better linear algebra performance
dx <- redistribute(tmp, bldim = c(100, 100), ICTXT = 0)

finalize ()

The code above creates the submatrices within the R sessions, but one could also read in from separate files, one per process.

The code in redistribute-test.R demonstrates that constructing the full matrix from column-wise blocks with this syntax works correctly.

5) MPI

5.1) MPI Overview

There are multiple MPI implementations, of which openMPI and mpich are very common. openMPI is quite common, and we'll use that.

In MPI programming, the same code runs on all the machines. This is called SPMD (single program, multiple data). As we saw a bit with the pbdR code, one invokes the same code (same program) multiple times, but the behavior of the code can be different based on querying the rank (ID) of the process. Since MPI operates in a distributed fashion, any transfer of information between processes must be done explicitly via send and receive calls (e.g., MPI_Send, MPI_Recv, MPI_Isend, and MPI_Irecv). (The “MPI_'' is for C code; C++ just has Send, Recv, etc.)

The latter two of these functions (MPI_Isend and MPI_Irecv) are so-called non-blocking calls. One important concept to understand is the difference between blocking and non-blocking calls. Blocking calls wait until the call finishes, while non-blocking calls return and allow the code to continue. Non-blocking calls can be more efficient, but can lead to problems with synchronization between processes.

In addition to send and receive calls to transfer to and from specific processes, there are calls that send out data to all processes (MPI_Scatter), gather data back (MPI_Gather) and perform reduction operations (MPI_Reduce).

Debugging MPI code can be tricky because communication can hang, error messages from the workers may not be seen or readily accessible, and it can be difficult to assess the state of the worker processes.

5.2) Basic syntax for MPI in C

Here's a basic hello world example The code is also in mpiHello.c.

// see mpiHello.c
#include <stdio.h> 
#include <math.h> 
#include <mpi.h>

int main(int argc, char* argv) {     
    int myrank, nprocs, namelen;     
    char process_name[MPI_MAX_PROCESSOR_NAME];
    MPI_Init(&argc, &argv);     
    MPI_Comm_size(MPI_COMM_WORLD, &nprocs);   
    MPI_Comm_rank(MPI_COMM_WORLD, &myrank);          
    MPI_Get_processor_name(process_name, &namelen);            
    printf("Hello from process %d of %d on %s\n", 
        myrank, nprocs, process_name);
    MPI_Finalize();     
    return 0; 
} 

There are C (mpicc) and C++ (mpic++) compilers for MPI programs (mpicxx and mpiCC are synonyms). I'll use the MPI C++ compiler even though the code is all plain C code.

mpicxx mpiHello.c -o mpiHello
cat .hosts # what hosts do I expect it to run on?
mpirun -machinefile .hosts -np 4 mpiHello
## smeagol slots=1
## radagast slots=1
## arwen slots=2
## Hello from processor 0 of 4 on smeagol
## Hello from processor 1 of 4 on radagast
## Hello from processor 2 of 4 on arwen
## Hello from processor 3 of 4 on arwen

To actually write real MPI code, you'll need to go learn some of the MPI syntax. See quad_mpi.c and quad_mpi.cpp, which are example C and C++ programs (for approximating an integral via quadrature) that show some of the basic MPI functions. Compilation and running are as above:

mpicxx quad_mpi.cpp -o quad_mpi
mpirun -machinefile .hosts -np 4 quad_mpi
## 27 September 2017 06:52:43 PM
## 
## QUAD_MPI
##   C++/MPI version
##   Estimate an integral of f(x) from A to B.
##   f(x) = 50 / (pi * ( 2500 * x * x + 1 ) )
## 
##   A = 0
##   B = 10
##   N = 999999999
##   EXACT =       0.4993633810764567
## 
##   Use MPI to divide the computation among 4 total processes,
##   of which one is the master and does not do core computations.
##   Process 1 contributed MY_TOTAL = 0.49809
##   Process 3 contributed MY_TOTAL = 0.000318308
## 
##   Estimate =       0.4993634591634721
##   Error = 7.808701535383378e-08
##   Time = 10.03146505355835
##   Process 2 contributed MY_TOTAL = 0.00095491
## 
## QUAD_MPI:
##   Normal end of execution.
## 
## 27 September 2017 06:52:53 PM

5.3) Using MPI from R via Rmpi or pbdR

5.3.1) Rmpi

R users can use Rmpi to interface with MPI.

Here's some example code that uses actual Rmpi syntax (as opposed to foreach with Rmpi as the back-end, where the use of Rmpi was hidden from us). The syntax is very similar to the MPI C syntax we've already seen. This code runs in a master-slave paradigm where the master starts the slaves and invokes commands on them. It may be possible to run Rmpi in a context where each process runs the same code based on invoking with Rmpi, but I haven't investigated this further.

# example syntax of standard MPI functions

library(Rmpi)
## by default this should start one fewer workers than processes
## saving one for the master
## but on my system, this fails unless explicitly
## ask for one fewer slave than total number of slots across hosts
mpi.spawn.Rslaves(nslaves = mpi.universe.size()-1)

n = 5
mpi.bcast.Robj2slave(n)
mpi.bcast.cmd(id <- mpi.comm.rank())
mpi.bcast.cmd(x <- rnorm(id))

mpi.remote.exec(ls(.GlobalEnv))

mpi.bcast.cmd(y <- 2 * x)
mpi.remote.exec(print(y))

objs <- as.list(c('x', 'n'))
# next command sends value of objs on _master_ as argument to rm
mpi.remote.exec(do.call, rm, objs)

# verify that 'n' is gone:
mpi.remote.exec(print(n))

# collect results back via send/recv
mpi.remote.exec(mpi.send.Robj(y, dest = 0, tag = 1))
results = list()
for(i in 1:(mpi.comm.size()-1)){
  results[[i]] = mpi.recv.Robj(source = i, tag = 1)
}

print(results)

mpi.close.Rslaves()
mpi.quit()

mpi.bcast.cmd and mpi.remote.exec are quite similar - they execute a function on the workers and can also use arguments on the master as inputs to the function evaluated on the workers (see the … argument). mpi.remote.exec can return the results of the execution to the master.

As before, we would start R via mpirun, requesting one process, since the workers are started within R via mpi.spawn.Rslaves.

mpirun -machinefile .hosts -np 1 R CMD BATCH -q --no-save Rmpi.R Rmpi.out
cat Rmpi.out
## > ## @knitr Rmpi
## > 
## > # example syntax of standard MPI functions
## > 
## > library(Rmpi)
## > ## by default this should start one fewer workers than processes
## > ## saving one for the master
## > ## but on my system, this fails unless explicitly
## > ## ask for one fewer slave than total number of slots across hosts
## > mpi.spawn.Rslaves(nslaves = mpi.universe.size()-1)
##  3 slaves are spawned successfully. 0 failed.
## master (rank 0, comm 1) of size 4 is running on: smeagol 
## slave1 (rank 1, comm 1) of size 4 is running on: radagast 
## slave2 (rank 2, comm 1) of size 4 is running on: arwen 
## slave3 (rank 3, comm 1) of size 4 is running on: arwen 
## > 
## > n = 5
## > mpi.bcast.Robj2slave(n)
## > mpi.bcast.cmd(id <- mpi.comm.rank())
## > mpi.bcast.cmd(x <- rnorm(id))
## > 
## > mpi.remote.exec(ls(.GlobalEnv))
## $slave1
## [1] "id" "n"  "x" 
## 
## $slave2
## [1] "id" "n"  "x" 
## 
## $slave3
## [1] "id" "n"  "x" 
## 
## > 
## > mpi.bcast.cmd(y <- 2 * x)
## > mpi.remote.exec(print(y))
## $slave1
## [1] -1.358116
## 
## $slave2
## [1] -0.8357533 -0.3354547
## 
## $slave3
## [1] -2.750742  1.510654 -1.412469
## 
## > 
## > objs <- as.list(c('x', 'n'))
## > # next command sends value of objs on _master_ as argument to rm
## > mpi.remote.exec(do.call, rm, objs)
## $slave3
## [1] 0
## 
## > 
## > # verify that 'n' is gone:
## > mpi.remote.exec(print(n))
## $slave1
## [1] "Error in print(n) : object 'n' not found\n"
## attr(,"class")
## [1] "try-error"
## attr(,"condition")
## <simpleError in print(n): object 'n' not found>
## 
## $slave2
## [1] "Error in print(n) : object 'n' not found\n"
## attr(,"class")
## [1] "try-error"
## attr(,"condition")
## <simpleError in print(n): object 'n' not found>
## 
## $slave3
## [1] "Error in print(n) : object 'n' not found\n"
## attr(,"class")
## [1] "try-error"
## attr(,"condition")
## <simpleError in print(n): object 'n' not found>
## 
## > 
## > # collect results back via send/recv
## > mpi.remote.exec(mpi.send.Robj(y, dest = 0, tag = 1))
## $slave1
##          used (Mb) gc trigger (Mb) max used (Mb)
## Ncells 327615 17.5     592000 31.7   550748 29.5
## Vcells 600412  4.6    1308461 10.0   786430  6.0
## 
## $slave2
##          used (Mb) gc trigger (Mb) max used (Mb)
## Ncells 327615 17.5     592000 31.7   550748 29.5
## Vcells 600412  4.6    1308461 10.0   786430  6.0
## 
## $slave3
##          used (Mb) gc trigger (Mb) max used (Mb)
## Ncells 327615 17.5     592000 31.7   550710 29.5
## Vcells 600414  4.6    1308461 10.0   786430  6.0
## 
## > results = list()
## > for(i in 1:(mpi.comm.size()-1)){
## +   results[[i]] = mpi.recv.Robj(source = i, tag = 1)
## + }
## >   
## > print(results)
## [[1]]
## [1] -1.358116
## 
## [[2]]
## [1] -0.8357533 -0.3354547
## 
## [[3]]
## [1] -2.750742  1.510654 -1.412469
## 
## > 
## > mpi.close.Rslaves()
## [1] 1
## > mpi.quit()

Note that if you do this in interactive mode, some of the usual functionality of command line R (tab completion, scrolling for history) is not enabled and errors will cause R to quit. This occurs because passing things through mpirun causes R to think it is not running interactively.

Note: in some cases a cluster/supercomputer will be set up so that Rmpi is loaded and the worker processes are already started when you start R. In this case you wouldn't need to load Rmpi or use mpi.spawn.Rslaves. You can always run mpi.comm.size() to see how many workers are running.

5.3.2) pbdMPI in pbdR

Here's an example of distributing an embarrassingly parallel calculation (estimating an integral via Monte Carlo - in this case estimating the value of pi).

library(pbdMPI, quiet = TRUE )
init()

myRank <- comm.rank() # comm index starts at 0 , not 1
comm.print(myRank , all.rank=TRUE)
node <- system("cat /etc/hostname", intern = TRUE) # Sys.getenv("HOSTNAME")
if(myRank == 0) {
    comm.print(paste0("hello, world from ", myRank, " ", node), all.rank=TRUE)
} else comm.print(paste0("goodbye from ", myRank, " ", node), all.rank=TRUE)

if(comm.rank() == 0) print(date())
set.seed(myRank)  # see parallel basics tutorial for more on parallel random number generation
N.gbd <- 1e7
X.gbd <- matrix(runif(N.gbd * 2), ncol = 2)
r.gbd <- sum(rowSums(X.gbd^2) <= 1)
ret <- allreduce(c(N.gbd,r.gbd), op = "sum")
PI <- 4 * ret [2] / ret [1]
comm.print(paste0("Pi is roughly: ", PI))
if(comm.rank() == 0) print(date())

finalize()
mpirun -machinefile .hosts -np 4 Rscript pbd-mpi.R > pbd-mpi.out
cat pbd-mpi.out
## COMM.RANK = 2
## [1] 2
## COMM.RANK = 3
## [1] 3
## COMM.RANK = 1
## [1] 1
## COMM.RANK = 0
## [1] 0
## COMM.RANK = 0
## [1] "hello, world from 0 scf-sm10"
## [1] "Sat Oct 17 12:21:28 2015"
## COMM.RANK = 3
## [1] "goodbye from 3 scf-sm11"
## COMM.RANK = 1
## [1] "goodbye from 1 scf-sm10"
## COMM.RANK = 2
## [1] "goodbye from 2 scf-sm11"
## COMM.RANK = 0
## [1] "Pi is roughly: 3.1421032"
## [1] "Sat Oct 17 12:21:31 2015"

5.4) Using MPI from Python via mpi4py

Here's some basic use of MPI within Python.

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD

# simple print out Rank & Size
id = comm.Get_rank()
print("Of ", comm.Get_size() , " workers, I am number " , id, ".")

def f(id, n):
    np.random.seed(id)
    return(np.mean(np.random.normal(0, 1, n)))

n = 1000000
result = f(id, n)


output = comm.gather(result, root = 0)

if id == 0:
    print(output)

To run the code, we start Python through the mpirun command as done previously.

mpirun -machinefile .hosts -np 4 python example-mpi.py 
## Of  4  workers, I am number  0 .
## Of  4  workers, I am number  3 .
## Of  4  workers, I am number  2 .
## Of  4  workers, I am number  1 .
## [0.0015121465155362318, 0.00065180430801923422, -0.000977212317921356, 0.001958404534987673]

More generally, you can send, receive, broadcast, gather, etc. as with MPI itself.

mpi4py generally does not work interactively.

6) Parallelization strategies

The following are some basic principles/suggestions for how to parallelize your computation.

Should I use one machine/node or many machines/nodes?

What level or dimension should I parallelize over?

How do I balance communication overhead with keeping my cores busy?

Should multiple tasks be pre-assigned to a process (i.e., a worker) (sometimes called prescheduling) or should tasks be assigned dynamically as previous tasks finish?