Chapter 13 High-performance computing

This chapter provides guidance on time-consuming drake workflows and high-level parallel computation.

13.1 Start small

Before you jump into high-performance computing with a large workflow, consider running a downsized version to debug and test things first. That way, you can avoid consuming lots of computing resources until you are reasonably sure everything works. Create a test plan with drake_plan(max_expand = SMALL_NUMBER) before scaling up to the full set of targets, and take temporary shortcuts in your commands so your targets build more quickly for test mode. See this section on plans for details.

13.2 Let make() schedule your targets.

When it comes time to activate high-performance computing, drake launches its own parallel workers and sends targets to those workers. The workers can be local processes or jobs on a cluster. drake uses your project’s implicit dependency graph to figure out which targets can run in parallel and which ones need to wait for dependencies.

load_mtcars_example() # from https://github.com/wlandau/drake-examples/tree/main/mtcars
vis_drake_graph(my_plan)

You do not need to not micromanage how targets are scheduled, and you do not need to run simultaneous instances of make().

13.3 The main process

make() takes care of the jobs it launches, but make() itself is a job too, and it is your responsibility to manage it.

13.3.1 Main process on a cluster

Most clusters will let you submit make() as a job on a compute node. Let’s consider the Sun Grid Engine (SGE) as an example. First, we create a script that calls make() (or r_make()).

# make.R
source("R/packages.R")
source("R/packages.R")
source("R/packages.R")
options(
  clustermq.scheduler = "sge",
  # Created by drake_hpc_template_file("sge_clustermq.tmpl"):
  clustermq.template = "sge_clustermq.tmpl"
)
make(
  plan,
  parallelism = "clustermq",
  jobs = 8,
  console_log_file = "drake.log"
)

Then, we create a shell script (say, run.sh) to call make.R. This script may look different if you use a different scheduler such as SLURM.

# run.sh
#!/bin/bash
#$ -j y       # combine stdout/error in one file
#$ -o log.out # output file
#$ -cwd       # use pwd as work dir
#$ -V         # use environment variable
module load R # Uncomment if R is an environment module.
R --no-save CMD BATCH make.R

Finally, to run the whole workflow, we call qsub.

qsub run.sh

And here is what happens:

  1. A new job starts on the cluster with the configuration flags next to #$ in run.sh.
  2. run.sh opens R and runs make.R.
  3. make.R invokes drake using the make() function.
  4. make() launches 8 new jobs on the cluster

So 9 simultaneous jobs run on the cluster and we avoid bothering the headnode / login node.

13.3.2 Local main process

Alternatively, you can run make() in a persistent background process. The following should work in the Mac/Linux terminal/shell.

nohup nice -19 R --no-save CMD BATCH make.R &

where:

  • nohup: Keep the job running even if you log out of the machine.
  • nice -19: This is a low-priority job that should not consume many resources. Other processes should take priority.
  • R CMD BATCH: Run the R script in a fresh new R session.
  • --no-save: do not save the workspace in a .RData file.
  • &: Run this job in the background so you can do other stuff in the terminal window.

Alternatives to nohup include screen and Byobu.

13.4 Parallel backends

Choose the parallel backend with the parallelism argument and set the jobs argument to scale the work appropriately.

make(my_plan, parallelism = "future", jobs = 2)

The two primary backends with long term support are clustermq and future. If you can install ZeroMQ, the best choice is usually clustermq. (It is faster than future.) However, future is more accessible: it does not require ZeroMQ, it supports parallel computing on Windows, it can work with more restrictive wall time limits on clusters, and it can deploy targets to Docker images (drake_example("Docker-psock")).

13.5 The clustermq backend

13.5.1 Persistent workers

The make(parallelism = "clustermq", jobs = 2) launches 2 parallel persistent workers. The main process assigns targets to workers, and the workers simultaneously traverse the dependency graph.

13.5.2 Installation

Persistent workers require the clustermq R package, which in turn requires ZeroMQ. Please refer to the clustermq installation guide for specific instructions.

13.5.3 On your local machine

To run your targets in parallel over the cores of your local machine, set the global option below and run make().

options(clustermq.scheduler = "multicore")
make(plan, parallelism = "clustermq", jobs = 2)

13.5.4 On a cluster

Set the clustermq global options to register your computing resources. For SLURM:

