Parallel Processing Workshop, October 2015

Parallel processing resources and tools in Statistics, Biostatistics, and Economics

Chris Paciorek, Department of Statistics, UC Berkeley

0) This Workshop

This tutorial covers basic strategies for using parallel processing in R, Python, Matlab, and C on single machines and multiple machines.

This tutorial assumes you have a working knowledge of either R, Python, Matlab, or C.

Materials for this tutorial, including the R markdown file and associated code files that were used to create this document are available on Github at https://github.com/berkeley-scf/parallel-scf-2015. You can download the files by doing a git clone from a terminal window on a UNIX-like machine, as follows:

git clone https://github.com/berkeley-scf/parallel-scf-2015

To create this HTML document, simply compile the corresponding R Markdown file in R as follows.

Rscript -e "library(knitr); knit2html('parallel.Rmd')"

This workshop material by Christopher Paciorek is licensed under a Creative Commons Attribution 3.0 Unported License.

1) Resources and links

This workshop will draw heavily on already-prepared SCF material. We'll move back and forth between this material and the following tutorials and instructions:

Additional information on:

1) Overview of parallel processing paradigms

First, let's see some terms in Section 1.1 of the shared memory tutorial.

1.1) Shared memory

Threaded linear algebra

A fast BLAS (Basic Linear Algebra Subroutines) package can make a huge difference in terms of computational time for linear algebra involving large matrices/vectors. More information can be found in Section 2 of the shared memory tutorial

1.2) GPUs

1.3) Distributed memory

2) Parallel hardware resources

3) Using the SCF (and EML) clusters

3.1) Job submission overview

Key ingredients:

As a demonstration, we'll run and monitor an R job that uses threaded linear algebra. We'll

# cat job.sh
# cat sim.R
# submit:
qsub -pe smp 8 job.sh
# monitor:
qstat
# check on CPU/memory use:
qrsh -q interactive.q -l hostname=scf-smXX
top

3.2) Submitting non-threaded one-core jobs

Here are simple job script examples. To use these, you'd simply put the appropriate line of code for your particular language in a file, say job.sh.

# R
R CMD BATCH --no-save sim.R sim.Rout
# Python 
python sim.py > sim.out
# Matlab
matlab -nodisplay -nodesktop -singleCompThread < sim.m > sim.out

To submit the job:

# regular queue
qsub job.sh
# high-priority queue
qsub -q high.q job.sh
# regular queue, longer than 3 days
qsub -l h_rt=672:00:00 job.sh
# high queue, longer than 3 days
qsub -l h_rt=168:00:00 job.sh

3.3) Submitting threaded jobs

Have your script (say job.sh) use the appropriate lines from among the following examples.

### R
export OMP_NUM_THREADS=${NSLOTS}
R CMD BATCH --no-save sim.R sim.Rout

### Python 
export OMP_NUM_THREADS=${NSLOTS}
python sim.py > sim.out

### Matlab
matlab -nodisplay -nodesktop < sim.m > sim.out
# include this line at the top of your MATLAB code:
# feature('numThreads', str2num(getenv('NSLOTS')))

### C/C++ with OpenMP
# compile with: g++ -fopenmp test.cpp -o test 
export OMP_NUM_THREADS=${NSLOTS}
./test > test.out

To submit the job, decide on the number of cores you want to use (8 in this case) and add -pe smp 8 to the qsub commands shown above. For example,

# regular queue
qsub -pe smp 8 job.sh

3.4) Submitting multi-core jobs

Have your script (say job.sh) use the appropriate lines from among the following examples.

# R
R CMD BATCH --no-save sim.R sim.Rout
# Python 
python sim.py > sim.out
# Matlab
matlab -nodisplay -nodesktop -singleCompThread < sim.m > sim.out

Your R/Python/Matlab code should use the NSLOTS environment variable when determining the number of parallel processes to run. For example

### R
nCores <- as.numeric(Sys.getenv('NSLOTS'))

### Python
nCores = int(os.environ['NSLOTS'])

### Matlab
pl = parpool(str2num(getenv('NSLOTS')));
# see cluster webpage if you want to use more than 12 processes

Then for R and Python use nCores in the code that sets up the parallel processing.

To submit the job, decide on the number of cores you want to use (8 in this case) and add -pe smp 8 to the qsub command. For example,

# regular queue
qsub -pe smp 8 job.sh

3.5) Submitting jobs for multiple nodes

Here's an example job script, assuming that example-mpi.py uses the mpi4py package. You can see the Python code in the distributed memory tutorial. The file ${TMPDIR}/machines is created by the queueing software for your particular job submission and will contain the list of nodes allocated for the job.

mpirun -machinefile ${TMPDIR}/machines -np ${NSLOTS} python example-mpi.py 

Here's how you would submit the job, requesting 36 cores:

qsub -pe mpi 36 job.sh

Please see the SCF (or EML) cluster instructions for more details on submitting such jobs using the mpi or dcs parallel environment.

3.6) Interactive jobs

You can work interactively by simply submitting an interactive job request as qrsh -q interactive.q

For threaded, multi-core, and multi-node jobs, you still need to request multiple cores/nodes via the -pe syntax.

Just remember to exit when you are done computing so you don't prevent others from using the cores you requested.

3.6) Tips and tricks

3.6.1) Reserving cores when the cluster is busy

