Quick Start - replicating dplyr’s tutorial on nycflight13

The disk.frame package aims to be the answer to the question: how do I manipulate structured tabular data that doesn’t fit into Random Access Memory (RAM)?

In a nutshell, disk.frame makes use of two simple ideas

  1. split up a larger-than-RAM dataset into chunks and store each chunk in a separate file inside a folder and
  2. provide a convenient API to manipulate these chunks

disk.frame performs a similar role to distributed systems such as Apache Spark, Python’s Dask, and Julia’s JuliaDB.jl for medium data which are datasets that are too large for RAM but not quite large enough to qualify as big data.

In this tutorial, we introduce disk.frame, address some common questions, and replicate the sparklyr data manipulation tutorial using disk.frame constructs.

Installation

Simply run

install.packages("disk.frame") # when CRAN ready

or

devtools::install_github("xiaodaigh/disk.frame")

Set-up disk.frame

disk.frame works best if it can process multiple data chunks in parallel. The best way to set-up disk.frame so that each CPU core runs a background worker is by using

setup_disk.frame()

# this will allow unlimited amount of data to be passed from worker to worker
options(future.globals.maxSize = Inf)

The setup_disk.frame() sets up background workers equal to the number of CPU cores; please note that, by default, hyper-threaded cores are counted as one not two.

Alternatively, one may specify the number of workers using setup_disk.frame(workers = n).

Basic Data Operations with disk.frame

The disk.frame package provides convenient functions to convert data.frames and CSVs to disk.frames.

Creating a disk.frame from data.frame

We convert a data.frame to disk.frame using the as.data.frame function.

library(nycflights13)
library(dplyr)
library(disk.frame)
library(data.table)

# convert the flights data to a disk.frame and store the disk.frame in the folder
# "tmp_flights" and overwrite any content if needed
flights.df <- as.disk.frame(
  flights, 
  outdir = file.path(tempdir(), "tmp_flights.df"),
  overwrite = TRUE)
flights.df
#> path: "C:\Users\RTX2080\AppData\Local\Temp\RtmpaQzEEk/tmp_flights.df"
#> nchunks: 6
#> nrow (at source): 336776
#> ncol (at source): 19
#> nrow (post operations): ???
#> ncol (post operations): ???

You should now see a folder called tmp_flights with some files in it, namely 1.fst, 2.fst…. where each fst files is one chunk of the disk.frame.

Creating a disk.frame from CSV

library(nycflights13)
# write a csv
csv_path = file.path(tempdir(), "tmp_flights.csv")
data.table::fwrite(flights, csv_path)

# load the csv into a disk.frame
df_path = file.path(tempdir(), "tmp_flights.df")
flights.df <- csv_to_disk.frame(
  csv_path, 
  outdir = df_path,
  overwrite = T)
  
flights.df
#> path: "C:\Users\RTX2080\AppData\Local\Temp\RtmpaQzEEk/tmp_flights.df"
#> nchunks: 6
#> nrow (at source): 336776
#> ncol (at source): 19
#> nrow (post operations): ???
#> ncol (post operations): ???

If the CSV is too large to read in, then we can also use the in_chunk_size option to control how many rows to read in at once. For example to read in the data 100,000 rows at a time.

library(nycflights13)
library(disk.frame)

# write a csv
csv_path = file.path(tempdir(), "tmp_flights.csv")

data.table::fwrite(flights, csv_path)

df_path = file.path(tempdir(), "tmp_flights.df")

flights.df <- csv_to_disk.frame(
  csv_path, 
  outdir = df_path, 
  in_chunk_size = 100000)
