Group-by in {disk.frame}

The group-by framework of {disk.frame} has been overhauled in v0.3.0. It is now able to perform some group-by-summarize operations in one stage. In this chapter we will cover

  1. How to use one-stage group-by
  2. Manual two-stage group and hard group-by

In the custom one-stage group-by chapter, we will cover

  • The architecture of disk.frame and its implications for group-by
  • How to define custom one-stage group-by functions and its limitations

One-stage Group-by

A one-stage group-by is the same as group-by for data.frames. This would be unremarkable, if not for the limitations imposed by the disk-based nature of disk.frame. Before v0.3.0 of disk.frame, one-stage group-by was not possible, and the user had to rely on two-stage group-by even for simple operations like mean.

However, now that one-stage group-by is possible, there are still limitations and not all functions are supported out-of-the-box. Hence, in the next chapter we will describe how to define custom one-stage group-by functions.

An example of one-stage group-by:

result_from_disk.frame = iris %>% 
  as.disk.frame %>% 
  group_by(Species) %>% 
  summarize(
    mean(Petal.Length), 
    sumx = sum(Petal.Length/Sepal.Width), 
    sd(Sepal.Width/ Petal.Length), 
    var(Sepal.Width/ Sepal.Width), 
    l = length(Sepal.Width/ Sepal.Width + 2),
    max(Sepal.Width), 
    min(Sepal.Width), 
    median(Sepal.Width)
    ) %>% 
  collect

It is important to note that not all functions that can run in dplyr::summarize would work automatically. This is because of how disk.frame works. Please see the next chapter on defining custom one-stage group-by functions to learn more.

List of supported group-by functions

If a function you need/like is missing, please make a feature request here. It is a limitation that function that depend on the order a column can only obtained using estimated methods.

Function Exact/Estimate Notes
min Exact
max Exact
mean Exact
sum Exact
length Exact
n Exact
n_distinct Exact
sd Exact
var Exact var(x) only cor, cov support planned
any Exact
all Exact
median Estimate
quantile Estimate One quantile only
IQR Estimate

Notes on One-Stage group-by

The results should be exactly the same as if applying the same group-by operations on a data.frame. If not then please report a bug.

Group-by notes

The disk.frame implements the chunk_group_by operation with a significant caveat. In the disk.frame framework, group-by happens WITHIN each chunk and not ACROSS chunks. To achieve group by across chunk we need to put all rows with the same group keys into the same file chunk; this can be achieved with hard_group_by. However, the hard_group_by operation can be VERY TIME CONSUMING computationally and should be avoided if possible.

The hard_group_by operation is best illustrated with an example, suppose a disk.frame has three chunks

# chunk1 = 1.fst
#  id n
#1  a 1
#2  a 2
#3  b 3
#4  d 4

# chunk2 = 2.fst
#  id n
#1  a 4
#2  a 5
#3  b 6
#4  d 7

# chunk3 = 3.fst
#  id n
#1  a 4
#2  b 5
#3  c 6

and notice that the id column contains 3 distinct values "a","b", and "c". To perform hard_group_by(df, by = id) MAY give you the following disk.frame where all the ids with the same values end up in the same chunks.

# chunk1 = 1.fst
#  id n
#1  b 3
#2  b 6

# chunk2 = 2.fst
#  id n
#1  c 6
#2  d 4
#3  d 7

# chunk3 = 3.fst
#  id n
#1  a 1
#2  a 2
#3  a 4
#4  a 5
#5  a 4

Also, notice that there is no guaranteed order for the distribution of the ids to the chunks. The order is random, but each chunk is likely to have a similar number of rows, provided that id does not follow a skewed distribution i.e. where a few distinct values make up the majority of the rows.

Typically, chunk_group_by is performed WITHIN each chunk. This is not an issue if the chunks have already been sharded on the by variables beforehand; however, if this is not the case then one may need a second stage aggregation to obtain the correct result, see Two-stage group by.

By forcing the user to choose chunk_group_by (within each chunk) and hard_group_by (across all chunks), this ensures that the user is conscious of the choice they are making. In sparklyr the equivalent of a hard_group_by is performed, which we should avoid, where possible, as it is time-consuming and expensive. Hence, disk.frame has chosen to explain the theory and allow the user to make a conscious choice when performing group_by.

suppressMessages(library(disk.frame))
flights.df %>%
  hard_group_by(carrier) %>% # notice that hard_group_by needs to be set
  chunk_summarize(count = n(), mean_dep_delay = mean(dep_delay, na.rm=T)) %>%  # mean follows normal R rules
  collect %>% 
  arrange(carrier)

Two-Stage Group-by

Prior to disk.frame v0.3.0, there is no general support for one-stage group-by. Hence a two-stage style group-by is needed. The key is understand is the chunk_group_by which performs group-by within each chunk.

For most group-by tasks, the user can achieve the desired result WITHOUT using hard = TRUE by performing the group by in two stages. For example, suppose you aim to count the number of rows group by carrier, you can set hard = F to find the count within each chunk and then use a second group-by to summaries each chunk’s results into the desired result. For example,

flights.df %>%
  chunk_group_by(carrier) %>% # `chunk_group_by` aggregates within each chunk
  chunk_summarize(count = n()) %>%  # mean follows normal R rules
  collect %>%  # collect each individul chunks results and row-bind into a data.table
  group_by(carrier) %>% 
  summarize(count = sum(count)) %>% 
  arrange(carrier)

Because this two-stage approach avoids the expensive hard group_by operation, it is often significantly faster. However, it can be tedious to write; and this is a con of the disk.frame chunking mechanism.

Note: this two-stage approach is similar to a map-reduce operation.

flights.df = as.disk.frame(nycflights13::flights)

flights.df %>%
  srckeep(c("year","distance")) %>%  # keep only carrier and distance columns
  chunk_group_by(year) %>% 
  chunk_summarise(sum_dist = sum(distance)) %>% # this does a count per chunk
  collect

This is two-stage group-by in action

# need a 2nd stage to finalise summing
flights.df %>%
  srckeep(c("year","distance")) %>%  # keep only carrier and distance columns
  chunk_group_by(year) %>% 
  chunk_summarise(sum_dist = sum(distance)) %>% # this does a count per chunk
  collect %>% 
  group_by(year) %>% 
  summarise(sum_dist = sum(sum_dist))

You can mix group-by with other dplyr verbs as below, here is an example of using filter.

# filter
pt = proc.time()
df_filtered <-
  flights.df %>% 
  filter(month == 1)
cat("filtering a < 0.1 took: ", data.table::timetaken(pt), "\n")
nrow(df_filtered)

Hard group-by

Another way to perform a one-stage group_by is to perform a hard_group_by on a disk.frame. This will rechunk the disk.frame by the by-columns. This is not recommended for performance reasons, as it can be quite slow to rechunk the file chunks on disk.

pt = proc.time()
res1 <- flights.df %>% 
  srckeep(c("month", "dep_delay")) %>% 
  filter(month <= 6) %>% 
  mutate(qtr = ifelse(month <= 3, "Q1", "Q2")) %>% 
  hard_group_by(qtr) %>% # hard group_by is MUCH SLOWER but avoid a 2nd stage aggregation
  chunk_summarise(avg_delay = mean(dep_delay, na.rm = TRUE)) %>% 
  collect
cat("group-by took: ", data.table::timetaken(pt), "\n")

collect(res1)