vignettes/02-intro-disk-frame.Rmd
02-intro-disk-frame.Rmd
The disk.frame
package aims to be the answer to the question: how do I manipulate structured tabular data that doesn’t fit into Random Access Memory (RAM)?
In a nutshell, disk.frame
makes use of two simple ideas
disk.frame
performs a similar role to distributed systems such as Apache Spark, Python’s Dask, and Julia’s JuliaDB.jl for medium data which are datasets that are too large for RAM but not quite large enough to qualify as big data.
In this tutorial, we introduce disk.frame
, address some common questions, and replicate the sparklyr data manipulation tutorial using disk.frame
constructs.
Simply run
install.packages("disk.frame") # when CRAN ready
or
devtools::install_github("xiaodaigh/disk.frame")
disk.frame
disk.frame
works best if it can process multiple data chunks in parallel. The best way to set-up disk.frame
so that each CPU core runs a background worker is by using
setup_disk.frame()
# this will allow unlimited amount of data to be passed from worker to worker
options(future.globals.maxSize = Inf)
The setup_disk.frame()
sets up background workers equal to the number of CPU cores; please note that, by default, hyper-threaded cores are counted as one not two.
Alternatively, one may specify the number of workers using setup_disk.frame(workers = n)
.
disk.frame
The disk.frame
package provides convenient functions to convert data.frame
s and CSVs to disk.frame
s.
disk.frame
from data.frame
We convert a data.frame
to disk.frame
using the as.data.frame
function.
library(nycflights13)
library(dplyr)
library(disk.frame)
library(data.table)
# convert the flights data to a disk.frame and store the disk.frame in the folder
# "tmp_flights" and overwrite any content if needed
flights.df <- as.disk.frame(
flights,
outdir = file.path(tempdir(), "tmp_flights.df"),
overwrite = TRUE)
flights.df
You should now see a folder called tmp_flights
with some files in it, namely 1.fst
, 2.fst
…. where each fst
files is one chunk of the disk.frame
.
disk.frame
from CSV
library(nycflights13)
# write a csv
csv_path = file.path(tempdir(), "tmp_flights.csv")
data.table::fwrite(flights, csv_path)
# load the csv into a disk.frame
df_path = file.path(tempdir(), "tmp_flights.df")
flights.df <- csv_to_disk.frame(
csv_path,
outdir = df_path,
overwrite = T)
flights.df
If the CSV is too large to read in, then we can also use the in_chunk_size
option to control how many rows to read in at once. For example to read in the data 100,000 rows at a time.
library(nycflights13)
library(disk.frame)
# write a csv
csv_path = file.path(tempdir(), "tmp_flights.csv")
data.table::fwrite(flights, csv_path)
df_path = file.path(tempdir(), "tmp_flights.df")
flights.df <- csv_to_disk.frame(
csv_path,
outdir = df_path,
in_chunk_size = 100000)
#> Warning: UNRELIABLE VALUE: One of the 'future.apply' iterations
#> ('future_lapply-1') unexpectedly generated random numbers without declaring so.
#> There is a risk that those random numbers are not statistically sound and the
#> overall results might be invalid. To fix this, specify 'future.seed=TRUE'. This
#> ensures that proper, parallel-safe random numbers are produced via the L'Ecuyer-
#> CMRG method. To disable this check, use 'future.seed = NULL', or set option
#> 'future.rng.onMisuse' to "ignore".
#> Warning: UNRELIABLE VALUE: One of the 'future.apply' iterations
#> ('future_lapply-2') unexpectedly generated random numbers without declaring so.
#> There is a risk that those random numbers are not statistically sound and the
#> overall results might be invalid. To fix this, specify 'future.seed=TRUE'. This
#> ensures that proper, parallel-safe random numbers are produced via the L'Ecuyer-
#> CMRG method. To disable this check, use 'future.seed = NULL', or set option
#> 'future.rng.onMisuse' to "ignore".
flights.df
disk.frame
also has a function zip_to_disk.frame
that can convert every CSV in a zip file to disk.frame
s.
dplyr
verbs and lazy evaluation
flights.df1 <- select(flights.df, year:day, arr_delay, dep_delay)
flights.df1
class(flights.df1)
#> [1] "disk.frame" "disk.frame.folder"
The class of flights.df1
is also a disk.frame
after the dplyr::select
transformation. Also, disk.frame
operations are by default (and where possible) lazy, meaning it doesn’t perform the operations right away. Instead, it waits until you call collect
. Exceptions to this rule are the *_join
operations which evaluated eagerly under certain conditions see Joins for disk.frame in-depth for details.
For lazily constructed disk.frame
s (e.g. flights.df1
). The function collect
can be used to bring the results from disk into R, e.g.
collect(flights.df1) %>% head(2)
#> year month day arr_delay dep_delay
#> 1: 2013 1 1 11 2
#> 2: 2013 1 1 20 4
Of course, for larger-than-RAM datasets, one wouldn’t call collect
on the whole disk.frame
(because why would you need disk.frame
otherwise). More likely, one would call collect
on a filter
ed dataset or one summarized with group_by
.
Some examples of other dplyr verbs applied:
filter(flights.df, dep_delay > 1000) %>% collect %>% head(2)
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1: 2013 1 9 641 900 1301 1242 1530
#> 2: 2013 1 10 1121 1635 1126 1239 1810
#> arr_delay carrier flight tailnum origin dest air_time distance hour minute
#> 1: 1272 HA 51 N384HA JFK HNL 640 4983 9 0
#> 2: 1109 MQ 3695 N517MQ EWR ORD 111 719 16 35
#> time_hour
#> 1: 2013-01-09 14:00:00
#> 2: 2013-01-10 21:00:00
mutate(flights.df, speed = distance / air_time * 60) %>% collect %>% head(2)
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1: 2013 1 1 517 515 2 830 819
#> 2: 2013 1 1 533 529 4 850 830
#> arr_delay carrier flight tailnum origin dest air_time distance hour minute
#> 1: 11 UA 1545 N14228 EWR IAH 227 1400 5 15
#> 2: 20 UA 1714 N24211 LGA IAH 227 1416 5 29
#> time_hour speed
#> 1: 2013-01-01 10:00:00 370.0441
#> 2: 2013-01-01 10:00:00 374.2731
dplyr
verbs
The chunk_arrange
function arranges (sorts) each chunk but not the whole dataset. So use with caution. Similarly chunk_summarise
creates summary variables within each chunk and hence also needs to be used with caution.
# this only sorts within each chunk
chunk_arrange(flights.df, dplyr::desc(dep_delay)) %>% collect %>% head(2)
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1: 2013 1 9 641 900 1301 1242 1530
#> 2: 2013 1 10 1121 1635 1126 1239 1810
#> arr_delay carrier flight tailnum origin dest air_time distance hour minute
#> 1: 1272 HA 51 N384HA JFK HNL 640 4983 9 0
#> 2: 1109 MQ 3695 N517MQ EWR ORD 111 719 16 35
#> time_hour
#> 1: 2013-01-09 14:00:00
#> 2: 2013-01-10 21:00:00
chunk_summarize(flights.df, mean_dep_delay = mean(dep_delay, na.rm =T)) %>% collect
#> mean_dep_delay
#> 1: 12.32700
#> 2: 12.01291
#> 3: 13.81315
#> 4: 12.94445
#> 5: 12.67813
#> 6: 12.05854
One can chain dplyr
verbs together like with a data.frame
c4 <- flights %>%
filter(month == 5, day == 17, carrier %in% c('UA', 'WN', 'AA', 'DL')) %>%
select(carrier, dep_delay, air_time, distance) %>%
mutate(air_time_hours = air_time / 60) %>%
collect %>%
arrange(carrier)# arrange should occur after `collect`
c4 %>% head
#> carrier dep_delay air_time distance air_time_hours
#> 1: AA -7 142 1089 2.366667
#> 2: AA -9 186 1389 3.100000
#> 3: AA -6 143 1096 2.383333
#> 4: AA -4 114 733 1.900000
#> 5: AA -2 146 1085 2.433333
#> 6: AA -7 119 733 1.983333
dplyr
verbs
select
rename
filter
chunk_arrange # within each chunk
chunk_group_by # within each chunk
chunk_summarize # within each chunk
group_by # limited functions
summarize # limited functions
mutate
transmute
left_join
inner_join
full_join # careful. Performance!
semi_join
anit_join
Like other distributed data manipulation frameworks disk.frame
utilizes the sharding concept to distribute the data into chunks. For example “to shard by cust_id
” means that all rows with the same cust_id
will be stored in the same chunk. This enables chunk_group_by
by cust_id
to produce the same results as non-chunked data.
The by
variables that were used to shard the dataset are called the shardkey
s. The sharding is performed by computing a deterministic hash on the shard keys (the by
variables) for each row. The hash function produces an integer between 1
and n
, where n
is the number of chunks.
disk.frame implements the group_by
operation some caveats. In the disk.frame framework, only a set functions are supported in summarize
. However, the user can create more custom group-by
functions can be defined.
flights.df %>%
group_by(carrier) %>%
summarize(count = n(), mean_dep_delay = mean(dep_delay, na.rm=T)) %>% # mean follows normal R rules
collect %>%
arrange(carrier)
#> # A tibble: 16 x 3
#> carrier count mean_dep_delay
#> <chr> <int> <dbl>
#> 1 9E 18460 16.7
#> 2 AA 32729 8.59
#> 3 AS 714 5.80
#> 4 B6 54635 13.0
#> 5 DL 48110 9.26
#> 6 EV 54173 20.0
#> 7 F9 685 20.2
#> 8 FL 3260 18.7
#> 9 HA 342 4.90
#> 10 MQ 26397 10.6
#> 11 OO 32 12.6
#> 12 UA 58665 12.1
#> 13 US 20536 3.78
#> 14 VX 5162 12.9
#> 15 WN 12275 17.7
#> 16 YV 601 19.0
One can restrict which input columns to load into memory for each chunk; this can significantly increase the speed of data processing. To restrict the input columns, use the srckeep
function which only accepts column names as a string vector.
flights.df %>%
srckeep(c("carrier","dep_delay")) %>%
group_by(carrier) %>%
summarize(count = n(), mean_dep_delay = mean(dep_delay, na.rm=T)) %>% # mean follows normal R rules
collect
#> # A tibble: 16 x 3
#> carrier count mean_dep_delay
#> <chr> <int> <dbl>
#> 1 9E 18460 16.7
#> 2 AA 32729 8.59
#> 3 AS 714 5.80
#> 4 B6 54635 13.0
#> 5 DL 48110 9.26
#> 6 EV 54173 20.0
#> 7 F9 685 20.2
#> 8 FL 3260 18.7
#> 9 HA 342 4.90
#> 10 MQ 26397 10.6
#> 11 OO 32 12.6
#> 12 UA 58665 12.1
#> 13 US 20536 3.78
#> 14 VX 5162 12.9
#> 15 WN 12275 17.7
#> 16 YV 601 19.0
Input column restriction is one of the most critical efficiencies provided by disk.frame
. Because the underlying format allows random access to columns (i.e. retrieve only the columns used for processing), hence one can drastically reduce the amount of data loaded into RAM for processing by keeping only those columns that are directly used to produce the results.
disk.frame
supports many dplyr joins including:
left_join
inner_join
semi_join
inner_join
full_join # requires rechunk on both left and right
In all cases, the left dataset (x
) must be a disk.frame
, and the right dataset (y
) can be either a disk.frame
or a data.frame
. If the right dataset is a disk.frame
and the shardkey
s are different between the two disk.frame
s then two expensive hard
rechunk
operations are performed eagerly, one on the left disk.frame
and one on the right disk.frame
to perform the joins correctly.
However, if the right dataset is a data.frame
then rechunk``s are only performed in the case of
full_join`.
Note disk.frame
does not support right_join
the user should use left_join
instead.
The below joins are performed lazily because airlines.dt
is a data.table
not a disk.frame
:
# make airlines a data.table
airlines.dt <- data.table(airlines)
# flights %>% left_join(airlines, by = "carrier") #
flights.df %>%
left_join(airlines.dt, by ="carrier") %>%
collect %>%
head
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1: 2013 1 1 517 515 2 830 819
#> 2: 2013 1 1 533 529 4 850 830
#> 3: 2013 1 1 542 540 2 923 850
#> 4: 2013 1 1 544 545 -1 1004 1022
#> 5: 2013 1 1 554 600 -6 812 837
#> 6: 2013 1 1 554 558 -4 740 728
#> arr_delay carrier flight tailnum origin dest air_time distance hour minute
#> 1: 11 UA 1545 N14228 EWR IAH 227 1400 5 15
#> 2: 20 UA 1714 N24211 LGA IAH 227 1416 5 29
#> 3: 33 AA 1141 N619AA JFK MIA 160 1089 5 40
#> 4: -18 B6 725 N804JB JFK BQN 183 1576 5 45
#> 5: -25 DL 461 N668DN LGA ATL 116 762 6 0
#> 6: 12 UA 1696 N39463 EWR ORD 150 719 5 58
#> time_hour name
#> 1: 2013-01-01 10:00:00 United Air Lines Inc.
#> 2: 2013-01-01 10:00:00 United Air Lines Inc.
#> 3: 2013-01-01 10:00:00 American Airlines Inc.
#> 4: 2013-01-01 10:00:00 JetBlue Airways
#> 5: 2013-01-01 11:00:00 Delta Air Lines Inc.
#> 6: 2013-01-01 10:00:00 United Air Lines Inc.
flights.df %>%
left_join(airlines.dt, by = c("carrier")) %>%
collect %>%
tail
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1: 2013 9 30 NA 1842 NA NA 2019
#> 2: 2013 9 30 NA 1455 NA NA 1634
#> 3: 2013 9 30 NA 2200 NA NA 2312
#> 4: 2013 9 30 NA 1210 NA NA 1330
#> 5: 2013 9 30 NA 1159 NA NA 1344
#> 6: 2013 9 30 NA 840 NA NA 1020
#> arr_delay carrier flight tailnum origin dest air_time distance hour minute
#> 1: NA EV 5274 N740EV LGA BNA NA 764 18 42
#> 2: NA 9E 3393 JFK DCA NA 213 14 55
#> 3: NA 9E 3525 LGA SYR NA 198 22 0
#> 4: NA MQ 3461 N535MQ LGA BNA NA 764 12 10
#> 5: NA MQ 3572 N511MQ LGA CLE NA 419 11 59
#> 6: NA MQ 3531 N839MQ LGA RDU NA 431 8 40
#> time_hour name
#> 1: 2013-09-30 22:00:00 ExpressJet Airlines Inc.
#> 2: 2013-09-30 18:00:00 Endeavor Air Inc.
#> 3: 2013-10-01 02:00:00 Endeavor Air Inc.
#> 4: 2013-09-30 16:00:00 Envoy Air
#> 5: 2013-09-30 15:00:00 Envoy Air
#> 6: 2013-09-30 12:00:00 Envoy Air
disk.frame supports all data.frame
operations, unlike Spark which can only perform those operations that Spark has implemented. Hence windowing functions like min_rank
and rank
are supported out of the box.
For example
# Rank each flight within a daily
ranked <- flights.df %>%
srckeep(c("year","month","day", "dep_delay")) %>%
chunk_group_by(year, month, day) %>%
select(dep_delay) %>%
mutate(rank = rank(desc(dep_delay))) %>%
collect
ranked %>% head
#> year month day dep_delay rank
#> 1: 2013 1 1 2 313
#> 2: 2013 1 1 4 276
#> 3: 2013 1 1 2 313
#> 4: 2013 1 1 -1 440
#> 5: 2013 1 1 -6 742
#> 6: 2013 1 1 -4 633
One can apply arbitrary transformations to each chunk of the disk.frame
by using the delayed
function which evaluates lazily or the cmap.disk.frame(lazy = F)
function which evaluates eagerly. For example to return the number of rows in each chunk
flights.df1 <- delayed(flights.df, ~nrow(.x))
collect_list(flights.df1) %>% head # returns number of rows for each data.frame in a list
#> [[1]]
#> [1] 56131
#>
#> [[2]]
#> [1] 56131
#>
#> [[3]]
#> [1] 56131
#>
#> [[4]]
#> [1] 56131
#>
#> [[5]]
#> [1] 56131
#>
#> [[6]]
#> [1] 56121
and to do the same with cmap.disk.frame
The cmap
function can also output the results to another disk.frame folder, e.g.
# return the first 10 rows of each chunk
flights.df2 <- cmap(flights.df, ~.x[1:10,], lazy = F, outdir = file.path(tempdir(), "tmp2"), overwrite = T)
flights.df2 %>% head
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1: 2013 1 1 517 515 2 830 819
#> 2: 2013 1 1 533 529 4 850 830
#> 3: 2013 1 1 542 540 2 923 850
#> 4: 2013 1 1 544 545 -1 1004 1022
#> 5: 2013 1 1 554 600 -6 812 837
#> 6: 2013 1 1 554 558 -4 740 728
#> arr_delay carrier flight tailnum origin dest air_time distance hour minute
#> 1: 11 UA 1545 N14228 EWR IAH 227 1400 5 15
#> 2: 20 UA 1714 N24211 LGA IAH 227 1416 5 29
#> 3: 33 AA 1141 N619AA JFK MIA 160 1089 5 40
#> 4: -18 B6 725 N804JB JFK BQN 183 1576 5 45
#> 5: -25 DL 461 N668DN LGA ATL 116 762 6 0
#> 6: 12 UA 1696 N39463 EWR ORD 150 719 5 58
#> time_hour
#> 1: 2013-01-01 10:00:00
#> 2: 2013-01-01 10:00:00
#> 3: 2013-01-01 10:00:00
#> 4: 2013-01-01 10:00:00
#> 5: 2013-01-01 11:00:00
#> 6: 2013-01-01 10:00:00
Notice disk.frame supports the purrr
syntax for defining a function using ~
.
In the disk.frame
framework, sampling a proportion of rows within each chunk can be performed using sample_frac
.
flights.df %>% sample_frac(0.01) %>% collect %>% head
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1: 2013 2 1 1348 1345 3 1658 1705
#> 2: 2013 1 17 755 800 -5 856 908
#> 3: 2013 1 8 830 830 0 1116 1106
#> 4: 2013 12 31 1936 1945 -9 2048 2120
#> 5: 2013 8 22 1350 1345 5 1606 1635
#> 6: 2013 12 30 1749 1750 -1 2002 2020
#> arr_delay carrier flight tailnum origin dest air_time distance hour minute
#> 1: -7 AA 1073 N3FTAA LGA MIA 166 1096 13 45
#> 2: -12 US 2118 N958UW LGA BOS 37 184 8 0
#> 3: 10 F9 835 N202FR LGA DEN 247 1620 8 30
#> 4: -32 9E 2950 N8516C JFK BWI 39 184 19 45
#> 5: -29 AA 753 N3BYAA LGA DFW 175 1389 13 45
#> 6: -18 9E 2954 N8516C JFK CHS 112 636 17 50
#> time_hour
#> 1: 2013-02-01 18:00:00
#> 2: 2013-01-17 13:00:00
#> 3: 2013-01-08 13:00:00
#> 4: 2014-01-01 00:00:00
#> 5: 2013-08-22 17:00:00
#> 6: 2013-12-30 22:00:00
One can output a disk.frame
by using the write_disk.frame
function. E.g.
write_disk.frame(flights.df, outdir="out")
this will output a disk.frame to the folder “out”