#>  -----------------------------------------------------
#> Stage 1 of 2: splitting the file C:\Users\RTX2080\AppData\Local\Temp\RtmpaQzEEk/tmp_flights.csv into smallers files:
#> Destination: C:\Users\RTX2080\AppData\Local\Temp\RtmpaQzEEk\file34803c7fa78
#>  -----------------------------------------------------
#> Stage 1 of 2 took: 0.170s elapsed (0.080s cpu)
#>  -----------------------------------------------------
#> Stage 2 of 2: Converting the smaller files into disk.frame
#>  -----------------------------------------------------
#> csv_to_disk.frame: Reading multiple input files.
#> Please use `colClasses = `  to set column types to minimize the chance of a failed read
#> =================================================
#> 
#>  -----------------------------------------------------
#> -- Converting CSVs to disk.frame -- Stage 1 of 2:
#> 
#> Converting 4 CSVs to 6 disk.frames each consisting of 6 chunks
#> 
#> -- Converting CSVs to disk.frame -- Stage 1 or 2 took: 0.970s elapsed (0.070s cpu)
#>  -----------------------------------------------------
#> 
#>  -----------------------------------------------------
#> -- Converting CSVs to disk.frame -- Stage 2 of 2:
#> 
#> Row-binding the 6 disk.frames together to form one large disk.frame:
#> Creating the disk.frame at C:\Users\RTX2080\AppData\Local\Temp\RtmpaQzEEk/tmp_flights.df
#> 
#> Appending disk.frames:
#> Stage 2 of 2 took: 0.630s elapsed (0.080s cpu)
#>  -----------------------------------------------------
#> Stage 1 & 2 in total took: 1.600s elapsed (0.150s cpu)
#> Stage 2 of 2 took: 1.610s elapsed (0.160s cpu)
#>  -----------------------------------------------------
#> Stage 2 & 2 took: 1.780s elapsed (0.240s cpu)
#>  -----------------------------------------------------
  
flights.df
#> path: "C:\Users\RTX2080\AppData\Local\Temp\RtmpaQzEEk/tmp_flights.df"
#> nchunks: 6
#> nrow (at source): 336776
#> ncol (at source): 19
#> nrow (post operations): ???
#> ncol (post operations): ???

disk.frame also has a function zip_to_disk.frame that can convert every CSV in a zip file to disk.frames.

Simple dplyr verbs and lazy evaluation

flights.df1 <- select(flights.df, year:day, arr_delay, dep_delay)
flights.df1
#> path: "C:\Users\RTX2080\AppData\Local\Temp\RtmpaQzEEk/tmp_flights.df"
#> nchunks: 6
#> nrow (at source): 336776
#> ncol (at source): 19
#> nrow (post operations): ???
#> ncol (post operations): ???
class(flights.df1)
#> [1] "disk.frame"        "disk.frame.folder"

The class of flights.df1 is also a disk.frame after the dplyr::select transformation. Also, disk.frame operations are by default (and where possible) lazy, meaning it doesn’t perform the operations right away. Instead, it waits until you call collect. Exceptions to this rule are the *_join operations which evaluated eagerly under certain conditions see Joins for disk.frame in-depth for details.

For lazily constructed disk.frames (e.g. flights.df1). The function collect can be used to bring the results from disk into R, e.g.

collect(flights.df1) %>% head
#>    year month day arr_delay dep_delay
#> 1: 2013     1   1        11         2
#> 2: 2013     1   1        20         4
#> 3: 2013     1   1        33         2
#> 4: 2013     1   1       -18        -1
#> 5: 2013     1   1       -25        -6
#> 6: 2013     1   1        12        -4

Of course, for larger-than-RAM datasets, one wouldn’t call collect on the whole disk.frame (because why would you need disk.frame otherwise). More likely, one would call collect on a filtered dataset or one summarized with group_by.

Some examples of other dplyr verbs applied:

