Please install the following packages:
install.packages(c("foreach", "doParallel", "doRNG",
"snowFT", "extraDistr", "ggplot2",
"reshape2", "wpp2017"),
dependencies = TRUE)
Many statistical simulations have the following structure:
initialize.rng(...)
for (iteration in 1:N) {
result[iteration] <- myfunc(...)
}
process(result,...)
If calls of myfunc
are independent of one another, we can transform the simulation as follows:
myfunc
.There are many packages in R that work in this fashion. One of the first packages, snow (Simple Network of Workstations) has been recently re-implemented as an R core package called parallel.
mclapply
, mcmapply
and mcMap
.Load the package and check how many cores you have:
library(parallel)
detectCores() # counts hyperthreaded cores
P <- detectCores(logical = FALSE) # physical cores
P
Start and stop a pool of workers with one worker per core:
cl <- makeCluster(P)
cl
typeof(cl)
length(cl)
cl[[1]]
cl[[P]]
typeof(cl[[P]])
names(cl[[P]])
stopCluster(cl)
# cl[[1]] # gives an error
Socket communication (default):
cl <- makeCluster(P, type = "PSOCK")
stopCluster(cl)
type = "FORK"
type = "MPI"
type = "NWS"
Start a cluster that will be used to solve multiple tasks:
cl <- makeCluster(P)
Let’s get each worker to generate as many normally distributed random numbers as its position in the list:
clusterApply(cl, 1:P, fun = rnorm)
The second argument is a sequence where each element gets passed to the corresponding worker, namely as the first argument to the function fun
. In this example, the first worker got the number 1, second 2 etc. which is passed as the first argument to the rnorm
function. Thus, the node cl[[4]]
for example evaluates rnorm(4, mean = 0, sd = 1)
.
Pass additional arguments to fun
:
clusterApply(cl, 1:P, fun = rnorm,
mean = 10, sd = 2)
Evaluate a function more times than the number of workers: Generate 20 sets of 100,000 random numbers from N(mean=5, sd=1) and return the average of each set:
res <- clusterApply(cl, rep(100000, 20),
fun = function(x) mean(rnorm(x, mean = 5)))
length(res)
head(res)
mean(unlist(res))
Another way to write the call above:
myfun <- function(r, mean = 0, sd = 1) {
mean(rnorm(r, mean = mean, sd = sd))
}
res <- clusterApply(cl, rep(100000, 20),
fun = myfun, mean = 5)
By default (on some OS), each worker corresponds to a fresh start of an R session with no preloaded libraries and data.
Consider a situation when each worker needs to use functions from a non-core R package.
Generate random numbers from a discrete normal distribution implemented in the extraDistr package:
This version will most likely fail because of workers not having extraDistr loaded:
myrdnorm <- function(r, mean = 0, sd = 1) {
rdnorm(r, mean = mean, sd = sd)
}
library(extraDistr)
res <- clusterApply(cl, rep(20, 20),
fun = myrdnorm, sd = 6)
Version where each function call loads the library (can be inefficient):
myrdnorm2 <- function(r, mean = 0, sd = 1) {
library(extraDistr)
rdnorm(r, mean = mean, sd = sd)
}
res <- clusterApply(cl, rep(10000, 1000),
fun = myrdnorm2, sd = 6)
hist(unlist(res))
Restart the cluster to clean its namespace:
stopCluster(cl)
cl <- makeCluster(P)
A better way is to initialize each worker only once, using clusterEvalQ
:
myrdnorm <- function(r, mean = 0, sd = 1) {
rdnorm(r, mean = mean, sd = sd)
}
clusterEvalQ(cl, library(extraDistr))
res <- clusterApply(cl, rep(10000, 1000),
fun = myrdnorm, sd = 6)
hist(unlist(res))
Once workers are initialized with a dataset, one can instruct each worker to process a specific part of it. For example, we can sample from a posterior AR(1) distribution given by a parameter sample:
N <- 1000
ar1pars <- data.frame(
mu = rnorm(N, 1, 0.1),
rho = runif(N, 0.95, 0.99),
sigma = runif(N, 0.01, 0.1)
)
# Each task uses one row of the data
myar1 <- function(id, t) {
pars <- ar1pars[id, ]
pars$mu + pars$rho * (t - pars$mu) +
rnorm(100, sd = pars$sigma)
}
clusterExport(cl, "ar1pars")
res <- clusterApply(cl, 1:N, fun = myar1, t = 1)
length(res)
length(res[[1]])
# Create a matrix from the result
resmat <- do.call(rbind, res)
dim(resmat)
Each row in resmat
represents 100 realization of an AR(1) process with parameters corresponding to one row in the ar1pars
data frame.
Parallel versions of R functions lapply
, sapply
and apply
are called parLapply
, parSapply
, and parApply
. They are implemented on top of clusterApply
.
Example of summing rows of a matrix, resulting in a vector (not a list):
parApply(cl, matrix(1:2000, nrow=20), 1, sum)
The AR(1) example above can be implemented using parApply
which passes only the corresponding row to the worker:
myar1row <- function(pars, t) {
pars[1] + pars[2] * (t - pars[1]) +
rnorm(100, sd = pars[3])
}
res <- parApply(cl, ar1pars, 1, myar1row, t = 1)
dim(res)
To apply a function to multiple arguments (analogous to mapply
and Map
), one can use clusterMap
. For example, in each replication, generate a different number of random numbers with different means:
myrnorm <- function(r, mean, sd = 1) {
rnorm(r, mean = mean, sd = sd)
}
N <- 100
res <- clusterMap(cl, myrnorm,
r = 1:N, mean = (N:1)*10, sd = 2)
length(res)
head(res, 10)
# plot the mean of results from each task
plot(parSapply(cl, res, mean))
Consider a situation when nodes have very different performance, or a function that takes a different amount of time to finish depending on the input parameters. To plot the time of cluster usage we will use the snow function snow.time
.
library(snow)
rnmean <- function(r, mean = 0, sd = 1) {
mean(rnorm(r, mean = mean, sd = sd))
}
N <- 30
# create a sequence that will trigger a very different
# load for different workers
set.seed(50)
r.seq <- sample(ceiling(exp(seq(7, 14, length=50))), N)
head(r.seq, 10)
ctime <- snow.time(clusterApply(cl, r.seq,
fun = rnmean))
plot(ctime, title = "Usage with clusterApply")
An alternative to clusterApply
that processes the tasks more efficiently is called clusterApplyLB
:
ctimeLB <- snow.time(clusterApplyLB(cl, r.seq,
fun = rnmean))
plot(ctimeLB, title = "Usage with clusterApplyLB")
clusterApplyLB
needs in this case a half of the time to complete in comparison to clusterApply
.
Note that by loading the snow library in the first step, we were using snow::clusterApply
and snow::clusterApplyLB
instead of their equivalents from the parallel library. This is because the parallel versions of the functions currently do not provide the ability of monitoring cluster usage. To remove snow from the search path, do:
detach(package:snow)
Initializing a random number generator on the master does not provide reproducible results when generating random numbers on a cluster. Evaluate the following two lines multiple times. Every time it gives a different answer:
set.seed(1)
unlist(clusterApply(cl, rep(5, P), rnorm))
A quick and dirty solution is to initialize each worker with set.seed
:
set.seed(1)
s <- clusterApply(cl, sample(1:10000000, P), set.seed)
unlist(clusterApply(cl, rep(5, P), rnorm))
This is a bad solution! The resulting random numbers do not have the desired structural properties.
A much better way is to use a parallel random number generator developed exactly for this purpose. The parallel package uses the Pierre L’Ecuyer’s RngStreams (more details also here).
Check the workers’ RNG kind:
clusterEvalQ(cl, RNGkind())
Initialize the L’Ecuyer’s RNG on each worker using a seed for reproducibility:
seed <- 1
clusterSetRNGStream(cl, seed)
clusterEvalQ(cl, RNGkind())
Each worker now has its own independent (very long) stream of random numbers available:
do.call(rbind, clusterEvalQ(cl, rnorm(5)))
Compare reproducibility of clusterApply
and clusterApplyLB
:
for(i in 1:10) {
clusterSetRNGStream(cl, seed)
print(tail(unlist(clusterApply(cl, r.seq,
fun = rnmean))))
}
for(i in 1:10) {
clusterSetRNGStream(cl, seed)
print(tail(unlist(clusterApplyLB(cl, r.seq,
fun = rnmean))))
}
clusterApplyLB
generates non-reproducible results!
Can we reproduce results on clusters of different sizes? Compare results from our original cluster with results generated on a cluster of size three:
clusterSetRNGStream(cl, seed)
unlist(clusterApply(cl, rep(1, 10), fun = rnorm))
cl3 <- makeCluster(3)
clusterSetRNGStream(cl3, seed)
unlist(clusterApply(cl3, rep(1, 10), fun = rnorm))
Results are only reproducible when evaluated on clusters of the same size using clusterApply
!
Cleanup:
stopCluster(cl)
stopCluster(cl3)
(Ševčíková, Rossini)
The snowFT package has a solution for the last two bullets above. In addition, it simplifies the interface to snow/parallel.
The main idea to solve the reproducibility issue is that there is one RNG stream per task instead of one stream per worker. This makes the process independent of the node on which a task is evaluated, and thus is always reproducible.
Caution: Currently (as of July 2017), some of the functions in this section will not work in RStudio due to this bug in RStudio. Please use an alternative R UI until it’s fixed.
Simplification of the parallel UI: only one function needed for all steps:
library(snowFT)
seed <- 1
res <- performParallel(P, r.seq, fun = rnmean,
seed = seed, cltype = "SOCK")
tail(unlist(res))
The following steps were performed by the function above:
Reproducible evaluation on clusters of different sizes:
N <- 10
for(p in 2:4)
print(unlist(performParallel(p, rep(1, N),
rnorm, seed = seed, cltype = "SOCK")))
Load-balanced reproducible evaluation:
for(i in 1:10)
print(tail(unlist(performParallel(P, r.seq,
rnmean, seed = seed, cltype = "SOCK"))))
For loading libraries and exporting objects to workers use arguments initexpr
(or initfun
) and export
:
myrdnorm <- function(r) {
rdnorm(r, mean = mean, sd = sd)
}
mean <- 20
sd <- 10
res <- performParallel(P, rep(1000, 100),
myrdnorm, seed = seed, cltype = "SOCK",
initexpr = library(extraDistr),
export = c("mean", "sd"))
hist(unlist(res))
That is equivalent to
res <- performParallel(P, rep(1000, 100),
myrdnorm, seed = seed, cltype = "SOCK",
initexpr = { library(extraDistr);
mean <- 20; sd <- 10
}
)
To eliminate the cltype
argument, check if SOCK
is the default type. If not, change it for all performParallel
calls:
snow::getClusterOption("type")
snow::setDefaultClusterOptions(type = "SOCK")
Dynamic resizing of the cluster. The size of the cluster is stored in the file “.clustersize” in the working directory. You can modify that file to a different number. The process detects it and resizes the cluster correspondingly. In the example below the original 2 nodes were expanded to 8 and then shrinked to 4.
tm <- snow::snow.time(
performParallel(2, rep(r.seq*10, 10),
fun = rnmean, seed = seed)
)
plot(tm)
snowFT allows to run your function sequentially while preserving reproducibility - can be useful for debugging purposes (set cluster size to 0):
# runs sequentially
unlist(performParallel(0, rep(5, 3), fun = rnorm,
seed = seed, ft_verbose = TRUE))
# runs in parallel
unlist(performParallel(3, rep(5, 3), fun = rnorm,
seed = seed, ft_verbose = TRUE))
(Calaway, Weston)
Provides an alternative looping construct. The following example shows a sequential evaluation:
library(foreach)
n.seq <- sample(1:1000, 10)
res <- foreach(n = n.seq) %do% rnorm(n)
class(res)
length(res)
length(res[[1]])
Results can be conveniently combined, e.g. into a vector:
res <- foreach(n = n.seq, .combine = c) %do%
rnorm(n)
length(res)
or a matrix, or using a function:
res <- foreach(n = rep(100, 10),
.combine = rbind) %do% rnorm(n)
dim(res)
res <- foreach(n = rep(100, 10),
.combine = "+") %do% rnorm(n)
length(res)
Multiple (to be segmented) arguments can be passed:
res <- foreach(n = rep(10, 5), m = (1:5)^2,
.combine = rbind) %do% rnorm(n, mean = m)
res
doParallel
: uses the R core package parallel (snow functionality)doMC
: multicore functionality of parallel (works on a single computer only; does not work on Windows)doMPI
: uses RmpidoFuture
, doRedis
, doAzureParallel
library(doParallel)
cl <- makeCluster(3)
registerDoParallel(cl)
res <- foreach(n = rep(10, 5), m = (1:5)^2,
.combine = rbind) %dopar% rnorm(n, mean = m)
Get info about the cluster:
getDoParWorkers()
getDoParName()
There are arguments for passing packages and objects to workers:
mean <- 20
sd <- 10
myrdnorm <- function(r)
rdnorm(r, mean = mean, sd = sd)
res <- foreach(r = rep(1000, 100), .combine = c,
.packages = "extraDistr") %dopar% myrdnorm(r)
hist(res)
For passing objects not defined in the current environment use the argument .export
.
Results are not reproducible out of the box:
set.seed(1)
foreach(n = rep(2, 5), .combine=rbind) %dopar%
rnorm(n)
set.seed(1)
foreach(n = rep(2, 5), .combine=rbind) %dopar%
rnorm(n)
For reproducibility use the package doRNG. It uses independent streams of the L’Ecuyer’s RNG:
library(doRNG)
set.seed(1)
res1 <- foreach(n = rep(2, 5),
.combine=rbind) %dorng% rnorm(n)
set.seed(1)
res2 <- foreach(n = rep(2, 5),
.combine=rbind) %dorng% rnorm(n)
identical(res1, res2)
This is equivalent to:
registerDoRNG(1)
res3 <- foreach(n = rep(2, 5),
.combine=rbind) %dopar% rnorm(n)
set.seed(1)
res4 <- foreach(n = rep(2, 5),
.combine=rbind) %dopar% rnorm(n)
identical(res1, res3)
identical(res2, res4)
This way one can make other’s people foreach code reproducible.
Similarly to snowFT, a task (instead of a node) is associated with an RNG stream, which provides reproducibility even when replicating the computation on a cluster of different size, or when run sequentially:
registerDoSEQ()
set.seed(1)
res_seq <- foreach(n = rep(2, 5),
.combine=rbind) %dorng% rnorm(n)
identical(res1, res_seq)
Note: The results would not be identical if using a simple do
in the call above.
system.time
.We will use a slightly modified bootstrap example from the doParallel vignette
dat <- subset(iris, Species != "setosa")
bootstrap <- function(x = NULL) {
ind <- sample(nrow(dat), 100, replace = TRUE)
res <- glm(Species ~ ., data = dat[ind,],
family = binomial)
coefficients(res)
}
bootstrap()
bootstrap()
For each parallel method write a function that runs the bootstrap
function multiple times on a cluster of given number of nodes. Argument nodes
determines the size of the cluster; argument trials
determines how many times to run bootstrap
.
# using the parallel package
run.parallel <- function(nodes, trials = 100,
seed = 1) {
cl <- makeCluster(nodes)
# Export the dat object to workers
clusterExport(cl, "dat")
clusterSetRNGStream(cl, seed)
res <- clusterApply(cl, 1:trials,
fun = bootstrap)
stopCluster(cl)
# convert to a matrix
do.call(rbind, res)
}
# load-balanced version of run.parallel
run.parallelLB <- function(nodes, trials = 100,
seed = 1) {
cl <- makeCluster(nodes)
clusterExport(cl, "dat")
clusterSetRNGStream(cl, seed)
res <- clusterApplyLB(cl, 1:trials,
fun = bootstrap)
stopCluster(cl)
do.call(rbind, res)
}
# using snowFT
run.snowFT <- function(nodes, trials = 100,
seed = 1) {
res <- performParallel(nodes, 1:trials,
bootstrap, export = "dat", seed = seed)
do.call(rbind, res)
}
# using foreach
run.foreach <- function(nodes, trials = 100,
seed = 1) {
cl <- makeCluster(nodes)
registerDoParallel(cl)
registerDoRNG(seed)
res <- foreach(i = 1:trials, .combine = rbind,
.export = c("dat", "bootstrap")) %dopar%
bootstrap()
stopCluster(cl)
res[,]
}
Note that even though all functions initialize the RNG with the same seed, only run.snowFT
and run.foreach
will give reproducible results when called for a different number of nodes:
run.snowFT(3, 10)
run.snowFT(4, 10)
run.foreach(5, 10)
run.foreach(10, 10)
but
run.parallel(3, 10)
run.parallel(4, 10)
Function that loops over a different number of nodes and the various methods and returns the run time:
runtime <- function(trials,
nodes = 1:detectCores(), seed = 1,
methods = c("parallel", "parallelLB",
"snowFT", "foreach")) {
tmm <- matrix(NA, nrow = length(nodes),
ncol = length(methods),
dimnames=list(NULL, methods))
tm <- data.frame(nodes=nodes, tmm)
for(i in 1:nrow(tm)) {
# sequential run at the end
if (tm$nodes[i] == 1) next
# iterate over methods
for(method in methods) {
t <- system.time(
do.call(paste0("run.", method),
list(nodes=tm$nodes[i],
trials=trials, seed=seed))
)
tm[i, method] <- t["elapsed"]
}
print(tm[i,])
}
# add time of a sequential run
if (1 %in% nodes) {
tm[1, methods] <- system.time(
run.snowFT(0, trials, seed)
)["elapsed"]
# add column with a linear speedup
tm <- cbind(tm,
"linear speedup" = tm[1, "parallel"]/
tm$nodes)
}
tm
}
Function for plotting results:
library(ggplot2)
library(reshape2)
show.runtime <- function(t) {
tl <- melt(t, id.vars = "nodes",
variable.name = "method")
g <- ggplot(tl, aes(x = nodes, y = value,
color = method)) +
geom_line() + ylab("time") +
scale_x_continuous(
breaks = sort(unique(tl$nodes)))
print(g)
}
Compute and plot runtime for a different number of trials:
t <- runtime(50, 1:4)
show.runtime(t)
t <- runtime(1000, c(1,2,4,6))
show.runtime(t)
Look under the hood:
library(snow)
st <- snow.time(run.parallelLB(6, 1000))
plot(st)
Lots of time is spent for creating the cluster and for communication.
Results of the same bootstrap function (modified to run a little longer) on a Linux cluster:
Projecting probabilistic migration (simplified version):
Given current net migration for all countries of the world (from the wpp2017 package, see below), derive reproducible quantiles of net migration in 2100 using an AR(1) process with given parameters. The computation should run in parallel with a task corresponding to processing one country. Use a parallel method of your choice.
A sequential version could look as follows:
# load data
options(stringsAsFactors = FALSE)
data(migration, package = "wpp2017")
data(UNlocations, package = "wpp2017")
# remove records that are not countries
mig <- subset(migration, country_code %in% subset(
UNlocations, location_type == 4)$country_code)
# extract current migration (units are in thousand)
mig <- mig[, c("country_code", "name", "2010-2015")]
# AR(1) parameters
pars <- c(mu = 0, rho = 0.95, sigma = 0.05)
# quantile
q <- 0.5
# project to 2100 (from 2015 by 5 years)
nperiod <- 17
# number of trajectories
ntraj <- 50000
# parallelize the following loop
res <- c()
for (country in mig$country_code) {
# current migration for this country
migprev <- subset(mig,
country_code == country)[,"2010-2015"]
# project ntraj trajectories
# for nperiod time points
for(time in 1:nperiod) {
mig.proj <- pars["mu"] + pars["rho"] *
(migprev - pars["mu"]) +
rnorm(ntraj, sd = pars["sigma"])
migprev <- mig.proj
}
# compute quantile
res <- c(res, quantile(mig.proj, q))
}
resdf <- cbind(mig, new = res)
Questions:
ntraj
.