Computing Distributed Groupwise Cumulative Sums in PySpark
When we work on modeling projects, we often need to compute the cumulative sum of a given quantity. At Magnetic, we are especially interested in making sure that our advertising campaigns spend their daily budgets evenly through out the day. To do this we need to compute cumulative sums of dollars spent through out the day in order to identify the moment at which a given campaign has delivered half of it’s daily budget. Another example where being able to compute a cumulative sum comes in handy is transforming a probability density function into a cumulative distribution function.
Because we deal with large quantities of data, we need to be able to compute cumulative sums in a distributed fashion. Unfortunately, most of the algorithms described in online resources do not work that well when groups are either: large (in which case we can run out of memory) or un-evenly distributed (in which case the largest group becomes the bottle neck).
Example data set
Here is an example data set to illustrate the different algorithms described in this post. It simulates the continuous spend for 3 different advertising campaigns at over time. Each row of the table contains:
- The grouping key (for us, this is typically an advertiser/campaign pair, but for simplicity’s sake we’ll use a string here)
- The Unix timestamp at which impressions are delivered
- The dollar cost for a given impression
Group | Time stamp | Impression cost | Cumulative spend |
---|---|---|---|
A | 2016-04-27 20:44:26 | 4.51 | ? |
B | 2016-04-27 20:44:27 | 1.14 | ? |
A | 2016-04-27 20:44:42 | 3.19 | ? |
B | 2016-04-27 20:45:11 | 2.89 | ? |
B | 2016-04-27 20:45:52 | 3.83 | ? |
C | 2016-04-27 20:46:29 | 3.46 | ? |
A | 2016-04-27 20:46:31 | 3.33 | ? |
A | 2016-04-27 20:47:49 | 1.03 | ? |
B | 2016-04-27 20:48:17 | 0.81 | ? |
B | 2016-04-27 20:48:19 | 3.71 | ? |
B | 2016-04-27 20:48:21 | 1.34 | ? |
C | 2016-04-27 20:48:31 | 4.02 | ? |
C | 2016-04-27 20:48:57 | 4.80 | ? |
A | 2016-04-27 20:48:59 | 0.33 | ? |
A | 2016-04-27 20:49:11 | 1.64 | ? |
C | 2016-04-27 20:49:12 | 3.80 | ? |
C | 2016-04-27 20:49:14 | 4.23 | ? |
C | 2016-04-27 20:49:16 | 4.00 | ? |
C | 2016-04-27 20:49:48 | 0.50 | ? |
A | 2016-04-27 20:50:06 | 1.34 | ? |
B | 2016-04-27 20:50:20 | 1.51 | ? |
C | 2016-04-27 20:50:37 | 1.22 | ? |
C | 2016-04-27 20:50:45 | 3.42 | ? |
C | 2016-04-27 20:51:29 | 0.63 | ? |
A | 2016-04-27 20:51:52 | 0.22 | ? |
C | 2016-04-27 20:52:26 | 4.86 | ? |
A | 2016-04-27 20:52:26 | 3.15 | ? |
A | 2016-04-27 20:52:32 | 4.02 | ? |
A | 2016-04-27 20:52:36 | 4.56 | ? |
We would like to compute the cumulative spend for each group over time. Once we have this information, we can easily compute the fraction of daily spend by dividing each value by the total daily spend for each group. This last part will not be covered in this post.
Note: Throughout this post I will assume that the data set is sorted ascending in time.
If this were not the case a simple call to sortBy
would do this.
For more information on sortBy
as well as all of the other functions used in this post, please refer to the PySpark API.
It is important to note that the fact that the data set is sorted along a global order guarantees that each group within the data set will be sorted too.
Existing solutions
Existing solutions to the grouped cumulative sum problem typically go as follows:
- Reformat the elements of the RDD into a (group, value) tuple.
- Call
groupByKey
on the RDD in order to collect all the values associated with a group in a given. - Sort the data in each partition since the
groupByKey
call triggers a shuffle and the order is not guaranteed. - Use
flatMap
in order to iterate over the per-group sequences and emit new records.
Here is what the code would look like for this approach:
def to_key_value(event):
return (
event["group"],
{
"time_stamp": event["time_stamp"],
"cost": event["cost"],
}
)
def cumulative_sum_mapper_partially_distributed_approach(group_evt_seq_pair):
group, event_sequence = group_evt_seq_pair
local_cumulative_sum_dict = defaultdict(int)
for event in sorted(event_sequence, key=lambda d: d["time_stamp"]):
event["group"] = group
local_cumulative_sum_dict[group] += event["cost"]
event["group_cumulative_sum"] = local_cumulative_sum_dict[group]
yield event
def compute_cumulative_sum_partially_distributed_approach(points_rdd):
return points_rdd.\
map(to_key_value).\
groupByKey().\
flatMap(cumulative_sum_mapper_partially_distributed_approach)
There are several draw-backs with this solution:
- The groups need to be small. This is because:
- Operations inside each group become sequential and could take a long time to complete.
- All the executors need to have enough memory to handle the largest group since there is no way to know ahead of time which executor will handle what group.
- The groups need to be about the same size otherwise stragglers will slow the whole job down.
- The shuffle operation introduced by the
groupByKey
causes a lot of data exchange across the executors.
In order to get around both of these limitations, we first need to take a closer look at RDD partitions.
Partitions of an RDD
Partitions are an intermediate level in Spark’s data aggregation hierarchy:
- Each partition contains a set of elements.
- Each RDD in turn contains a set of partitions.
Partitions tend to be a more “behind the scenes” spark abstraction with Spark users usually operating directly on element of a given RDD through functions such as map
and reduce
.
However, it is possible to operate over partitions using function such as mapPartitions
and mapPartitionsWithIndex
.
The mapPartitionsWithIndex
functions makes the index of the partition available to the function used to operate on individual elements within a partition.
This enables us to have the notion of order across different partitions that compose an RDD.
This will come in very handy in the two-pass cumulative sum algorithm.
Two pass algorithm
Because the RDD is sorted, we know that each partition is sorted in the correct order.
We could directly use the mapPartitions
function in order to get the cumulative sum within each partition.
The problem with this approach is that each cumulative sums for each partitions start at 0.
Knowing the offsets at which to start the cumulative sum for each group within each partition would allow us to fix this problem. To do this we could compute the per-group totals for each partition. Once this is done, we could combine the results and form a global offset dictionary which could then be made available to each partition via a broadcast.
Putting it all together, this is what the algorithm would look like:
- Build the offset dictionary.
- Compute the cumulative sums for each partition using the offset dictionary to initialize each sum properly.
Here is what the code would look like for this approach.
def cumulative_sum_for_each_group_per_partition(partition_index, event_stream):
# Part of the first pass to compute the offset dictionary.
# This function is applied to each partition in parallel.
# For each item in this partition, we maintain a per-group running sum.
# Once we are done iterating, we emit one record per group.
# This record contains (the group id, (the partition index, the per group sum inside the partition)
cumulative_sum_dict = defaultdict(int)
for event in event_stream:
group = event["group"]
cumulative_sum_dict[group] += event["cost"]
for grp, cumulative_sum in cumulative_sum_dict .iteritems():
yield (grp, (partition_index, cumulative_sum))
def compute_offsets_per_group_factory(num_partitions):
# Part of the first pass to compute the offset dictionary
# This function is run for each group in parallel
# We get a sequence of (partition index, group sum inside the partition) pairs from which we build a dictionary.
# We then build a cumulative sum across all partitions for a given group
def _mapper(partial_sum_stream):
per_partition_cumulative_sum_dict = dict(partial_sum_stream)
cumulative_sum = 0
offset_dict = {}
for partition_index in range(num_partitions):
offset_dict[partition_index] = cumulative_sum
cumulative_sum += per_partition_cumulative_sum_dict.get(partition_index, 0)
return offset_dict
return _mapper
def compute_cumulative_sum_per_group_factory(bc_global_offset_dict):
# Part of the second pass to compute the cumulative sums once we have the offsets
# We iterate over the events in a partition and maintain a cumulative sum.
# This cumulative sum is corrected by the per group offsets for a given partition.
def _mapper(partition_index, event_stream):
local_cumulative_sum_dict = defaultdict(int)
for event in event_stream:
group = event["group"]
local_cumulative_sum_dict[group] += event["cost"]
event["group_cumulative_sum"] = local_cumulative_sum_dict[group] + bc_global_offset_dict.value[group][partition_index]
yield event
return _mapper
def compute_cumulative_sum_fully_distributed_approach(points_rdd):
# First pass to compute the cumulative offset dictionary
compute_offsets_per_group = compute_offsets_per_group_factory(points_rdd.getNumPartitions())
offsets_per_group = points_rdd.\
mapPartitionsWithIndex(cumulative_sum_for_each_group_per_partition, preservesPartitioning=True).\
groupByKey().mapValues(compute_offsets_per_group).\
collectAsMap()
# Second pass to compute the cumulative sum using the offset dictionary
sc = points_rdd.context
compute_cumulative_sum_per_group = compute_cumulative_sum_per_group_factory(sc.broadcast(offsets_per_group))
return points_rdd.\
mapPartitionsWithIndex(compute_cumulative_sum_per_group, preservesPartitioning=True)
Once we sort the output of the above code both by group and by time stamp we get the following data set:
Group | Time stamp | Impression cost | Cumulative spend |
---|---|---|---|
B | 2016-04-27 20:44:27 | 1.14 | 1.14 |
B | 2016-04-27 20:45:11 | 2.89 | 4.03 |
B | 2016-04-27 20:45:52 | 3.83 | 7.86 |
B | 2016-04-27 20:48:17 | 0.81 | 8.67 |
B | 2016-04-27 20:48:19 | 3.71 | 12.38 |
B | 2016-04-27 20:48:21 | 1.34 | 13.72 |
B | 2016-04-27 20:50:20 | 1.51 | 15.23 |
C | 2016-04-27 20:46:29 | 3.46 | 3.46 |
C | 2016-04-27 20:48:31 | 4.02 | 7.48 |
C | 2016-04-27 20:48:57 | 4.80 | 12.28 |
C | 2016-04-27 20:49:12 | 3.80 | 16.08 |
C | 2016-04-27 20:49:14 | 4.23 | 20.31 |
C | 2016-04-27 20:49:16 | 4.00 | 24.31 |
C | 2016-04-27 20:49:48 | 0.50 | 24.81 |
C | 2016-04-27 20:50:37 | 1.22 | 26.03 |
C | 2016-04-27 20:50:45 | 3.42 | 29.45 |
C | 2016-04-27 20:51:29 | 0.63 | 30.08 |
C | 2016-04-27 20:52:26 | 4.86 | 34.94 |
A | 2016-04-27 20:44:26 | 4.51 | 4.51 |
A | 2016-04-27 20:44:42 | 3.19 | 7.70 |
A | 2016-04-27 20:46:31 | 3.33 | 11.03 |
A | 2016-04-27 20:47:49 | 1.03 | 12.06 |
A | 2016-04-27 20:48:59 | 0.33 | 12.39 |
A | 2016-04-27 20:49:11 | 1.64 | 14.03 |
A | 2016-04-27 20:50:06 | 1.34 | 15.37 |
A | 2016-04-27 20:51:52 | 0.22 | 15.59 |
A | 2016-04-27 20:52:26 | 3.15 | 18.74 |
A | 2016-04-27 20:52:32 | 4.02 | 22.76 |
A | 2016-04-27 20:52:36 | 4.56 | 27.32 |
There are some important points to note about this approach:
- There are no shuffle operations involving the original RDD:
- The only shuffle operation is introduced by the
groupBy
call on the RDD of cumulative sums by group and by partition. This intermediate RDD is rather small. - This guarantees that the order of the RDD (both within and across partitions) is unchanged between the two passes, which is required for this method to work.
- We are not moving large quantities of data across our executors making our job faster.
- The only shuffle operation is introduced by the
- The RDD is partitioned evenly enabling a better distribution of work. This also means that we no longer need to size our executor memory in order to process the largest group.
- If we wanted the cumulative sum to be exclusive rather than inclusive, all we would have to to is change the order of operations when we update the per partition dictionaries.
Take-aways
The two pass algorithm cumulative algorithm allows us to compute per-group in a truly distributed fashion. This algorithms can be used in many different modeling problems such as integrating continuous spend or computing cumulative distribution functions.