# Parallel Processing Workshop, October 2015

## Parallel processing resources and tools in Statistics, Biostatistics, and Economics

Chris Paciorek, Department of Statistics, UC Berkeley

# 0) This Workshop

This tutorial covers basic strategies for using parallel processing in R, Python, Matlab, and C on single machines and multiple machines.

This tutorial assumes you have a working knowledge of either R, Python, Matlab, or C.

Materials for this tutorial, including the R markdown file and associated code files that were used to create this document are available on Github at https://github.com/berkeley-scf/parallel-scf-2015. You can download the files by doing a git clone from a terminal window on a UNIX-like machine, as follows:

``````git clone https://github.com/berkeley-scf/parallel-scf-2015
``````

To create this HTML document, simply compile the corresponding R Markdown file in R as follows.

``````Rscript -e "library(knitr); knit2html('parallel.Rmd')"
``````

This workshop will draw heavily on already-prepared SCF material. We'll move back and forth between this material and the following tutorials and instructions:

# 1) Overview of parallel processing paradigms

First, let's see some terms in Section 1.1 of the shared memory tutorial.

## 1.1) Shared memory

• threaded linear algebra (OpenBLAS, MKL, ACML) called from R, Python, Matlab, C/C++
• Matlab functions that are natively threaded
• OpenMP C/C++ code
• multi-core computations using multiple processes

• foreach, parallel apply/sapply/lapply in R
• multiprocessing, pp, and other packages in Python
• Matlab parfor loops
• (Open MP C/C++ code fits here too)

A fast BLAS (Basic Linear Algebra Subroutines) package can make a huge difference in terms of computational time for linear algebra involving large matrices/vectors. More information can be found in Section 2 of the shared memory tutorial

## 1.2) GPUs

• CUDA and openCL code in C/C++
• CPU+GPU linear algebra via Magma in C/C++
• calling CUDA/openCL code from R
• calling CUDA/openCL from Python

## 1.3) Distributed memory

• high-level tools that hide the inter-machine communication

• foreach and parallel apply/lapply/sapply with Rmpi backends in R
• parallel linear algebra using pbdR
• pp package and IPython parallelization in Python
• Matlab parfor loops with DCS (Economics or Savio have licenses)
• direct MPI coding

• MPI code in C/C++
• mpi4py Python package
• Rmpi and pbdR MPI interfaces
• distributed computations with a distributed filesystem (HDFS)

• PySpark (Python)
• Spark (Scala or Java)
• SparkR ( R )

# 2) Parallel hardware resources

• SCF

• main cluster: 8 nodes x 32 cores/node; 256 Gb RAM per node; SGE queueing
• newer cluster: 4 nodes by 12 (24) cores/node: 128 Gb RAM per node, SLURM queueing
• one GPU
• EML

• main cluster: 8 nodes x 32 cores/node; 256 Gb RAM per node; SGE queueing
• Biostatistics

• new cluster: 8 nodes x 24 cores/node; 64 Gb RAM per node; SGE queueing (SLURM/Torque?)
• Savio (SLURM queueing)

• Department-owned nodes
• SCF nodes: 2 nodes x 24 cores/node; 64 Gb RAM per node
• EML nodes: 2 nodes; details to be determined
• Biostat (Mark/Alan) nodes: 8 nodes
• Faculty Compute Allowance
• ~200,000 core-hours per year free per faculty member; can be delegated to grads/postdocs
• 512 Gb RAM nodes available
• nodes with 2 GPUs each available
• Spark available on department nodes or FCA but likely want more than 2 nodes for a job
• Amazon EC2 and other cloud providers

• ability to start virtual machines and virtual clusters

# 3) Using the SCF (and EML) clusters

## 3.1) Job submission overview

Key ingredients:

• 1) R/Python/Matlab/C code that may contain code that operates in parallel
• 2) a job script as a bash script that calls your R/Python/Matlab/C code
• 3) start your job via `qsub` from any of the stand-alone SCF (or EML) Linux servers (e.g., arwen, beren, gandalf).
• Optionally, you might have a wrapper script that submits one or more jobs, e.g., by using a loop in bash shell syntax. See the cluster webpage for example syntax for automating submission of multiple jobs.

As a demonstration, we'll run and monitor an R job that uses threaded linear algebra. We'll

``````# cat job.sh
# cat sim.R
# submit:
qsub -pe smp 8 job.sh
# monitor:
qstat
# check on CPU/memory use:
qrsh -q interactive.q -l hostname=scf-smXX
top
``````

