intro-disk-frame.Rmd
The disk.frame
package aims to be the answer to the question: how do I manipulate structured tabular data that doesn’t fit into Random Access Memory (RAM)?
In a nutshell, disk.frame
makes use of two simple ideas
disk.frame
performs a similar role to distributed systems such as Apache Spark, Python’s Dask, and Julia’s JuliaDB.jl for medium data which are datasets that are too large for RAM but not quite large enough to qualify as big data.
In this tutorial, we introduce disk.frame
, address some common questions, and replicate the sparklyr data manipulation tutorial using disk.frame
constructs.
disk.frame
disk.frame
works best if it can process multiple data chunks in parallel. The best way to set-up disk.frame
so that each CPU core runs a background worker is by using
setup_disk.frame()
# this will allow unlimited amount of data to be passed from worker to worker
options(future.globals.maxSize = Inf)
The setup_disk.frame()
sets up background workers equal to the number of CPU cores; please note that, by default, hyper-threaded cores are counted as one not two.
Alternatively, one may specify the number of workers using setup_disk.frame(workers = n)
.
disk.frame
The disk.frame
package provides convenient functions to convert data.frame
s and CSVs to disk.frame
s.
disk.frame
from data.frame
We convert a data.frame
to disk.frame
using the as.data.frame
function.
library(nycflights13)
library(dplyr)
library(disk.frame)
library(data.table)
# convert the flights data to a disk.frame and store the disk.frame in the folder
# "tmp_flights" and overwrite any content if needed
flights.df <- as.disk.frame(
flights,
outdir = file.path(tempdir(), "tmp_flights.df"),
overwrite = TRUE)
flights.df
You should now see a folder called tmp_flights
with some files in it, namely 1.fst
, 2.fst
…. where each fst
files is one chunk of the disk.frame
.
disk.frame
from CSVlibrary(nycflights13)
# write a csv
csv_path = file.path(tempdir(), "tmp_flights.csv")
data.table::fwrite(flights, csv_path)
# load the csv into a disk.frame
df_path = file.path(tempdir(), "tmp_flights.df")
flights.df <- csv_to_disk.frame(
csv_path,
outdir = df_path,
overwrite = T)
flights.df
If the CSV is too large to read in, then we can also use the in_chunk_size
option to control how many rows to read in at once. For example to read in the data 100,000 rows at a time.
library(nycflights13)
library(disk.frame)
# write a csv
csv_path = file.path(tempdir(), "tmp_flights.csv")
data.table::fwrite(flights, csv_path)
df_path = file.path(tempdir(), "tmp_flights.df")
flights.df <- csv_to_disk.frame(
csv_path,
outdir = df_path,
in_chunk_size = 100000)
flights.df
disk.frame
also has a function zip_to_disk.frame
that can convert every CSV in a zip file to disk.frame
s.
dplyr
verbs and lazy evaluationThe class of flights.df1
is also a disk.frame
after the dplyr::select
transformation. Also, disk.frame
operations are by default (and where possible) lazy, meaning it doesn’t perform the operations right away. Instead, it waits until you call collect
. Exceptions to this rule are the *_join
operations which evaluated eagerly under certain conditions see Joins for disk.frame in-depth for details.
For lazily constructed disk.frame
s (e.g. flights.df1
). The function collect
can be used to bring the results from disk into R, e.g.
Of course, for larger-than-RAM datasets, one wouldn’t call collect
on the whole disk.frame
(because why would you need disk.frame
otherwise). More likely, one would call collect
on a filter
ed dataset or one summarized with group_by
.
Some examples of other dplyr verbs applied:
dplyr
verbsThe chunk_arrange
function arranges (sorts) each chunk but not the whole dataset. So use with caution. Similarly chunk_summarise
creates summary variables within each chunk and hence also needs to be used with caution. In the Group By section, we demonstrate how to use summarise
in the disk.frame
context correctly with hard_group_by
s.
{disk.frame}
implements the group_by
operation some caveats. In the {disk.frame}
framework, only a set functions are supported in summarize
. However, the user can create more custom group-by
functions can be defined. For more information see group-by
One can restrict which input columns to load into memory for each chunk; this can significantly increase the speed of data processing. To restrict the input columns, use the srckeep
function which only accepts column names as a string vector.
flights.df %>%
srckeep(c("carrier","dep_delay")) %>%
group_by(carrier) %>%
summarize(count = n(), mean_dep_delay = mean(dep_delay, na.rm=T)) %>% # mean follows normal R rules
collect
Input column restriction is one of the most critical efficiencies provided by disk.frame
. Because the underlying format allows random access to columns (i.e. retrieve only the columns used for processing), hence one can drastically reduce the amount of data loaded into RAM for processing by keeping only those columns that are directly used to produce the results.
disk.frame
supports many dplyr joins including:
In all cases, the left dataset (x
) must be a disk.frame
, and the right dataset (y
) can be either a disk.frame
or a data.frame
. If the right dataset is a disk.frame
and the shardkey
s are different between the two disk.frame
s then two expensive hard
group_by
operations are performed eagerly, one on the left disk.frame
and one on the right disk.frame
to perform the joins correctly.
However, if the right dataset is a data.frame
then hard_group_by
s are only performed in the case of full_join
.
Note disk.frame
does not support right_join
the user should use left_join
instead.
The below joins are performed lazily because airlines.dt
is a data.table
not a disk.frame
:
{disk.frame}
supports all data.frame
operations, unlike Spark which can only perform those operations that Spark has implemented. Hence windowing functions like min_rank
and rank
are supported out of the box.
For the following example, we will use the hard_group_by
which performs a group-by and also reorganises the chunks so that all records with the same year
, month
, and day
end up in the same chunk. This is typically not adviced, as hard_group_by
can be slow for large datasets.
# Find the most and least delayed flight each day
bestworst <- flights.df %>%
srckeep(c("year","month","day", "dep_delay")) %>%
hard_group_by(c("year", "month", "day")) %>%
filter(dep_delay == min(dep_delay, na.rm = T) || dep_delay == max(dep_delay, na.rm = T)) %>%
collect
bestworst %>% head
another example
ranked <- flights.df %>%
srckeep(c("year","month","day", "dep_delay")) %>%
hard_group_by(c("year", "month", "day")) %>%
filter(min_rank(desc(dep_delay)) <= 2 & dep_delay > 0) %>%
collect
ranked %>% head
one more example
One can apply arbitrary transformations to each chunk of the disk.frame
by using the delayed
function which evaluates lazily or the map.disk.frame(lazy = F)
function which evaluates eagerly. For example to return the number of rows in each chunk
flights.df1 <- delayed(flights.df, ~nrow(.x))
collect_list(flights.df1) %>% head # returns number of rows for each data.frame in a list
and to do the same with map.disk.frame
The map
function can also output the results to another disk.frame folder, e.g.
# return the first 10 rows of each chunk
flights.df2 <- map(flights.df, ~.x[1:10,], lazy = F, outdir = file.path(tempdir(), "tmp2"), overwrite = T)
flights.df2 %>% head
Notice {disk.frame}
supports the purrr
syntax for defining a function using ~
.
In the disk.frame
framework, sampling a proportion of rows within each chunk can be performed using sample_frac
.