Chapter 13 High-performance computing

targets supports high-performance computing with the tar_make_clustermq() and tar_make_future() functions. These functions are like tar_make(), but they allow multiple targets to run simultaneously over parallel workers. These workers can be processes on your local machine, or they can be jobs on a computing cluster. The main process automatically sends a target to a worker as soon as

  1. The worker is available, and
  2. All the target’s upstream dependency targets have been checked or built.

Practical real-world examples of high-performance computing in targets can be found at the examples linked from here. But for the purposes of explaining the mechanics of the package, consider the following sketch of a pipeline.

# _targets.R
library(targets)
list(
  tar_target(data, get_data()),
  tar_target(fast_fit, fit_small_model(data)),
  tar_target(slow_fit, fit_slow_model(data)),
  tar_target(plot_1, make_plot(fast_fit)),
  tar_target(plot_2, make_plot(slow_fit))
)
# R console
tar_visnetwork()

When we run this pipeline with high-performance computing, targets automatically knows to wait for data to finish running before moving on to the other targets. Once data is finished, it moves on to targets fast_fit and slow_fit. If fast_fit finishes before slow_fit, target plot_1 begins even as slow_fit is still running. Unlike drake, targets applies this behavior not only to stem targets, but also to branches of patterns.

The following sections cover the mechanics and configuration details of high-performance computing in targets.

13.1 Clustermq

tar_make_clustermq() uses the clustermq package, and prior familiarity with clustermq is extremely helpful for configuring targets and diagnosing errors. So before you use tar_make_clustermq(), please read the documentation at https://mschubert.github.io/clustermq/ and try out clustermq directly. If you plan to use a scheduler like SLURM or SGE, please configure and experiment with clustermq on your scheduler without targets. And if you later experience issues with tar_make_clustermq(), try to isolate the problem by creating a reproducible example that uses clustermq and not targets. Peeling back layers can help isolate problems and point toward specific solutions, and targets is usually one of the outer layers.

13.1.1 Persistent workers

tar_make_clustermq() uses persistent workers. That means all the parallel processes launch together as soon as there is a target to build, and all the processes keep running until the pipeline winds down. The video clip below visualizes the concept.

13.1.2 Clustermq installation

Persistent workers require the clustermq R package, which in turn requires ZeroMQ. Please refer to the clustermq installation guide for specific instructions.

13.1.3 Clustermq locally

When you write your target script file (default: _targets.R) be sure to set the clustermq.scheduler global option to a a local scheduler like "multiprocess". Many of the supported schedulers and their configuration details are listed here.

# _targets.R
options(clustermq.scheduler = "multiprocess")
list(
  tar_target(data, get_data()),
  tar_target(fast_fit, fit_small_model(data)),
  tar_target(slow_fit, fit_slow_model(data)),
  tar_target(plot_1, make_plot(fast_fit)),
  tar_target(plot_2, make_plot(slow_fit))
)

Then, run tar_make_clustermq() with the appropriate number of workers.

# R console
tar_make_clustermq(workers = 2)

13.1.4 Clustermq remotely

For parallel computing on a cluster,

  1. Choose a scheduler listed here that corresponds to your cluster’s resource manager.
  2. Create a template file that configures the computing requirements and other settings for the cluster.

Supply the scheduler option and template file to the clustermq.scheduler and clustermq.template global options in your target script file (default: _targets.R).

# _targets.R
options(clustermq.scheduler = "sge", clustermq.template = "sge.tmpl")
list(
  tar_target(data, get_data()),
  tar_target(fast_fit, fit_small_model(data)),
  tar_target(slow_fit, fit_slow_model(data)),
  tar_target(plot_1, make_plot(fast_fit)),
  tar_target(plot_2, make_plot(slow_fit))
)

Above, sge_tmpl refers to a template file like the one below.

## From https://github.com/mschubert/clustermq/wiki/SGE
#$ -N {{ job_name }}  # Worker name.
#$ -t 1-{{ n_jobs }}  # Submit workers as an array.
#$ -j y               # Combine stdout and stderr into one worker log file.
#$ -o /dev/null       # Worker log files.
#$ -cwd               # Use project root as working directory.
#$ -V                 # Use environment variables.
module load R/3.6.3   # Needed if R is an environment module on the cluster.
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ main }}")' # Leave alone.