When the cluster is busy, multi-core jobs may wait in the queue with jobs that use fewer cores slipping ahead of them. To partially alleviate this, you can use the -R flag:

# regular queue
qsub -R y -pe smp 8 job.sh

3.6.2) Doing input/output to the local disk of the cluster node

For jobs that do a lot of I/O, it's best to read/write directly from the disk of the node(s) rather than from your home directory on the shared filesystem. Here are the steps in the form of an example job script, assuming the input file is input.csv and the output file that your code creates is output.dat.

cp ~/input.csv /tmp/.
python sim.py > sim.out # your code should read/write from /tmp
cp /tmp/output.dat ~/.

4) Strategies and suggestions for how to parallelize your computation.

Let's talk through some of the issues, following the material in the Section 4 of the shared memory tutorial.

5) Some examples of parallel processing in R, Python, and C

You can find a more extensive set of examples of parallel functionality in the shared and distributed memory tutorials. Here we'll just see a small number of examples. We'll demonstrate them without using the cluster, simply using one or more of the stand-alone Linux servers.

5.1) Threaded linear algebra in Python

Here's some linear algebra in Python that should use the default BLAS on the system, which on the SCF is OpenBLAS, which is threaded.

# in bash before starting Python to use 4 threads: export OMP_NUM_THREADS=4
import numpy as np
n = 8000
x = np.random.normal(0, 1, size=(n, n))
x = x.T.dot(x)
U = np.linalg.cholesky(x)

Be careful as not all of the linear algebra calls from numpy and scipy may by default be set up to use a threaded BLAS, even if it is installed on your system.

5.2) Parallel for loops in R on one or more nodes

Here's some basic R code using foreach with the doMPI backend to allow us to parallelize across multiple nodes:

## you should have invoked R as:
## mpirun -machinefile .hosts -np 1 R CMD BATCH --no-save doMPI.R doMPI.Rout
## or for interactive use:
## mpirun -machinefile .hosts -np 1 R --no-save

library(Rmpi)
library(doMPI)

cl <- startMPIcluster()  # by default will start one fewer slave
# than elements in .hosts

registerDoMPI(cl)
clusterSize(cl) # just to check

results <- foreach(i = 1:200) %dopar% {
  out <- mean(rnorm(1e6))
}

print(results[1:5])

closeCluster(cl)

mpi.quit()

Here's the job script, assuming we are NOT using the cluster:

mpirun -machinefile .hosts -np 1 R CMD BATCH -q --no-save doMPI.R doMPI.out

To do a parallel for loop on a single machine, one modifies the setup as seen in Section 3.1.1 of the shared memory tutorial and simply starts R as normal without using mpirun. But one can use doMPI on a single machine if one likes.

5.3) Parallelizing tasks in Python

Here's code that uses the pp package to parallelize tasks on a single node. Note that this may not work in an interactive session in base Python but should work interactively in IPython and in batch mode.

import numpy
import pp

nCores = 4
job_server = pp.Server(ncpus = nCores, secret = 'mysecretphrase')
# set 'secret' to some passphrase (you need to set it but 
#   what it is should not be crucial)
job_server.get_ncpus()

nSmp = 10000000
m = 40
def taskFun(i, n):
    numpy.random.seed(i)
    return (i, numpy.mean(numpy.random.normal(0, 1, n)))

# create list of tuples to iterate over
inputs = [(i, nSmp) for i in xrange(m)]
# submit and run jobs using list comprehension
jobs = [job_server.submit(taskFun, invalue, modules = ('numpy',)) for invalue in inputs]
# collect results (will have to wait for longer tasks to finish)
results = [job() for job in jobs]
print(results)
job_server.destroy()

You can do the same operations across multiple nodes using the pp package as well. However, you need to do a bit of work to get the worker nodes ready. See Section 3.2 of the distributed memory tutorial for more details.

5.4) Parallel linear algebra using ScaLapack via pbdR in R

Section 3.1.2 of the distributed memory tutorial gives an overview of pbdR, a relatively new effort to enhance R's capability for distributed memory processing.

One of pbdR's capabilities is to provide an R interface to ScaLapack, the parallel version of Lapack.

Here's an example of doing some distributed linear algebra.

library(pbdDMAT, quiet = TRUE )

n <- 4096*2

# if you are putting multiple processes on node
# you may want to prevent threading of the linear algebra:
# library(RhpcBLASctl)
# blas_set_num_threads(1)
# (or do by passing OMP_NUM_THREADS to mpirun)

init.grid()

if(comm.rank()==0) print(date())

# pbd allows for parallel I/O, but here
# we keep things simple and distribute
# an object from one process
if(comm.rank() == 0) {
    x <- rnorm(n^2)
    dim(x) <- c(n, n)
} else x <- NULL
dx <- as.ddmatrix(x)

timing <- comm.timer(sigma <- crossprod(dx))

if(comm.rank()==0) {
    print(date())
    print(timing)
}

timing <- comm.timer(out <- chol(sigma))

if(comm.rank()==0) {
    print(date())
    print(timing)
}

finalize()

We run pbdR code via mpirun, starting all of the R processes through mpirun.

export OMP_NUM_THREADS=1
mpirun -machinefile .hosts -x OMP_NUM_THREADS Rscript pbd-linalg.R > pbd-linalg.out

6) Random number generation for parallelized jobs

Let's talk through some of the issues, following the material in Section 5 of the shared memory tutorial.