Custom one-stage group-by

At a glance

{disk.frame} allows the user to enable create custom one-stage group-by functions. To make a function fn one stage. One needs to define two functions

  1. fn_df.chunk_agg.disk.frame which applies the itself to each chunk
  2. fn_df.collected_agg.disk.frame which accpets a list of returns from fn_df.chunk_agg.disk.frame and finalize the computation.

For example, to make mean a one-stage group-by function, {disk.frame} has defined mean_df.chunk_agg.disk.frame and mean_df.collected_agg.disk.frame, which we will illustrate with examples below.

But first, we shall explain some theory behind {disk.frame} to help you better understand “why does {disk.frame} do it like that?”.

How does {disk.frame} work

One may ask, how come only a few functions are supported for one-stage group-by? And why are some functions like median only produce estimates instead of producing the exact figure? To answer these question, we need to have an understanding of how {disk.frame} works.

A disk.frame is organized as chunks stored on disk. Each chunk is a file stored in fst format. The {future} package is used to apply the same function to each chunk, each of these operations are carried out in a separate R session. These R sessions cannot communicate with each other during the execution of the operations.

Once the operation has been performed the results will be bought back to the session from which the operation was called. This is the only point of interprocess communication. The process of making group-by in one stage does require some additional work.

To summarize, the two phases of a df %>% some_fn %>% collect operation is

  1. The some_fn is applied to each chunk, and the result is assumed to be a data.frame
  2. collect then row-binds (rbind/bind_rows/rbindlist) the results together to form a data.frame in the main session

How group-by works

Except for passing the result back to the main session, communication between worker sessions are not allowed. This limits how group-by operations can be performed, hence why group-by can be done in two stages for many functions. However, R’s meta-programming abilities allows us to rewrite code to that automatically perform the two-stage group-bys. For example, consider:

we can use meta-programming to transform that to

Basically, we are “compiling” one-stage group-by code to two-stage group-by code, and then executing it.

For mean, it’s trickier, as one needs to keep track on the numerator and the denominator separately in computing mean(x) = sum(x)/length(x).

Therefore, {disk.frame} compiles

to

where mean.chunk_agg.disk.frame defines what needs to be done to each chunk, as you can see, the return value is a vector where the elements are named sumx and lengthx. Here is an example implementation of mean.chunk_agg.disk.frame

mean_df.chunk_agg.disk.frame <- function(x, na.rm = FALSE, ...) {
  sumx = sum(x, na.rm = na.rm)
  lengthx = length(x) - ifelse(na.rm, sum(is.na(x)), 0)
  c(sumx = sumx, lengthx = lengthx)
}

because the return value is not a scalar, we need to write it in a list (line 3).

The mean_df.collected_agg.disk.frame receives a list of outputs from mean.chunk_agg.disk.frame. Recall that mean.chunk_agg.disk.frame returns a vector for each chunk, so the input to mean.collected_agg.disk.frame is a list of vectors

mean_df.collected_agg.disk.frame <- function(listx) {
  sum(sapply(listx, function(x) x["sumx"]))/sum(sapply(listx, function(x) x["lengthx"]))
}

How to define your own one-stage group-by function

Now that we have seen two examples, namely sum and mean, we are ready summarize how group-by functions are implemented.

Given the below

{disk.frame} compiles it to

df %>% 
  chunk_group_by(grp1) %>% 
  chunk_summarize(__tmp1__ = list(fn_df.chunk_agg.disk.frame(x))) %>% 
  collect %>% 
  group_by(grp1) %>% 
  chunk_summarize(namex = fn_df.chunk_agg.disk.frame(__tmp1__))

Based on the above information, to make fn a one-stage group-by function, the user has to

  1. Define fn_df.chunk_agg.disk.frame which is a function to be applied at each chunk
  2. Define fn_df.collected_agg.disk.frame which is a function to be applied to a list containing the returns from fn.chunk_agg.disk.frame applied on each chunk

Example of implementing sum:

  1. Define sum_df.chunk_agg.disk.frame
  1. Define sum_df.collected_agg.disk.frame, which needs to accept a list of sum(x, na.rm), but sum(x, na.rm) is just a numeric, so

Example of implementing n_distinct:

The n_distinct function counts the number of distint values from a vector x

  1. Define n_distinct_df.chunk_agg.disk.frame, to return a list of unique values. Because the same value can appear in multiple chunks, so to ensure that we don’t double count, we simply return all the unique values from each chunk which is then deduplicated in the next phase
  1. Define n_distinct_df.collected_agg.disk.frame, which deduplicates the unique values

Limitations

We have seen that {disk.frame} performs operations in two phases

  1. apply the same function to each chunk
  2. row-bind the results

and there are no communication between the sessions that applies the functions at chunk level.

Hence, it is generally difficult to compute rank based summarizations like median exactly. Hence most rank based calculations are estimates only. This is also true of distributed data system like Spark whose median function is also estimates only.

Another limitation for now is that summarization that is more complext then f(x) is not supported. E.g. sum(x) + 1, sum(x + mean(x)), sum(x) + mean(x), and fn(sum(x)) are not yet supported as arguments in the summarize function.

Advertisements

Interested in learning {disk.frame} in a structured course?

Please register your interest at:

https://leanpub.com/c/taminglarger-than-ramwithdiskframe

Open Collective

If you like disk.frame and want to speed up its development or perhaps you have a feature request? Please consider sponsoring {disk.frame} on Open Collective. Your logo will show up here with a link to your website.

Backers

Thank you to all our backers! 🙏 [Become a backer]

Backers on Open Collective

Sponsors

[Become a sponsor]

Sponsors on Open Collective

Contact me for consulting

Do you need help with machine learning and data science in R, Python, or Julia? I am available for Machine Learning/Data Science/R/Python/Julia consulting! Email me