filter(flights.df, dep_delay > 1000) %>% collect %>% head
#>   year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1 2013     1   9      641            900      1301     1242           1530
#> 2 2013     1  10     1121           1635      1126     1239           1810
#> 3 2013     6  15     1432           1935      1137     1607           2120
#> 4 2013     7  22      845           1600      1005     1044           1815
#> 5 2013     9  20     1139           1845      1014     1457           2210
#>   arr_delay carrier flight tailnum origin dest air_time distance hour
#> 1      1272      HA     51  N384HA    JFK  HNL      640     4983    9
#> 2      1109      MQ   3695  N517MQ    EWR  ORD      111      719   16
#> 3      1127      MQ   3535  N504MQ    JFK  CMH       74      483   19
#> 4       989      MQ   3075  N665MQ    JFK  CVG       96      589   16
#> 5      1007      AA    177  N338AA    JFK  SFO      354     2586   18
#>   minute            time_hour
#> 1      0 2013-01-09T14:00:00Z
#> 2     35 2013-01-10T21:00:00Z
#> 3     35 2013-06-15T23:00:00Z
#> 4      0 2013-07-22T20:00:00Z
#> 5     45 2013-09-20T22:00:00Z
mutate(flights.df, speed = distance / air_time * 60) %>% collect %>% head
#>   year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1 2013     1   1      517            515         2      830            819
#> 2 2013     1   1      533            529         4      850            830
#> 3 2013     1   1      542            540         2      923            850
#> 4 2013     1   1      544            545        -1     1004           1022
#> 5 2013     1   1      554            600        -6      812            837
#> 6 2013     1   1      554            558        -4      740            728
#>   arr_delay carrier flight tailnum origin dest air_time distance hour
#> 1        11      UA   1545  N14228    EWR  IAH      227     1400    5
#> 2        20      UA   1714  N24211    LGA  IAH      227     1416    5
#> 3        33      AA   1141  N619AA    JFK  MIA      160     1089    5
#> 4       -18      B6    725  N804JB    JFK  BQN      183     1576    5
#> 5       -25      DL    461  N668DN    LGA  ATL      116      762    6
#> 6        12      UA   1696  N39463    EWR  ORD      150      719    5
#>   minute            time_hour    speed
#> 1     15 2013-01-01T10:00:00Z 370.0441
#> 2     29 2013-01-01T10:00:00Z 374.2731
#> 3     40 2013-01-01T10:00:00Z 408.3750
#> 4     45 2013-01-01T10:00:00Z 516.7213
#> 5      0 2013-01-01T11:00:00Z 394.1379
#> 6     58 2013-01-01T10:00:00Z 287.6000

Examples of NOT fully supported dplyr verbs

The chunk_arrange function arranges (sorts) each chunk but not the whole dataset. So use with caution. Similarly chunk_summarise creates summary variables within each chunk and hence also needs to be used with caution. In the Group By section, we demonstrate how to use summarise in the disk.frame context correctly with hard_group_bys.

# this only sorts within each chunk
chunk_arrange(flights.df, dplyr::desc(dep_delay)) %>% collect %>% head
#>   year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1 2013     1   9      641            900      1301     1242           1530
#> 2 2013     1  10     1121           1635      1126     1239           1810
#> 3 2013     1   1      848           1835       853     1001           1950
#> 4 2013     5  19      713           1700       853     1007           1955
#> 5 2013     1  13     1809            810       599     2054           1042
#> 6 2013     5  16     2233           1340       533       59           1640
#>   arr_delay carrier flight tailnum origin dest air_time distance hour
#> 1      1272      HA     51  N384HA    JFK  HNL      640     4983    9
#> 2      1109      MQ   3695  N517MQ    EWR  ORD      111      719   16
#> 3       851      MQ   3944  N942MQ    JFK  BWI       41      184   18
#> 4       852      AA    257  N3HEAA    JFK  LAS      323     2248   17
#> 5       612      DL    269  N322NB    JFK  ATL      116      760    8
#> 6       499      AA    753  N3ERAA    LGA  DFW      184     1389   13
#>   minute            time_hour
#> 1      0 2013-01-09T14:00:00Z
#> 2     35 2013-01-10T21:00:00Z
#> 3     35 2013-01-01T23:00:00Z
#> 4      0 2013-05-19T21:00:00Z
#> 5     10 2013-01-13T13:00:00Z
#> 6     40 2013-05-16T17:00:00Z
chunk_summarize(flights.df, mean_dep_delay = mean(dep_delay, na.rm =T)) %>% collect
#>   mean_dep_delay
#> 1       12.32700
#> 2       12.01291
#> 3       13.81315
#> 4       12.94445
#> 5       12.67813
#> 6       12.05854

