Key {disk.frame} concepts

There are a number of concepts and terminologies that are useful to understand in order to use disk.frame effectively.

What is a disk.frame and what are chunks?

A disk.frame is a folder containing fst files named “1.fst”, “2.fst”, “3.fst” etc. Each of the “.fst” file is called a chunk.

Workers and parallelism

Parallelism in {disk.frame} is achieved using the {future} package. When performing many tasks, {disk.frame} uses multiple workers to perform the tasks in parallel. A worker is an R session.

It is recommended that you run the following immediately after library(disk.frame) to set-up multiple workers. For example:

library(disk.frame)
#> Loading required package: dplyr
#> 
#> Attaching package: 'dplyr'
#> The following objects are masked from 'package:stats':
#> 
#>     filter, lag
#> The following objects are masked from 'package:base':
#> 
#>     intersect, setdiff, setequal, union
#> Loading required package: purrr
#> Registered S3 method overwritten by 'pryr':
#>   method      from
#>   print.bytes Rcpp
#> 
#> ## Message from disk.frame:
#> We have 1 workers to use with disk.frame.
#> To change that, use setup_disk.frame(workers = n) or just setup_disk.frame() to use the defaults.
#> 
#> 
#> It is recommended that you run the following immediately to set up disk.frame with multiple workers in order to parallelize your operations:
#> 
#> 
#> ```r
#> # this will set up disk.frame with multiple workers
#> setup_disk.frame()
#> # this will allow unlimited amount of data to be passed from worker to worker
#> options(future.globals.maxSize = Inf)
#> ```
#> 
#> Attaching package: 'disk.frame'
#> The following objects are masked from 'package:purrr':
#> 
#>     imap, imap_dfr, map, map2
#> The following objects are masked from 'package:base':
#> 
#>     colnames, ncol, nrow
setup_disk.frame()
#> The number of workers available for disk.frame is 6

# this will allow unlimited amount of data to be passed from worker to worker
options(future.globals.maxSize = Inf)

For example, suppose we wish to compute the number of rows for each chunk, we can clearly perform this simultaneously in parallel. The code to do that is

# use only one column is fastest
df[,.N, keep = "first_col"]

or equivalent using the srckeep function

# use only one column is fastest
srckeep(df, "first_col")[,.N, keep = "first_col"]

Say there are n chunks in df, and there are m workers. Then the first m chunks will run chunk[,.N] simultaneously.

To see how many workers are available for {disk.frame} to use, run

future::nbrOfWorkers()

How {disk.frame} works

When df %>% some_fn %>% collect is called. The some_fn is applied to each chunk of df. The collect will row-bind the results from some_fn(chunk)together if the returned value of some_fn is a data.frame, or it will return a list containing the results of some_fn.

The session that receives these results is called the main session. In general, we should try to minimize the amount of data passed from the worker sessions back to the main session, because passing data around can be slow.

Also, please note that there is no communication between the workers, except for workers passing data back to the main session.