## 3.2) Submitting non-threaded one-core jobs

Here are simple job script examples. To use these, you'd simply put the appropriate line of code for your particular language in a file, say job.sh.

``````# R
R CMD BATCH --no-save sim.R sim.Rout
# Python
python sim.py > sim.out
# Matlab
matlab -nodisplay -nodesktop -singleCompThread < sim.m > sim.out
``````

To submit the job:

``````# regular queue
qsub job.sh
# high-priority queue
qsub -q high.q job.sh
# regular queue, longer than 3 days
qsub -l h_rt=672:00:00 job.sh
# high queue, longer than 3 days
qsub -l h_rt=168:00:00 job.sh
``````

Have your script (say job.sh) use the appropriate lines from among the following examples.

``````### R
R CMD BATCH --no-save sim.R sim.Rout

### Python
python sim.py > sim.out

### Matlab
matlab -nodisplay -nodesktop < sim.m > sim.out
# include this line at the top of your MATLAB code:

### C/C++ with OpenMP
# compile with: g++ -fopenmp test.cpp -o test
./test > test.out
``````

To submit the job, decide on the number of cores you want to use (8 in this case) and add `-pe smp 8` to the qsub commands shown above. For example,

``````# regular queue
qsub -pe smp 8 job.sh
``````

## 3.4) Submitting multi-core jobs

Have your script (say job.sh) use the appropriate lines from among the following examples.

``````# R
R CMD BATCH --no-save sim.R sim.Rout
# Python
python sim.py > sim.out
# Matlab
matlab -nodisplay -nodesktop -singleCompThread < sim.m > sim.out
``````

Your R/Python/Matlab code should use the NSLOTS environment variable when determining the number of parallel processes to run. For example

``````### R
nCores <- as.numeric(Sys.getenv('NSLOTS'))

### Python
nCores = int(os.environ['NSLOTS'])

### Matlab
pl = parpool(str2num(getenv('NSLOTS')));
# see cluster webpage if you want to use more than 12 processes
``````

Then for R and Python use `nCores` in the code that sets up the parallel processing.

To submit the job, decide on the number of cores you want to use (8 in this case) and add `-pe smp 8` to the qsub command. For example,

``````# regular queue
qsub -pe smp 8 job.sh
``````

## 3.5) Submitting jobs for multiple nodes

Here's an example job script, assuming that example-mpi.py uses the mpi4py package. You can see the Python code in the distributed memory tutorial. The file \${TMPDIR}/machines is created by the queueing software for your particular job submission and will contain the list of nodes allocated for the job.

``````mpirun -machinefile \${TMPDIR}/machines -np \${NSLOTS} python example-mpi.py
``````

Here's how you would submit the job, requesting 36 cores:

``````qsub -pe mpi 36 job.sh
``````

Please see the SCF (or EML) cluster instructions for more details on submitting such jobs using the mpi or dcs parallel environment.

## 3.6) Interactive jobs

You can work interactively by simply submitting an interactive job request as `qrsh -q interactive.q`

For threaded, multi-core, and multi-node jobs, you still need to request multiple cores/nodes via the -pe syntax.

Just remember to exit when you are done computing so you don't prevent others from using the cores you requested.

## 3.6) Tips and tricks

### 3.6.1) Reserving cores when the cluster is busy

When the cluster is busy, multi-core jobs may wait in the queue with jobs that use fewer cores slipping ahead of them. To partially alleviate this, you can use the -R flag:

``````# regular queue
qsub -R y -pe smp 8 job.sh
``````

### 3.6.2) Doing input/output to the local disk of the cluster node

For jobs that do a lot of I/O, it's best to read/write directly from the disk of the node(s) rather than from your home directory on the shared filesystem. Here are the steps in the form of an example job script, assuming the input file is input.csv and the output file that your code creates is output.dat.

``````cp ~/input.csv /tmp/.
cp /tmp/output.dat ~/.
``````

# 4) Strategies and suggestions for how to parallelize your computation.

Let's talk through some of the issues, following the material in the Section 4 of the shared memory tutorial.

# 5) Some examples of parallel processing in R, Python, and C

You can find a more extensive set of examples of parallel functionality in the shared and distributed memory tutorials. Here we'll just see a small number of examples. We'll demonstrate them without using the cluster, simply using one or more of the stand-alone Linux servers.