Piping

One can chain dplyr verbs together like with a data.frame

c4 <- flights %>%
  filter(month == 5, day == 17, carrier %in% c('UA', 'WN', 'AA', 'DL')) %>%
  select(carrier, dep_delay, air_time, distance) %>%
  mutate(air_time_hours = air_time / 60) %>%
  collect %>%
  arrange(carrier)# arrange should occur after `collect`

c4  %>% head
#>   carrier dep_delay air_time distance air_time_hours
#> 1      AA        -7      142     1089       2.366667
#> 2      AA        -9      186     1389       3.100000
#> 3      AA        -6      143     1096       2.383333
#> 4      AA        -4      114      733       1.900000
#> 5      AA        -2      146     1085       2.433333
#> 6      AA        -7      119      733       1.983333

List of supported dplyr verbs

select
rename
filter
chunk_arrange # within each chunk
chunk_group_by # within each chunk
chunk_summarise/chunk_summarize # within each chunk
mutate
transmute
left_join
inner_join
full_join # careful. Performance!
semi_join
anit_join

Sharding and distribution of chunks

Like other distributed data manipulation frameworks disk.frame utilizes the sharding concept to distribute the data into chunks. For example “to shard by cust_id” means that all rows with the same cust_id will be stored in the same chunk. This enables chunk_group_by by cust_id to produce the same results as non-chunked data.

The by variables that were used to shard the dataset are called the shardkeys. The sharding is performed by computing a deterministic hash on the shard keys (the by variables) for each row. The hash function produces an integer between 1 and n, where n is the number of chunks.

Grouping

The disk.frame implements the chunk_group_by operation with a significant caveat. In the disk.frame framework, group-by happens WITHIN each chunk and not ACROSS chunks. To achieve group by across chunk we need to put all rows with the same group keys into the same file chunk; this can be achieved with hard_group_by. However, the hard_group_by operation can be VERY TIME CONSUMING computationally and should be avoided if possible.

The hard_group_by operation is best illustrated with an example, suppose a disk.frame has three chunks

# chunk1 = 1.fst
#  id n
#1  a 1
#2  a 2
#3  b 3
#4  d 4

# chunk2 = 2.fst
#  id n
#1  a 4
#2  a 5
#3  b 6
#4  d 7

# chunk3 = 3.fst
#  id n
#1  a 4
#2  b 5
#3  c 6

and notice that the id column contains 3 distinct values "a","b", and "c". To perform hard_group_by(df, by = id) MAY give you the following disk.frame where all the ids with the same values end up in the same chunks.

# chunk1 = 1.fst
#  id n
#1  b 3
#2  b 6

# chunk2 = 2.fst
#  id n
#1  c 6
#2  d 4
#3  d 7

# chunk3 = 3.fst
#  id n
#1  a 1
#2  a 2
#3  a 4
#4  a 5
#5  a 4

Also, notice that there is no guaranteed order for the distribution of the ids to the chunks. The order is random, but each chunk is likely to have a similar number of rows, provided that id does not follow a skewed distribution i.e. where a few distinct values make up the majority of the rows.

Typically, chunk_group_by is performed WITHIN each chunk. This is not an issue if the chunks have already been sharded on the by variables beforehand; however, if this is not the case then one may need a second stage aggregation to obtain the correct result, see Two-stage group by.

By forcing the user to choose chunk_group_by (within each chunk) and hard_group_by (across all chunks), this ensures that the user is conscious of the choice they are making. In sparklyr the equivalent of a hard_group_by is performed, which we should avoid, where possible, as it is time-consuming and expensive. Hence, disk.frame has chosen to explain the theory and allow the user to make a conscious choice when performing group_by.

