Required Packages

Please install the following packages:

install.packages(c("foreach", "doParallel", "doRNG", 
                   "snowFT", "extraDistr", "ggplot2", 
                   "reshape2", "wpp2017"), 
                 dependencies = TRUE)

Structure of Statistical Simulations

Many statistical simulations have the following structure:

initialize.rng(...)
for (iteration in 1:N) {
    result[iteration] <- myfunc(...)
}
process(result,...)

If calls of myfunc are independent of one another, we can transform the simulation as follows:

Master-slave Paradigm

There are many packages in R that work in this fashion. One of the first packages, snow (Simple Network of Workstations) has been recently re-implemented as an R core package called parallel.

Package parallel

Setup

  • Load the package and check how many cores you have:

    library(parallel)
    detectCores() # counts hyperthreaded cores
    P <- detectCores(logical = FALSE) # physical cores
    P

Start and Stop a Cluster

  • Start and stop a pool of workers with one worker per core:

    cl <- makeCluster(P)
    cl
    typeof(cl)
    length(cl)
    cl[[1]]
    cl[[P]]
    typeof(cl[[P]])
    names(cl[[P]])
    stopCluster(cl)
    # cl[[1]] # gives an error

Types of Clusters

  • Socket communication (default):

    cl <- makeCluster(P, type = "PSOCK")
    stopCluster(cl)
    • Workers start with an empty environment (i.e. new R process).
    • Available for all OS platforms.
  • Fork: type = "FORK"
    • Workers are complete copies of the master process.
    • Not available for Windows.
  • MPI: type = "MPI"
    • Requires the Rmpi package (and MPI) to be installed.
  • NetWorkSpaces: type = "NWS"
    • Requires the nws package (from Revolution Computing) to be installed.

Evaluating a Function on the Cluster

  • Start a cluster that will be used to solve multiple tasks:

    cl <- makeCluster(P)
  • Let’s get each worker to generate as many normally distributed random numbers as its position in the list:

    clusterApply(cl, 1:P, fun = rnorm)

    The second argument is a sequence where each element gets passed to the corresponding worker, namely as the first argument to the function fun. In this example, the first worker got the number 1, second 2 etc. which is passed as the first argument to the rnorm function. Thus, the node cl[[4]] for example evaluates rnorm(4, mean = 0, sd = 1).

  • Pass additional arguments to fun:

    clusterApply(cl, 1:P, fun = rnorm, 
                mean = 10, sd = 2)
  • Evaluate a function more times than the number of workers: Generate 20 sets of 100,000 random numbers from N(mean=5, sd=1) and return the average of each set:

    res <- clusterApply(cl, rep(100000, 20), 
            fun = function(x) mean(rnorm(x, mean = 5)))
    length(res)
    head(res)
    mean(unlist(res))

  • Another way to write the call above:

    myfun <- function(r, mean = 0, sd = 1) {
        mean(rnorm(r, mean = mean, sd = sd))
    }
    res <- clusterApply(cl, rep(100000, 20), 
                    fun = myfun, mean = 5)

Initialization of Workers

By default (on some OS), each worker corresponds to a fresh start of an R session with no preloaded libraries and data.

Loading Libraries

Consider a situation when each worker needs to use functions from a non-core R package.

  • Generate random numbers from a discrete normal distribution implemented in the extraDistr package:

    • This version will most likely fail because of workers not having extraDistr loaded:

      myrdnorm <- function(r, mean = 0, sd = 1) {
          rdnorm(r, mean = mean, sd = sd)
      }
      library(extraDistr)
      res <- clusterApply(cl, rep(20, 20), 
                      fun = myrdnorm, sd = 6)
    • Version where each function call loads the library (can be inefficient):

      myrdnorm2 <- function(r, mean = 0, sd = 1) {
          library(extraDistr)
          rdnorm(r, mean = mean, sd = sd)
      }
      
      res <- clusterApply(cl, rep(10000, 1000), 
                      fun = myrdnorm2, sd = 6)
      hist(unlist(res))
    • Restart the cluster to clean its namespace:

      stopCluster(cl)
      cl <- makeCluster(P)
    • A better way is to initialize each worker only once, using clusterEvalQ:

      myrdnorm <- function(r, mean = 0, sd = 1) {
          rdnorm(r, mean = mean, sd = sd)
      }
      
      clusterEvalQ(cl, library(extraDistr))
      res <- clusterApply(cl, rep(10000, 1000), 
                      fun = myrdnorm, sd = 6)
      hist(unlist(res))

