Mon 09 May 2016

Distributed Metrics for Conversion Model Evaluation

At Magnetic we use logistic regression and Vowpal Wabbit in order to determine the probability of a given impression resulting in either a click or a conversion. In order to decide which variables to include in our models, we need objective metrics to determine if we are doing a good job. Out of these metrics, only the computation of lift quality (in it’s exact form) is not easily parallelizable. In this post, I will show how the computation of lift quality can be re-ordered to make it distributable.

Does my model work?

We are constantly performing research to decide which attributes to include into our click and conversion prediction models. We do this by running series of experiments on different model variables. These experiments typically have the following steps:

  1. Select variables of interest
  2. Train a model
  3. Evaluate the performance of model on the hold out sample

When evaluating models we try to answer two main questions:

Serial implementation

Here is a simple class that allows us to compute both lift quality as well as normalized log-loss. The computation assumes that the data set is already sorted by score in descending order. These metrics are computed by first streaming through the entire data set and calling add for each example. Once we have gone through all of our data, we call the computation methods in order to get the metrics of interest.

class LightMetrics(object):
    # As we iterate through the data set we use the add() method to
    # update the running totals Once we have reached the end of the
    # data set, we are able to use these running totals in order
    # compute the metrics we care about.

    def __init__(self, positive_weight=0, cumulative_percent_hits=0, total_weight=0, log_loss_sum=0):
        self.positive_weight = positive_weight
        self.cumulative_percent_hits = cumulative_percent_hits
        self.total_weight = total_weight
        self.log_loss_sum = log_loss_sum

    def add(self, target, score, weight):
        # This method updates the intermediate running totals using
        # the information contained in an example of the data set.
        #
        # Target: 1 if the example is positive, 0 if the example is negative
        # Score: The score that the model computed for this example
        # Weight: The weight of the example

        self.total_weight += weight
        self.cumulative_percent_hits += self.positive_weight * weight

        if target == 1:
            self.positive_weight += weight
            self.cumulative_percent_hits += weight * (weight + 1)/2
            marginal_log_loss = -math.log(max(sys.float_info.epsilon,score))

        else:
            marginal_log_loss = -math.log(max(sys.float_info.epsilon,1-score))

        self.log_loss_sum += weight * marginal_log_loss

    def lift_quality(self):
        # Once we have seen all the examples in the data set, this
        # method is called to compute lift quality from the
        # intermediate running totals.
        base_rate = self.base_rate()
        if base_rate:
            lift_quality = ((2.0*self.cumulative_percent_hits - self.positive_weight) / (self.positive_weight * self.total_weight) - 1) / (1 - base_rate)
            return lift_quality
        else:
            return None

    def base_rate(self):
        # Compute the weighted rate of positives in the data set.
        # This is used as part of the lift quality calculation and the
        # normalization of log loss.
        if self.positive_weight == 0 or self.total_weight == 0 or self.positive_weight > self.total_weight:
            return None
        else:
            return float(self.positive_weight) / self.total_weight

    def log_loss(self):
        if self.total_weight == 0:
            return None
        return self.log_loss_sum / self.total_weight

    def normalized_log_loss(self):
        # Normalize log-loss by using the entropy in the data set
        br = self.base_rate()
        if br and br != 1 and br != 0:
            return 1 + self.log_loss() / (br*math.log(br)+(1-br)*math.log(1-br))
        else:
            return None

    def get_all_results(self):
        # Utility method to get all the metrics in one call
        return {
            "lift_quality": self.lift_quality(),
            "positives": self.positive_weight,
            "observations": self.total_weight,
            "normalized_log_loss": self.normalized_log_loss(),
        }

This approach works well on small data sets, however there are several draw-backs when we try to use it at scale

For these, reasons, a distributed solution becomes extremely useful.

Need for a distributed solution