flights.df %>%
  hard_group_by(carrier) %>% # notice that hard_group_by needs to be set
  chunk_summarize(count = n(), mean_dep_delay = mean(dep_delay, na.rm=T)) %>%  # mean follows normal R rules
  collect %>% 
  arrange(carrier)
#> Hashing...
#> Hashing...
#> Hashing...
#> Hashing...
#> Hashing...
#> Hashing...
#> Appending disk.frames:
#> # A tibble: 16 x 3
#>    carrier count mean_dep_delay
#>    <chr>   <int>          <dbl>
#>  1 9E      18460          16.7 
#>  2 AA      32729           8.59
#>  3 AS        714           5.80
#>  4 B6      54635          13.0 
#>  5 DL      48110           9.26
#>  6 EV      54173          20.0 
#>  7 F9        685          20.2 
#>  8 FL       3260          18.7 
#>  9 HA        342           4.90
#> 10 MQ      26397          10.6 
#> 11 OO         32          12.6 
#> 12 UA      58665          12.1 
#> 13 US      20536           3.78
#> 14 VX       5162          12.9 
#> 15 WN      12275          17.7 
#> 16 YV        601          19.0

Two-stage group by

For most group-by tasks, the user can achieve the desired result WITHOUT using hard = TRUE by performing the group by in two stages. For example, suppose you aim to count the number of rows group by carrier, you can set hard = F to find the count within each chunk and then use a second group-by to summaries each chunk’s results into the desired result. For example,

flights.df %>%
  chunk_group_by(carrier) %>% # `chunk_group_by` aggregates within each chunk
  chunk_summarize(count = n()) %>%  # mean follows normal R rules
  collect %>%  # collect each individul chunks results and row-bind into a data.table
  group_by(carrier) %>% 
  summarize(count = sum(count)) %>% 
  arrange(carrier)
#> # A tibble: 16 x 2
#>    carrier count
#>    <chr>   <int>
#>  1 9E      18460
#>  2 AA      32729
#>  3 AS        714
#>  4 B6      54635
#>  5 DL      48110
#>  6 EV      54173
#>  7 F9        685
#>  8 FL       3260
#>  9 HA        342
#> 10 MQ      26397
#> 11 OO         32
#> 12 UA      58665
#> 13 US      20536
#> 14 VX       5162
#> 15 WN      12275
#> 16 YV        601

Because this two-stage approach avoids the expensive hard group_by operation, it is often significantly faster. However, it can be tedious to write; and this is a con of the disk.frame chunking mechanism.

Note: this two-stage approach is similar to a map-reduce operation.

Restrict input columns for faster processing

One can restrict which input columns to load into memory for each chunk; this can significantly increase the speed of data processing. To restrict the input columns, use the srckeep function which only accepts column names as a string vector.

flights.df %>%
  srckeep(c("carrier","dep_delay")) %>%
  hard_group_by(carrier) %>% 
  chunk_summarize(count = n(), mean_dep_delay = mean(dep_delay, na.rm=T)) %>%  # mean follows normal R rules
  collect
#> Hashing...
#> Hashing...
#> Hashing...
#> Hashing...
#> Hashing...
#> Hashing...
#> Appending disk.frames:
#> # A tibble: 16 x 3
#>    carrier count mean_dep_delay
#>    <chr>   <int>          <dbl>
#>  1 B6      54635          13.0 
#>  2 FL       3260          18.7 
#>  3 MQ      26397          10.6 
#>  4 US      20536           3.78
#>  5 HA        342           4.90
#>  6 9E      18460          16.7 
#>  7 UA      58665          12.1 
#>  8 AA      32729           8.59
#>  9 DL      48110           9.26
#> 10 EV      54173          20.0 
#> 11 VX       5162          12.9 
#> 12 WN      12275          17.7 
#> 13 YV        601          19.0 
#> 14 OO         32          12.6 
#> 15 AS        714           5.80
#> 16 F9        685          20.2

