# Distributed Metrics for Conversion Model Evaluation

At Magnetic we use logistic regression and Vowpal Wabbit in order to determine the probability of a given impression resulting in either a click or a conversion. In order to decide which variables to include in our models, we need objective metrics to determine if we are doing a good job. Out of these metrics, only the computation of lift quality (in it’s exact form) is not easily parallelizable. In this post, I will show how the computation of lift quality can be re-ordered to make it distributable.

# Does my model work?

We are constantly performing research to decide which attributes to include into our click and conversion prediction models. We do this by running series of experiments on different model variables. These experiments typically have the following steps:

- Select variables of interest
- Train a model
- Evaluate the performance of model on the hold out sample

When evaluating models we try to answer two main questions:

- Does the model rank-order examples correctly? This is evaluated using lift quality
- Is the output of the model a good measure of probability? This is assessed using normalized log-loss

# Serial implementation

Here is a simple class that allows us to compute both lift quality as well as normalized log-loss. The computation assumes that the data set is already sorted by score in descending order. These metrics are computed by first streaming through the entire data set and calling add for each example. Once we have gone through all of our data, we call the computation methods in order to get the metrics of interest.

```
class LightMetrics(object):
# As we iterate through the data set we use the add() method to
# update the running totals Once we have reached the end of the
# data set, we are able to use these running totals in order
# compute the metrics we care about.
def __init__(self, positive_weight=0, cumulative_percent_hits=0, total_weight=0, log_loss_sum=0):
self.positive_weight = positive_weight
self.cumulative_percent_hits = cumulative_percent_hits
self.total_weight = total_weight
self.log_loss_sum = log_loss_sum
def add(self, target, score, weight):
# This method updates the intermediate running totals using
# the information contained in an example of the data set.
#
# Target: 1 if the example is positive, 0 if the example is negative
# Score: The score that the model computed for this example
# Weight: The weight of the example
self.total_weight += weight
self.cumulative_percent_hits += self.positive_weight * weight
if target == 1:
self.positive_weight += weight
self.cumulative_percent_hits += weight * (weight + 1)/2
marginal_log_loss = -math.log(max(sys.float_info.epsilon,score))
else:
marginal_log_loss = -math.log(max(sys.float_info.epsilon,1-score))
self.log_loss_sum += weight * marginal_log_loss
def lift_quality(self):
# Once we have seen all the examples in the data set, this
# method is called to compute lift quality from the
# intermediate running totals.
base_rate = self.base_rate()
if base_rate:
lift_quality = ((2.0*self.cumulative_percent_hits - self.positive_weight) / (self.positive_weight * self.total_weight) - 1) / (1 - base_rate)
return lift_quality
else:
return None
def base_rate(self):
# Compute the weighted rate of positives in the data set.
# This is used as part of the lift quality calculation and the
# normalization of log loss.
if self.positive_weight == 0 or self.total_weight == 0 or self.positive_weight > self.total_weight:
return None
else:
return float(self.positive_weight) / self.total_weight
def log_loss(self):
if self.total_weight == 0:
return None
return self.log_loss_sum / self.total_weight
def normalized_log_loss(self):
# Normalize log-loss by using the entropy in the data set
br = self.base_rate()
if br and br != 1 and br != 0:
return 1 + self.log_loss() / (br*math.log(br)+(1-br)*math.log(1-br))
else:
return None
def get_all_results(self):
# Utility method to get all the metrics in one call
return {
"lift_quality": self.lift_quality(),
"positives": self.positive_weight,
"observations": self.total_weight,
"normalized_log_loss": self.normalized_log_loss(),
}
```

This approach works well on small data sets, however there are several draw-backs when we try to use it at scale

- We need to process to the entire data set in a serial fashion.
- Should we want to compute this on a per-group basis, we would need to maintain a dictionary/hash-map of these data structures. This can become both slow as well as consume a lot of memory
- All of these operations happen on a single node which can lead us to quickly run out both of memory and compute resources.

For these, reasons, a distributed solution becomes extremely useful.

