Mon 02 May 2016

Computing Distributed Groupwise Cumulative Sums in PySpark

When we work on modeling projects, we often need to compute the cumulative sum of a given quantity. At Magnetic, we are especially interested in making sure that our advertising campaigns spend their daily budgets evenly through out the day. To do this we need to compute cumulative sums of dollars spent through out the day in order to identify the moment at which a given campaign has delivered half of it’s daily budget. Another example where being able to compute a cumulative sum comes in handy is transforming a probability density function into a cumulative distribution function.

Because we deal with large quantities of data, we need to be able to compute cumulative sums in a distributed fashion. Unfortunately, most of the algorithms described in online resources do not work that well when groups are either: large (in which case we can run out of memory) or un-evenly distributed (in which case the largest group becomes the bottle neck).

Example data set

Here is an example data set to illustrate the different algorithms described in this post. It simulates the continuous spend for 3 different advertising campaigns at over time. Each row of the table contains:

Group Time stamp Impression cost Cumulative spend
A 2016-04-27 20:44:26 4.51 ?
B 2016-04-27 20:44:27 1.14 ?
A 2016-04-27 20:44:42 3.19 ?
B 2016-04-27 20:45:11 2.89 ?
B 2016-04-27 20:45:52 3.83 ?
C 2016-04-27 20:46:29 3.46 ?
A 2016-04-27 20:46:31 3.33 ?
A 2016-04-27 20:47:49 1.03 ?
B 2016-04-27 20:48:17 0.81 ?
B 2016-04-27 20:48:19 3.71 ?
B 2016-04-27 20:48:21 1.34 ?
C 2016-04-27 20:48:31 4.02 ?
C 2016-04-27 20:48:57 4.80 ?
A 2016-04-27 20:48:59 0.33 ?
A 2016-04-27 20:49:11 1.64 ?
C 2016-04-27 20:49:12 3.80 ?
C 2016-04-27 20:49:14 4.23 ?
C 2016-04-27 20:49:16 4.00 ?
C 2016-04-27 20:49:48 0.50 ?
A 2016-04-27 20:50:06 1.34 ?
B 2016-04-27 20:50:20 1.51 ?
C 2016-04-27 20:50:37 1.22 ?
C 2016-04-27 20:50:45 3.42 ?
C 2016-04-27 20:51:29 0.63 ?
A 2016-04-27 20:51:52 0.22 ?
C 2016-04-27 20:52:26 4.86 ?
A 2016-04-27 20:52:26 3.15 ?
A 2016-04-27 20:52:32 4.02 ?
A 2016-04-27 20:52:36 4.56 ?

We would like to compute the cumulative spend for each group over time. Once we have this information, we can easily compute the fraction of daily spend by dividing each value by the total daily spend for each group. This last part will not be covered in this post.

Note: Throughout this post I will assume that the data set is sorted ascending in time. If this were not the case a simple call to sortBy would do this. For more information on sortBy as well as all of the other functions used in this post, please refer to the PySpark API. It is important to note that the fact that the data set is sorted along a global order guarantees that each group within the data set will be sorted too.

Existing solutions

Existing solutions to the grouped cumulative sum problem typically go as follows:

  1. Reformat the elements of the RDD into a (group, value) tuple.
  2. Call groupByKey on the RDD in order to collect all the values associated with a group in a given.
  3. Sort the data in each partition since the groupByKey call triggers a shuffle and the order is not guaranteed.
  4. Use flatMap in order to iterate over the per-group sequences and emit new records.

Here is what the code would look like for this approach:

def to_key_value(event):
    return (
        event["group"],
        {
            "time_stamp": event["time_stamp"],
            "cost": event["cost"],
        }
    )

def cumulative_sum_mapper_partially_distributed_approach(group_evt_seq_pair):
    group, event_sequence = group_evt_seq_pair
    local_cumulative_sum_dict = defaultdict(int)
    for event in sorted(event_sequence, key=lambda d: d["time_stamp"]):
        event["group"] = group
        local_cumulative_sum_dict[group] += event["cost"]
        event["group_cumulative_sum"] = local_cumulative_sum_dict[group]
        yield event

def compute_cumulative_sum_partially_distributed_approach(points_rdd):
    return points_rdd.\
        map(to_key_value).\
        groupByKey().\
        flatMap(cumulative_sum_mapper_partially_distributed_approach)

There are several draw-backs with this solution:

In order to get around both of these limitations, we first need to take a closer look at RDD partitions.

Partitions of an RDD

Partitions are an intermediate level in Spark’s data aggregation hierarchy:

Partitions tend to be a more “behind the scenes” spark abstraction with Spark users usually operating directly on element of a given RDD through functions such as map and reduce.

However, it is possible to operate over partitions using function such as mapPartitions and mapPartitionsWithIndex. The mapPartitionsWithIndex functions makes the index of the partition available to the function used to operate on individual elements within a partition. This enables us to have the notion of order across different partitions that compose an RDD. This will come in very handy in the two-pass cumulative sum algorithm.

Two pass algorithm

Because the RDD is sorted, we know that each partition is sorted in the correct order. We could directly use the mapPartitions function in order to get the cumulative sum within each partition. The problem with this approach is that each cumulative sums for each partitions start at 0.