Sharing Data

Initialization of workers with data can be useful to avoid passing data as arguments, especially if these are big datasets. The advantage is that the procedure is done only once per worker.

  • clusterEvalQ can be used to inititalize data in the worker’s environment:

    myrdnorm <- function(r) {
        rdnorm(r, mean = mean, sd = sd)
    }
    
    clusterEvalQ(cl, {
        library(extraDistr)
        mean <- 10
        sd <- 5
    })
    res <- clusterApply(cl, rep(10000, 1000), 
                        fun = myrdnorm)
    hist(unlist(res))
    
    stopCluster(cl)
    cl <- makeCluster(P)

    The same approach can be used to instruct workers to read data files or source a script.

  • Another way of passing data to workers is using clusterExport. It exports given objects from the environment of the master to the workers’ environment:

    myrdnorm <- function(r) {
        rdnorm(r, mean = mean, sd = sd)
    }
    mean <- 20
    sd <- 10
    
    clusterExport(cl, c("mean", "sd"))
    clusterEvalQ(cl, library(extraDistr))
    
    res <- clusterApply(cl, rep(10000, 1000), 
                        fun = myrdnorm)
    hist(unlist(res))

Data Partitioning

  • Once workers are initialized with a dataset, one can instruct each worker to process a specific part of it. For example, we can sample from a posterior AR(1) distribution given by a parameter sample:

    N <- 1000
    ar1pars <- data.frame(
        mu = rnorm(N, 1, 0.1),
        rho = runif(N, 0.95, 0.99),
        sigma = runif(N, 0.01, 0.1)
    )
    # Each task uses one row of the data
    myar1 <- function(id, t) {
        pars <- ar1pars[id, ]
        pars$mu + pars$rho * (t - pars$mu) + 
                    rnorm(100, sd = pars$sigma)
    }
    clusterExport(cl, "ar1pars")
    res <- clusterApply(cl, 1:N, fun = myar1, t = 1)
    length(res)
    length(res[[1]])
    # Create a matrix from the result
    resmat <- do.call(rbind, res)
    dim(resmat)

    Each row in resmat represents 100 realization of an AR(1) process with parameters corresponding to one row in the ar1pars data frame.

Higher Level Functions

Parallel versions of R functions lapply, sapply and apply are called parLapply, parSapply, and parApply. They are implemented on top of clusterApply.

  • Example of summing rows of a matrix, resulting in a vector (not a list):

    parApply(cl, matrix(1:2000, nrow=20), 1, sum)
  • The AR(1) example above can be implemented using parApply which passes only the corresponding row to the worker:

    myar1row <- function(pars, t) {
        pars[1] + pars[2] * (t - pars[1]) + 
                    rnorm(100, sd = pars[3])
    }    
    res <- parApply(cl, ar1pars, 1, myar1row, t = 1)
    dim(res)
  • To apply a function to multiple arguments (analogous to mapply and Map), one can use clusterMap. For example, in each replication, generate a different number of random numbers with different means:

    myrnorm <- function(r, mean, sd = 1) {
        rnorm(r, mean = mean, sd = sd)
    }
    N <- 100
    res <- clusterMap(cl, myrnorm, 
            r = 1:N, mean = (N:1)*10, sd = 2)
    length(res)
    head(res, 10)
    # plot the mean of results from each task
    plot(parSapply(cl, res, mean))

Load Balancing