The first thought that comes to mind is to use PySpark to distribute these computations. Looking at the code above, two things become clear. First, the calculation of log-loss fits well with PySpark’s distributed computation model. Second, the distribution of the calculation of lift quality is a little bit more tricky. This is because we need to maintain the running cumulative sum of positive weights.

Lift quality can be expressed as a function of ROC AUC, which is implemented as part of PySpark’s MLlib. Sadly, there are three problems with MLlib’s implementation:

It seems that if we want an implementation that does what we want, we are going to write it ourselves.

Reformulating serial code

By taking a second look at the calculation of lift quality we see that we need to obtain three quantities: sum of positives, sum of weights & cumulative percent hits. The first two terms can be computed easily in parallel. The calculation of cumulative percent hits is more difficult. This is because the calculation of a given cumulative percent hits value depends both the previous cumulative percent hits value as well as the previous value of sum of positives.

If we take a moment to write out the update formula for cumulative percent hits, this is what we get.

Original equation from source code

Reformulate equation to be more readable

It turns out that this formula can be re-casted into the following way:

Equation that can be computed in parallel

In order to ensure that this is correct, we can use a proof by recurrence:

Recurrence proof

In fact, we can rearrange the formula even further to get this form:

Result for spark code

To simplify the formulas, I have assumed that all the data set contained a single group. The algorithm works on a per-group basis.

So where does this leave us? If we can compute the cumulative sums of weights for each element in the data set, we have a fully distributable algorithm. Luckily, cumulative sums can be computed efficiently in parallel (see this previous blog post for more information). Here is what the code would look like:

def cumulative_sum_for_each_group_per_partition(partition_index, event_stream):
    # Part of the first pass to compute the offset dictionary.
    # This function is applied to each partition in parallel.
    # For each item in this partition, we maintain a per-group running sum.
    # Once we are done iterating, we emit one record per group.
    # This record contains (the group id, (the partition index, the per group sum inside the partition)
    cumulative_sum_dict = defaultdict(int)
    for event in event_stream:
        group = event["group"]
        cumulative_sum_dict[group] += event["weight"]
    for grp, cumulative_sum in cumulative_sum_dict .iteritems():
        yield (grp, (partition_index, cumulative_sum))

def compute_offsets_per_group_factory(num_partitions):
    # Part of the first pass to compute the offset dictionary
    # This function is run for each group in parallel
    # We get a sequence of (partition index, group sum inside the partition) pairs from which we build a dictionary.
    # We then build a cumulative sum across all partitions for a given group
    def _mapper(partial_sum_stream):
        per_partition_cumulative_sum_dict = dict(partial_sum_stream)
        cumulative_sum = 0
        offset_dict = {}
        for partition_index in range(num_partitions):
            offset_dict[partition_index] = cumulative_sum
            cumulative_sum += per_partition_cumulative_sum_dict.get(partition_index, 0)
        return offset_dict
    return _mapper

def compute_cumulative_sum_per_group_factory(bc_global_offset_dict):
    # Part of the second pass to compute the cumulative sums once we have the offsets
    # We iterate over the events in a partition and maintain a cumulative sum.
    # This cumulative sum is corrected by the per group offsets for a given partition.
    def _mapper(partition_index, event_stream):
        local_cumulative_sum_dict = defaultdict(int)
        for event in event_stream:
            group = event["group"]
            local_cumulative_sum_dict[group] += event["weight"]
            event["group_cumulative_sum"] = local_cumulative_sum_dict[group] + bc_global_offset_dict.value[group][partition_index]
            yield event

    return _mapper

def compute_cumulative_sum_fully_distributed_approach(points_rdd):
    # First pass to compute the cumulative offset dictionary
    compute_offsets_per_group = compute_offsets_per_group_factory(points_rdd.getNumPartitions())

    offsets_per_group = points_rdd.\
        mapPartitionsWithIndex(cumulative_sum_for_each_group_per_partition, preservesPartitioning=True).\
        groupByKey().mapValues(compute_offsets_per_group).\
        collectAsMap()

    # Second pass to compute the cumulative sum using the offset dictionary
    sc = points_rdd.context
    compute_cumulative_sum_per_group = compute_cumulative_sum_per_group_factory(sc.broadcast(offsets_per_group))

    return points_rdd.\
        mapPartitionsWithIndex(compute_cumulative_sum_per_group, preservesPartitioning=True)

