Appendix A — High-performance computing (the old way)

Performance

See the performance chapter for options, settings, and other choices to make parallel and distributed pipelines more efficient.

targets supports parallel and distributed computing with the tar_make_clustermq() and tar_make_future() functions. These functions are like tar_make(), but they allow multiple targets to run simultaneously over parallel workers.1 These workers can be processes on your local machine, or they can be jobs on a computing cluster. The main process automatically sends a target to a worker as soon as

  1. The worker is available, and
  2. All the target’s upstream dependency targets have been checked or built.

Consider the following sketch of a pipeline.

# _targets.R
library(targets)
list(
  tar_target(data, get_data()),
  tar_target(fast_fit, fit_small_model(data)),
  tar_target(slow_fit, fit_slow_model(data)),
  tar_target(plot_1, make_plot(fast_fit)),
  tar_target(plot_2, make_plot(slow_fit))
)
# R console
tar_visnetwork()

When we run this pipeline with high-performance computing, targets automatically knows to wait for data to finish running before moving on to the other targets. Once data is finished, it moves on to targets fast_fit and slow_fit. If fast_fit finishes before slow_fit, target plot_1 begins even as slow_fit is still running.2

The following sections cover the implementation details of parallel and distributed pipelines in targets.

Start small

Even if you plan to create a large-scale heavy-duty pipeline with hundreds of time-consuming parallel targets, it is best to start small. First, create a version of the pipeline with a small number of quick-to-run targets, follow directions in the walkthrough chapter to inspect it, and run it locally with tar_make(). Do not scale up to the full-sized pipeline until you are sure everything is working in the downsized pipeline that runs locally.

A.1 Clustermq

tar_make_clustermq() uses the clustermq package, and prior familiarity with clustermq is extremely helpful for configuring targets and diagnosing errors. So before you use tar_make_clustermq(), please read the documentation at https://mschubert.github.io/clustermq/ and try out clustermq directly. If you plan to use a scheduler like SLURM or SGE, please configure and experiment with clustermq on your scheduler without targets. And if you later experience issues with tar_make_clustermq(), try to isolate the problem by creating a reproducible example that uses clustermq and not targets. Peeling back layers can help isolate problems and point toward specific solutions, and targets is usually one of the outer layers.

A.1.1 Persistent workers

tar_make_clustermq() uses persistent workers. A persistent worker is an R process that launches early in the pipeline and stays running until the whole pipeline starts to wind down. A persistent worker usually runs multiple targets during its lifecycle, and it is not possible to precisely predict in advance which targets will be assigned to which workers. The video clip below visualizes the concept.

A.1.2 Clustermq installation

Persistent workers require the clustermq R package, which in turn requires ZeroMQ. Please refer to the clustermq installation guide for specific instructions.

A.1.3 Automatic configuration with use_targets()

If clustermq is installed, then use_targets() function configures clustermq automatically. use_targets() tries to detect the cluster you are using and write the appropriate _targets.R settings clustermq configuration files commensurate with your system resources. On most systems, this should allow you you to run tar_make_clustermq() with no extra configuration. If so, you can skip the rest of the clustermq section. If not, read on to learn how to configure clustermq for targets.

A.1.4 Clustermq local configuration

To utilize parallel computing on your local machine, set the clustermq.scheduler option to "multicore" or "multiprocess" in your _targets.R file. See the configuration instructions for details.

# _targets.R
options(clustermq.scheduler = "multiprocess")
list(
  tar_target(data, get_data()),
  tar_target(fast_fit, fit_small_model(data)),
  tar_target(slow_fit, fit_slow_model(data)),
  tar_target(plot_1, make_plot(fast_fit)),
  tar_target(plot_2, make_plot(slow_fit))
)

Then, run tar_make_clustermq() with the appropriate number of workers. These workers are local R processes that run concurrently to run the outdated targets in the pipeline. The workers launch as soon as there is an outdated target with deployment = "worker", and they continue running until there are no more targets to run.