# Need for a distributed solution

The first thought that comes to mind is to use PySpark to distribute these computations. Looking at the code above, two things become clear. First, the calculation of log-loss fits well with PySpark’s distributed computation model. Second, the distribution of the calculation of lift quality is a little bit more tricky. This is because we need to maintain the running cumulative sum of positive weights.

Lift quality can be expressed as a function of ROC AUC, which is implemented as part of PySpark’s MLlib. Sadly, there are three problems with MLlib’s implementation:

- It does not support per group ROC AUC
- It does support weighted ROC AUC
- It relies on trapezoidal integration and is therefore not exact.

It seems that if we want an implementation that does what we want, we are going to write it ourselves.

# Reformulating serial code

By taking a second look at the calculation of lift quality we see that we need to obtain three quantities: `sum of positives`

, `sum of weights`

& `cumulative percent hits`

.
The first two terms can be computed easily in parallel.
The calculation of `cumulative percent hits`

is more difficult.
This is because the calculation of a given `cumulative percent hits`

value depends both the previous `cumulative percent hits`

value as well as the previous value of `sum of positives`

.

If we take a moment to write out the update formula for `cumulative percent hits`

, this is what we get.

It turns out that this formula can be re-casted into the following way:

In order to ensure that this is correct, we can use a proof by recurrence:

In fact, we can rearrange the formula even further to get this form:

To simplify the formulas, I have assumed that all the data set contained a single group. The algorithm works on a per-group basis.

So where does this leave us? If we can compute the cumulative sums of weights for each element in the data set, we have a fully distributable algorithm. Luckily, cumulative sums can be computed efficiently in parallel (see this previous blog post for more information). Here is what the code would look like:

```
def cumulative_sum_for_each_group_per_partition(partition_index, event_stream):
# Part of the first pass to compute the offset dictionary.
# This function is applied to each partition in parallel.
# For each item in this partition, we maintain a per-group running sum.
# Once we are done iterating, we emit one record per group.
# This record contains (the group id, (the partition index, the per group sum inside the partition)
cumulative_sum_dict = defaultdict(int)
for event in event_stream:
group = event["group"]
cumulative_sum_dict[group] += event["weight"]
for grp, cumulative_sum in cumulative_sum_dict .iteritems():
yield (grp, (partition_index, cumulative_sum))
def compute_offsets_per_group_factory(num_partitions):
# Part of the first pass to compute the offset dictionary
# This function is run for each group in parallel
# We get a sequence of (partition index, group sum inside the partition) pairs from which we build a dictionary.
# We then build a cumulative sum across all partitions for a given group
def _mapper(partial_sum_stream):
per_partition_cumulative_sum_dict = dict(partial_sum_stream)
cumulative_sum = 0
offset_dict = {}
for partition_index in range(num_partitions):
offset_dict[partition_index] = cumulative_sum
cumulative_sum += per_partition_cumulative_sum_dict.get(partition_index, 0)
return offset_dict
return _mapper
def compute_cumulative_sum_per_group_factory(bc_global_offset_dict):
# Part of the second pass to compute the cumulative sums once we have the offsets
# We iterate over the events in a partition and maintain a cumulative sum.
# This cumulative sum is corrected by the per group offsets for a given partition.
def _mapper(partition_index, event_stream):
local_cumulative_sum_dict = defaultdict(int)
for event in event_stream:
group = event["group"]
local_cumulative_sum_dict[group] += event["weight"]
event["group_cumulative_sum"] = local_cumulative_sum_dict[group] + bc_global_offset_dict.value[group][partition_index]
yield event
return _mapper
def compute_cumulative_sum_fully_distributed_approach(points_rdd):
# First pass to compute the cumulative offset dictionary
compute_offsets_per_group = compute_offsets_per_group_factory(points_rdd.getNumPartitions())
offsets_per_group = points_rdd.\
mapPartitionsWithIndex(cumulative_sum_for_each_group_per_partition, preservesPartitioning=True).\
groupByKey().mapValues(compute_offsets_per_group).\
collectAsMap()
# Second pass to compute the cumulative sum using the offset dictionary
sc = points_rdd.context
compute_cumulative_sum_per_group = compute_cumulative_sum_per_group_factory(sc.broadcast(offsets_per_group))
return points_rdd.\
mapPartitionsWithIndex(compute_cumulative_sum_per_group, preservesPartitioning=True)
```