Input column restriction is one of the most critical efficiencies provided by disk.frame. Because the underlying format allows random access to columns (i.e. retrieve only the columns used for processing), hence one can drastically reduce the amount of data loaded into RAM for processing by keeping only those columns that are directly used to produce the results.

Joins

disk.frame supports many dplyr joins including:

left_join
inner_join
semi_join
inner_join
full_join # requires hard_group_by on both left and right

In all cases, the left dataset (x) must be a disk.frame, and the right dataset (y) can be either a disk.frame or a data.frame. If the right dataset is a disk.frame and the shardkeys are different between the two disk.frames then two expensive hard group_by operations are performed eagerly, one on the left disk.frame and one on the right disk.frame to perform the joins correctly.

However, if the right dataset is a data.frame then hard_group_bys are only performed in the case of full_join.

Note disk.frame does not support right_join the user should use left_join instead.

The below joins are performed lazily because airlines.dt is a data.table not a disk.frame:

# make airlines a data.table
airlines.dt <- data.table(airlines)
# flights %>% left_join(airlines, by = "carrier") #
flights.df %>% 
  left_join(airlines.dt, by ="carrier") %>% 
  collect %>% 
  head
#>    year month day dep_time sched_dep_time dep_delay arr_time
#> 1: 2013     1   1      517            515         2      830
#> 2: 2013     1   1      533            529         4      850
#> 3: 2013     1   1      542            540         2      923
#> 4: 2013     1   1      544            545        -1     1004
#> 5: 2013     1   1      554            600        -6      812
#> 6: 2013     1   1      554            558        -4      740
#>    sched_arr_time arr_delay carrier flight tailnum origin dest air_time
#> 1:            819        11      UA   1545  N14228    EWR  IAH      227
#> 2:            830        20      UA   1714  N24211    LGA  IAH      227
#> 3:            850        33      AA   1141  N619AA    JFK  MIA      160
#> 4:           1022       -18      B6    725  N804JB    JFK  BQN      183
#> 5:            837       -25      DL    461  N668DN    LGA  ATL      116
#> 6:            728        12      UA   1696  N39463    EWR  ORD      150
#>    distance hour minute            time_hour                   name
#> 1:     1400    5     15 2013-01-01T10:00:00Z  United Air Lines Inc.
#> 2:     1416    5     29 2013-01-01T10:00:00Z  United Air Lines Inc.
#> 3:     1089    5     40 2013-01-01T10:00:00Z American Airlines Inc.
#> 4:     1576    5     45 2013-01-01T10:00:00Z        JetBlue Airways
#> 5:      762    6      0 2013-01-01T11:00:00Z   Delta Air Lines Inc.
#> 6:      719    5     58 2013-01-01T10:00:00Z  United Air Lines Inc.
flights.df %>% 
  left_join(airlines.dt, by = c("carrier", "carrier")) %>% 
  collect %>% 
  tail
#>    year month day dep_time sched_dep_time dep_delay arr_time
#> 1: 2013     9  30       NA           1842        NA       NA
#> 2: 2013     9  30       NA           1455        NA       NA
#> 3: 2013     9  30       NA           2200        NA       NA
#> 4: 2013     9  30       NA           1210        NA       NA
#> 5: 2013     9  30       NA           1159        NA       NA
#> 6: 2013     9  30       NA            840        NA       NA
#>    sched_arr_time arr_delay carrier flight tailnum origin dest air_time
#> 1:           2019        NA      EV   5274  N740EV    LGA  BNA       NA
#> 2:           1634        NA      9E   3393            JFK  DCA       NA
#> 3:           2312        NA      9E   3525            LGA  SYR       NA
#> 4:           1330        NA      MQ   3461  N535MQ    LGA  BNA       NA
#> 5:           1344        NA      MQ   3572  N511MQ    LGA  CLE       NA
#> 6:           1020        NA      MQ   3531  N839MQ    LGA  RDU       NA
#>    distance hour minute            time_hour                     name
#> 1:      764   18     42 2013-09-30T22:00:00Z ExpressJet Airlines Inc.
#> 2:      213   14     55 2013-09-30T18:00:00Z        Endeavor Air Inc.
#> 3:      198   22      0 2013-10-01T02:00:00Z        Endeavor Air Inc.
#> 4:      764   12     10 2013-09-30T16:00:00Z                Envoy Air
#> 5:      419   11     59 2013-09-30T15:00:00Z                Envoy Air
#> 6:      431    8     40 2013-09-30T12:00:00Z                Envoy Air