# R console
tar_make_clustermq(workers = 2)

A.1.5 Clustermq remote configuration

A cluster is a collection of multiple computers that work together to run computationally demanding workloads. A cluster is capable of running far more tasks simultaneously than a local workstation like a PC or laptop. When configured for a cluster, tar_make_clustermq() uses clustermq to run persistent workers as long-running distributed jobs. To manually set the configuration,

  1. Choose a scheduler listed here that corresponds to your cluster’s resource manager.
  2. Create a template file that configures the computing requirements and other settings for the cluster.

Supply the scheduler option and template file to the clustermq.scheduler and clustermq.template global options in your target script file (default: _targets.R).

# _targets.R
options(clustermq.scheduler = "sge", clustermq.template = "sge.tmpl")
list(
  tar_target(data, get_data()),
  tar_target(fast_fit, fit_small_model(data)),
  tar_target(slow_fit, fit_slow_model(data)),
  tar_target(plot_1, make_plot(fast_fit)),
  tar_target(plot_2, make_plot(slow_fit))
)

Above, sge_tmpl refers to a template file like the one below.

## From https://github.com/mschubert/clustermq/wiki/SGE
#$ -N {{ job_name }}  # Worker name.
#$ -t 1-{{ n_jobs }}  # Submit workers as an array.
#$ -j y               # Combine stdout and stderr into one worker log file.
#$ -o /dev/null       # Worker log files.
#$ -cwd               # Use project root as working directory.
#$ -V                 # Use environment variables.
module load R/3.6.3   # Needed if R is an environment module on the cluster.
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ main }}")' # Leave alone.

Then, run tar_make_clustermq() as before. Instead of running locally, the workers will run as jobs on the cluster. Because they are persistent workers, they launch as soon as there is an outdated target with deployment = "worker", and they continue running until they complete their current targets and there are no more targets to assign.

# R console
tar_make_clustermq(workers = 2)

See the examples linked from here to see how this setup works in real-world projects.

A.1.6 Clustermq template file configuration

In addition to configuration options hard-coded in the template file, you can supply custom computing resources with the resources argument of tar_option_set(). As an example, let’s use a wildcard for the number of cores per worker on an SGE cluster. In the template file, supply { num_cores } wildcard to the -pe smp flag.

#$ -pe smp {{ num_cores }} # Number of cores per worker
#$ -N {{ job_name | 1 }}
#$ -t 1-{{ n_jobs }}
#$ -j y
#$ -o /dev/null
#$ -cwd
#$ -V
module load R/3.6.3
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ main }}")'

Then, supply the value of num_cores to the resources option from within the target script file (default: _targets.R). In older version of targets, resources was a named list. In targets 0.5.0.9000 and above, please create the resources argument with helpers tar_resources() and tar_resources_clustermq().

# _targets.R
# With older versions of targets:
# tar_option_set(resources = list(num_cores = 2))
# With targets >= 0.5.0.9000:
tar_option_set(
  resources = tar_resources(
    clustermq = tar_resources_clustermq(template = list(num_cores = 2))
  )
)
list(
  tar_target(...),
  ... # more targets
)

Finally, call tar_make_clustermq() normally.

# R console
tar_make_clustermq(workers = 2)

This particular use case comes up when you have custom parallel computing within targets and need to take advantage of multiple cores.

clustermq resources

Remember: with tar_make_clustermq(), the workers are persistent, which means there is not a one-to-one correspondence between workers and targets. In the resources arguemnt of tar_option_set(), the clustermq-specific resources apply to the workers, not the targets directly. So in the resources argument of tar_target(), any clustermq-specific will be ignored. tar_option_set() is the only way to set clustermq resources.

A.2 Future