Consider a situation when nodes have very different performance, or a function that takes a different amount of time to finish depending on the input parameters. To plot the time of cluster usage we will use the snow function snow.time.

library(snow)
rnmean <- function(r, mean = 0, sd = 1) {
        mean(rnorm(r, mean = mean, sd = sd))
}
N <- 30
# create a sequence that will trigger a very different 
# load for different workers
set.seed(50)
r.seq <- sample(ceiling(exp(seq(7, 14, length=50))), N)
head(r.seq, 10)
ctime <- snow.time(clusterApply(cl, r.seq, 
                                fun = rnmean))
plot(ctime, title = "Usage with clusterApply")

An alternative to clusterApply that processes the tasks more efficiently is called clusterApplyLB:

ctimeLB <- snow.time(clusterApplyLB(cl, r.seq, 
                                    fun = rnmean))
plot(ctimeLB, title = "Usage with clusterApplyLB")

clusterApplyLB needs in this case a half of the time to complete in comparison to clusterApply.

Note that by loading the snow library in the first step, we were using snow::clusterApply and snow::clusterApplyLB instead of their equivalents from the parallel library. This is because the parallel versions of the functions currently do not provide the ability of monitoring cluster usage. To remove snow from the search path, do:

detach(package:snow)

Random Numbers and Reproducibility

Initializing a random number generator on the master does not provide reproducible results when generating random numbers on a cluster. Evaluate the following two lines multiple times. Every time it gives a different answer:

set.seed(1)
unlist(clusterApply(cl, rep(5, P), rnorm))

A quick and dirty solution is to initialize each worker with set.seed:

set.seed(1)
s <- clusterApply(cl, sample(1:10000000, P), set.seed)
unlist(clusterApply(cl, rep(5, P), rnorm))

This is a bad solution! The resulting random numbers do not have the desired structural properties.

A much better way is to use a parallel random number generator developed exactly for this purpose. The parallel package uses the Pierre L’Ecuyer’s RngStreams (more details also here).

  • Check the workers’ RNG kind:

    clusterEvalQ(cl, RNGkind())
  • Initialize the L’Ecuyer’s RNG on each worker using a seed for reproducibility:

    seed <- 1
    clusterSetRNGStream(cl, seed)
    clusterEvalQ(cl, RNGkind())
  • Each worker now has its own independent (very long) stream of random numbers available:

    do.call(rbind, clusterEvalQ(cl, rnorm(5)))
  • Compare reproducibility of clusterApply and clusterApplyLB:

    for(i in 1:10) {
        clusterSetRNGStream(cl, seed)
        print(tail(unlist(clusterApply(cl, r.seq, 
                                       fun = rnmean))))
    }
    for(i in 1:10) {
        clusterSetRNGStream(cl, seed)
        print(tail(unlist(clusterApplyLB(cl, r.seq, 
                                        fun = rnmean))))
    }

    clusterApplyLB generates non-reproducible results!

  • Can we reproduce results on clusters of different sizes? Compare results from our original cluster with results generated on a cluster of size three:

    clusterSetRNGStream(cl, seed)
    unlist(clusterApply(cl, rep(1, 10), fun = rnorm))
    
    cl3 <- makeCluster(3)
    clusterSetRNGStream(cl3, seed)
    unlist(clusterApply(cl3, rep(1, 10), fun = rnorm))

    Results are only reproducible when evaluated on clusters of the same size using clusterApply!

  • Cleanup:

    stopCluster(cl)
    stopCluster(cl3)    

Useful Packages

snowFT

(Ševčíková, Rossini)

The snowFT package has a solution for the last two bullets above. In addition, it simplifies the interface to snow/parallel.

The main idea to solve the reproducibility issue is that there is one RNG stream per task instead of one stream per worker. This makes the process independent of the node on which a task is evaluated, and thus is always reproducible.

