13  Distributed computing

Package versions

For best results, please use targets version 1.2.0 or higher and crew version 0.3.0 or higher. If you cannot install packages globally, consider creating a local renv package library for your project.

renv::init()
renv::install("crew")
renv::install("targets")
renv::snapshot()
Performance

See the performance chapter for options, settings, and other choices to make parallel and distributed pipelines more efficient.

To efficiently process a large and complex pipeline, tar_make() can run multiple targets at the same time. Thanks to integration with crew and blazing fast scheduling from mirai behind the scenes, those targets can run on a variety of high-performance computing platforms, and they can scale out to the hundreds and beyond.

13.1 How it works

  1. Write your pipeline as usual, but set the controller argument of tar_option_set to the crew controller of your choice.
  2. Run the pipeline with a simple tar_make().

The crew controller from (1) allows tar_make() to launch external R processes called “workers” which can each run one or more targets. By delegating long-running targets to these workers, the local R session is free to focus on other tasks, and the pipeline finishes faster.

13.2 Example

The following _targets.R file uses a local process controller with 2 workers. That means up to 2 workers can be running at any given time, and each worker is an separate R process on the same computer as the local R process.

# _targets.R
library(targets)
library(crew)
tar_option_set(
  controller = crew_controller_local(workers = 2)
)
tar_source()
list(
  tar_target(name = data, command = get_data()),
  tar_target(name = model1, command = run_model1(data)),
  tar_target(name = model2, command = run_model2(data)),
  tar_target(name = model3, command = run_model3(data)),
  tar_target(name = plot1, command = plot_model(model1)),
  tar_target(name = plot2, command = plot_model(model2)),
  tar_target(name = plot3, command = plot_model(model3))
)
# R console
tar_visnetwork(targets_only = TRUE)

Run the pipeline with a simple call to tar_make(). Please note that real-life pipelines will have longer execution times, especially for the models.

# R console
tar_make()
#> ▶ dispatched target data
#> ● completed target data [0 seconds]
#> ▶ dispatched target model1
#> ● completed target model1 [0.001 seconds]
#> ▶ dispatched target model2
#> ● completed target model2 [0 seconds]
#> ▶ dispatched target model3
#> ● completed target model3 [0 seconds]
#> ▶ dispatched target plot1
#> ● completed target plot1 [0 seconds]
#> ▶ dispatched target plot2
#> ● completed target plot2 [0.001 seconds]
#> ▶ dispatched target plot3
#> ● completed target plot3 [0 seconds]
#> ▶ ended pipeline [0.089 seconds]

Let’s talk through what happens in the above call to tar_make(). First, a new worker launches and sends the data target to the crew queue. After the data target completes, all three models are ready to begin. A second worker automatically launches to meet the increased demand of the workload, and each of the two workers starts to run a model. After one of the models finishes, its worker is free to either run the downstream plot or the third model. The process continues until all the targets are complete. The workers shut down when the pipeline is done.

13.3 Configuration and auto-scaling

Adding more workers might speed up your pipeline, but not always. Beyond a certain point, the efficiency gains will diminish, and the extra workers will have nothing to do. With proper configuration, you can find the right balance.

As mentioned above, new workers launch automatically in response to increasing demand. By default, they stay running for the duration of the pipeline. However, you can customize the controller to scale down when circumstances allow, which helps help avoid wasting resources1 The most useful arguments for down-scaling, in order of importance, are:

  1. seconds_idle: automatically shut down a worker if it spends too long waiting for a target.
  2. tasks_max: maximum number of tasks a worker can run before shutting down.
  3. seconds_wall: soft wall time of a worker.

On the other hand, it is not always helpful to eagerly down-scale workers. Because the workload can fluctuate rapidly, some workers may quit and relaunch so often that it creates noticeable overhead.

Fortunately, it is straightforward to explore auto-scaling and configuration issues empirically. Simply run your existing configuration with tar_make() and then look at the output from tar_crew(). For example, consider the following pipeline with 1001 quick targets, 10 workers, and a maximum idle time of 3 seconds.

# _targets.R file

library(targets)
controller <- crew::crew_controller_local(
  name = "my_controller",
  workers = 10,
  seconds_idle = 3
)
tar_option_set(controller = controller)
list(
  tar_target(x, seq_len(1000)),
  tar_target(y, x, pattern = map(x))
)

After running tar_make(), tar_crew() shows the following worker metadata:

tar_crew()
#> # A tibble: 10 × 5
#>    controller    worker launches seconds targets
#>    <chr>          <int>    <int>   <dbl>   <int>
#>  1 my_controller      1        1   1.77      998
#>  2 my_controller      2        4   0.076       3
#>  3 my_controller      3        1   0           0
#>  4 my_controller      4        0   0           0
#>  5 my_controller      5        0   0           0
#>  6 my_controller      6        0   0           0
#>  7 my_controller      7        0   0           0
#>  8 my_controller      8        0   0           0
#>  9 my_controller      9        0   0           0
#> 10 my_controller     10        0   0           0

