::init()
renv::install("crew")
renv::install("targets")
renv::snapshot() renv
13 Distributed computing
For best results, please use targets
version 1.2.0
or higher and crew
version 0.3.0
or higher. If you cannot install packages globally, consider creating a local renv
package library for your project.
See the performance chapter for options, settings, and other choices to make parallel and distributed pipelines more efficient.
To efficiently process a large and complex pipeline, tar_make()
can run multiple targets at the same time. Thanks to integration with crew
and blazing fast scheduling from mirai
behind the scenes, those targets can run on a variety of high-performance computing platforms, and they can scale out to the hundreds and beyond.
13.1 How it works
- Write your pipeline as usual, but set the
controller
argument oftar_option_set
to thecrew
controller of your choice. - Run the pipeline with a simple
tar_make()
.
The crew
controller from (1) allows tar_make()
to launch external R processes called “workers” which can each run one or more targets. By delegating long-running targets to these workers, the local R session is free to focus on other tasks, and the pipeline finishes faster.
13.2 Example
The following _targets.R
file uses a local process controller with 2 workers. That means up to 2 workers can be running at any given time, and each worker is an separate R process on the same computer as the local R process.
# _targets.R
library(targets)
library(tarchetypes)
library(crew)
tar_option_set(
controller = crew_controller_local(workers = 2)
)tar_source()
list(
tar_target(name = data, command = get_data()),
tar_target(name = model1, command = run_model1(data)),
tar_target(name = model2, command = run_model2(data)),
tar_target(name = model3, command = run_model3(data)),
tar_target(name = plot1, command = plot_model(model1)),
tar_target(name = plot2, command = plot_model(model2)),
tar_target(name = plot3, command = plot_model(model3))
)
# R console
tar_visnetwork(targets_only = TRUE)
Run the pipeline with a simple call to tar_make()
. Please note that real-life pipelines will have longer execution times, especially for the models.
# R console
tar_make()
#> + data dispatched
#> ✔ data completed [0ms, 46 B]
#> + model1 dispatched
#> ✔ model1 completed [0ms, 46 B]
#> + model2 dispatched
#> ✔ model2 completed [1ms, 46 B]
#> + model3 dispatched
#> ✔ model3 completed [0ms, 46 B]
#> + plot1 dispatched
#> ✔ plot1 completed [0ms, 46 B]
#> + plot2 dispatched
#> ✔ plot2 completed [1ms, 46 B]
#> + plot3 dispatched
#> ✔ plot3 completed [0ms, 46 B]
#> ✔ ended pipeline [187ms, 7 completed, 0 skipped]
Let’s talk through what happens in the above call to tar_make()
. First, a new worker launches and sends the data
target to the crew
queue. After the data
target completes, all three models are ready to begin. A second worker automatically launches to meet the increased demand of the workload, and each of the two workers starts to run a model. After one of the models finishes, its worker is free to either run the downstream plot or the third model. The process continues until all the targets are complete. The workers shut down when the pipeline is done.
13.3 Configuration and auto-scaling
Adding more workers might speed up your pipeline, but not always. Beyond a certain point, the efficiency gains will diminish, and the extra workers will have nothing to do. With proper configuration, you can find the right balance.
As mentioned above, new workers launch automatically in response to increasing demand. By default, they stay running for the duration of the pipeline. However, you can customize the controller to scale down when circumstances allow, which helps help avoid wasting resources1 The most useful arguments for down-scaling, in order of importance, are:
seconds_idle
: automatically shut down a worker if it spends too long waiting for a target.tasks_max
: maximum number of tasks a worker can run before shutting down.seconds_wall
: soft wall time of a worker.
On the other hand, it is not always helpful to eagerly down-scale workers. Because the workload can fluctuate rapidly, some workers may quit and relaunch so often that it creates noticeable overhead. crew
and its plugins try to set reasonable defaults, but you may need to adjust for optimal efficiency.
13.4 Plugins
crew
is a platform for multiple computing platforms, not just local processes, but also traditional high-performance computing systems and cloud computing services. For example, to run each worker as a job on a Sun Grid Engine cluster, use crew_controller_sge()
from the crew.cluster
package.
# _targets.R
library(targets)
library(tarchetypes)
library(crew.cluster)
tar_option_set(
controller = crew_controller_sge(
workers = 3,
options_cluster = crew_options_sge(
script_lines = "module load R",
log_output = "log_folder/"
)
)
)tar_source()
list(
tar_target(name = data, command = get_data()),
tar_target(name = model1, command = run_model1(data)),
tar_target(name = model2, command = run_model2(data)),
tar_target(name = model3, command = run_model3(data)),
tar_target(name = plot1, command = plot_model(model1)),
tar_target(name = plot2, command = plot_model(model2)),
tar_target(name = plot3, command = plot_model(model3))
)
If crew.cluster
and other official packages do not meet your needs, then you can write your own launcher plugin tailored to your own specific computing environment. crew
makes this process straightforward, and the vignette at https://wlandau.github.io/crew/articles/launcher_plugins.html walks through the details step by step.
13.5 Heterogeneous workers
Different targets may have different computing requirements, from memory to GPUs and beyond. You can send different targets to different kinds of workers using crew
controller groups. In the _targets.R
file below, we create a local process controller alongside a Sun Grid Engine controller a memory requirement and a GPU. We combine them in a crew
controller group which we supply to the controller
argument of tar_option_set
. Next, we use tar_resources()
and tar_resources_crew()
to tell model2
to run on Sun Grid Engine and all other targets to run on local processes. The deployment = "main"
argument tells the plots to avoid worker processes altogether and run on the main central R process.
# _targets.R
library(targets)
library(tarchetypes)
library(crew)
library(crew.cluster)
<- crew_controller_local(
controller_local name = "my_local_controller",
workers = 2,
seconds_idle = 10
)<- crew_controller_sge(
controller_sge name = "my_sge_controller",
workers = 3,
seconds_idle = 15,
options_cluster = crew_options_sge(
script_lines = "module load R",
log_output = "log_folder/",
memory_gigabytes_required = 64,
gpu = 1
)
)tar_option_set(
controller = crew_controller_group(controller_local, controller_sge),
resources = tar_resources(
crew = tar_resources_crew(controller = "my_local_controller")
)
)tar_source()
list(
tar_target(name = data, command = get_data()),
tar_target(name = model1, command = run_model1(data)),
tar_target(
name = model2,
command = run_model2(data),
resources = tar_resources(
crew = tar_resources_crew(controller = "my_sge_controller")
)
),tar_target(name = model3, run_model3(data)),
tar_target(name = plot1, command = plot_model(model1), deployment = "main"),
tar_target(name = plot2, command = plot_model(model2), deployment = "main"),
tar_target(name = plot3, command = plot_model(model3), deployment = "main")
)
13.6 Resource usage
The autometric
package can monitor the CPU and memory consumption of the various processes in a targets
pipeline, both local processes and parallel workers. Please read https://wlandau.github.io/crew/articles/logging.html for details and examples.
13.7 Thanks
The crew
package is an extension of mirai
, a sleek and sophisticated task scheduler that efficiently processes intense workloads. crew
is only possible because of the amazing work by Charlie Gao in packages mirai
and nanonext
.
Automatic down-scaling also helps comply with wall time restrictions on shared computing clusters. See the arguments of
crew_controller_local()
for details.↩︎