Caution: Currently (as of July 2017), some of the functions in this section will not work in RStudio due to this bug in RStudio. Please use an alternative R UI until it’s fixed.

  • Simplification of the parallel UI: only one function needed for all steps:

    library(snowFT)
    seed <- 1
    res <- performParallel(P, r.seq, fun = rnmean, 
                    seed = seed, cltype = "SOCK")
    tail(unlist(res))
    The following steps were performed by the function above:
    • A socket cluster of P nodes created.
    • Each node initialized with the L’Ecuyer’s RNG.
    • The given function performed on the cluster in a load-balancing manner.
    • Cluster stopped.
    • Results returned.
  • Reproducible evaluation on clusters of different sizes:

    N <- 10
    for(p in 2:4)
        print(unlist(performParallel(p, rep(1, N), 
                rnorm, seed = seed, cltype = "SOCK")))
  • Load-balanced reproducible evaluation:

    for(i in 1:10) 
        print(tail(unlist(performParallel(P, r.seq, 
            rnmean, seed = seed, cltype = "SOCK"))))
  • For loading libraries and exporting objects to workers use arguments initexpr (or initfun) and export:

    myrdnorm <- function(r) {
        rdnorm(r, mean = mean, sd = sd)
    }
    mean <- 20
    sd <- 10
    
    res <- performParallel(P, rep(1000, 100), 
                myrdnorm, seed = seed, cltype = "SOCK",
                initexpr = library(extraDistr),
                export = c("mean", "sd"))
    hist(unlist(res))

    That is equivalent to

    res <- performParallel(P, rep(1000, 100), 
                myrdnorm, seed = seed, cltype = "SOCK",
                initexpr = { library(extraDistr); 
                             mean <- 20; sd <- 10
                            }
                )
  • To eliminate the cltype argument, check if SOCK is the default type. If not, change it for all performParallel calls:

    snow::getClusterOption("type")
    snow::setDefaultClusterOptions(type = "SOCK")
  • Dynamic resizing of the cluster. The size of the cluster is stored in the file “.clustersize” in the working directory. You can modify that file to a different number. The process detects it and resizes the cluster correspondingly. In the example below the original 2 nodes were expanded to 8 and then shrinked to 4.

    tm <- snow::snow.time(
            performParallel(2, rep(r.seq*10, 10), 
                fun = rnmean, seed = seed)
        )
    plot(tm)

  • snowFT allows to run your function sequentially while preserving reproducibility - can be useful for debugging purposes (set cluster size to 0):

    # runs sequentially
    unlist(performParallel(0, rep(5, 3), fun = rnorm, 
                seed = seed, ft_verbose = TRUE))
    # runs in parallel
    unlist(performParallel(3, rep(5, 3), fun = rnorm, 
                seed = seed, ft_verbose = TRUE))

foreach

(Calaway, Weston)

Looping construct

  • Provides an alternative looping construct. The following example shows a sequential evaluation:

    library(foreach)
    n.seq <- sample(1:1000, 10)
    res <- foreach(n = n.seq) %do% rnorm(n)
    class(res)
    length(res)
    length(res[[1]])
  • Results can be conveniently combined, e.g. into a vector:

    res <- foreach(n = n.seq, .combine = c) %do% 
                            rnorm(n)
    length(res)

    or a matrix, or using a function:

    res <- foreach(n = rep(100, 10), 
               .combine = rbind) %do% rnorm(n)
    dim(res)
    res <- foreach(n = rep(100, 10), 
               .combine = "+") %do% rnorm(n)
    length(res)
  • Multiple (to be segmented) arguments can be passed:

    res <- foreach(n = rep(10, 5), m = (1:5)^2, 
             .combine = rbind) %do% rnorm(n, mean = m)
    res

