13  Distributed computing

Package versions

For best results, please use targets version 1.2.0 or higher and crew version 0.3.0 or higher. If you cannot install packages globally, consider creating a local renv package library for your project.

renv::init()
renv::install("crew")
renv::install("targets")
renv::snapshot()
Performance

See the performance chapter for options, settings, and other choices to make parallel and distributed pipelines more efficient.

To efficiently process a large and complex pipeline, tar_make() can run multiple targets at the same time. Thanks to integration with crew and blazing fast scheduling from mirai behind the scenes, those targets can run on a variety of high-performance computing platforms, and they can scale out to the hundreds and beyond.

13.1 How it works

  1. Write your pipeline as usual, but set the controller argument of tar_option_set to the crew controller of your choice.
  2. Run the pipeline with a simple tar_make().

The crew controller from (1) allows tar_make() to launch external R processes called “workers” which can each run one or more targets. By delegating long-running targets to these workers, the local R session is free to focus on other tasks, and the pipeline finishes faster.

13.2 Example

The following _targets.R file uses a local process controller with 2 workers. That means up to 2 workers can be running at any given time, and each worker is an separate R process on the same computer as the local R process.

# _targets.R
library(targets)
library(tarchetypes)
library(crew)
tar_option_set(
  controller = crew_controller_local(workers = 2)
)
tar_source()
list(
  tar_target(name = data, command = get_data()),
  tar_target(name = model1, command = run_model1(data)),
  tar_target(name = model2, command = run_model2(data)),
  tar_target(name = model3, command = run_model3(data)),
  tar_target(name = plot1, command = plot_model(model1)),
  tar_target(name = plot2, command = plot_model(model2)),
  tar_target(name = plot3, command = plot_model(model3))
)
# R console
tar_visnetwork(targets_only = TRUE)

Run the pipeline with a simple call to tar_make(). Please note that real-life pipelines will have longer execution times, especially for the models.

# R console
tar_make()
#> + data dispatched
#> ✔ data completed [0ms, 46 B]
#> + model1 dispatched
#> ✔ model1 completed [0ms, 46 B]
#> + model2 dispatched
#> ✔ model2 completed [1ms, 46 B]
#> + model3 dispatched
#> ✔ model3 completed [0ms, 46 B]
#> + plot1 dispatched
#> ✔ plot1 completed [0ms, 46 B]
#> + plot2 dispatched
#> ✔ plot2 completed [1ms, 46 B]
#> + plot3 dispatched
#> ✔ plot3 completed [0ms, 46 B]
#> ✔ ended pipeline [187ms, 7 completed, 0 skipped]

Let’s talk through what happens in the above call to tar_make(). First, a new worker launches and sends the data target to the crew queue. After the data target completes, all three models are ready to begin. A second worker automatically launches to meet the increased demand of the workload, and each of the two workers starts to run a model. After one of the models finishes, its worker is free to either run the downstream plot or the third model. The process continues until all the targets are complete. The workers shut down when the pipeline is done.

13.3 Configuration and auto-scaling

Adding more workers might speed up your pipeline, but not always. Beyond a certain point, the efficiency gains will diminish, and the extra workers will have nothing to do. With proper configuration, you can find the right balance.

As mentioned above, new workers launch automatically in response to increasing demand. By default, they stay running for the duration of the pipeline. However, you can customize the controller to scale down when circumstances allow, which helps help avoid wasting resources1 The most useful arguments for down-scaling, in order of importance, are:

  1. seconds_idle: automatically shut down a worker if it spends too long waiting for a target.
  2. tasks_max: maximum number of tasks a worker can run before shutting down.
  3. seconds_wall: soft wall time of a worker.

On the other hand, it is not always helpful to eagerly down-scale workers. Because the workload can fluctuate rapidly, some workers may quit and relaunch so often that it creates noticeable overhead. crew and its plugins try to set reasonable defaults, but you may need to adjust for optimal efficiency.

13.4 Plugins

crew is a platform for multiple computing platforms, not just local processes, but also traditional high-performance computing systems and cloud computing services. For example, to run each worker as a job on a Sun Grid Engine cluster, use crew_controller_sge() from the crew.cluster package.

# _targets.R
library(targets)
library(tarchetypes)
library(crew.cluster)
tar_option_set(
  controller = crew_controller_sge(
    workers = 3,
    options_cluster = crew_options_sge(
      script_lines = "module load R",
      log_output = "log_folder/"
    )
  )
)
tar_source()
list(
  tar_target(name = data, command = get_data()),
  tar_target(name = model1, command = run_model1(data)),
  tar_target(name = model2, command = run_model2(data)),
  tar_target(name = model3, command = run_model3(data)),
  tar_target(name = plot1, command = plot_model(model1)),
  tar_target(name = plot2, command = plot_model(model2)),
  tar_target(name = plot3, command = plot_model(model3))
)

If crew.cluster and other official packages do not meet your needs, then you can write your own launcher plugin tailored to your own specific computing environment. crew makes this process straightforward, and the vignette at https://wlandau.github.io/crew/articles/launcher_plugins.html walks through the details step by step.

13.5 Heterogeneous workers

Different targets may have different computing requirements, from memory to GPUs and beyond. You can send different targets to different kinds of workers using crew controller groups. In the _targets.R file below, we create a local process controller alongside a Sun Grid Engine controller a memory requirement and a GPU. We combine them in a crew controller group which we supply to the controller argument of tar_option_set. Next, we use tar_resources() and tar_resources_crew() to tell model2 to run on Sun Grid Engine and all other targets to run on local processes. The deployment = "main" argument tells the plots to avoid worker processes altogether and run on the main central R process.

# _targets.R
library(targets)
library(tarchetypes)
library(crew)
library(crew.cluster)
controller_local <- crew_controller_local(
  name = "my_local_controller",
  workers = 2,
  seconds_idle = 10
)
controller_sge <- crew_controller_sge(
  name = "my_sge_controller",
  workers = 3,
  seconds_idle = 15,
  options_cluster = crew_options_sge(
    script_lines = "module load R",
    log_output = "log_folder/",
    memory_gigabytes_required = 64,
    gpu = 1
  )
)
tar_option_set(
  controller = crew_controller_group(controller_local, controller_sge),
  resources = tar_resources(
    crew = tar_resources_crew(controller = "my_local_controller")
  )
)
tar_source()
list(
  tar_target(name = data, command = get_data()),
  tar_target(name = model1, command = run_model1(data)),
  tar_target(
    name = model2,
    command = run_model2(data),
    resources = tar_resources(
      crew = tar_resources_crew(controller = "my_sge_controller")
    )
  ),
  tar_target(name = model3, run_model3(data)),
  tar_target(name = plot1, command = plot_model(model1), deployment = "main"),
  tar_target(name = plot2, command = plot_model(model2), deployment = "main"),
  tar_target(name = plot3, command = plot_model(model3), deployment = "main")
)

13.6 Resource usage

The autometric package can monitor the CPU and memory consumption of the various processes in a targets pipeline, both local processes and parallel workers. Please read https://wlandau.github.io/crew/articles/logging.html for details and examples.

13.7 Thanks

The crew package is an extension of mirai, a sleek and sophisticated task scheduler that efficiently processes intense workloads. crew is only possible because of the amazing work by Charlie Gao in packages mirai and nanonext.


  1. Automatic down-scaling also helps comply with wall time restrictions on shared computing clusters. See the arguments of crew_controller_local() for details.↩︎