Window functions and arbitrary functions

disk.frame supports all data.frame operations, unlike Spark which can only perform those operations that Spark has implemented. Hence windowing functions like rank are supported out of the box.

# Find the most and least delayed flight each day
bestworst <- flights.df %>%
   srckeep(c("year","month","day", "dep_delay")) %>%
   chunk_group_by(year, month, day) %>%
   select(dep_delay) %>%
   filter(dep_delay == min(dep_delay, na.rm = T) || dep_delay == max(dep_delay, na.rm = T)) %>%
   collect
#> Adding missing grouping variables: `year`, `month`, `day`
#> Warning in min(dep_delay, na.rm = T): no non-missing arguments to min;
#> returning Inf
#> Warning in max(dep_delay, na.rm = T): no non-missing arguments to max;
#> returning -Inf
#> Adding missing grouping variables: `year`, `month`, `day`
#> Adding missing grouping variables: `year`, `month`, `day`
#> Adding missing grouping variables: `year`, `month`, `day`
#> Adding missing grouping variables: `year`, `month`, `day`
#> Adding missing grouping variables: `year`, `month`, `day`
   

bestworst %>% head
#> # A tibble: 6 x 4
#> # Groups:   year, month, day [1]
#>    year month   day dep_delay
#>   <int> <int> <int>     <int>
#> 1  2013     2     7       148
#> 2  2013     2     7        -2
#> 3  2013     2     7         1
#> 4  2013     2     7         6
#> 5  2013     2     7        -2
#> 6  2013     2     7        -2
# Rank each flight within a daily
ranked <- flights.df %>%
  srckeep(c("year","month","day", "dep_delay")) %>%
  chunk_group_by(year, month, day) %>%
  select(dep_delay) %>%
  mutate(rank = rank(desc(dep_delay))) %>%
  collect
#> Adding missing grouping variables: `year`, `month`, `day`
#> Adding missing grouping variables: `year`, `month`, `day`
#> Adding missing grouping variables: `year`, `month`, `day`
#> Adding missing grouping variables: `year`, `month`, `day`
#> Adding missing grouping variables: `year`, `month`, `day`
#> Adding missing grouping variables: `year`, `month`, `day`

ranked %>% head
#> # A tibble: 6 x 5
#> # Groups:   year, month, day [1]
#>    year month   day dep_delay  rank
#>   <int> <int> <int>     <int> <dbl>
#> 1  2013     1     1         2   313
#> 2  2013     1     1         4   276
#> 3  2013     1     1         2   313
#> 4  2013     1     1        -1   440
#> 5  2013     1     1        -6   742
#> 6  2013     1     1        -4   633

Arbitrary by-chunk processing

One can apply arbitrary transformations to each chunk of the disk.frame by using the delayed function which evaluates lazily or the map.disk.frame(lazy = F) function which evaluates eagerly. For example to return the number of rows in each chunk

flights.df1 <- delayed(flights.df, ~nrow(.x))
collect_list(flights.df1) %>% head # returns number of rows for each data.frame in a list
#> [[1]]
#> [1] 56131
#> 
#> [[2]]
#> [1] 56131
#> 
#> [[3]]
#> [1] 56131
#> 
#> [[4]]
#> [1] 56131
#> 
#> [[5]]
#> [1] 56131
#> 
#> [[6]]
#> [1] 56121

and to do the same with map.disk.frame

