group-by.Rmd
{disk.frame}
The group-by framework of {disk.frame}
has been overhauled in v0.3.0. It is now able to perform some group-by-summarize operations in one stage. In this chapter we will cover
{disk.frame}
and its implications for group-byA one-stage group-by is the same as group-by for data.frames. This would be remarkable, if not for the limitaions imposed by the disk-based nature of {disk.frame}
. Before v0.3.0 of {disk.frame}
, one-stage group-by was not possible, and the users had to rely to two-stage group-by even for simple operations like mean
.
However, now that one-stage group-by is possible, there are still limiations and not all functions are supported out-of-the-box. Hence, in the next chapter we have described how to define custom one-stage group-by functions.
An example of one-stage group-by:
result_from_disk.frame = iris %>%
as.disk.frame %>%
group_by(Species) %>%
summarize(
mean(Petal.Length),
sumx = sum(Petal.Length/Sepal.Width),
sd(Sepal.Width/ Petal.Length),
var(Sepal.Width/ Sepal.Width),
l = length(Sepal.Width/ Sepal.Width + 2),
max(Sepal.Width),
min(Sepal.Width),
median(Sepal.Width)
) %>%
collect
It is important to note that not all functions that can run in data.frame
summarize
would work automatically. This is because of how {disk.frame}
works. Please see the secion on defining your own one-stage-group-by if you wish to learn how to define your own one-stage group-by functions.
If a function you need/like is missing, please make a feature request here. It is a limitation that function that depend on the order a column can only obtained using estimated methods.
Function | Exact/Estimate | Notes |
---|---|---|
min |
Exact | |
max |
Exact | |
mean |
Exact | |
sum |
Exact | |
length |
Exact | |
n |
Exact | |
n_distinct |
Exact | |
sd |
Exact | |
var |
Exact |
var(x) only cor, cov support planned
|
any |
Exact | |
all |
Exact | |
median |
Estimate | |
quantile |
Estimate | One quantile only |
IQR |
Estimate |
The results should be exactly the same as if applying the same group-by operations on a data.frame
. If not then please report a bug.
The disk.frame
implements the chunk_group_by
operation with a significant caveat. In the disk.frame
framework, group-by happens WITHIN each chunk and not ACROSS chunks. To achieve group by across chunk we need to put all rows with the same group keys into the same file chunk; this can be achieved with hard_group_by
. However, the hard_group_by
operation can be VERY TIME CONSUMING computationally and should be avoided if possible.
The hard_group_by
operation is best illustrated with an example, suppose a disk.frame
has three chunks
# chunk1 = 1.fst
# id n
#1 a 1
#2 a 2
#3 b 3
#4 d 4
# chunk2 = 2.fst
# id n
#1 a 4
#2 a 5
#3 b 6
#4 d 7
# chunk3 = 3.fst
# id n
#1 a 4
#2 b 5
#3 c 6
and notice that the id
column contains 3 distinct values "a"
,"b"
, and "c"
. To perform hard_group_by(df, by = id)
MAY give you the following disk.frame
where all the id
s with the same values end up in the same chunks.
# chunk1 = 1.fst
# id n
#1 b 3
#2 b 6
# chunk2 = 2.fst
# id n
#1 c 6
#2 d 4
#3 d 7
# chunk3 = 3.fst
# id n
#1 a 1
#2 a 2
#3 a 4
#4 a 5
#5 a 4
Also, notice that there is no guaranteed order for the distribution of the id
s to the chunks. The order is random, but each chunk is likely to have a similar number of rows, provided that id
does not follow a skewed distribution i.e. where a few distinct values make up the majority of the rows.
Typically, chunk_group_by
is performed WITHIN each chunk. This is not an issue if the chunks have already been sharded on the by
variables beforehand; however, if this is not the case then one may need a second stage aggregation to obtain the correct result, see Two-stage group by.
By forcing the user to choose chunk_group_by
(within each chunk) and hard_group_by
(across all chunks), this ensures that the user is conscious of the choice they are making. In sparklyr
the equivalent of a hard_group_by
is performed, which we should avoid, where possible, as it is time-consuming and expensive. Hence, disk.frame
has chosen to explain the theory and allow the user to make a conscious choice when performing group_by
.
Prior to {disk.frame}
v0.3.0, there is no general support for one-stage group-by. Hence a two-stage style group-by is needed. The key is understand is the chunk_group_by
which performs group-by
within each chunk.
For most group-by tasks, the user can achieve the desired result WITHOUT using hard = TRUE
by performing the group by in two stages. For example, suppose you aim to count the number of rows group by carrier
, you can set hard = F
to find the count within each chunk and then use a second group-by to summaries each chunk’s results into the desired result. For example,
flights.df %>%
chunk_group_by(carrier) %>% # `chunk_group_by` aggregates within each chunk
chunk_summarize(count = n()) %>% # mean follows normal R rules
collect %>% # collect each individul chunks results and row-bind into a data.table
group_by(carrier) %>%
summarize(count = sum(count)) %>%
arrange(carrier)
Because this two-stage approach avoids the expensive hard group_by
operation, it is often significantly faster. However, it can be tedious to write; and this is a con of the disk.frame
chunking mechanism.
Note: this two-stage approach is similar to a map-reduce operation.
flights.df = as.disk.frame(nycflights13::flights)
flights.df %>%
srckeep(c("year","distance")) %>% # keep only carrier and distance columns
chunk_group_by(year) %>%
chunk_summarise(sum_dist = sum(distance)) %>% # this does a count per chunk
collect
This is two-stage group-by in action
# need a 2nd stage to finalise summing
flights.df %>%
srckeep(c("year","distance")) %>% # keep only carrier and distance columns
chunk_group_by(year) %>%
chunk_summarise(sum_dist = sum(distance)) %>% # this does a count per chunk
collect %>%
group_by(year) %>%
summarise(sum_dist = sum(sum_dist))
You can mix group-by with other dplyr verbs as below, here is an example of using filter
.
Another way to perform a one-stage group_by
is to perform a hard_group_by
on a disk.frame
. This will rechunk the disk.frame
by the by-columns. This is not recommended for performance reasons, as it can be quite slow to rechunk the file chunks on disk.
pt = proc.time()
res1 <- flights.df %>%
srckeep(c("month", "dep_delay")) %>%
filter(month <= 6) %>%
mutate(qtr = ifelse(month <= 3, "Q1", "Q2")) %>%
hard_group_by(qtr) %>% # hard group_by is MUCH SLOWER but avoid a 2nd stage aggregation
chunk_summarise(avg_delay = mean(dep_delay, na.rm = TRUE)) %>%
collect
cat("group-by took: ", data.table::timetaken(pt), "\n")
collect(res1)
If you like disk.frame and want to speed up its development or perhaps you have a feature request? Please consider sponsoring {disk.frame} on Open Collective. Your logo will show up here with a link to your website.
Do you need help with machine learning and data science in R, Python, or Julia? I am available for Machine Learning/Data Science/R/Python/Julia consulting! Email me