vignettes/02-intro-disk-frame.Rmd
02-intro-disk-frame.RmdThe disk.frame package aims to be the answer to the question: how do I manipulate structured tabular data that doesn’t fit into Random Access Memory (RAM)?
In a nutshell, disk.frame makes use of two simple ideas
disk.frame performs a similar role to distributed systems such as Apache Spark, Python’s Dask, and Julia’s JuliaDB.jl for medium data which are datasets that are too large for RAM but not quite large enough to qualify as big data.
In this tutorial, we introduce disk.frame, address some common questions, and replicate the sparklyr data manipulation tutorial using disk.frame constructs.
Simply run
install.packages("disk.frame") # when CRAN readyor
devtools::install_github("xiaodaigh/disk.frame")disk.frame
disk.frame works best if it can process multiple data chunks in parallel. The best way to set-up disk.frame so that each CPU core runs a background worker is by using
setup_disk.frame()
# this will allow unlimited amount of data to be passed from worker to worker
options(future.globals.maxSize = Inf)The setup_disk.frame() sets up background workers equal to the number of CPU cores; please note that, by default, hyper-threaded cores are counted as one not two.
Alternatively, one may specify the number of workers using setup_disk.frame(workers = n).
disk.frame
The disk.frame package provides convenient functions to convert data.frames and CSVs to disk.frames.
disk.frame from data.frame
We convert a data.frame to disk.frame using the as.data.frame function.
library(nycflights13)
library(dplyr)
library(disk.frame)
library(data.table)
# convert the flights data to a disk.frame and store the disk.frame in the folder
# "tmp_flights" and overwrite any content if needed
flights.df <- as.disk.frame(
flights,
outdir = file.path(tempdir(), "tmp_flights.df"),
overwrite = TRUE)
flights.dfYou should now see a folder called tmp_flights with some files in it, namely 1.fst, 2.fst…. where each fst files is one chunk of the disk.frame.
disk.frame from CSV
library(nycflights13)
# write a csv
csv_path = file.path(tempdir(), "tmp_flights.csv")
data.table::fwrite(flights, csv_path)
# load the csv into a disk.frame
df_path = file.path(tempdir(), "tmp_flights.df")
flights.df <- csv_to_disk.frame(
csv_path,
outdir = df_path,
overwrite = T)
flights.dfIf the CSV is too large to read in, then we can also use the in_chunk_size option to control how many rows to read in at once. For example to read in the data 100,000 rows at a time.
library(nycflights13)
library(disk.frame)
# write a csv
csv_path = file.path(tempdir(), "tmp_flights.csv")
data.table::fwrite(flights, csv_path)
df_path = file.path(tempdir(), "tmp_flights.df")
flights.df <- csv_to_disk.frame(
csv_path,
outdir = df_path,
in_chunk_size = 100000)
#> Warning: UNRELIABLE VALUE: One of the 'future.apply' iterations
#> ('future_lapply-1') unexpectedly generated random numbers without declaring so.
#> There is a risk that those random numbers are not statistically sound and the
#> overall results might be invalid. To fix this, specify 'future.seed=TRUE'. This
#> ensures that proper, parallel-safe random numbers are produced via the L'Ecuyer-
#> CMRG method. To disable this check, use 'future.seed = NULL', or set option
#> 'future.rng.onMisuse' to "ignore".
#> Warning: UNRELIABLE VALUE: One of the 'future.apply' iterations
#> ('future_lapply-2') unexpectedly generated random numbers without declaring so.
#> There is a risk that those random numbers are not statistically sound and the
#> overall results might be invalid. To fix this, specify 'future.seed=TRUE'. This
#> ensures that proper, parallel-safe random numbers are produced via the L'Ecuyer-
#> CMRG method. To disable this check, use 'future.seed = NULL', or set option
#> 'future.rng.onMisuse' to "ignore".
flights.dfdisk.frame also has a function zip_to_disk.frame that can convert every CSV in a zip file to disk.frames.
dplyr verbs and lazy evaluation
flights.df1 <- select(flights.df, year:day, arr_delay, dep_delay)
flights.df1
class(flights.df1)
#> [1] "disk.frame" "disk.frame.folder"The class of flights.df1 is also a disk.frame after the dplyr::select transformation. Also, disk.frame operations are by default (and where possible) lazy, meaning it doesn’t perform the operations right away. Instead, it waits until you call collect. Exceptions to this rule are the *_join operations which evaluated eagerly under certain conditions see Joins for disk.frame in-depth for details.
For lazily constructed disk.frames (e.g. flights.df1). The function collect can be used to bring the results from disk into R, e.g.
collect(flights.df1) %>% head(2)
#> year month day arr_delay dep_delay
#> 1: 2013 1 1 11 2
#> 2: 2013 1 1 20 4Of course, for larger-than-RAM datasets, one wouldn’t call collect on the whole disk.frame (because why would you need disk.frame otherwise). More likely, one would call collect on a filtered dataset or one summarized with group_by.
Some examples of other dplyr verbs applied:
filter(flights.df, dep_delay > 1000) %>% collect %>% head(2)
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1: 2013 1 9 641 900 1301 1242 1530
#> 2: 2013 1 10 1121 1635 1126 1239 1810
#> arr_delay carrier flight tailnum origin dest air_time distance hour minute
#> 1: 1272 HA 51 N384HA JFK HNL 640 4983 9 0
#> 2: 1109 MQ 3695 N517MQ EWR ORD 111 719 16 35
#> time_hour
#> 1: 2013-01-09 14:00:00
#> 2: 2013-01-10 21:00:00
mutate(flights.df, speed = distance / air_time * 60) %>% collect %>% head(2)
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1: 2013 1 1 517 515 2 830 819
#> 2: 2013 1 1 533 529 4 850 830
#> arr_delay carrier flight tailnum origin dest air_time distance hour minute
#> 1: 11 UA 1545 N14228 EWR IAH 227 1400 5 15
#> 2: 20 UA 1714 N24211 LGA IAH 227 1416 5 29
#> time_hour speed
#> 1: 2013-01-01 10:00:00 370.0441
#> 2: 2013-01-01 10:00:00 374.2731dplyr verbs
The chunk_arrange function arranges (sorts) each chunk but not the whole dataset. So use with caution. Similarly chunk_summarise creates summary variables within each chunk and hence also needs to be used with caution.
# this only sorts within each chunk
chunk_arrange(flights.df, dplyr::desc(dep_delay)) %>% collect %>% head(2)
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1: 2013 1 9 641 900 1301 1242 1530
#> 2: 2013 1 10 1121 1635 1126 1239 1810
#> arr_delay carrier flight tailnum origin dest air_time distance hour minute
#> 1: 1272 HA 51 N384HA JFK HNL 640 4983 9 0
#> 2: 1109 MQ 3695 N517MQ EWR ORD 111 719 16 35
#> time_hour
#> 1: 2013-01-09 14:00:00
#> 2: 2013-01-10 21:00:00
chunk_summarize(flights.df, mean_dep_delay = mean(dep_delay, na.rm =T)) %>% collect
#> mean_dep_delay
#> 1: 12.32700
#> 2: 12.01291
#> 3: 13.81315
#> 4: 12.94445
#> 5: 12.67813
#> 6: 12.05854One can chain dplyr verbs together like with a data.frame
c4 <- flights %>%
filter(month == 5, day == 17, carrier %in% c('UA', 'WN', 'AA', 'DL')) %>%
select(carrier, dep_delay, air_time, distance) %>%
mutate(air_time_hours = air_time / 60) %>%
collect %>%
arrange(carrier)# arrange should occur after `collect`
c4 %>% head
#> carrier dep_delay air_time distance air_time_hours
#> 1: AA -7 142 1089 2.366667
#> 2: AA -9 186 1389 3.100000
#> 3: AA -6 143 1096 2.383333
#> 4: AA -4 114 733 1.900000
#> 5: AA -2 146 1085 2.433333
#> 6: AA -7 119 733 1.983333dplyr verbs
select
rename
filter
chunk_arrange # within each chunk
chunk_group_by # within each chunk
chunk_summarize # within each chunk
group_by # limited functions
summarize # limited functions
mutate
transmute
left_join
inner_join
full_join # careful. Performance!
semi_join
anit_joinLike other distributed data manipulation frameworks disk.frame utilizes the sharding concept to distribute the data into chunks. For example “to shard by cust_id” means that all rows with the same cust_id will be stored in the same chunk. This enables chunk_group_by by cust_id to produce the same results as non-chunked data.
The by variables that were used to shard the dataset are called the shardkeys. The sharding is performed by computing a deterministic hash on the shard keys (the by variables) for each row. The hash function produces an integer between 1 and n, where n is the number of chunks.
disk.frame implements the group_by operation some caveats. In the disk.frame framework, only a set functions are supported in summarize. However, the user can create more custom group-by functions can be defined.
flights.df %>%
group_by(carrier) %>%
summarize(count = n(), mean_dep_delay = mean(dep_delay, na.rm=T)) %>% # mean follows normal R rules
collect %>%
arrange(carrier)
#> # A tibble: 16 x 3
#> carrier count mean_dep_delay
#> <chr> <int> <dbl>
#> 1 9E 18460 16.7
#> 2 AA 32729 8.59
#> 3 AS 714 5.80
#> 4 B6 54635 13.0
#> 5 DL 48110 9.26
#> 6 EV 54173 20.0
#> 7 F9 685 20.2
#> 8 FL 3260 18.7
#> 9 HA 342 4.90
#> 10 MQ 26397 10.6
#> 11 OO 32 12.6
#> 12 UA 58665 12.1
#> 13 US 20536 3.78
#> 14 VX 5162 12.9
#> 15 WN 12275 17.7
#> 16 YV 601 19.0One can restrict which input columns to load into memory for each chunk; this can significantly increase the speed of data processing. To restrict the input columns, use the srckeep function which only accepts column names as a string vector.
flights.df %>%
srckeep(c("carrier","dep_delay")) %>%
group_by(carrier) %>%
summarize(count = n(), mean_dep_delay = mean(dep_delay, na.rm=T)) %>% # mean follows normal R rules
collect
#> # A tibble: 16 x 3
#> carrier count mean_dep_delay
#> <chr> <int> <dbl>
#> 1 9E 18460 16.7
#> 2 AA 32729 8.59
#> 3 AS 714 5.80
#> 4 B6 54635 13.0
#> 5 DL 48110 9.26
#> 6 EV 54173 20.0
#> 7 F9 685 20.2
#> 8 FL 3260 18.7
#> 9 HA 342 4.90
#> 10 MQ 26397 10.6
#> 11 OO 32 12.6
#> 12 UA 58665 12.1
#> 13 US 20536 3.78
#> 14 VX 5162 12.9
#> 15 WN 12275 17.7
#> 16 YV 601 19.0Input column restriction is one of the most critical efficiencies provided by disk.frame. Because the underlying format allows random access to columns (i.e. retrieve only the columns used for processing), hence one can drastically reduce the amount of data loaded into RAM for processing by keeping only those columns that are directly used to produce the results.
disk.frame supports many dplyr joins including:
left_join
inner_join
semi_join
inner_join
full_join # requires rechunk on both left and rightIn all cases, the left dataset (x) must be a disk.frame, and the right dataset (y) can be either a disk.frame or a data.frame. If the right dataset is a disk.frame and the shardkeys are different between the two disk.frames then two expensive hard rechunk operations are performed eagerly, one on the left disk.frame and one on the right disk.frame to perform the joins correctly.
However, if the right dataset is a data.frame then rechunk``s are only performed in the case offull_join`.
Note disk.frame does not support right_join the user should use left_join instead.
The below joins are performed lazily because airlines.dt is a data.table not a disk.frame:
# make airlines a data.table
airlines.dt <- data.table(airlines)
# flights %>% left_join(airlines, by = "carrier") #
flights.df %>%
left_join(airlines.dt, by ="carrier") %>%
collect %>%
head
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1: 2013 1 1 517 515 2 830 819
#> 2: 2013 1 1 533 529 4 850 830
#> 3: 2013 1 1 542 540 2 923 850
#> 4: 2013 1 1 544 545 -1 1004 1022
#> 5: 2013 1 1 554 600 -6 812 837
#> 6: 2013 1 1 554 558 -4 740 728
#> arr_delay carrier flight tailnum origin dest air_time distance hour minute
#> 1: 11 UA 1545 N14228 EWR IAH 227 1400 5 15
#> 2: 20 UA 1714 N24211 LGA IAH 227 1416 5 29
#> 3: 33 AA 1141 N619AA JFK MIA 160 1089 5 40
#> 4: -18 B6 725 N804JB JFK BQN 183 1576 5 45
#> 5: -25 DL 461 N668DN LGA ATL 116 762 6 0
#> 6: 12 UA 1696 N39463 EWR ORD 150 719 5 58
#> time_hour name
#> 1: 2013-01-01 10:00:00 United Air Lines Inc.
#> 2: 2013-01-01 10:00:00 United Air Lines Inc.
#> 3: 2013-01-01 10:00:00 American Airlines Inc.
#> 4: 2013-01-01 10:00:00 JetBlue Airways
#> 5: 2013-01-01 11:00:00 Delta Air Lines Inc.
#> 6: 2013-01-01 10:00:00 United Air Lines Inc.
flights.df %>%
left_join(airlines.dt, by = c("carrier")) %>%
collect %>%
tail
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1: 2013 9 30 NA 1842 NA NA 2019
#> 2: 2013 9 30 NA 1455 NA NA 1634
#> 3: 2013 9 30 NA 2200 NA NA 2312
#> 4: 2013 9 30 NA 1210 NA NA 1330
#> 5: 2013 9 30 NA 1159 NA NA 1344
#> 6: 2013 9 30 NA 840 NA NA 1020
#> arr_delay carrier flight tailnum origin dest air_time distance hour minute
#> 1: NA EV 5274 N740EV LGA BNA NA 764 18 42
#> 2: NA 9E 3393 JFK DCA NA 213 14 55
#> 3: NA 9E 3525 LGA SYR NA 198 22 0
#> 4: NA MQ 3461 N535MQ LGA BNA NA 764 12 10
#> 5: NA MQ 3572 N511MQ LGA CLE NA 419 11 59
#> 6: NA MQ 3531 N839MQ LGA RDU NA 431 8 40
#> time_hour name
#> 1: 2013-09-30 22:00:00 ExpressJet Airlines Inc.
#> 2: 2013-09-30 18:00:00 Endeavor Air Inc.
#> 3: 2013-10-01 02:00:00 Endeavor Air Inc.
#> 4: 2013-09-30 16:00:00 Envoy Air
#> 5: 2013-09-30 15:00:00 Envoy Air
#> 6: 2013-09-30 12:00:00 Envoy Airdisk.frame supports all data.frame operations, unlike Spark which can only perform those operations that Spark has implemented. Hence windowing functions like min_rank and rank are supported out of the box.
For example
# Rank each flight within a daily
ranked <- flights.df %>%
srckeep(c("year","month","day", "dep_delay")) %>%
chunk_group_by(year, month, day) %>%
select(dep_delay) %>%
mutate(rank = rank(desc(dep_delay))) %>%
collect
ranked %>% head
#> year month day dep_delay rank
#> 1: 2013 1 1 2 313
#> 2: 2013 1 1 4 276
#> 3: 2013 1 1 2 313
#> 4: 2013 1 1 -1 440
#> 5: 2013 1 1 -6 742
#> 6: 2013 1 1 -4 633One can apply arbitrary transformations to each chunk of the disk.frame by using the delayed function which evaluates lazily or the cmap.disk.frame(lazy = F) function which evaluates eagerly. For example to return the number of rows in each chunk
flights.df1 <- delayed(flights.df, ~nrow(.x))
collect_list(flights.df1) %>% head # returns number of rows for each data.frame in a list
#> [[1]]
#> [1] 56131
#>
#> [[2]]
#> [1] 56131
#>
#> [[3]]
#> [1] 56131
#>
#> [[4]]
#> [1] 56131
#>
#> [[5]]
#> [1] 56131
#>
#> [[6]]
#> [1] 56121and to do the same with cmap.disk.frame
The cmap function can also output the results to another disk.frame folder, e.g.
# return the first 10 rows of each chunk
flights.df2 <- cmap(flights.df, ~.x[1:10,], lazy = F, outdir = file.path(tempdir(), "tmp2"), overwrite = T)
flights.df2 %>% head
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1: 2013 1 1 517 515 2 830 819
#> 2: 2013 1 1 533 529 4 850 830
#> 3: 2013 1 1 542 540 2 923 850
#> 4: 2013 1 1 544 545 -1 1004 1022
#> 5: 2013 1 1 554 600 -6 812 837
#> 6: 2013 1 1 554 558 -4 740 728
#> arr_delay carrier flight tailnum origin dest air_time distance hour minute
#> 1: 11 UA 1545 N14228 EWR IAH 227 1400 5 15
#> 2: 20 UA 1714 N24211 LGA IAH 227 1416 5 29
#> 3: 33 AA 1141 N619AA JFK MIA 160 1089 5 40
#> 4: -18 B6 725 N804JB JFK BQN 183 1576 5 45
#> 5: -25 DL 461 N668DN LGA ATL 116 762 6 0
#> 6: 12 UA 1696 N39463 EWR ORD 150 719 5 58
#> time_hour
#> 1: 2013-01-01 10:00:00
#> 2: 2013-01-01 10:00:00
#> 3: 2013-01-01 10:00:00
#> 4: 2013-01-01 10:00:00
#> 5: 2013-01-01 11:00:00
#> 6: 2013-01-01 10:00:00Notice disk.frame supports the purrr syntax for defining a function using ~.
In the disk.frame framework, sampling a proportion of rows within each chunk can be performed using sample_frac.
flights.df %>% sample_frac(0.01) %>% collect %>% head
#> year month day dep_time sched_dep_time dep_delay arr_time sched_arr_time
#> 1: 2013 2 1 1348 1345 3 1658 1705
#> 2: 2013 1 17 755 800 -5 856 908
#> 3: 2013 1 8 830 830 0 1116 1106
#> 4: 2013 12 31 1936 1945 -9 2048 2120
#> 5: 2013 8 22 1350 1345 5 1606 1635
#> 6: 2013 12 30 1749 1750 -1 2002 2020
#> arr_delay carrier flight tailnum origin dest air_time distance hour minute
#> 1: -7 AA 1073 N3FTAA LGA MIA 166 1096 13 45
#> 2: -12 US 2118 N958UW LGA BOS 37 184 8 0
#> 3: 10 F9 835 N202FR LGA DEN 247 1620 8 30
#> 4: -32 9E 2950 N8516C JFK BWI 39 184 19 45
#> 5: -29 AA 753 N3BYAA LGA DFW 175 1389 13 45
#> 6: -18 9E 2954 N8516C JFK CHS 112 636 17 50
#> time_hour
#> 1: 2013-02-01 18:00:00
#> 2: 2013-01-17 13:00:00
#> 3: 2013-01-08 13:00:00
#> 4: 2014-01-01 00:00:00
#> 5: 2013-08-22 17:00:00
#> 6: 2013-12-30 22:00:00One can output a disk.frame by using the write_disk.frame function. E.g.
write_disk.frame(flights.df, outdir="out")this will output a disk.frame to the folder “out”