Parallel backends

  • To run loops in parallel, one needs to register a parallel backend which defines the lower level parallel interface to be used.
  • There are several parallel backends available:
    library(doParallel)
    cl <- makeCluster(3)
    registerDoParallel(cl)
    res <- foreach(n = rep(10, 5), m = (1:5)^2, 
        .combine = rbind) %dopar% rnorm(n, mean = m)
  • Get info about the cluster:

    getDoParWorkers()
    getDoParName()
  • There are arguments for passing packages and objects to workers:

    mean <- 20
    sd <- 10
    myrdnorm <- function(r) 
        rdnorm(r, mean = mean, sd = sd)
    
    res <- foreach(r = rep(1000, 100), .combine = c, 
        .packages = "extraDistr") %dopar% myrdnorm(r)
    hist(res)

    For passing objects not defined in the current environment use the argument .export.

Random numbers and reproducibility

  • Results are not reproducible out of the box:

    set.seed(1)
    foreach(n = rep(2, 5), .combine=rbind) %dopar% 
                    rnorm(n)
    set.seed(1)
    foreach(n = rep(2, 5), .combine=rbind) %dopar% 
                    rnorm(n)
  • For reproducibility use the package doRNG. It uses independent streams of the L’Ecuyer’s RNG:

    library(doRNG)
    set.seed(1)
    res1 <- foreach(n = rep(2, 5), 
                .combine=rbind) %dorng% rnorm(n)
    set.seed(1)
    res2 <- foreach(n = rep(2, 5), 
                .combine=rbind) %dorng% rnorm(n)
    identical(res1, res2)
  • This is equivalent to:

    registerDoRNG(1)
    res3 <- foreach(n = rep(2, 5), 
                .combine=rbind) %dopar% rnorm(n)
    set.seed(1)
    res4 <- foreach(n = rep(2, 5), 
                .combine=rbind) %dopar% rnorm(n)
    identical(res1, res3)
    identical(res2, res4)

    This way one can make other’s people foreach code reproducible.

  • Similarly to snowFT, a task (instead of a node) is associated with an RNG stream, which provides reproducibility even when replicating the computation on a cluster of different size, or when run sequentially:

    registerDoSEQ()
    set.seed(1)
    res_seq <- foreach(n = rep(2, 5), 
                   .combine=rbind) %dorng% rnorm(n)
    identical(res1, res_seq)

    Note: The results would not be identical if using a simple do in the call above.

Benchmarking

Summary

Challenge

Projecting probabilistic migration (simplified version):

Given current net migration for all countries of the world (from the wpp2017 package, see below), derive reproducible quantiles of net migration in 2100 using an AR(1) process with given parameters. The computation should run in parallel with a task corresponding to processing one country. Use a parallel method of your choice.

A sequential version could look as follows:

# load data 
options(stringsAsFactors = FALSE)
data(migration, package = "wpp2017")
data(UNlocations, package = "wpp2017")
# remove records that are not countries
mig <- subset(migration, country_code %in% subset(
    UNlocations, location_type == 4)$country_code)
# extract current migration (units are in thousand)
mig <- mig[, c("country_code", "name", "2010-2015")]

# AR(1) parameters
pars <- c(mu = 0, rho = 0.95, sigma = 0.05)

# quantile 
q <- 0.5
# project to 2100 (from 2015 by 5 years)
nperiod <- 17 
# number of trajectories
ntraj <- 50000

# parallelize the following loop
res <- c()
for (country in mig$country_code) {
    # current migration for this country
    migprev <- subset(mig, 
           country_code == country)[,"2010-2015"]
    # project ntraj trajectories 
    # for nperiod time points
    for(time in 1:nperiod) {
        mig.proj <- pars["mu"] + pars["rho"] * 
                    (migprev - pars["mu"]) + 
                    rnorm(ntraj, sd = pars["sigma"])    
        migprev <- mig.proj
    }
    # compute quantile
    res <- c(res, quantile(mig.proj, q))
}
resdf <- cbind(mig, new = res)

Questions:

  1. Which country has the highest migration median forecast in 2100?
  2. Which country has the lowest 5%-tile in 2100?
  3. Can you reproduce the same results when running on a cluster of different size?
  4. How does the run time differ when running on a cluster of different size?
  5. Explore when parallel processing gets inefficient as you decrease the number of trajectories ntraj.