-
Notifications
You must be signed in to change notification settings - Fork 292
Expand file tree
/
Copy pathmap.R
More file actions
348 lines (329 loc) · 10.2 KB
/
map.R
File metadata and controls
348 lines (329 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
#' Apply a function to each element of a vector
#'
#' @description
#' The map functions transform their input by applying a function to
#' each element of a list or atomic vector and returning an object of
#' the same length as the input.
#'
#' * `map()` always returns a list. See the [modify()] family for
#' versions that return an object of the same type as the input.
#'
#' * `map_lgl()`, `map_int()`, `map_dbl()` and `map_chr()` return an
#' atomic vector of the indicated type (or die trying). For these functions,
#' `.f` must return a length-1 vector of the appropriate type.
#'
#' * `map_vec()` simplifies to the common type of the output. It works with
#' most types of simple vectors like Date, POSIXct, factors, etc.
#'
#' * `walk()` calls `.f` for its side-effect and returns
#' the input `.x`.
#'
#' @param .x A list or atomic vector.
#' @param .f A function, specified in one of the following ways:
#'
#' * A named function, e.g. `mean`.
#' * An anonymous function, e.g. `\(x) x + 1` or `function(x) x + 1`.
#' * A formula, e.g. `~ .x + 1`. Use `.x` to refer to the first
#' argument. No longer recommended.
#' * A string, integer, or list, e.g. `"idx"`, `1`, or `list("idx", 1)` which
#' are shorthand for `\(x) pluck(x, "idx")`, `\(x) pluck(x, 1)`, and
#' `\(x) pluck(x, "idx", 1)` respectively. Optionally supply `.default` to
#' set a default value if the indexed element is `NULL` or does not exist.
#'
#' `r lifecycle::badge("experimental")`
#'
#' Wrap a function with [in_parallel()] to declare that it should be performed
#' in parallel. See [in_parallel()] for more details.
#' Use of `...` is not permitted in this context.
#'
#' @param ... Additional arguments passed on to the mapped function.
#'
#' We now generally recommend against using `...` to pass additional
#' (constant) arguments to `.f`. Instead use a shorthand anonymous function:
#'
#' ```R
#' # Instead of
#' x |> map(f, 1, 2, collapse = ",")
#' # do:
#' x |> map(\(x) f(x, 1, 2, collapse = ","))
#' ```
#'
#' This makes it easier to understand which arguments belong to which
#' function and will tend to yield better error messages.
#'
#' @param .progress Whether to show a progress bar. Use `TRUE` to turn on
#' a basic progress bar, use a string to give it a name, or see
#' [progress_bars] for more details.
#' @returns
#' The output length is determined by the length of the input.
#' The output names are determined by the input names.
#' The output type is determined by the suffix:
#'
#' * No suffix: a list; `.f()` can return anything.
#'
#' * `_lgl()`, `_int()`, `_dbl()`, `_chr()` return a logical, integer, double,
#' or character vector respectively; `.f()` must return a compatible atomic
#' vector of length 1.
#'
#' * `_vec()` return an atomic or S3 vector, the same type that `.f` returns.
#' `.f` can return pretty much any type of vector, as long as its length 1.
#'
#' * `walk()` returns the input `.x` (invisibly). This makes it easy to
#' use in a pipe. The return value of `.f()` is ignored.
#'
#' Any errors thrown by `.f` will be wrapped in an error with class
#' [purrr_error_indexed].
#' @export
#' @family map variants
#' @seealso [map_if()] for applying a function to only those elements
#' of `.x` that meet a specified condition.
#' @examples
#' # Compute normal distributions from an atomic vector
#' 1:10 |>
#' map(rnorm, n = 10)
#'
#' # You can also use an anonymous function
#' 1:10 |>
#' map(\(x) rnorm(10, x))
#'
#' # Simplify output to a vector instead of a list by computing the mean of the distributions
#' 1:10 |>
#' map(rnorm, n = 10) |> # output a list
#' map_dbl(mean) # output an atomic vector
#'
#' # Using set_names() with character vectors is handy to keep track
#' # of the original inputs:
#' set_names(c("foo", "bar")) |> map_chr(paste0, ":suffix")
#'
#' # Working with lists
#' favorite_desserts <- list(Sophia = "banana bread", Eliott = "pancakes", Karina = "chocolate cake")
#' favorite_desserts |> map_chr(\(food) paste(food, "rocks!"))
#'
#' # Extract by name or position
#' # .default specifies value for elements that are missing or NULL
#' l1 <- list(list(a = 1L), list(a = NULL, b = 2L), list(b = 3L))
#' l1 |> map("a", .default = "???")
#' l1 |> map_int("b", .default = NA)
#' l1 |> map_int(2, .default = NA)
#'
#' # Supply multiple values to index deeply into a list
#' l2 <- list(
#' list(num = 1:3, letters[1:3]),
#' list(num = 101:103, letters[4:6]),
#' list()
#' )
#' l2 |> map(c(2, 2))
#'
#' # Use a list to build an extractor that mixes numeric indices and names,
#' # and .default to provide a default value if the element does not exist
#' l2 |> map(list("num", 3))
#' l2 |> map_int(list("num", 3), .default = NA)
#'
#' # Working with data frames
#' # Use map_lgl(), map_dbl(), etc to return a vector instead of a list:
#' mtcars |> map_dbl(sum)
#'
#' # A more realistic example: split a data frame into pieces, fit a
#' # model to each piece, summarise and extract R^2
#' mtcars |>
#' split(mtcars$cyl) |>
#' map(\(df) lm(mpg ~ wt, data = df)) |>
#' map(summary) |>
#' map_dbl("r.squared")
#'
#' @examplesIf interactive() && rlang::is_installed("mirai") && rlang::is_installed("carrier")
#' # Run in interactive sessions only as spawns additional processes
#'
#' # To use parallelized map:
#' # 1. Set daemons (number of parallel processes) first:
#' mirai::daemons(2)
#'
#' # 2. Wrap .f with in_parallel():
#' mtcars |> map_dbl(in_parallel(\(x) mean(x)))
#'
#' # Note that functions from packages should be fully qualified with `pkg::`
#' # or call `library(pkg)` within the function
#' 1:10 |>
#' map(in_parallel(\(x) vctrs::vec_init(integer(), x))) |>
#' map_int(in_parallel(\(x) { library(vctrs); vec_size(x) }))
#'
#' # A locally-defined function (or any required variables)
#' # should be passed via ... of in_parallel():
#' slow_lm <- function(formula, data) {
#' Sys.sleep(0.5)
#' lm(formula, data)
#' }
#'
#' mtcars |>
#' split(mtcars$cyl) |>
#' map(in_parallel(\(df) slow_lm(mpg ~ disp, data = df), slow_lm = slow_lm))
#'
#' # Tear down daemons when no longer in use:
#' mirai::daemons(0)
#'
map <- function(.x, .f, ..., .progress = FALSE) {
map_("list", .x, .f, ..., .progress = .progress)
}
#' @rdname map
#' @export
map_lgl <- function(.x, .f, ..., .progress = FALSE) {
map_("logical", .x, .f, ..., .progress = .progress)
}
#' @rdname map
#' @export
map_int <- function(.x, .f, ..., .progress = FALSE) {
map_("integer", .x, .f, ..., .progress = .progress)
}
#' @rdname map
#' @export
map_dbl <- function(.x, .f, ..., .progress = FALSE) {
map_("double", .x, .f, ..., .progress = .progress)
}
#' @rdname map
#' @export
map_chr <- function(.x, .f, ..., .progress = FALSE) {
map_("character", .x, .f, ..., .progress = .progress)
}
map_ <- function(
.type,
.x,
.f,
...,
.progress = FALSE,
.purrr_user_env = caller_env(2),
.purrr_error_call = caller_env()
) {
.progress <- as_progress(
.progress,
user_env = .purrr_user_env,
caller_env = .purrr_error_call
)
.x <- vctrs_vec_compat(.x, .purrr_user_env)
vec_assert(.x, arg = ".x", call = .purrr_error_call)
if (running_in_parallel(.f)) {
return(mmap_(.x, .f, .progress, .type, .purrr_error_call, ...))
}
.f <- as_mapper(.f, ...)
n <- vec_size(.x)
names <- vec_names(.x)
i <- 0L
with_indexed_errors(
i = i,
names = names,
error_call = .purrr_error_call,
call_with_cleanup(map_impl, environment(), .type, .progress, n, names, i)
)
}
mmap_ <- function(.x, .f, .progress, .type, error_call, ...) {
if (...length()) {
cli::cli_abort(
"Can't use `...` with parallelized functions.",
call = error_call
)
}
m <- mirai::mirai_map(.x, .f)
options <- if (isFALSE(.progress)) {
".stop"
} else if (is.logical(.progress)) {
c(".stop", ".progress")
} else if (is.character(.progress) || is.list(.progress)) {
list(.stop = TRUE, .progress = .progress)
} else {
cli::cli_abort(
"Unknown cli progress bar configuation, see manual.",
call = error_call
)
}
x <- with_parallel_indexed_errors(
mirai::collect_mirai(m, options = options),
interrupt_expr = mirai::stop_mirai(m),
error_call = error_call
)
if (.type != "list") {
x <- simplify_impl(x, ptype = vector(mode = .type), error_call = error_call)
}
x
}
#' @rdname map
#' @param .ptype If `NULL`, the default, the output type is the common type
#' of the elements of the result. Otherwise, supply a "prototype" giving
#' the desired type of output.
#' @export
map_vec <- function(.x, .f, ..., .ptype = NULL, .progress = FALSE) {
out <- map(.x, .f, ..., .progress = .progress)
simplify_impl(out, ptype = .ptype)
}
#' @rdname map
#' @export
walk <- function(.x, .f, ..., .progress = FALSE) {
map(.x, .f, ..., .progress = .progress)
invisible(.x)
}
with_indexed_errors <- function(
expr,
i,
names = NULL,
error_call = caller_env()
) {
withCallingHandlers(
expr,
error = function(cnd) {
if (i == 0L) {
# Error happened before or after loop
} else {
message <- c(i = "In index: {i}.")
if (!is.null(names) && !is.na(names[[i]]) && names[[i]] != "") {
name <- names[[i]]
message <- c(message, i = "With name: {name}.")
} else {
name <- NULL
}
cli::cli_abort(
message,
location = i,
name = name,
parent = cnd,
call = error_call,
class = "purrr_error_indexed"
)
}
}
)
}
with_parallel_indexed_errors <- function(
expr,
interrupt_expr = NULL,
error_call = caller_env()
) {
withCallingHandlers(
expr,
error = function(cnd) {
location <- cnd$location
iname <- cnd$name
cli::cli_abort(
c(
i = "In index: {location}.",
i = if (length(iname) && nzchar(iname)) "With name: {iname}."
),
location = location,
name = iname,
parent = cnd$parent,
call = error_call,
class = "purrr_error_indexed"
)
},
interrupt = function(cnd) {
interrupt_expr
}
)
}
#' Indexed errors (`purrr_error_indexed`)
#'
#' @description
#'
#' ```{r, child = "man/rmd/indexed-error.Rmd"}
#' ```
#'
#' @keywords internal
#' @name purrr_error_indexed
NULL