We now have all the pieces of the algorithm, we need to assemble them in Pyspark

Putting it all together

Let’s recap the operations what we will need to perform:

After the aggregation step, we are in the same state as if we had streamed through the entire data set in a sequential fashion.

Here is how the remaining pieces of the algorithm could be implemented:

def compute_partial_metrics_factory(total_weight_per_group_dict):
    # Compute the partial metrics for a given point in the RDD This is
    # very similar to the add method of the LightMetrics class
    def _mapper(point):
        group = point["group"]
        cumulative_percent_hits = (total_weight_per_group_dict[group] - point["group_cumulative_sum"]) * point["target"] * point["weight"] \
            + 0.5 * point["target"] * point["weight"] \
            - 0.5 * point["target"] * point["weight"] * point["weight"]

        if point["target"]:
            log_loss = -math.log(max(sys.float_info.epsilon, point["score"]))

        else:
            log_loss = -math.log(max(sys.float_info.epsilon, 1 - point["score"]))

        partial_metrics = {
            "cumulative_percent_hits": cumulative_percent_hits,
            "positive_weight": point["weight"] if point["target"] else 0,
            "total_weight": point["weight"],
            "log_loss_sum": log_loss * point["weight"],
        }

        return (group, partial_metrics)

    return _mapper

def metric_reducer(l, r):
    # Reducer to aggregate the partial results in order to get the
    # per-group intermediate values
    return {
        "cumulative_percent_hits": l["cumulative_percent_hits"] + r["cumulative_percent_hits"],
        "positive_weight": l["positive_weight"] + r["positive_weight"],
        "total_weight": l["total_weight"] + r["total_weight"],
        "log_loss_sum": l["log_loss_sum"] + r["log_loss_sum"],
    }

def reformat_metrics(group_partial_metric_pair):
    # Use the intermediate results in order to create a LightMetrics
    # object for each group and compute the metrics
    group, aggregated_partial_metrics = group_partial_metric_pair
    metrics = LightMetrics(**aggregated_partial_metrics).get_all_results()
    metrics["group"] = group
    return metrics

def compute_metrics_per_group(points_rdd):
    # Interface to the outside world This is the function that will be
    # called in order to get the metrics The elements of the
    # points_rdd are dictionaries of the following form:
    #
    # {  "group": hashable variable that identifies the group of the example
    #    "target": 1 if the example is positive, 0 otherwise,
    #    "score": output of the prediction model,
    #    "weight": weight of the example }

    sc = points_rdd.context
    points_rdd_cumulative_sum_by_group = compute_cumulative_sum_fully_distributed_approach(sc, points_rdd)

    # Build a dictionary containing the total weights per group
    total_weight_per_group = points_rdd_cumulative_sum_by_group.\
        map(lambda point: (point["group"], point["weight"])).\
        reduceByKey(lambda l, r: r + l).\
        collectAsMap()

    compute_partial_metrics = compute_partial_metrics_factory(total_weight_per_group)

    # Perform the per group metric calculation
    return points_rdd_cumulative_sum_by_group.\
        map(compute_partial_metrics).\
        reduceByKey(metric_reducer).\
        map(reformat_metrics)

Take away

By distributing the calculation of lift quality we are now able to compute all model evaluation metrics in parallel. This new implementation allows us to cut down the model evaluation time by a factor of 5, enabling us to carry out faster experiments and discover better models.

Tags: data science, spark, python

We are Hiring!

We have in Engineering, including:

Apply online and see all positions at our job board.