options(clustermq.scheduler = "slurm", clustermq.template = "slurm_clustermq.tmpl")

Here, slurm_clustermq.tmpl is a template file with configuration details. Use drake_hpc_template_file() to write one of the available examples.

drake_hpc_template_file("slurm_clustermq.tmpl") # Write the file slurm_clustermq.tmpl.

After modifying slurm_clustermq.tmpl by hand to meet your needs, call make() as usual.

make(plan, parallelism = "clustermq", jobs = 4)

13.6 The future backend

13.6.1 Transient workers

make(parallelism = "future", jobs = 2) launches transient workers to build your targets. When a target is ready to build, the main process creates a fresh worker to build it, and the worker terminates when the target is done. jobs = 2 means that at most 2 transient workers are allowed to run at a given time.


13.6.2 Installation

Install the future package.

install.packages("future") # CRAN release
# Alternatively, install the GitHub development version.
devtools::install_github("HenrikBengtsson/future", ref = "develop")

If you intend to use a cluster, be sure to install the future.batchtools package too. The future ecosystem contains even more packages that extend future’s parallel computing functionality, such as future.callr.

13.6.3 On your local machine

First, select a future plan to tell future how to create the workers. See this table for descriptions of the core options.

future::plan(future::multiprocess) 

Next, run make().

make(plan, parallelism = "future", jobs = 2)

13.6.4 On a cluster

Install the future.batchtools package and use this list to select a future plan that matches your resources. You will also need a compatible template file with configuration details. As with clustermq, drake can generate some examples:

drake_hpc_template_file("slurm_batchtools.tmpl") # Edit by hand.

Next, register the template file with a plan.

library(future.batchtools)
future::plan(batchtools_slurm, template = "slurm_batchtools.tmpl")

Finally, run make().

make(plan, parallelism = "future", jobs = 2)

13.7 Advanced options

13.7.1 Selectivity

Some targets build so quickly that it is not worth sending them to parallel workers. To run these targets locally in the main process, define a special hpc column of your drake plan. Below, NA and TRUE are treated the same, and make(plan, parallelism = "clustermq") only sends model_1 and model_2 to parallel workers.

drake_plan(
  model = target(
    crazy_long_computation(index),
    transform = map(index = c(1, 2))
  ),
  accuracy = target(
    summarize_accuracy(model),
    transform = combine(model),
    hpc = FALSE
  ),
  specificity = target(
    summarize_specificity(model),
    transform = combine(model),
    hpc = FALSE
  ),
  report = target(
    render(knitr_in("results.Rmd"), output_file = file_out("results.html")),
    hpc = FALSE
  )
)

13.7.2 Memory options

By default, make() keeps targets in memory during runtime. Some targets are dependencies of other targets downstream, while others may be no longer actually need to be in memory. The memory_strategy argument to make() allows you to choose the tradeoff that best suits your project. Options:

  • "speed": Once a target is loaded in memory, just keep it there. This choice maximizes speed and hogs memory.
  • "memory": Just before building each new target, unload everything from memory except the target’s direct dependencies. This option conserves memory, but it sacrifices speed because each new target needs to reload any previously unloaded targets from storage.
  • "lookahead": Just before building each new target, search the dependency graph to find targets that will not be needed for the rest of the current make() session. In this mode, targets are only in memory if they need to be loaded, and we avoid superfluous reads from the cache. However, searching the graph takes time, and it could even double the computational overhead for large projects.

13.7.3 Storage options

In make(caching = "main"), the workers send the targets to the main process, and the main process stores them one by one in the cache. caching = "main" is compatible with all storr cache formats, including the more esoteric ones like storr_dbi() and storr_environment().

In make(caching = "worker"), the parallel workers are responsible for writing the targets to the cache. Some output-heavy projects can benefit from this form of parallelism. However, it can sometimes add slowness on clusters due to lag from network file systems. And there are additional restrictions:

  • All the workers must have the same file system and the same working directory as the main process.
  • Only the default storr_rds() cache may be used. Other formats like storr_dbi() and storr_environment() cannot accommodate parallel cache operations.

See the storage chapter for details.

13.7.4 The template argument for persistent workers

For more control and flexibility in the clustermq backend, you can parameterize your template file and use the template argument of make(). For example, suppose you want to programatically set the number of “slots” (basically cores) per job on an SGE system (clustermq guide to SGE setup here). Begin with a parameterized template file sge_clustermq.tmpl with a custom n_slots placeholder.

