Apply the same function to all chunks

`cimap.disk.frame` accepts a two argument function where the first argument is a data.frame and the second is the chunk ID

`lazy` is convenience function to apply `.f` to every chunk

`delayed` is an alias for lazy and is consistent with the naming in Dask and Dagger.jl

cmap(.x, .f, ...)

# S3 method for disk.frame
cmap(
  .x,
  .f,
  ...,
  outdir = NULL,
  keep = NULL,
  chunks = nchunks(.x),
  compress = 50,
  lazy = TRUE,
  overwrite = FALSE,
  vars_and_pkgs = future::getGlobalsAndPackages(.f, envir = parent.frame()),
  .progress = TRUE
)

cmap_dfr(.x, .f, ..., .id = NULL)

# S3 method for disk.frame
cmap_dfr(.x, .f, ..., .id = NULL, use.names = fill, fill = FALSE, idcol = NULL)

cimap(.x, .f, ...)

# S3 method for disk.frame
cimap(
  .x,
  .f,
  outdir = NULL,
  keep = NULL,
  chunks = nchunks(.x),
  compress = 50,
  lazy = TRUE,
  overwrite = FALSE,
  ...
)

cimap_dfr(.x, .f, ..., .id = NULL)

# S3 method for disk.frame
cimap_dfr(
  .x,
  .f,
  ...,
  .id = NULL,
  use.names = fill,
  fill = FALSE,
  idcol = NULL
)

lazy(.x, .f, ...)

# S3 method for disk.frame
lazy(.x, .f, ...)

delayed(.x, .f, ...)

chunk_lapply(...)

map(.x, .f, ...)

# S3 method for disk.frame
map(...)

# S3 method for default
map(.x, .f, ...)

imap_dfr(.x, .f, ..., .id = NULL)

# S3 method for disk.frame
imap_dfr(...)

# S3 method for default
imap_dfr(.x, .f, ..., .id = NULL)

imap(.x, .f, ...)

# S3 method for default
imap(.x, .f, ...)

# S3 method for disk.frame
map_dfr(...)

# S3 method for default
map_dfr(.x, .f, ..., .id = NULL)

Arguments

.x

a disk.frame

.f

a function to apply to each of the chunks

...

for compatibility with `purrr::map`

outdir

the output directory

keep

the columns to keep from the input

chunks

The number of chunks to output

compress

0-100 fst compression ratio

lazy

if TRUE then do this lazily

overwrite

if TRUE removes any existing chunks in the data

vars_and_pkgs

variables and packages to send to a background session. This is typically automatically detected

.progress

A logical, for whether or not to print a progress bar for multiprocess, multisession, and multicore plans. From furrr

.id

not used

use.names

for cmap_dfr's call to data.table::rbindlist. See data.table::rbindlist

fill

for cmap_dfr's call to data.table::rbindlist. See data.table::rbindlist

idcol

for cmap_dfr's call to data.table::rbindlist. See data.table::rbindlist

Examples

