::init()
renv::install("crew")
renv::install("targets")
renv::snapshot() renv
13 Distributed computing
For best results, please use targets
version 1.2.0
or higher and crew
version 0.3.0
or higher. If you cannot install packages globally, consider creating a local renv
package library for your project.
See the performance chapter for options, settings, and other choices to make parallel and distributed pipelines more efficient.
To efficiently process a large and complex pipeline, tar_make()
can run multiple targets at the same time. Thanks to integration with crew
and blazing fast scheduling from mirai
behind the scenes, those targets can run on a variety of high-performance computing platforms, and they can scale out to the hundreds and beyond.
13.1 How it works
- Write your pipeline as usual, but set the
controller
argument oftar_option_set
to thecrew
controller of your choice. - Run the pipeline with a simple
tar_make()
.
The crew
controller from (1) allows tar_make()
to launch external R processes called “workers” which can each run one or more targets. By delegating long-running targets to these workers, the local R session is free to focus on other tasks, and the pipeline finishes faster.
13.2 Example
The following _targets.R
file uses a local process controller with 2 workers. That means up to 2 workers can be running at any given time, and each worker is an separate R process on the same computer as the local R process.
# _targets.R
library(targets)
library(tarchetypes)
library(crew)
tar_option_set(
controller = crew_controller_local(workers = 2)
)tar_source()
list(
tar_target(name = data, command = get_data()),
tar_target(name = model1, command = run_model1(data)),
tar_target(name = model2, command = run_model2(data)),
tar_target(name = model3, command = run_model3(data)),
tar_target(name = plot1, command = plot_model(model1)),
tar_target(name = plot2, command = plot_model(model2)),
tar_target(name = plot3, command = plot_model(model3))
)
# R console
tar_visnetwork(targets_only = TRUE)
Run the pipeline with a simple call to tar_make()
. Please note that real-life pipelines will have longer execution times, especially for the models.
# R console
tar_make()
#> ▶ dispatched target data
#> ● completed target data [0.001 seconds, 48 bytes]
#> ▶ dispatched target model1
#> ● completed target model1 [0 seconds, 48 bytes]
#> ▶ dispatched target model2
#> ● completed target model2 [0 seconds, 48 bytes]
#> ▶ dispatched target model3
#> ● completed target model3 [0 seconds, 48 bytes]
#> ▶ dispatched target plot1
#> ● completed target plot1 [0 seconds, 48 bytes]
#> ▶ dispatched target plot2
#> ● completed target plot2 [0.001 seconds, 48 bytes]
#> ▶ dispatched target plot3
#> ● completed target plot3 [0 seconds, 48 bytes]
#> ▶ ended pipeline [0.093 seconds]
Let’s talk through what happens in the above call to tar_make()
. First, a new worker launches and sends the data
target to the crew
queue. After the data
target completes, all three models are ready to begin. A second worker automatically launches to meet the increased demand of the workload, and each of the two workers starts to run a model. After one of the models finishes, its worker is free to either run the downstream plot or the third model. The process continues until all the targets are complete. The workers shut down when the pipeline is done.
13.3 Configuration and auto-scaling
Adding more workers might speed up your pipeline, but not always. Beyond a certain point, the efficiency gains will diminish, and the extra workers will have nothing to do. With proper configuration, you can find the right balance.
As mentioned above, new workers launch automatically in response to increasing demand. By default, they stay running for the duration of the pipeline. However, you can customize the controller to scale down when circumstances allow, which helps help avoid wasting resources1 The most useful arguments for down-scaling, in order of importance, are:
seconds_idle
: automatically shut down a worker if it spends too long waiting for a target.tasks_max
: maximum number of tasks a worker can run before shutting down.seconds_wall
: soft wall time of a worker.
On the other hand, it is not always helpful to eagerly down-scale workers. Because the workload can fluctuate rapidly, some workers may quit and relaunch so often that it creates noticeable overhead.
Fortunately, it is straightforward to explore auto-scaling and configuration issues empirically. Simply run your existing configuration with tar_make()
and then look at the output from tar_crew()
. For example, consider the following pipeline with 1001 quick targets, 10 workers, and a maximum idle time of 3 seconds.
# _targets.R file
library(targets)
library(tarchetypes)
<- crew::crew_controller_local(
controller name = "my_controller",
workers = 10,
seconds_idle = 3
)tar_option_set(controller = controller)
list(
tar_target(x, seq_len(1000)),
tar_target(y, x, pattern = map(x))
)
After running tar_make()
, tar_crew()
shows the following worker metadata:
tar_crew()
#> # A tibble: 10 × 5
#> controller worker launches seconds targets
#> <chr> <int> <int> <dbl> <int>
#> 1 my_controller 1 1 1.77 998
#> 2 my_controller 2 4 0.076 3
#> 3 my_controller 3 1 0 0
#> 4 my_controller 4 0 0 0
#> 5 my_controller 5 0 0 0
#> 6 my_controller 6 0 0 0
#> 7 my_controller 7 0 0 0
#> 8 my_controller 8 0 0 0
#> 9 my_controller 9 0 0 0
#> 10 my_controller 10 0 0 0
The first worker did most of the work, and the second worker ran only 3 targets. Both the second and third workers launched and self-terminated more often than they ran targets. In fact, the third worker did not run any targets at all. None of the other workers actually launched. For this pipeline, it would be better to not use crew
at all, or set deployment = "main"
in tar_target()
for the targets that will complete instantly. And if 2 workers were truly busy instead of just 1, it would be reasonable to set workers = 2
and pick a much higher value of seconds_idle
But now, suppose the targets in the pipeline take longer to run:
# _targets.R file:
library(targets)
library(tarchetypes)
<- crew::crew_controller_local(
controller name = "my_controller",
workers = 10,
seconds_idle = 3
)tar_option_set(controller = controller)
list(
tar_target(x, seq_len(1000)),
tar_target(y, Sys.sleep(1), pattern = map(x)) # Run for 1 second.
)
The tar_crew()
worker metadata is far more sensible: each worker launched once and accomplished a lot. And because seconds_idle
was set to 3, we know each worker instance wasted no more than 3 seconds waiting for new targets to run. As long as resources allow, it is appropriate to have workers = 10
and seconds_idle = 3
for this pipeline: this configuration speeds up the pipeline while avoiding wasted resources.
tar_crew()
#> # A tibble: 10 × 5
#> controller worker launches seconds targets
#> <chr> <int> <int> <dbl> <int>
#> 1 my_controller 1 1 103. 104
#> 2 my_controller 2 1 100. 100
#> 3 my_controller 3 1 100. 100
#> 4 my_controller 4 1 100. 100
#> 5 my_controller 5 1 100. 100
#> 6 my_controller 6 1 100. 100
#> 7 my_controller 7 1 99.4 99
#> 8 my_controller 8 1 100. 100
#> 9 my_controller 9 1 99.4 99
#> 10 my_controller 10 1 99.4 99
13.4 Backends
crew
is a platform for multiple computing platforms, not just local processes, but also traditional high-performance computing systems and cloud computing services. For example, to run each worker as a job on a Sun Grid Engine cluster, use crew_controller_sge()
from the crew.cluster
package.
# _targets.R
library(targets)
library(tarchetypes)
library(crew.cluster)
tar_option_set(
controller = crew_controller_sge(
workers = 3,
script_lines = "module load R",
sge_log_output = "log_folder/"
)
)tar_source()
list(
tar_target(name = data, command = get_data()),
tar_target(name = model1, command = run_model1(data)),
tar_target(name = model2, command = run_model2(data)),
tar_target(name = model3, command = run_model3(data)),
tar_target(name = plot1, command = plot_model(model1)),
tar_target(name = plot2, command = plot_model(model2)),
tar_target(name = plot3, command = plot_model(model3))
)
If crew.cluster
and other official packages do not meet your needs, then you can write your own launcher plugin tailored to your own specific computing environment. crew
makes this process straightforward, and the vignette at https://wlandau.github.io/crew/articles/launcher_plugins.html walks through the details step by step.
13.5 Heterogeneous workers
Different targets may have different computing requirements, from memory to GPUs and beyond. You can send different targets to different kinds of workers using crew
controller groups. In the _targets.R
file below, we create a local process controller alongside a Sun Grid Engine controller a memory requirement and a GPU. We combine them in a crew
controller group which we supply to the controller
argument of tar_option_set
. Next, we use tar_resources()
and tar_resources_crew()
to tell model2
to run on Sun Grid Engine and all other targets to run on local processes. The deployment = "main"
argument tells the plots to avoid worker processes altogether and run on the main central R process.
# _targets.R
library(targets)
library(tarchetypes)
library(crew)
library(crew.cluster)
<- crew_controller_local(
controller_local name = "my_local_controller",
workers = 2,
seconds_idle = 10
)<- crew_controller_sge(
controller_sge name = "my_sge_controller",
workers = 3,
seconds_idle = 15,
script_lines = "module load R",
sge_log_output = "log_folder/",
sge_memory_gigabytes_required = 64,
sge_gpu = 1
)tar_option_set(
controller = crew_controller_group(controller_local, controller_sge),
resources = tar_resources(
crew = tar_resources_crew(controller = "my_local_controller")
)
)tar_source()
list(
tar_target(name = data, command = get_data()),
tar_target(name = model1, command = run_model1(data)),
tar_target(
name = model2,
command = run_model2(data),
resources = tar_resources(
crew = tar_resources_crew(controller = "my_sge_controller")
)
),tar_target(name = model3, run_model3(data)),
tar_target(name = plot1, command = plot_model(model1), deployment = "main"),
tar_target(name = plot2, command = plot_model(model2), deployment = "main"),
tar_target(name = plot3, command = plot_model(model3), deployment = "main")
)
13.6 Resource usage
The autometric
package can monitor the CPU and memory consumption of the various processes in a targets
pipeline, both local processes and parallel workers. Please read https://wlandau.github.io/crew/articles/logging.html for details and examples.
13.7 Thanks
The crew
package is an extension of mirai
, a sleek and sophisticated task scheduler that efficiently processes intense workloads. crew
is only possible because of the amazing work by Charlie Gao in packages mirai
and nanonext
.
Automatic down-scaling also helps comply with wall time restrictions on shared computing clusters. See the arguments of
crew_controller_local()
for details.↩︎