# File: sge_clustermq.tmpl
# Modified from https://github.com/mschubert/clustermq/wiki/SGE
#$ -N {{ job_name }}               # job name
#$ -t 1-{{ n_jobs }}               # submit jobs as array
#$ -j y                            # combine stdout/error in one file
#$ -o {{ log_file | /dev/null }}   # output file
#$ -cwd                            # use pwd as work dir
#$ -V                              # use environment variable
#$ -pe smp {{ n_slots | 1 }}       # request n_slots cores per job
module load R
ulimit -v $(( 1024 * {{ memory | 4096 }} ))
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'

Then when you run make(), use the template argument to set n_slots.

options(clustermq.scheduler = "sge", clustermq.template = "sge_clustermq.tmpl")
library(drake)
load_mtcars_example()
make(
  my_plan,
  parallelism = "clustermq",
  jobs = 16,
  template = list(n_slots = 4) # Request 4 cores per persistent worker.
)

Custom placeholders like n_slots are processed with the infuser package.

13.7.5 The resources column for transient workers

Different targets may need different resources. For example,

plan <- drake_plan(
  data = download_data(),
  model = big_machine_learning_model(data)
)

The model needs a GPU and multiple CPU cores, and the data only needs the bare minimum resources. Declare these requirements with target(), as below. This is equivalent to adding a new list column to the plan, where each element is a named list for the resources argument of future::future().

plan <- drake_plan(
  data = target(
    download_data(),
    resources = list(cores = 1, gpus = 0)
  ),
  model = target(
    big_machine_learning_model(data),
    resources = list(cores = 4, gpus = 1)
  )
)

plan

str(plan$resources)

Next, plug the names of your resources into the brew patterns of your batchtools template file. The following sge_batchtools.tmpl file shows how to do it, but the file itself probably requires modification before it will work with your own machine.

#!/bin/bash
#$ -cwd
#$ -j y
#$ -o <%= log.file %>
#$ -V
#$ -N <%= job.name %>
#$ -pe smp <%= resources[["cores"]] %> # CPU cores
#$ -l gpu=<%= resources[["gpus"]] %>   # GPUs.
Rscript -e 'batchtools::doJobCollection("<%= uri %>")'
exit 0

Finally, register the template file and run your project.

library(drake)
library(future.batchtools)
future::plan(batchtools_sge, template = "sge_batchtools.tmpl")
make(plan, parallelism = "future", jobs = 2)

13.7.6 Parallel computing within targets

To recruit parallel processes within individual targets, we recommend the future.callr and furrr packages. Usage details depend on the parallel backend you choose for make(). If you must write custom code with mclapply(), please read the subsection below on locked bindings/environments.

13.7.6.1 Locally

Use future.callr and furrr normally.

library(drake)

# The targets just collect the process IDs of the callr processes.
plan <- drake_plan(
  x = furrr::future_map_int(1:2, function(x) Sys.getpid()),
  y = furrr::future_map_int(1:2, function(x) Sys.getpid())
)

# Tell the drake targets to fork up to 4 callr processes.
future::plan(future.callr::callr)

# Build the targets.
make(plan)

# Process IDs of the local workers of x:
readd(x)

If you happen to be using r_make() and want to use parallelism within targets, future::plan(future.callr::callr) needs to go inside _drake.R before the call to drake_config(). In that case your _drake.R file would look like this:

plan <- drake_plan(
       x = furrr::future_map_int(1:2, function(x) Sys.getpid()),
       y = furrr::future_map_int(1:2, function(x) Sys.getpid())
     )
     
future::plan(future.callr::callr)

drake_config(plan)

This will allow you to call r_make like normal. Configuring your _drake.R file like this is necessary because r_make() launches an external R process that runs _drake.R followed by make(). See this section for more information on r_make().

13.7.6.2 Persistent workers

Each persistent worker needs its own future::plan(), which we set with the prework argument of make(). The following example uses SGE. To learn about templates for other clusters, please consult the clustermq documentation.

library(drake)

# The targets just collect the process IDs of the callr processes.
plan <- drake_plan(
  x = furrr::future_map_int(1:2, function(x) Sys.getpid()),
  y = furrr::future_map_int(1:2, function(x) Sys.getpid())
)