The first worker did most of the work, and the second worker ran only 3 targets. Both the second and third workers launched and self-terminated more often than they ran targets. In fact, the third worker did not run any targets at all. None of the other workers actually launched. For this pipeline, it would be better to not use crew at all, or set deployment = "main" in tar_target() for the targets that will complete instantly. And if 2 workers were truly busy instead of just 1, it would be reasonable to set workers = 2 and pick a much higher value of seconds_idle

But now, suppose the targets in the pipeline take longer to run:

# _targets.R file:
library(targets)
controller <- crew::crew_controller_local(
  name = "my_controller",
  workers = 10,
  seconds_idle = 3
)
tar_option_set(controller = controller)
list(
  tar_target(x, seq_len(1000)),
  tar_target(y, Sys.sleep(1), pattern = map(x)) # Run for 1 second.
)

The tar_crew() worker metadata is far more sensible: each worker launched once and accomplished a lot. And because seconds_idle was set to 3, we know each worker instance wasted no more than 3 seconds waiting for new targets to run. As long as resources allow, it is appropriate to have workers = 10 and seconds_idle = 3 for this pipeline: this configuration speeds up the pipeline while avoiding wasted resources.

tar_crew()
#> # A tibble: 10 × 5
#>    controller    worker launches seconds targets
#>    <chr>          <int>    <int>   <dbl>   <int>
#>  1 my_controller      1        1   103.      104
#>  2 my_controller      2        1   100.      100
#>  3 my_controller      3        1   100.      100
#>  4 my_controller      4        1   100.      100
#>  5 my_controller      5        1   100.      100
#>  6 my_controller      6        1   100.      100
#>  7 my_controller      7        1    99.4      99
#>  8 my_controller      8        1   100.      100
#>  9 my_controller      9        1    99.4      99
#> 10 my_controller     10        1    99.4      99

13.4 Backends

crew is a platform for multiple computing platforms, not just local processes, but also traditional high-performance computing systems and cloud computing services. For example, to run each worker as a job on a Sun Grid Engine cluster, use crew_controller_sge() from the crew.cluster package.

# _targets.R
library(targets)
library(crew.cluster)
tar_option_set(
  controller = crew_controller_sge(
    workers = 3,
    script_lines = "module load R",
    sge_log_output = "log_folder/"
  )
)
tar_source()
list(
  tar_target(name = data, command = get_data()),
  tar_target(name = model1, command = run_model1(data)),
  tar_target(name = model2, command = run_model2(data)),
  tar_target(name = model3, command = run_model3(data)),
  tar_target(name = plot1, command = plot_model(model1)),
  tar_target(name = plot2, command = plot_model(model2)),
  tar_target(name = plot3, command = plot_model(model3))
)

If crew.cluster and other official packages do not meet your needs, then you can write your own launcher plugin tailored to your own specific computing environment. crew makes this process straightforward, and the vignette at https://wlandau.github.io/crew/articles/launcher_plugins.html walks through the details step by step.

13.5 Heterogeneous workers

Different targets may have different computing requirements, from memory to GPUs and beyond. You can send different targets to different kinds of workers using crew controller groups. In the _targets.R file below, we create a local process controller alongside a Sun Grid Engine controller a memory requirement and a GPU. We combine them in a crew controller group which we supply to the controller argument of tar_option_set. Next, we use tar_resources() and tar_resources_crew() to tell model2 to run on Sun Grid Engine and all other targets to run on local processes. The deployment = "main" argument tells the plots to avoid worker processes altogether and run on the main central R process.

# _targets.R
library(targets)
library(crew)
library(crew.cluster)
controller_local <- crew_controller_local(
  name = "my_local_controller",
  workers = 2,
  seconds_idle = 10
)
controller_sge <- crew_controller_sge(
  name = "my_sge_controller",
  workers = 3,
  seconds_idle = 15,
  script_lines = "module load R",
  sge_log_output = "log_folder/",
  sge_memory_gigabytes_required = 64,
  sge_gpu = 1
)
tar_option_set(
  controller = crew_controller_group(controller_local, controller_sge),
  resources = tar_resources(
    crew = tar_resources_crew(controller = "my_local_controller")
  )
)
tar_source()
list(
  tar_target(name = data, command = get_data()),
  tar_target(name = model1, command = run_model1(data)),
  tar_target(
    name = model2,
    command = run_model2(data),
    resources = tar_resources(
      crew = tar_resources_crew(controller = "my_sge_controller")
    )
  ),
  tar_target(name = model3, run_model3(data)),
  tar_target(name = plot1, command = plot_model(model1), deployment = "main"),
  tar_target(name = plot2, command = plot_model(model2), deployment = "main"),
  tar_target(name = plot3, command = plot_model(model3), deployment = "main")
)

13.6 Thanks

The crew package is an extension of mirai, a sleek and sophisticated task scheduler that efficiently processes intense workloads. crew is only possible because of the amazing work by Charlie Gao in packages mirai and nanonext.


  1. Automatic down-scaling also helps comply with wall time restrictions on shared computing clusters. See the arguments of crew_controller_local() for details.↩︎