tar_make_future() uses the future package, and prior familiarity with future is extremely helpful for configuring targets and diagnosing errors. So before you use tar_make_future(), please read the documentation at https://future.futureverse.org/ and try out future directly, ideally with backends like future.callr and possibly future.batchtools. If you plan to use a scheduler like SLURM or SGE, please configure and experiment with future on your scheduler without targets. And if you later experience issues with tar_make_future(), try to isolate the problem by creating a reproducible example that uses future and not targets. Same goes for future.batchtools if applicable. Peeling back layers can help isolate problems and point toward specific solutions, and targets is usually one of the outer layers.

A.2.1 Transient workers

tar_make_future() runs transient workers. That means each target gets its own worker which initializes when the target begins and terminates when the target ends. The following video clip demonstrates the concept.


A.2.2 Automatic configuration with use_targets()

If future and future.batchtools are installed, then use_targets() function configures future automatically. use_targets() tries to detect the cluster you are using and write the appropriate _targets.R settings, and if appropriate, the batchtools configuration file commensurate with your system resources. On most systems, this should allow you you to run tar_make_future() with no extra configuration. If so, you can skip the rest of the future section. If not, read on to learn how to configure future for targets.

A.2.3 Future installation

Install the future package.

install.packages("future")

If you intend to use a cluster, be sure to install the future.batchtools package too.

install.packages("future.batchtools")

The future ecosystem contains even more packages that extend future’s parallel computing functionality, such as future.callr.

install.packages("future.callr")

A.2.4 Future locally

If you ar manually configuring future/future.batchtools for your pipeline, declare a future plan in your target script file (default: _targets.R). The callr plan from the future.callr package is recommended.3 It is crucial that future::plan() is called in the target script file itself - defining a plan interactively before invoking tar_make_future() does not leverage the future package.

# _targets.R
library(future)
library(future.callr)
plan(callr)
list(
  tar_target(data, get_data()),
  tar_target(fast_fit, fit_small_model(data)),
  tar_target(slow_fit, fit_slow_model(data)),
  tar_target(plot_1, make_plot(fast_fit)),
  tar_target(plot_2, make_plot(slow_fit))
)

Then, run tar_make_future() with the desired number of workers. Here, the workers argument specifies the maximum number of transient workers to allow at a given time. Some future plans also have optional workers arguments that set their own caps.

# R console
tar_make_future(workers = 2)

A.2.5 Future remotely

To run transient workers on a cluster, first install the future.batchtools package. Then, set one of these plans in your target script file (default: _targets.R).

# _targets.R
library(future)
library(future.batchtools)
plan(batchtools_sge, template = "sge.tmpl")
list(
  tar_target(data, get_data()),
  tar_target(fast_fit, fit_small_model(data)),
  tar_target(slow_fit, fit_slow_model(data)),
  tar_target(plot_1, make_plot(fast_fit)),
  tar_target(plot_2, make_plot(slow_fit))
)

Here, our template file sge.tmpl is configured for batchtools.

#!/bin/bash
#$ -cwd               # Run in the current working directory.
#$ -j y               # Direct stdout and stderr to the same file.
#$ -o <%= log.file %> # log file
#$ -V                 # Use environment variables.
#$ -N <%= job.name %> # job name
module load R/3.6.3   # Uncomment and adjust if R is an environment module.
Rscript -e 'batchtools::doJobCollection("<%= uri %>")' # Leave alone.
exit 0 # Leave alone.

A.2.6 Future configuration

The tar_target(), tar_target_raw(), and tar_option_set() functions accept a resources argument.4 For example, if our batchtools template file has a wildcard for the number of cores for a job,

#!/bin/bash
#$ -pe smp <%= resources[["num_cores"]] | 1 %> # Wildcard for cores per job.
#$ -cwd
#$ -j y
#$ -o <%= log.file %>
#$ -V
#$ -N <%= job.name %>
module load R/3.6.3
Rscript -e 'batchtools::doJobCollection("<%= uri %>")'
exit 0