Then, run tar_make_clustermq() as before.

# R console
tar_make_clustermq(workers = 2)

See the examples linked from here to see how this setup works in real-world projects.

13.1.5 Clustermq configuration

In addition to configuration options hard-coded in the template file, you can supply custom computing resources with the resources argument of tar_option_set(). As an example, let’s use a wildcard for the number of cores per worker on an SGE cluster. In the template file, supply {{ num_cores }} wildcard to the -pe smp flag.

#$ -pe smp {{ num_cores }} # Number of cores per worker
#$ -N {{ job_name | 1 }}
#$ -t 1-{{ n_jobs }}
#$ -j y
#$ -o /dev/null
#$ -cwd
#$ -V
module load R/3.6.3
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ main }}")'

Then, supply the value of num_cores to the resources option from within the target script file (default: _targets.R). In older version of targets, resources was a named list. In targets 0.5.0.9000 and above, please create the resources argument with helpers tar_resources() and tar_resources_clustermq().

# _targets.R
# With older versions of targets:
# tar_option_set(resources = list(num_cores = 2))
# With targets >= 0.5.0.9000:
tar_option_set(
  resources = tar_resources(
    clustermq = tar_resources_clustermq(template = list(num_cores = 2))
  )
)
list(
  tar_target(...),
  ... # more targets
)

Finally, call tar_make_clustermq() normally.

# R console
tar_make_clustermq(workers = 2)

This particular use case comes up when you have custom parallel computing within targets and need to take advantage of multiple cores.

13.2 Future

tar_make_future() uses the future package, and prior familiarity with future is extremely helpful for configuring targets and diagnosing errors. So before you use tar_make_future(), please read the documentation at https://future.futureverse.org/ and try out future directly, ideally with backends like future.callr and possibly future.batchtools. If you plan to use a scheduler like SLURM or SGE, please configure and experiment with future on your scheduler without targets. And if you later experience issues with tar_make_future(), try to isolate the problem by creating a reproducible example that uses future and not targets. Same goes for future.batchtools if applicable. Peeling back layers can help isolate problems and point toward specific solutions, and targets is usually one of the outer layers.

13.2.1 Transient workers

tar_make_future() runs transient workers. That means each target gets its own worker which initializes when the target begins and terminates when the target ends. The following video clip demonstrates the concept.


13.2.2 Future installation

Install the future package.

install.packages("future")

If you intend to use a cluster, be sure to install the future.batchtools package too.

install.packages("future.batchtools")

The future ecosystem contains even more packages that extend future’s parallel computing functionality, such as future.callr.

install.packages("future.callr")

13.2.3 Future locally

To parallelize targets over multiple processes on your local machine, declare a future plan in your target script file (default: _targets.R). The callr plan from the future.callr package is recommended.15 It is crucial that future::plan() is called in the target script file itself - defining a plan interactively before invoking tar_make_future() does not leverage the future package.

# _targets.R
library(future)
library(future.callr)
plan(callr)
list(
  tar_target(data, get_data()),
  tar_target(fast_fit, fit_small_model(data)),
  tar_target(slow_fit, fit_slow_model(data)),
  tar_target(plot_1, make_plot(fast_fit)),
  tar_target(plot_2, make_plot(slow_fit))
)

Then, run tar_make_future() with the desired number of workers. Here, the workers argument specifies the maximum number of transient workers to allow at a given time. Some future plans also have optional workers arguments that set their own caps.

# R console
tar_make_future(workers = 2)

13.2.4 Future remotely

To run transient workers on a cluster, first install the future.batchtools package. Then, set one of these plans in your target script file (default: _targets.R).

# _targets.R
library(future)
library(future.batchtools)
plan(batchtools_sge, template = "sge.tmpl")
list(
  tar_target(data, get_data()),
  tar_target(fast_fit, fit_small_model(data)),
  tar_target(slow_fit, fit_slow_model(data)),
  tar_target(plot_1, make_plot(fast_fit)),
  tar_target(plot_2, make_plot(slow_fit))
)

Here, our template file sge.tmpl is configured for batchtools.