map(flights.df, ~nrow(.x), lazy = F) %>% head
#> [[1]]
#> [1] 56131
#> 
#> [[2]]
#> [1] 56131
#> 
#> [[3]]
#> [1] 56131
#> 
#> [[4]]
#> [1] 56131
#> 
#> [[5]]
#> [1] 56131
#> 
#> [[6]]
#> [1] 56121

The map function can also output the results to another disk.frame folder, e.g.

# return the first 10 rows of each chunk
flights.df2 <- map(flights.df, ~.x[1:10,], lazy = F, outdir = file.path(tempdir(), "tmp2"), overwrite = T)

flights.df2 %>% head
#>    year month day dep_time sched_dep_time dep_delay arr_time
#> 1: 2013     1   1      517            515         2      830
#> 2: 2013     1   1      533            529         4      850
#> 3: 2013     1   1      542            540         2      923
#> 4: 2013     1   1      544            545        -1     1004
#> 5: 2013     1   1      554            600        -6      812
#> 6: 2013     1   1      554            558        -4      740
#>    sched_arr_time arr_delay carrier flight tailnum origin dest air_time
#> 1:            819        11      UA   1545  N14228    EWR  IAH      227
#> 2:            830        20      UA   1714  N24211    LGA  IAH      227
#> 3:            850        33      AA   1141  N619AA    JFK  MIA      160
#> 4:           1022       -18      B6    725  N804JB    JFK  BQN      183
#> 5:            837       -25      DL    461  N668DN    LGA  ATL      116
#> 6:            728        12      UA   1696  N39463    EWR  ORD      150
#>    distance hour minute            time_hour
#> 1:     1400    5     15 2013-01-01T10:00:00Z
#> 2:     1416    5     29 2013-01-01T10:00:00Z
#> 3:     1089    5     40 2013-01-01T10:00:00Z
#> 4:     1576    5     45 2013-01-01T10:00:00Z
#> 5:      762    6      0 2013-01-01T11:00:00Z
#> 6:      719    5     58 2013-01-01T10:00:00Z

Notice disk.frame supports the purrr syntax for defining a function using ~.

Sampling

In the disk.frame framework, sampling a proportion of rows within each chunk can be performed using sample_frac.

flights.df %>% sample_frac(0.01) %>% collect %>% head
#>   year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1 2013     5  17     1856           1815        41     2120           2039
#> 2 2013     1   5      731            735        -4      856            858
#> 3 2013     5  25     1950           1700       170     2122           1830
#> 4 2013    12  20      852            815        37     1058           1040
#> 5 2013     1  15     1928           1935        -7     2241           2240
#> 6 2013    12  27     1455           1500        -5     1713           1740
#>   arr_delay carrier flight tailnum origin dest air_time distance hour
#> 1        41      9E   3691  N8771A    JFK  CHS       91      636   18
#> 2        -2      B6     20  N281JB    JFK  ROC       57      264    7
#> 3       172      MQ   4323  N682MQ    JFK  ORF       47      290   17
#> 4        18      9E   2921  N936XJ    JFK  MSP      164     1029    8
#> 5         1      AA    791  N3ANAA    LGA  DFW      228     1389   19
#> 6       -27      MQ   3202  N613MQ    JFK  IND      104      665   15
#>   minute            time_hour
#> 1     15 2013-05-17T22:00:00Z
#> 2     35 2013-01-05T12:00:00Z
#> 3      0 2013-05-25T21:00:00Z
#> 4     15 2013-12-20T13:00:00Z
#> 5     35 2013-01-16T00:00:00Z
#> 6      0 2013-12-27T20:00:00Z

Writing Data

One can output a disk.frame by using the write_disk.frame function. E.g.

write_disk.frame(flights.df, outdir="out")

this will output a disk.frame to the folder “out”

fs::dir_delete(file.path(tempdir(), "tmp_flights.df"))
fs::dir_delete(file.path(tempdir(), "tmp2"))
fs::file_delete(file.path(tempdir(), "tmp_flights.csv"))