-
Notifications
You must be signed in to change notification settings - Fork 292
Expand file tree
/
Copy pathparallelization.R
More file actions
202 lines (200 loc) · 7.83 KB
/
parallelization.R
File metadata and controls
202 lines (200 loc) · 7.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
#' Parallelization in purrr
#'
#' @description
#' `r lifecycle::badge("experimental")`
#'
#' All map functions allow parallelized operation using \CRANpkg{mirai}.
#'
#' Wrap functions passed to the `.f` argument of [map()] and its variants with
#' [in_parallel()].
#'
#' [in_parallel()] is a \pkg{purrr} adverb that plays two roles:
#' * It is a signal to purrr verbs like [map()] to go ahead and perform
#' computations in parallel.
#' * It helps you create self-contained functions that are isolated from your
#' workspace. This is important because the function is packaged up
#' (serialized) to be sent across to parallel processes. Isolation is
#' critical for performance because it prevents accidentally sending very
#' large objects between processes.
#'
#' For maps to actually be performed in parallel, the user must also set
#' [mirai::daemons()], otherwise they fall back to sequential processing.
#' [mirai::require_daemons()] may be used to enforce the use of parallel
#' processing. See the section 'Daemons settings' below.
#'
#' @param .f A fresh formula or function. "Fresh" here means that they should be
#' declared in the call to [in_parallel()].
#' @param ... Named arguments to declare in the environment of the function.
#'
#' @return A 'crate' (classed function).
#'
#' @section Creating self-contained functions:
#'
#' * They should call package functions with an explicit `::` namespace. For
#' instance `ggplot()` from the ggplot2 package must be called with its
#' namespace prefix: `ggplot2::ggplot()`. An alternative is to use `library()`
#' within the function to attach a package to the search path, which allows
#' subsequent use of package functions without the explicit namespace.
#'
#' * They should declare any data they depend on. Declare data by supplying
#' named arguments to `...`. When `.f` is an anonymous function to a
#' locally-defined function of the form `\(x) fun(x)`, `fun` itself must be
#' supplied to `...` in the manner of: `in_parallel(\(x) fun(x), fun = fun)`.
#'
#' * Functions (closures) supplied to `...` must themselves be self-contained,
#' as they are modified to share the same closure as the main function. This
#' means that all helper functions and other required variables must also be
#' supplied as further `...` arguments. This applies only for functions
#' directly supplied to `...`: containers (such as lists) are not
#' recursively analysed. In other words, if you supply complex
#' objects to `...` you're at risk of unexpectedly including large objects.
#'
#' [in_parallel()] is a simple wrapper of [carrier::crate()] and you may refer
#' to that package for more details.
#'
#' Example usage:
#' ```r
#' # The function needs to be freshly-defined, so instead of:
#' mtcars |> map_dbl(in_parallel(sum))
#' # Use an anonymous function:
#' mtcars |> map_dbl(in_parallel(\(x) sum(x)))
#'
#' # Package functions need to be explicitly namespaced, so instead of:
#' map(1:3, in_parallel(\(x) vec_init(integer(), x)))
#' # Use :: to namespace all package functions:
#' map(1:3, in_parallel(\(x) vctrs::vec_init(integer(), x)))
#'
#' fun <- function(x) { param + helper(x) }
#' helper <- function(x) { x %% 2 }
#' param <- 5
#' # Operating in parallel, locally-defined functions, including helper
#' # functions and other objects required by it, will not be found:
#' map(1:3, in_parallel(\(x) fun(x)))
#' # Use the ... argument to supply these objects:
#' map(1:3, in_parallel(\(x) fun(x), fun = fun, helper = helper, param = param))
#' ```
#'
#' @section When to use:
#'
#' Parallelizing a map using 'n' processes does not automatically lead to it
#' taking 1/n of the time. Additional overhead from setting up the parallel task
#' and communicating with parallel processes eats into this benefit, and can
#' outweigh it for very short tasks or those involving large amounts of data.
#'
#' The threshold at which parallelization becomes clearly beneficial will differ
#' according to your individual setup and task, but a rough guide would be in
#' the order of 100 microseconds to 1 millisecond for each map iteration.
#'
#' @section Daemons settings:
#'
#' How and where parallelization occurs is determined by [mirai::daemons()].
#' This is a function from the \pkg{mirai} package that sets up daemons
#' (persistent background processes that receive parallel computations) on your
#' local machine or across the network.
#'
#' Daemons must be set prior to performing any parallel map operation, otherwise
#' [in_parallel()] will fall back to sequential processing. To ensure that maps
#' are always performed in parallel, place [mirai::require_daemons()] before the
#' map.
#'
#' It is usual to set daemons once per session. You can leave them running on
#' your local machine as they consume almost no resources whilst waiting to
#' receive tasks. The following sets up 6 daemons locally:
#'
#' ```r
#' mirai::daemons(6)
#' ```
#'
#' Function arguments:
#'
#' * `n`: the number of daemons to launch on your local machine, e.g.
#' `mirai::daemons(6)`. As a rule of thumb, for maximum efficiency this should
#' be (at most) one less than the number of cores on your machine, leaving one
#' core for the main R process.
#' * `url` and `remote`: used to set up and launch daemons for distributed
#' computing over the network. See [mirai::daemons()] documentation for more
#' details.
#'
#' Resetting daemons:
#'
#' Daemons persist for the duration of your session. To reset and tear down any
#' existing daemons:
#'
#' ```r
#' mirai::daemons(0)
#' ```
#'
#' All daemons automatically terminate when your session ends. You do not need
#' to explicitly terminate daemons in this instance, although it is still good
#' practice to do so.
#'
#' Note: if you are using parallel map within a package, do not make any
#' [mirai::daemons()] calls within your package. It should always be
#' up to the user how they wish to set up parallel processing: (i) resources are
#' only known at run-time e.g. availability of local or remote daemons, (ii)
#' packages should make use of existing daemons when already set, rather than
#' reset them, and (iii) it helps prevent inadvertently spawning too many
#' daemons when functions are used recursively within each other.
#'
#' @references
#'
#' \pkg{purrr}'s parallelization is powered by \CRANpkg{mirai}. See the
#' [mirai website](https://mirai.r-lib.org/) for more details.
#'
#' @seealso [map()] for usage examples.
#' @aliases parallelization
#' @export
#' @examplesIf interactive() && rlang::is_installed("mirai") && rlang::is_installed("carrier")
#' # Run in interactive sessions only as spawns additional processes
#'
#' default_param <- 0.5
#'
#' delay <- function(secs = default_param) {
#' Sys.sleep(secs)
#' }
#'
#' slow_lm <- function(formula, data) {
#' delay()
#' lm(formula, data)
#' }
#'
#' # Example of a 'crate' returned by in_parallel(). The object print method
#' # shows the size of the crate and any objects contained within:
#' crate <- in_parallel(
#' \(df) slow_lm(mpg ~ disp, data = df),
#' slow_lm = slow_lm,
#' delay = delay,
#' default_param = default_param
#' )
#' crate
#'
#' # Use mirai::mirai() to test that a crate is self-contained
#' # by running it in a daemon and collecting its return value:
#' mirai::mirai(crate(mtcars), crate = crate) |> mirai::collect_mirai()
#'
in_parallel <- function(.f, ...) {
parallel_pkgs_installed()
inject(
carrier::crate(
!!substitute(.f),
!!!list(...),
.parent_env = globalenv(),
.error_arg = ".f",
.error_call = environment()
)
)
}
running_in_parallel <- function(x) {
inherits(x, "crate") && parallel_pkgs_installed() && mirai::daemons_set()
}
parallel_pkgs_installed <- function() {
is.logical(the$parallel_pkgs_installed) ||
{
check_installed(
c("carrier", "mirai"),
version = c("0.3.0", "2.5.1"),
reason = "for parallel map."
)
the$parallel_pkgs_installed <- TRUE
}
}