#!/bin/bash
#$ -cwd               # Run in the current working directory.
#$ -j y               # Direct stdout and stderr to the same file.
#$ -o <%= log.file %> # log file
#$ -V                 # Use environment variables.
#$ -N <%= job.name %> # job name
module load R/3.6.3   # Uncomment and adjust if R is an environment module.
Rscript -e 'batchtools::doJobCollection("<%= uri %>")' # Leave alone.
exit 0 # Leave alone.

13.2.5 Future configuration

The tar_target(), tar_target_raw(), and tar_option_set() functions accept a resources argument.16 For example, if our batchtools template file has a wildcard for the number of cores for a job,

#!/bin/bash
#$ -pe smp <%= resources[["num_cores"]] | 1 %> # Wildcard for cores per job.
#$ -cwd
#$ -j y
#$ -o <%= log.file %>
#$ -V
#$ -N <%= job.name %>
module load R/3.6.3
Rscript -e 'batchtools::doJobCollection("<%= uri %>")'
exit 0

then you can set the number of cores for an individual target using a target-specific future plan. In the case below, maybe the slow model needs 2 cores to run fast enough. Because of the resources[["num_cores"]] placeholder in the above template file, we can control the number of cores in each target through its local plan.17

# _targets.R
library(future)
library(future.batchtools)
plan(batchtools_sge, template = "sge.tmpl")
list(
  tar_target(data, get_data()),
  tar_target(fast_fit, fit_small_model(data)),
  # With older version of targets:
  # tar_target(slow_fit, fit_slow_model(data), resources = list(num_cores = 2)),
  # With targets >= 0.5.0.9000:
  tar_target(
    slow_fit,
    fit_slow_model(data),
    resources = tar_resources(
      future = tar_resources_future(
        plan = tweak(
          batchtools_sge,
          template = "sge.tmpl",
          resources = list(num_cores = 2)
        )
      )
    )
  ),
  tar_target(plot_1, make_plot(fast_fit)),
  tar_target(plot_2, make_plot(slow_fit))
)

Then, run tar_make_future() as usual.

# R console
tar_make_future(workers = 2)

13.3 Advanced

Functions tar_target(), tar_target_raw(), and tar_option_set() support advanced configuration options for heavy-duty pipelines that require high-performance computing.

  • deployment: With the deployment argument, you can choose to run some targets locally on the main process instead of on a high-performance computing worker. This options is suitable for lightweight targets such as R Markdown reports where runtime is quick and a cluster would be excessive.
  • memory: Choose whether to retain a target in memory or remove it from memory whenever it is not needed at the moment. This is a tradeoff between memory consumption and storage read speeds, and like all of the options listed here, you can set it on a target-by-target basis. The default settings consume a lot of memory to avoid frequently reading from storage. To keep memory usage down to a minimum, set memory = "transient" and garbage_collection = TRUE in tar_target() or tar_option_set(). For cloud-based dynamic files such as format = "aws_file", this memory policy applies to temporary local copies of the file in _targets/scratch/: "persistent" means they remain until the end of the pipeline, and "transient" means they get deleted from the file system as soon as possible. The former conserves bandwidth, and the latter conserves local storage.
  • garbage_collection: Choose whether to run base::gc() just before running the target.
  • storage: Choose whether the parallel workers or the main process is responsible for saving the target’s value. For slow network file systems on clusters, storage = "main" is often faster for small numbers of targets. For large numbers of targets or low-bandwidth connections between the main and workers, storage = "worker" is often faster. Always choose storage = "main" if the workers do not have access to the file system with the _targets/ data store.
  • retrieval: Choose whether the parallel workers or the main process is responsible for reading dependency targets from disk. Should usually be set to whatever you choose for storage (default). Always choose retrieval = "main" if the workers do not have access to the file system with the _targets/ data store.
  • format: If your pipeline has large computation, it may also have large data. Consider setting the format argument to help targets store and retrieve your data faster.
  • error: Set error to "continue" to let the rest of the pipeline keep running even if a target encounters an error.

  1. Some alternative local future plans are listed here.↩︎

  2. The resources of tar_target() defaults to tar_option_get("resources"). You can set the default value for all targets using tar_option_set().↩︎

  3. In older version of targets, resources was a named list. In targets version 0.5.0.9000 and above, please create the resources argument with helpers tar_resources() and tar_resources_future().↩︎

Copyright Eli Lilly and Company