Chris Paciorek, Department of Statistics, UC Berkeley
This tutorial covers basic strategies for using parallel processing in R, Python, Matlab, and C on single machines and multiple machines.
This tutorial assumes you have a working knowledge of either R, Python, Matlab, or C.
Materials for this tutorial, including the R markdown file and associated code files that were used to create this document are available on Github at https://github.com/berkeley-scf/parallel-scf-2015. You can download the files by doing a git clone from a terminal window on a UNIX-like machine, as follows:
git clone https://github.com/berkeley-scf/parallel-scf-2015
To create this HTML document, simply compile the corresponding R Markdown file in R as follows.
Rscript -e "library(knitr); knit2html('parallel.Rmd')"
This workshop material by Christopher Paciorek is licensed under a Creative Commons Attribution 3.0 Unported License.
This workshop will draw heavily on already-prepared SCF material. We'll move back and forth between this material and the following tutorials and instructions:
Additional information on:
First, let's see some terms in Section 1.1 of the shared memory tutorial.
threaded computations
multi-core computations using multiple processes
A fast BLAS (Basic Linear Algebra Subroutines) package can make a huge difference in terms of computational time for linear algebra involving large matrices/vectors. More information can be found in Section 2 of the shared memory tutorial
high-level tools that hide the inter-machine communication
direct MPI coding
distributed computations with a distributed filesystem (HDFS)
SCF
EML
Biostatistics
Savio (SLURM queueing)
Amazon EC2 and other cloud providers
Key ingredients:
qsub
from any of the stand-alone SCF (or EML) Linux servers (e.g., arwen, beren, gandalf).As a demonstration, we'll run and monitor an R job that uses threaded linear algebra. We'll
# cat job.sh
# cat sim.R
# submit:
qsub -pe smp 8 job.sh
# monitor:
qstat
# check on CPU/memory use:
qrsh -q interactive.q -l hostname=scf-smXX
top
Here are simple job script examples. To use these, you'd simply put the appropriate line of code for your particular language in a file, say job.sh.
# R
R CMD BATCH --no-save sim.R sim.Rout
# Python
python sim.py > sim.out
# Matlab
matlab -nodisplay -nodesktop -singleCompThread < sim.m > sim.out
To submit the job:
# regular queue
qsub job.sh
# high-priority queue
qsub -q high.q job.sh
# regular queue, longer than 3 days
qsub -l h_rt=672:00:00 job.sh
# high queue, longer than 3 days
qsub -l h_rt=168:00:00 job.sh
Have your script (say job.sh) use the appropriate lines from among the following examples.
### R
export OMP_NUM_THREADS=${NSLOTS}
R CMD BATCH --no-save sim.R sim.Rout
### Python
export OMP_NUM_THREADS=${NSLOTS}
python sim.py > sim.out
### Matlab
matlab -nodisplay -nodesktop < sim.m > sim.out
# include this line at the top of your MATLAB code:
# feature('numThreads', str2num(getenv('NSLOTS')))
### C/C++ with OpenMP
# compile with: g++ -fopenmp test.cpp -o test
export OMP_NUM_THREADS=${NSLOTS}
./test > test.out
To submit the job, decide on the number of cores you want to use (8 in this case) and add -pe smp 8
to the qsub commands shown above. For example,
# regular queue
qsub -pe smp 8 job.sh
Have your script (say job.sh) use the appropriate lines from among the following examples.
# R
R CMD BATCH --no-save sim.R sim.Rout
# Python
python sim.py > sim.out
# Matlab
matlab -nodisplay -nodesktop -singleCompThread < sim.m > sim.out
Your R/Python/Matlab code should use the NSLOTS environment variable when determining the number of parallel processes to run. For example
### R
nCores <- as.numeric(Sys.getenv('NSLOTS'))
### Python
nCores = int(os.environ['NSLOTS'])
### Matlab
pl = parpool(str2num(getenv('NSLOTS')));
# see cluster webpage if you want to use more than 12 processes
Then for R and Python use nCores
in the code that sets up the parallel processing.
To submit the job, decide on the number of cores you want to use (8 in this case) and add -pe smp 8
to the qsub command. For example,
# regular queue
qsub -pe smp 8 job.sh
Here's an example job script, assuming that example-mpi.py uses the mpi4py package. You can see the Python code in the distributed memory tutorial. The file ${TMPDIR}/machines is created by the queueing software for your particular job submission and will contain the list of nodes allocated for the job.
mpirun -machinefile ${TMPDIR}/machines -np ${NSLOTS} python example-mpi.py
Here's how you would submit the job, requesting 36 cores:
qsub -pe mpi 36 job.sh
Please see the SCF (or EML) cluster instructions for more details on submitting such jobs using the mpi or dcs parallel environment.
You can work interactively by simply submitting an interactive job request as qrsh -q interactive.q
For threaded, multi-core, and multi-node jobs, you still need to request multiple cores/nodes via the -pe syntax.
Just remember to exit when you are done computing so you don't prevent others from using the cores you requested.
When the cluster is busy, multi-core jobs may wait in the queue with jobs that use fewer cores slipping ahead of them. To partially alleviate this, you can use the -R flag:
# regular queue
qsub -R y -pe smp 8 job.sh
For jobs that do a lot of I/O, it's best to read/write directly from the disk of the node(s) rather than from your home directory on the shared filesystem. Here are the steps in the form of an example job script, assuming the input file is input.csv and the output file that your code creates is output.dat.
cp ~/input.csv /tmp/.
python sim.py > sim.out # your code should read/write from /tmp
cp /tmp/output.dat ~/.
Let's talk through some of the issues, following the material in the Section 4 of the shared memory tutorial.
You can find a more extensive set of examples of parallel functionality in the shared and distributed memory tutorials. Here we'll just see a small number of examples. We'll demonstrate them without using the cluster, simply using one or more of the stand-alone Linux servers.
Here's some linear algebra in Python that should use the default BLAS on the system, which on the SCF is OpenBLAS, which is threaded.
# in bash before starting Python to use 4 threads: export OMP_NUM_THREADS=4
import numpy as np
n = 8000
x = np.random.normal(0, 1, size=(n, n))
x = x.T.dot(x)
U = np.linalg.cholesky(x)
Be careful as not all of the linear algebra calls from numpy and scipy may by default be set up to use a threaded BLAS, even if it is installed on your system.
Here's some basic R code using foreach with the doMPI backend to allow us to parallelize across multiple nodes:
## you should have invoked R as:
## mpirun -machinefile .hosts -np 1 R CMD BATCH --no-save doMPI.R doMPI.Rout
## or for interactive use:
## mpirun -machinefile .hosts -np 1 R --no-save
library(Rmpi)
library(doMPI)
cl <- startMPIcluster() # by default will start one fewer slave
# than elements in .hosts
registerDoMPI(cl)
clusterSize(cl) # just to check
results <- foreach(i = 1:200) %dopar% {
out <- mean(rnorm(1e6))
}
print(results[1:5])
closeCluster(cl)
mpi.quit()
Here's the job script, assuming we are NOT using the cluster:
mpirun -machinefile .hosts -np 1 R CMD BATCH -q --no-save doMPI.R doMPI.out
To do a parallel for loop on a single machine, one modifies the setup as seen in Section 3.1.1 of the shared memory tutorial and simply starts R as normal without using mpirun. But one can use doMPI on a single machine if one likes.
Here's code that uses the pp package to parallelize tasks on a single node. Note that this may not work in an interactive session in base Python but should work interactively in IPython and in batch mode.
import numpy
import pp
nCores = 4
job_server = pp.Server(ncpus = nCores, secret = 'mysecretphrase')
# set 'secret' to some passphrase (you need to set it but
# what it is should not be crucial)
job_server.get_ncpus()
nSmp = 10000000
m = 40
def taskFun(i, n):
numpy.random.seed(i)
return (i, numpy.mean(numpy.random.normal(0, 1, n)))
# create list of tuples to iterate over
inputs = [(i, nSmp) for i in xrange(m)]
# submit and run jobs using list comprehension
jobs = [job_server.submit(taskFun, invalue, modules = ('numpy',)) for invalue in inputs]
# collect results (will have to wait for longer tasks to finish)
results = [job() for job in jobs]
print(results)
job_server.destroy()
You can do the same operations across multiple nodes using the pp package as well. However, you need to do a bit of work to get the worker nodes ready. See Section 3.2 of the distributed memory tutorial for more details.
Section 3.1.2 of the distributed memory tutorial gives an overview of pbdR, a relatively new effort to enhance R's capability for distributed memory processing.
One of pbdR's capabilities is to provide an R interface to ScaLapack, the parallel version of Lapack.
Here's an example of doing some distributed linear algebra.
library(pbdDMAT, quiet = TRUE )
n <- 4096*2
# if you are putting multiple processes on node
# you may want to prevent threading of the linear algebra:
# library(RhpcBLASctl)
# blas_set_num_threads(1)
# (or do by passing OMP_NUM_THREADS to mpirun)
init.grid()
if(comm.rank()==0) print(date())
# pbd allows for parallel I/O, but here
# we keep things simple and distribute
# an object from one process
if(comm.rank() == 0) {
x <- rnorm(n^2)
dim(x) <- c(n, n)
} else x <- NULL
dx <- as.ddmatrix(x)
timing <- comm.timer(sigma <- crossprod(dx))
if(comm.rank()==0) {
print(date())
print(timing)
}
timing <- comm.timer(out <- chol(sigma))
if(comm.rank()==0) {
print(date())
print(timing)
}
finalize()
We run pbdR code via mpirun, starting all of the R processes through mpirun.
export OMP_NUM_THREADS=1
mpirun -machinefile .hosts -x OMP_NUM_THREADS Rscript pbd-linalg.R > pbd-linalg.out
Let's talk through some of the issues, following the material in Section 5 of the shared memory tutorial.