Knowing the offsets at which to start the cumulative sum for each group within each partition would allow us to fix this problem. To do this we could compute the per-group totals for each partition. Once this is done, we could combine the results and form a global offset dictionary which could then be made available to each partition via a broadcast.

Putting it all together, this is what the algorithm would look like:

  1. Build the offset dictionary.
  2. Compute the cumulative sums for each partition using the offset dictionary to initialize each sum properly.

Here is what the code would look like for this approach.

def cumulative_sum_for_each_group_per_partition(partition_index, event_stream):
    # Part of the first pass to compute the offset dictionary.
    # This function is applied to each partition in parallel.
    # For each item in this partition, we maintain a per-group running sum.
    # Once we are done iterating, we emit one record per group.
    # This record contains (the group id, (the partition index, the per group sum inside the partition)
    cumulative_sum_dict = defaultdict(int)
    for event in event_stream:
        group = event["group"]
        cumulative_sum_dict[group] += event["cost"]
    for grp, cumulative_sum in cumulative_sum_dict .iteritems():
        yield (grp, (partition_index, cumulative_sum))

def compute_offsets_per_group_factory(num_partitions):
    # Part of the first pass to compute the offset dictionary
    # This function is run for each group in parallel
    # We get a sequence of (partition index, group sum inside the partition) pairs from which we build a dictionary.
    # We then build a cumulative sum across all partitions for a given group
    def _mapper(partial_sum_stream):
        per_partition_cumulative_sum_dict = dict(partial_sum_stream)
        cumulative_sum = 0
        offset_dict = {}
        for partition_index in range(num_partitions):
            offset_dict[partition_index] = cumulative_sum
            cumulative_sum += per_partition_cumulative_sum_dict.get(partition_index, 0)
        return offset_dict
    return _mapper

def compute_cumulative_sum_per_group_factory(bc_global_offset_dict):
    # Part of the second pass to compute the cumulative sums once we have the offsets
    # We iterate over the events in a partition and maintain a cumulative sum.
    # This cumulative sum is corrected by the per group offsets for a given partition.
    def _mapper(partition_index, event_stream):
        local_cumulative_sum_dict = defaultdict(int)
        for event in event_stream:
            group = event["group"]
            local_cumulative_sum_dict[group] += event["cost"]
            event["group_cumulative_sum"] = local_cumulative_sum_dict[group] + bc_global_offset_dict.value[group][partition_index]
            yield event

    return _mapper

def compute_cumulative_sum_fully_distributed_approach(points_rdd):
    # First pass to compute the cumulative offset dictionary
    compute_offsets_per_group = compute_offsets_per_group_factory(points_rdd.getNumPartitions())

    offsets_per_group = points_rdd.\
        mapPartitionsWithIndex(cumulative_sum_for_each_group_per_partition, preservesPartitioning=True).\
        groupByKey().mapValues(compute_offsets_per_group).\
        collectAsMap()

    # Second pass to compute the cumulative sum using the offset dictionary
    sc = points_rdd.context
    compute_cumulative_sum_per_group = compute_cumulative_sum_per_group_factory(sc.broadcast(offsets_per_group))

    return points_rdd.\
        mapPartitionsWithIndex(compute_cumulative_sum_per_group, preservesPartitioning=True)

Once we sort the output of the above code both by group and by time stamp we get the following data set:

Group Time stamp Impression cost Cumulative spend
B 2016-04-27 20:44:27 1.14 1.14
B 2016-04-27 20:45:11 2.89 4.03
B 2016-04-27 20:45:52 3.83 7.86
B 2016-04-27 20:48:17 0.81 8.67
B 2016-04-27 20:48:19 3.71 12.38
B 2016-04-27 20:48:21 1.34 13.72
B 2016-04-27 20:50:20 1.51 15.23
C 2016-04-27 20:46:29 3.46 3.46
C 2016-04-27 20:48:31 4.02 7.48
C 2016-04-27 20:48:57 4.80 12.28
C 2016-04-27 20:49:12 3.80 16.08
C 2016-04-27 20:49:14 4.23 20.31
C 2016-04-27 20:49:16 4.00 24.31
C 2016-04-27 20:49:48 0.50 24.81
C 2016-04-27 20:50:37 1.22 26.03
C 2016-04-27 20:50:45 3.42 29.45
C 2016-04-27 20:51:29 0.63 30.08
C 2016-04-27 20:52:26 4.86 34.94
A 2016-04-27 20:44:26 4.51 4.51
A 2016-04-27 20:44:42 3.19 7.70
A 2016-04-27 20:46:31 3.33 11.03
A 2016-04-27 20:47:49 1.03 12.06
A 2016-04-27 20:48:59 0.33 12.39
A 2016-04-27 20:49:11 1.64 14.03
A 2016-04-27 20:50:06 1.34 15.37
A 2016-04-27 20:51:52 0.22 15.59
A 2016-04-27 20:52:26 3.15 18.74
A 2016-04-27 20:52:32 4.02 22.76
A 2016-04-27 20:52:36 4.56 27.32

There are some important points to note about this approach:

Take-aways

The two pass algorithm cumulative algorithm allows us to compute per-group in a truly distributed fashion. This algorithms can be used in many different modeling problems such as integrating continuous spend or computing cumulative distribution functions.

Tags: python, spark

We are Hiring!

We have in Engineering, including:

Apply online and see all positions at our job board.