# _targets.R
library(targets)
library(tarchetypes)
list(
tar_target(data, get_data()),
tar_target(fast_fit, fit_small_model(data)),
tar_target(slow_fit, fit_slow_model(data)),
tar_target(plot_1, make_plot(fast_fit)),
tar_target(plot_2, make_plot(slow_fit))
)
Appendix A — High-performance computing (the old way)
See the performance chapter for options, settings, and other choices to make parallel and distributed pipelines more efficient.
targets
supports parallel and distributed computing with the tar_make_clustermq()
and tar_make_future()
functions. These functions are like tar_make()
, but they allow multiple targets to run simultaneously over parallel workers.1 These workers can be processes on your local machine, or they can be jobs on a computing cluster. The main process automatically sends a target to a worker as soon as
- The worker is available, and
- All the target’s upstream dependency targets have been checked or built.
Consider the following sketch of a pipeline.
# R console
tar_visnetwork()
When we run this pipeline with high-performance computing, targets
automatically knows to wait for data
to finish running before moving on to the other targets. Once data
is finished, it moves on to targets fast_fit
and slow_fit
. If fast_fit
finishes before slow_fit
, target plot_1
begins even as slow_fit
is still running.2
The following sections cover the implementation details of parallel and distributed pipelines in targets
.
Even if you plan to create a large-scale heavy-duty pipeline with hundreds of time-consuming parallel targets, it is best to start small. First, create a version of the pipeline with a small number of quick-to-run targets, follow directions in the walkthrough chapter to inspect it, and run it locally with tar_make()
. Do not scale up to the full-sized pipeline until you are sure everything is working in the downsized pipeline that runs locally.
A.1 Clustermq
tar_make_clustermq()
uses the clustermq
package, and prior familiarity with clustermq
is extremely helpful for configuring targets
and diagnosing errors. So before you use tar_make_clustermq()
, please read the documentation at https://mschubert.github.io/clustermq/ and try out clustermq
directly. If you plan to use a scheduler like SLURM or SGE, please configure and experiment with clustermq
on your scheduler without targets
. And if you later experience issues with tar_make_clustermq()
, try to isolate the problem by creating a reproducible example that uses clustermq
and not targets
. Peeling back layers can help isolate problems and point toward specific solutions, and targets
is usually one of the outer layers.
A.1.1 Persistent workers
tar_make_clustermq()
uses persistent workers. A persistent worker is an R process that launches early in the pipeline and stays running until the whole pipeline starts to wind down. A persistent worker usually runs multiple targets during its lifecycle, and it is not possible to precisely predict in advance which targets will be assigned to which workers. The video clip below visualizes the concept.
A.1.2 Clustermq installation
Persistent workers require the clustermq
R package, which in turn requires ZeroMQ. Please refer to the clustermq
installation guide for specific instructions.
A.1.3 Automatic configuration with use_targets()
If clustermq
is installed, then use_targets()
function configures clustermq
automatically. use_targets()
tries to detect the cluster you are using and write the appropriate _targets.R
settings clustermq
configuration files commensurate with your system resources. On most systems, this should allow you you to run tar_make_clustermq()
with no extra configuration. If so, you can skip the rest of the clustermq
section. If not, read on to learn how to configure clustermq
for targets
.
A.1.4 Clustermq local configuration
To utilize parallel computing on your local machine, set the clustermq.scheduler
option to "multicore"
or "multiprocess"
in your _targets.R
file. See the configuration instructions for details.
# _targets.R
options(clustermq.scheduler = "multiprocess")
list(
tar_target(data, get_data()),
tar_target(fast_fit, fit_small_model(data)),
tar_target(slow_fit, fit_slow_model(data)),
tar_target(plot_1, make_plot(fast_fit)),
tar_target(plot_2, make_plot(slow_fit))
)
Then, run tar_make_clustermq()
with the appropriate number of workers. These workers are local R processes that run concurrently to run the outdated targets in the pipeline. The workers launch as soon as there is an outdated target with deployment = "worker"
, and they continue running until there are no more targets to run.
# R console
tar_make_clustermq(workers = 2)
A.1.5 Clustermq remote configuration
A cluster is a collection of multiple computers that work together to run computationally demanding workloads. A cluster is capable of running far more tasks simultaneously than a local workstation like a PC or laptop. When configured for a cluster, tar_make_clustermq()
uses clustermq
to run persistent workers as long-running distributed jobs. To manually set the configuration,
- Choose a scheduler listed here that corresponds to your cluster’s resource manager.
- Create a template file that configures the computing requirements and other settings for the cluster.
Supply the scheduler option and template file to the clustermq.scheduler
and clustermq.template
global options in your target script file (default: _targets.R
).
# _targets.R
options(clustermq.scheduler = "sge", clustermq.template = "sge.tmpl")
list(
tar_target(data, get_data()),
tar_target(fast_fit, fit_small_model(data)),
tar_target(slow_fit, fit_slow_model(data)),
tar_target(plot_1, make_plot(fast_fit)),
tar_target(plot_2, make_plot(slow_fit))
)
Above, sge_tmpl
refers to a template file like the one below.
## From https://github.com/mschubert/clustermq/wiki/SGE
#$ -N {{ job_name }} # Worker name.
#$ -t 1-{{ n_jobs }} # Submit workers as an array.
#$ -j y # Combine stdout and stderr into one worker log file.
#$ -o /dev/null # Worker log files.
#$ -cwd # Use project root as working directory.
#$ -V # Use environment variables.
module load R/3.6.3 # Needed if R is an environment module on the cluster.
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ main }}")' # Leave alone.
Then, run tar_make_clustermq()
as before. Instead of running locally, the workers will run as jobs on the cluster. Because they are persistent workers, they launch as soon as there is an outdated target with deployment = "worker"
, and they continue running until they complete their current targets and there are no more targets to assign.
# R console
tar_make_clustermq(workers = 2)
See the examples linked from here to see how this setup works in real-world projects.
A.1.6 Clustermq template file configuration
In addition to configuration options hard-coded in the template file, you can supply custom computing resources with the resources
argument of tar_option_set()
. As an example, let’s use a wildcard for the number of cores per worker on an SGE cluster. In the template file, supply { num_cores }
wildcard to the -pe smp
flag.
#$ -pe smp {{ num_cores }} # Number of cores per worker
#$ -N {{ job_name | 1 }}
#$ -t 1-{{ n_jobs }}
#$ -j y
#$ -o /dev/null
#$ -cwd
#$ -V
module load R/3.6.3
CMQ_AUTH={{ auth }} R --no-save --no-restore -e 'clustermq:::worker("{{ main }}")'
Then, supply the value of num_cores
to the resources
option from within the target script file (default: _targets.R
). In older version of targets
, resources
was a named list. In targets
0.5.0.9000 and above, please create the resources
argument with helpers tar_resources()
and tar_resources_clustermq()
.
# _targets.R
# With older versions of targets:
# tar_option_set(resources = list(num_cores = 2))
# With targets >= 0.5.0.9000:
tar_option_set(
resources = tar_resources(
clustermq = tar_resources_clustermq(template = list(num_cores = 2))
)
)list(
tar_target(...),
# more targets
... )
Finally, call tar_make_clustermq()
normally.
# R console
tar_make_clustermq(workers = 2)
This particular use case comes up when you have custom parallel computing within targets and need to take advantage of multiple cores.
clustermq
resources
Remember: with tar_make_clustermq()
, the workers are persistent, which means there is not a one-to-one correspondence between workers and targets. In the resources
arguemnt of tar_option_set()
, the clustermq
-specific resources apply to the workers, not the targets directly. So in the resources
argument of tar_target()
, any clustermq
-specific will be ignored. tar_option_set()
is the only way to set clustermq
resources.
A.2 Future
tar_make_future()
uses the future
package, and prior familiarity with future
is extremely helpful for configuring targets
and diagnosing errors. So before you use tar_make_future()
, please read the documentation at https://future.futureverse.org/ and try out future
directly, ideally with backends like future.callr
and possibly future.batchtools
. If you plan to use a scheduler like SLURM or SGE, please configure and experiment with future
on your scheduler without targets
. And if you later experience issues with tar_make_future()
, try to isolate the problem by creating a reproducible example that uses future
and not targets
. Same goes for future.batchtools
if applicable. Peeling back layers can help isolate problems and point toward specific solutions, and targets
is usually one of the outer layers.
A.2.1 Transient workers
tar_make_future()
runs transient workers. That means each target gets its own worker which initializes when the target begins and terminates when the target ends. The following video clip demonstrates the concept.
A.2.2 Automatic configuration with use_targets()
If future
and future.batchtools
are installed, then use_targets()
function configures future
automatically. use_targets()
tries to detect the cluster you are using and write the appropriate _targets.R
settings, and if appropriate, the batchtools
configuration file commensurate with your system resources. On most systems, this should allow you you to run tar_make_future()
with no extra configuration. If so, you can skip the rest of the future
section. If not, read on to learn how to configure future
for targets
.
A.2.3 Future installation
Install the future
package.
install.packages("future")
If you intend to use a cluster, be sure to install the future.batchtools
package too.
install.packages("future.batchtools")
The future
ecosystem contains even more packages that extend future
’s parallel computing functionality, such as future.callr
.
install.packages("future.callr")
A.2.4 Future locally
If you ar manually configuring future
/future.batchtools
for your pipeline, declare a future
plan in your target script file (default: _targets.R
). The callr
plan from the future.callr
package is recommended.3 It is crucial that future::plan()
is called in the target script file itself - defining a plan interactively before invoking tar_make_future()
does not leverage the future package.
# _targets.R
library(future)
library(future.callr)
plan(callr)
list(
tar_target(data, get_data()),
tar_target(fast_fit, fit_small_model(data)),
tar_target(slow_fit, fit_slow_model(data)),
tar_target(plot_1, make_plot(fast_fit)),
tar_target(plot_2, make_plot(slow_fit))
)
Then, run tar_make_future()
with the desired number of workers. Here, the workers
argument specifies the maximum number of transient workers to allow at a given time. Some future
plans also have optional workers
arguments that set their own caps.
# R console
tar_make_future(workers = 2)
A.2.5 Future remotely
To run transient workers on a cluster, first install the future.batchtools
package. Then, set one of these plans in your target script file (default: _targets.R
).
# _targets.R
library(future)
library(future.batchtools)
plan(batchtools_sge, template = "sge.tmpl")
list(
tar_target(data, get_data()),
tar_target(fast_fit, fit_small_model(data)),
tar_target(slow_fit, fit_slow_model(data)),
tar_target(plot_1, make_plot(fast_fit)),
tar_target(plot_2, make_plot(slow_fit))
)
Here, our template file sge.tmpl
is configured for batchtools
.
#!/bin/bash
#$ -cwd # Run in the current working directory.
#$ -j y # Direct stdout and stderr to the same file.
#$ -o <%= log.file %> # log file
#$ -V # Use environment variables.
#$ -N <%= job.name %> # job name
module load R/3.6.3 # Uncomment and adjust if R is an environment module.
Rscript -e 'batchtools::doJobCollection("<%= uri %>")' # Leave alone.
exit 0 # Leave alone.
A.2.6 Future configuration
The tar_target()
, tar_target_raw()
, and tar_option_set()
functions accept a resources
argument.4 For example, if our batchtools
template file has a wildcard for the number of cores for a job,
#!/bin/bash
#$ -pe smp <%= resources[["num_cores"]] | 1 %> # Wildcard for cores per job.
#$ -cwd
#$ -j y
#$ -o <%= log.file %>
#$ -V
#$ -N <%= job.name %>
module load R/3.6.3
Rscript -e 'batchtools::doJobCollection("<%= uri %>")'
exit 0
then you can set the number of cores for an individual target using a target-specific future
plan. In the case below, maybe the slow model needs 2 cores to run fast enough. Because of the resources[["num_cores"]]
placeholder in the above template file, we can control the number of cores in each target through its local plan.5
# _targets.R
library(future)
library(future.batchtools)
plan(batchtools_sge, template = "sge.tmpl")
list(
tar_target(data, get_data()),
tar_target(fast_fit, fit_small_model(data)),
# With older version of targets:
# tar_target(slow_fit, fit_slow_model(data), resources = list(num_cores = 2)),
# With targets >= 0.5.0.9000:
tar_target(
slow_fit,fit_slow_model(data),
resources = tar_resources(
future = tar_resources_future(
plan = tweak(
batchtools_sge,template = "sge.tmpl",
resources = list(num_cores = 2)
)
)
)
),tar_target(plot_1, make_plot(fast_fit)),
tar_target(plot_2, make_plot(slow_fit))
)
Then, run tar_make_future()
as usual.
# R console
tar_make_future(workers = 2)
A.3 Advanced
Functions tar_target()
, tar_target_raw()
, and tar_option_set()
support advanced configuration options for heavy-duty pipelines that require high-performance computing.
deployment
: With thedeployment
argument, you can choose to run some targets locally on the main process instead of on a high-performance computing worker. This options is suitable for lightweight targets such as R Markdown reports where runtime is quick and a cluster would be excessive.memory
: Choose whether to retain a target in memory or remove it from memory whenever it is not needed at the moment. This is a tradeoff between memory consumption and storage read speeds, and like all of the options listed here, you can set it on a target-by-target basis. The default settings consume a lot of memory to avoid frequently reading from storage. To keep memory usage down to a minimum, setmemory = "transient"
andgarbage_collection = TRUE
intar_target()
ortar_option_set()
. For cloud-based dynamic files such asformat = "aws_file"
, this memory policy applies to temporary local copies of the file in_targets/scratch/
:"persistent"
means they remain until the end of the pipeline, and"transient"
means they get deleted from the file system as soon as possible. The former conserves bandwidth, and the latter conserves local storage.garbage_collection
: Choose whether to runbase::gc()
just before running the target.storage
: Choose whether the parallel workers or the main process is responsible for saving the target’s value. For slow network file systems on clusters,storage = "main"
is often faster for small numbers of targets. For large numbers of targets or low-bandwidth connections between the main and workers,storage = "worker"
is often faster. Always choosestorage = "main"
if the workers do not have access to the file system with the_targets/
data store.retrieval
: Choose whether the parallel workers or the main process is responsible for reading dependency targets from disk. Should usually be set to whatever you choose forstorage
(default). Always chooseretrieval = "main"
if the workers do not have access to the file system with the_targets/
data store.format
: If your pipeline has large computation, it may also have large data. Consider setting theformat
argument to helptargets
store and retrieve your data faster.error
: Seterror
to"continue"
to let the rest of the pipeline keep running even if a target encounters an error.
A.4 Cloud computing
Right now, targets
does not have built-in cloud-based distributed computing support. However, future development plans include seamless integration with AWS Batch. As a temporary workaround, it is possible to deploy a burstable SLURM cluster using AWS ParallelCluster and leverage targets
’ existing support for traditional schedulers.
Please use
tar_make_clustermq()
ortar_make_future()
instead of multiple concurrent calls totar_make()
. The former is the proper way to use parallel computing intargets
, and the latter will probably break the data store.↩︎Unlike
drake
,targets
applies this behavior not only to stem targets, but also to branches of patterns.↩︎Some alternative local future plans are listed here.↩︎
The
resources
oftar_target()
defaults totar_option_get("resources")
. You can set the default value for all targets usingtar_option_set()
.↩︎In older version of
targets
,resources
was a named list. Intargets
version 0.5.0.9000 and above, please create theresources
argument with helperstar_resources()
andtar_resources_future()
.↩︎