# Write a template file for clustermq.
writeLines(
  c(
    "#!/bin/bash",
    "#$ -N {{ job_name }}               # job name",
    "#$ -t 1-{{ n_jobs }}               # submit jobs as array",
    "#$ -j y                            # combine stdout/error in one file",
    "#$ -o {{ log_file | /dev/null }}   # output file",
    "#$ -cwd                            # use pwd as work dir",
    "#$ -V                              # use environment variables",
    "#$ -pe smp 4                       # request 4 cores per job",
    "module load R-qualified/3.5.2      # if loading R from an environment module",
    "ulimit -v $(( 1024 * {{ memory | 4096 }} ))",
    "CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker(\"{{ master }}\")'"
  ),
  "sge_clustermq.tmpl"
)

# Register the scheduler and template file with clustermq.
options(
  clustermq.scheduler = "sge",
  clustermq.template = "sge_clustermq.tmpl"
)

# Build the targets.
make(
  plan,
  parallelism = "clustermq",
  jobs = 2,
  # Each of the two workers can spawn up to 4 local processes.
  prework = quote(future::plan(future.callr::callr))
)

# Process IDs of the local workers of x:
readd(x) 

13.7.6.3 Transient workers

As explained in the future vignette, we can nest our future::plans(). Each target gets its own remote job, and each job can spawn up to 4 local callr processes. The following example uses SGE. To learn about templates for other clusters, please consult the future.batchtools documentation.

library(drake)

# The targets just collect the process IDs of the callr processes.
plan <- drake_plan(
  x = furrr::future_map_int(1:2, function(x) Sys.getpid()),
  y = furrr::future_map_int(1:2, function(x) Sys.getpid())
)

# Write a template file for future.batchtools.
writeLines(
  c(
    "#!/bin/bash",
    "#$ -cwd                # use pwd as work dir",
    "#$ -j y                # combine stdout/error in one file",
    "#$ -o <%= log.file %>  # output file",
    "#$ -V                  # use environment variables",
    "#$ -N <%= job.name %>  # job name",
    "#$ -pe smp 4           # 4 cores per job",
    "module load R          # if loading R from an environment module",
    "Rscript -e 'batchtools::doJobCollection(\"<%= uri %>\")'",
    "exit 0"
  ),
  "sge_batchtools.tmpl"
)

# In our nested plans, each target gets its own remote SGE job,
# and each worker can spawn up to 4 `callr` processes.
future::plan(
  list(
    future::tweak(
      future.batchtools::batchtools_sge,
      template = "sge_batchtools.tmpl"
    ),
    future.callr::callr
  )
)

# Build the targets.
make(plan, parallelism = "future", jobs = 2)

# Process IDs of the local workers of x:
readd(x)

13.7.6.4 Number of local workers per target

By default, future::availableCores() determines the number of local callr workers. To better manage resources, you may wish to further restrict the number of callr workers for all targets in the plan, e.g. future::plan(future::callr, workers = 4L) or:

future::plan(
  list(
    future::tweak(
      future.batchtools::batchtools_sge,
      template = "sge_batchtools.tmpl"
    ),
    future::tweak(future.callr::callr, workers = 4L)
  )
)

Alternatively, you can use chunking to prevent individual targets from using too many workers, e.g. furrr::future_map(.options = furrr::future_options(scheduling = 4)). Here, the scheduling argument sets the average number of futures per worker.

13.7.6.5 Locked binding/environment errors

Some workflows unavoidably use mclapply(), which is known to modify the global environment against drake’s will. If you are stuck, there are two workarounds.

  1. Use make(lock_envir = FALSE).
  2. Use the envir argument of make(). That way, drake locks your special custom environment instead of the global environment.
# Load the main example: https://github.com/wlandau/drake-examples
library(drake)
drake_example("main")
setwd("main")

# Define and populate a special custom environment.
envir <- new.env(parent = globalenv())
source("R/packages.R", local = envir)
source("R/functions.R", local = envir)
source("R/plan.R", local = envir)

# Check the contents of your environments.
ls(envir) # Should have your functions and plan
ls()         # The global environment should only have what you started with.

# Build the targets using your custom environment
make(envir$plan, envir = envir)
Copyright Eli Lilly and Company