## 5.1) Threaded linear algebra in Python

Here's some linear algebra in Python that should use the default BLAS on the system, which on the SCF is OpenBLAS, which is threaded.

``````# in bash before starting Python to use 4 threads: export OMP_NUM_THREADS=4
import numpy as np
n = 8000
x = np.random.normal(0, 1, size=(n, n))
x = x.T.dot(x)
U = np.linalg.cholesky(x)
``````

Be careful as not all of the linear algebra calls from numpy and scipy may by default be set up to use a threaded BLAS, even if it is installed on your system.

## 5.2) Parallel for loops in R on one or more nodes

Here's some basic R code using foreach with the doMPI backend to allow us to parallelize across multiple nodes:

``````## you should have invoked R as:
## mpirun -machinefile .hosts -np 1 R CMD BATCH --no-save doMPI.R doMPI.Rout
## or for interactive use:
## mpirun -machinefile .hosts -np 1 R --no-save

library(Rmpi)
library(doMPI)

cl <- startMPIcluster()  # by default will start one fewer slave
# than elements in .hosts

registerDoMPI(cl)
clusterSize(cl) # just to check

results <- foreach(i = 1:200) %dopar% {
out <- mean(rnorm(1e6))
}

print(results[1:5])

closeCluster(cl)

mpi.quit()
``````

Here's the job script, assuming we are NOT using the cluster:

``````mpirun -machinefile .hosts -np 1 R CMD BATCH -q --no-save doMPI.R doMPI.out
``````

To do a parallel for loop on a single machine, one modifies the setup as seen in Section 3.1.1 of the shared memory tutorial and simply starts R as normal without using mpirun. But one can use doMPI on a single machine if one likes.

## 5.3) Parallelizing tasks in Python

Here's code that uses the pp package to parallelize tasks on a single node. Note that this may not work in an interactive session in base Python but should work interactively in IPython and in batch mode.

``````import numpy
import pp

nCores = 4
job_server = pp.Server(ncpus = nCores, secret = 'mysecretphrase')
# set 'secret' to some passphrase (you need to set it but
#   what it is should not be crucial)
job_server.get_ncpus()

nSmp = 10000000
m = 40
numpy.random.seed(i)
return (i, numpy.mean(numpy.random.normal(0, 1, n)))

# create list of tuples to iterate over
inputs = [(i, nSmp) for i in xrange(m)]
# submit and run jobs using list comprehension
jobs = [job_server.submit(taskFun, invalue, modules = ('numpy',)) for invalue in inputs]
# collect results (will have to wait for longer tasks to finish)
results = [job() for job in jobs]
print(results)
job_server.destroy()
``````

You can do the same operations across multiple nodes using the pp package as well. However, you need to do a bit of work to get the worker nodes ready. See Section 3.2 of the distributed memory tutorial for more details.

## 5.4) Parallel linear algebra using ScaLapack via pbdR in R

Section 3.1.2 of the distributed memory tutorial gives an overview of pbdR, a relatively new effort to enhance R's capability for distributed memory processing.

One of pbdR's capabilities is to provide an R interface to ScaLapack, the parallel version of Lapack.

Here's an example of doing some distributed linear algebra.

``````library(pbdDMAT, quiet = TRUE )

n <- 4096*2

# if you are putting multiple processes on node
# you may want to prevent threading of the linear algebra:
# library(RhpcBLASctl)
# (or do by passing OMP_NUM_THREADS to mpirun)

init.grid()

if(comm.rank()==0) print(date())

# pbd allows for parallel I/O, but here
# we keep things simple and distribute
# an object from one process
if(comm.rank() == 0) {
x <- rnorm(n^2)
dim(x) <- c(n, n)
} else x <- NULL
dx <- as.ddmatrix(x)

timing <- comm.timer(sigma <- crossprod(dx))

if(comm.rank()==0) {
print(date())
print(timing)
}

timing <- comm.timer(out <- chol(sigma))

if(comm.rank()==0) {
print(date())
print(timing)
}

finalize()
``````

We run pbdR code via mpirun, starting all of the R processes through mpirun.

``````export OMP_NUM_THREADS=1
mpirun -machinefile .hosts -x OMP_NUM_THREADS Rscript pbd-linalg.R > pbd-linalg.out
``````

# 6) Random number generation for parallelized jobs

Let's talk through some of the issues, following the material in Section 5 of the shared memory tutorial.