We now have all the pieces of the algorithm, we need to assemble them in Pyspark

# Putting it all together

Let’s recap the operations what we will need to perform:

- Compute the cumulative sum of weights per group for each item in data set
- Obtain the total weights per group
- Perform the intermediate operations for each example in the data set
- Aggregate everything in order to get the per-group intermediate state (i.e.
`cumulative percent hits`

,`sum of positives`

,`sum of weights`

) - Use the per-group intermediate state to create an instance of the Metrics class and call the get_all_results() method

After the aggregation step, we are in the same state as if we had streamed through the entire data set in a sequential fashion.

Here is how the remaining pieces of the algorithm could be implemented:

```
def compute_partial_metrics_factory(total_weight_per_group_dict):
# Compute the partial metrics for a given point in the RDD This is
# very similar to the add method of the LightMetrics class
def _mapper(point):
group = point["group"]
cumulative_percent_hits = (total_weight_per_group_dict[group] - point["group_cumulative_sum"]) * point["target"] * point["weight"] \
+ 0.5 * point["target"] * point["weight"] \
- 0.5 * point["target"] * point["weight"] * point["weight"]
if point["target"]:
log_loss = -math.log(max(sys.float_info.epsilon, point["score"]))
else:
log_loss = -math.log(max(sys.float_info.epsilon, 1 - point["score"]))
partial_metrics = {
"cumulative_percent_hits": cumulative_percent_hits,
"positive_weight": point["weight"] if point["target"] else 0,
"total_weight": point["weight"],
"log_loss_sum": log_loss * point["weight"],
}
return (group, partial_metrics)
return _mapper
def metric_reducer(l, r):
# Reducer to aggregate the partial results in order to get the
# per-group intermediate values
return {
"cumulative_percent_hits": l["cumulative_percent_hits"] + r["cumulative_percent_hits"],
"positive_weight": l["positive_weight"] + r["positive_weight"],
"total_weight": l["total_weight"] + r["total_weight"],
"log_loss_sum": l["log_loss_sum"] + r["log_loss_sum"],
}
def reformat_metrics(group_partial_metric_pair):
# Use the intermediate results in order to create a LightMetrics
# object for each group and compute the metrics
group, aggregated_partial_metrics = group_partial_metric_pair
metrics = LightMetrics(**aggregated_partial_metrics).get_all_results()
metrics["group"] = group
return metrics
def compute_metrics_per_group(points_rdd):
# Interface to the outside world This is the function that will be
# called in order to get the metrics The elements of the
# points_rdd are dictionaries of the following form:
#
# { "group": hashable variable that identifies the group of the example
# "target": 1 if the example is positive, 0 otherwise,
# "score": output of the prediction model,
# "weight": weight of the example }
sc = points_rdd.context
points_rdd_cumulative_sum_by_group = compute_cumulative_sum_fully_distributed_approach(sc, points_rdd)
# Build a dictionary containing the total weights per group
total_weight_per_group = points_rdd_cumulative_sum_by_group.\
map(lambda point: (point["group"], point["weight"])).\
reduceByKey(lambda l, r: r + l).\
collectAsMap()
compute_partial_metrics = compute_partial_metrics_factory(total_weight_per_group)
# Perform the per group metric calculation
return points_rdd_cumulative_sum_by_group.\
map(compute_partial_metrics).\
reduceByKey(metric_reducer).\
map(reformat_metrics)
```

# Take away

By distributing the calculation of lift quality we are now able to compute all model evaluation metrics in parallel. This new implementation allows us to cut down the model evaluation time by a factor of 5, enabling us to carry out faster experiments and discover better models.