cars.df = as.disk.frame(cars) # return the first row of each chunk lazily # cars2 = cmap(cars.df, function(chunk) { chunk[,1] }) collect(cars2)
#> speed #> 1: 4 #> 2: 4 #> 3: 7 #> 4: 7 #> 5: 8 #> 6: 9 #> 7: 10 #> 8: 10 #> 9: 10 #> 10: 11 #> 11: 11 #> 12: 12 #> 13: 12 #> 14: 12 #> 15: 12 #> 16: 13 #> 17: 13 #> 18: 13 #> 19: 13 #> 20: 14 #> 21: 14 #> 22: 14 #> 23: 14 #> 24: 15 #> 25: 15 #> 26: 15 #> 27: 16 #> 28: 16 #> 29: 17 #> 30: 17 #> 31: 17 #> 32: 18 #> 33: 18 #> 34: 18 #> 35: 18 #> 36: 19 #> 37: 19 #> 38: 19 #> 39: 20 #> 40: 20 #> 41: 20 #> 42: 20 #> 43: 20 #> 44: 22 #> 45: 23 #> 46: 24 #> 47: 24 #> 48: 24 #> 49: 24 #> 50: 25 #> speed
# same as above but using purrr cars2 = cmap(cars.df, ~.x[1,]) collect(cars2)
#> speed dist #> 1: 4 2 #> 2: 11 17 #> 3: 13 46 #> 4: 16 40 #> 5: 19 46 #> 6: 24 70
# return the first row of each chunk eagerly as list cmap(cars.df, ~.x[1,], lazy = FALSE)
#> [[1]] #> speed dist #> 1: 4 2 #> #> [[2]] #> speed dist #> 1: 11 17 #> #> [[3]] #> speed dist #> 1: 13 46 #> #> [[4]] #> speed dist #> 1: 16 40 #> #> [[5]] #> speed dist #> 1: 19 46 #> #> [[6]] #> speed dist #> 1: 24 70 #>
# return the first row of each chunk eagerly as data.table/data.frame by row-binding cmap_dfr(cars.df, ~.x[1,])
#> speed dist #> 1: 4 2 #> 2: 11 17 #> 3: 13 46 #> 4: 16 40 #> 5: 19 46 #> 6: 24 70
# lazy and delayed are just an aliases for cmap(..., lazy = TRUE) collect(lazy(cars.df, ~.x[1,]))
#> speed dist #> 1: 4 2 #> 2: 11 17 #> 3: 13 46 #> 4: 16 40 #> 5: 19 46 #> 6: 24 70
collect(delayed(cars.df, ~.x[1,]))
#> speed dist #> 1: 4 2 #> 2: 11 17 #> 3: 13 46 #> 4: 16 40 #> 5: 19 46 #> 6: 24 70
# clean up cars.df delete(cars.df) cars.df = as.disk.frame(cars) # .x is the chunk and .y is the ID as an integer # lazy = TRUE support is not available at the moment cimap(cars.df, ~.x[, id := .y], lazy = FALSE)
#> [[1]] #> speed dist id #> 1: 4 2 1 #> 2: 4 10 1 #> 3: 7 4 1 #> 4: 7 22 1 #> 5: 8 16 1 #> 6: 9 10 1 #> 7: 10 18 1 #> 8: 10 26 1 #> 9: 10 34 1 #> #> [[2]] #> speed dist id #> 1: 11 17 2 #> 2: 11 28 2 #> 3: 12 14 2 #> 4: 12 20 2 #> 5: 12 24 2 #> 6: 12 28 2 #> 7: 13 26 2 #> 8: 13 34 2 #> 9: 13 34 2 #> #> [[3]] #> speed dist id #> 1: 13 46 3 #> 2: 14 26 3 #> 3: 14 36 3 #> 4: 14 60 3 #> 5: 14 80 3 #> 6: 15 20 3 #> 7: 15 26 3 #> 8: 15 54 3 #> 9: 16 32 3 #> #> [[4]] #> speed dist id #> 1: 16 40 4 #> 2: 17 32 4 #> 3: 17 40 4 #> 4: 17 50 4 #> 5: 18 42 4 #> 6: 18 56 4 #> 7: 18 76 4 #> 8: 18 84 4 #> 9: 19 36 4 #> #> [[5]] #> speed dist id #> 1: 19 46 5 #> 2: 19 68 5 #> 3: 20 32 5 #> 4: 20 48 5 #> 5: 20 52 5 #> 6: 20 56 5 #> 7: 20 64 5 #> 8: 22 66 5 #> 9: 23 54 5 #> #> [[6]] #> speed dist id #> 1: 24 70 6 #> 2: 24 92 6 #> 3: 24 93 6 #> 4: 24 120 6 #> 5: 25 85 6 #>
cimap_dfr(cars.df, ~.x[, id := .y])
#> speed dist id #> 1: 4 2 1 #> 2: 4 10 1 #> 3: 7 4 1 #> 4: 7 22 1 #> 5: 8 16 1 #> 6: 9 10 1 #> 7: 10 18 1 #> 8: 10 26 1 #> 9: 10 34 1 #> 10: 11 17 2 #> 11: 11 28 2 #> 12: 12 14 2 #> 13: 12 20 2 #> 14: 12 24 2 #> 15: 12 28 2 #> 16: 13 26 2 #> 17: 13 34 2 #> 18: 13 34 2 #> 19: 13 46 3 #> 20: 14 26 3 #> 21: 14 36 3 #> 22: 14 60 3 #> 23: 14 80 3 #> 24: 15 20 3 #> 25: 15 26 3 #> 26: 15 54 3 #> 27: 16 32 3 #> 28: 16 40 4 #> 29: 17 32 4 #> 30: 17 40 4 #> 31: 17 50 4 #> 32: 18 42 4 #> 33: 18 56 4 #> 34: 18 76 4 #> 35: 18 84 4 #> 36: 19 36 4 #> 37: 19 46 5 #> 38: 19 68 5 #> 39: 20 32 5 #> 40: 20 48 5 #> 41: 20 52 5 #> 42: 20 56 5 #> 43: 20 64 5 #> 44: 22 66 5 #> 45: 23 54 5 #> 46: 24 70 6 #> 47: 24 92 6 #> 48: 24 93 6 #> 49: 24 120 6 #> 50: 25 85 6 #> speed dist id
# clean up cars.df delete(cars.df)