# Group-by in {disk.frame}

The group-by framework of {disk.frame} has been overhauled in v0.3.0. It is now able to perform some group-by-summarize operations in one stage. In this chapter we will cover

1. How to use one-stage group-by
2. Manual two-stage group and hard group-by
3. The architecture of {disk.frame} and its implications for group-by
4. How to define custom one-stage group-by functions and its limitatons

## One-stage Group-by

A one-stage group-by is the same as group-by for data.frames. This would be remarkable, if not for the limitaions imposed by the disk-based nature of {disk.frame}. Before v0.3.0 of {disk.frame}, one-stage group-by was not possible, and the users had to rely to two-stage group-by even for simple operations like mean.

However, now that one-stage group-by is possible, there are still limiations and not all functions are supported out-of-the-box. Hence, in the next chapter we have described how to define custom one-stage group-by functions.

An example of one-stage group-by:

result_from_disk.frame = iris %>%
as.disk.frame %>%
group_by(Species) %>%
summarize(
mean(Petal.Length),
sumx = sum(Petal.Length/Sepal.Width),
sd(Sepal.Width/ Petal.Length),
var(Sepal.Width/ Sepal.Width),
l = length(Sepal.Width/ Sepal.Width + 2),
max(Sepal.Width),
min(Sepal.Width),
median(Sepal.Width)
) %>%
collect

It is important to note that not all functions that can run in data.frame summarize would work automatically. This is because of how {disk.frame} works. Please see the secion on defining your own one-stage-group-by if you wish to learn how to define your own one-stage group-by functions.

### List of supported group-by functions

If a function you need/like is missing, please make a feature request here. It is a limitation that function that depend on the order a column can only obtained using estimated methods.

Function Exact/Estimate Notes
min Exact
max Exact
mean Exact
sum Exact
length Exact
n Exact
n_distinct Exact
sd Exact
var Exact var(x) only cor, cov support planned
any Exact
all Exact
median Estimate
quantile Estimate One quantile only
IQR Estimate

### Notes on One-Stage group-by

The results should be exactly the same as if applying the same group-by operations on a data.frame. If not then please report a bug.

## Group-by notes

The disk.frame implements the chunk_group_by operation with a significant caveat. In the disk.frame framework, group-by happens WITHIN each chunk and not ACROSS chunks. To achieve group by across chunk we need to put all rows with the same group keys into the same file chunk; this can be achieved with hard_group_by. However, the hard_group_by operation can be VERY TIME CONSUMING computationally and should be avoided if possible.

The hard_group_by operation is best illustrated with an example, suppose a disk.frame has three chunks

# chunk1 = 1.fst
#  id n
#1  a 1
#2  a 2
#3  b 3
#4  d 4

# chunk2 = 2.fst
#  id n
#1  a 4
#2  a 5
#3  b 6
#4  d 7

# chunk3 = 3.fst
#  id n
#1  a 4
#2  b 5
#3  c 6

and notice that the id column contains 3 distinct values "a","b", and "c". To perform hard_group_by(df, by = id) MAY give you the following disk.frame where all the ids with the same values end up in the same chunks.

# chunk1 = 1.fst
#  id n
#1  b 3
#2  b 6

# chunk2 = 2.fst
#  id n
#1  c 6
#2  d 4
#3  d 7

# chunk3 = 3.fst
#  id n
#1  a 1
#2  a 2
#3  a 4
#4  a 5
#5  a 4

Also, notice that there is no guaranteed order for the distribution of the ids to the chunks. The order is random, but each chunk is likely to have a similar number of rows, provided that id does not follow a skewed distribution i.e. where a few distinct values make up the majority of the rows.

Typically, chunk_group_by is performed WITHIN each chunk. This is not an issue if the chunks have already been sharded on the by variables beforehand; however, if this is not the case then one may need a second stage aggregation to obtain the correct result, see Two-stage group by.

By forcing the user to choose chunk_group_by (within each chunk) and hard_group_by (across all chunks), this ensures that the user is conscious of the choice they are making. In sparklyr the equivalent of a hard_group_by is performed, which we should avoid, where possible, as it is time-consuming and expensive. Hence, disk.frame has chosen to explain the theory and allow the user to make a conscious choice when performing group_by.

suppressMessages(library(disk.frame))
flights.df %>%
hard_group_by(carrier) %>% # notice that hard_group_by needs to be set
chunk_summarize(count = n(), mean_dep_delay = mean(dep_delay, na.rm=T)) %>%  # mean follows normal R rules
collect %>%
arrange(carrier)

## Two-Stage Group-by

Prior to {disk.frame} v0.3.0, there is no general support for one-stage group-by. Hence a two-stage style group-by is needed. The key is understand is the chunk_group_by which performs group-by within each chunk.

For most group-by tasks, the user can achieve the desired result WITHOUT using hard = TRUE by performing the group by in two stages. For example, suppose you aim to count the number of rows group by carrier, you can set hard = F to find the count within each chunk and then use a second group-by to summaries each chunk’s results into the desired result. For example,

flights.df %>%
chunk_group_by(carrier) %>% # chunk_group_by aggregates within each chunk
chunk_summarize(count = n()) %>%  # mean follows normal R rules
collect %>%  # collect each individul chunks results and row-bind into a data.table
group_by(carrier) %>%
summarize(count = sum(count)) %>%
arrange(carrier)

Because this two-stage approach avoids the expensive hard group_by operation, it is often significantly faster. However, it can be tedious to write; and this is a con of the disk.frame chunking mechanism.

Note: this two-stage approach is similar to a map-reduce operation.

suppressPackageStartupMessages(library(disk.frame))
setup_disk.frame()
flights.df = as.disk.frame(nycflights13::flights)

flights.df %>%
srckeep(c("year","distance")) %>%  # keep only carrier and distance columns
chunk_group_by(year) %>%
chunk_summarise(sum_dist = sum(distance)) %>% # this does a count per chunk
collect

This is two-stage group-by in action

# need a 2nd stage to finalise summing
flights.df %>%
srckeep(c("year","distance")) %>%  # keep only carrier and distance columns
chunk_group_by(year) %>%
chunk_summarise(sum_dist = sum(distance)) %>% # this does a count per chunk
collect %>%
group_by(year) %>%
summarise(sum_dist = sum(sum_dist))

You can mix group-by with other dplyr verbs as below, here is an example of using filter.

# filter
pt = proc.time()
df_filtered <-
flights.df %>%
filter(month == 1)
cat("filtering a < 0.1 took: ", data.table::timetaken(pt), "\n")
nrow(df_filtered)

## Hard group-by

Another way to perform a one-stage group_by is to perform a hard_group_by on a disk.frame. This will rechunk the disk.frame by the by-columns. This is not recommended for performance reasons, as it can be quite slow to rechunk the file chunks on disk.

pt = proc.time()
res1 <- flights.df %>%
srckeep(c("month", "dep_delay")) %>%
filter(month <= 6) %>%
mutate(qtr = ifelse(month <= 3, "Q1", "Q2")) %>%
hard_group_by(qtr) %>% # hard group_by is MUCH SLOWER but avoid a 2nd stage aggregation
chunk_summarise(avg_delay = mean(dep_delay, na.rm = TRUE)) %>%
collect
cat("group-by took: ", data.table::timetaken(pt), "\n")

collect(res1)