then you can set the number of cores for an individual target using a target-specific future plan. In the case below, maybe the slow model needs 2 cores to run fast enough. Because of the resources[["num_cores"]] placeholder in the above template file, we can control the number of cores in each target through its local plan.5

# _targets.R
library(future)
library(future.batchtools)
plan(batchtools_sge, template = "sge.tmpl")
list(
  tar_target(data, get_data()),
  tar_target(fast_fit, fit_small_model(data)),
  # With older version of targets:
  # tar_target(slow_fit, fit_slow_model(data), resources = list(num_cores = 2)),
  # With targets >= 0.5.0.9000:
  tar_target(
    slow_fit,
    fit_slow_model(data),
    resources = tar_resources(
      future = tar_resources_future(
        plan = tweak(
          batchtools_sge,
          template = "sge.tmpl",
          resources = list(num_cores = 2)
        )
      )
    )
  ),
  tar_target(plot_1, make_plot(fast_fit)),
  tar_target(plot_2, make_plot(slow_fit))
)

Then, run tar_make_future() as usual.

# R console
tar_make_future(workers = 2)

A.3 Advanced

Functions tar_target(), tar_target_raw(), and tar_option_set() support advanced configuration options for heavy-duty pipelines that require high-performance computing.

  • deployment: With the deployment argument, you can choose to run some targets locally on the main process instead of on a high-performance computing worker. This options is suitable for lightweight targets such as R Markdown reports where runtime is quick and a cluster would be excessive.
  • memory: Choose whether to retain a target in memory or remove it from memory whenever it is not needed at the moment. This is a tradeoff between memory consumption and storage read speeds, and like all of the options listed here, you can set it on a target-by-target basis. The default settings consume a lot of memory to avoid frequently reading from storage. To keep memory usage down to a minimum, set memory = "transient" and garbage_collection = TRUE in tar_target() or tar_option_set(). For cloud-based dynamic files such as format = "aws_file", this memory policy applies to temporary local copies of the file in _targets/scratch/: "persistent" means they remain until the end of the pipeline, and "transient" means they get deleted from the file system as soon as possible. The former conserves bandwidth, and the latter conserves local storage.
  • garbage_collection: Choose whether to run base::gc() just before running the target.
  • storage: Choose whether the parallel workers or the main process is responsible for saving the target’s value. For slow network file systems on clusters, storage = "main" is often faster for small numbers of targets. For large numbers of targets or low-bandwidth connections between the main and workers, storage = "worker" is often faster. Always choose storage = "main" if the workers do not have access to the file system with the _targets/ data store.
  • retrieval: Choose whether the parallel workers or the main process is responsible for reading dependency targets from disk. Should usually be set to whatever you choose for storage (default). Always choose retrieval = "main" if the workers do not have access to the file system with the _targets/ data store.
  • format: If your pipeline has large computation, it may also have large data. Consider setting the format argument to help targets store and retrieve your data faster.
  • error: Set error to "continue" to let the rest of the pipeline keep running even if a target encounters an error.

A.4 Cloud computing

Right now, targets does not have built-in cloud-based distributed computing support. However, future development plans include seamless integration with AWS Batch. As a temporary workaround, it is possible to deploy a burstable SLURM cluster using AWS ParallelCluster and leverage targetsexisting support for traditional schedulers.


  1. Please use tar_make_clustermq() or tar_make_future() instead of multiple concurrent calls to tar_make(). The former is the proper way to use parallel computing in targets, and the latter will probably break the data store.↩︎

  2. Unlike drake, targets applies this behavior not only to stem targets, but also to branches of patterns.↩︎

  3. Some alternative local future plans are listed here.↩︎

  4. The resources of tar_target() defaults to tar_option_get("resources"). You can set the default value for all targets using tar_option_set().↩︎

  5. In older version of targets, resources was a named list. In targets version 0.5.0.9000 and above, please create the resources argument with helpers tar_resources() and tar_resources_future().↩︎