# One-Pass Distributed Random Sampling

One of the important factors that affects efficiency of our predictive models is the recency of the model. The earlier our bidders get new version of prediction model, the better decisions they can make. Delays in producing the model result in lost money due to incorrect predictions.

The slowest steps in our modeling pipeline are those that require manipulating the full data set — multiple weeks worth of data. Our sampling process has historically required two full passes over the data set, and so was an obvious target for optimization.

# Why We Sample

Using the full data set to build the model is not practical. Since we train on several weeks of data, totalling hundreds of millions of events, it takes hours to build the model. Worse, the model will not produce good predictions, since it will have learned that almost every value in every feature is dramatically more likely to result in a non-click outcome than a click. To help the model learn, we keep all target events — clicks — and select a number of non-target events proportional to the number of clicks.

The question is: how do we sample to achieve a given ratio of impressions to clicks in the training data set?

# Typical Distributed Sampling Approach

If we knew the number of impressions and clicks in the data set, we could compute the percentage of impressions we would need to take to achieve a ten-to-one ratio, and sample efficiently by performing a random check against each impression. This technique is called stratified sampling. In stratified sampling, the population is divided into groups, and a different sample rate is applied to each group (take all clicks, but only a percentage of impressions). For every record that we want to sample we generate random number between 0 and 1, compare it with the target ratio for its group, and if the generated random number is higher than the ratio, we discard the record.

The problem with this approach is that to calculate target ratio, we need to know the total number of records in each group. So this approach requires two passes over the data set:

• A first pass to count the number of clicks and the number of impressions (our two groups), from which we can compute the ratio of impressions to keep
• A second pass to sample impressions (we always keep all clicks)

Stratified sampling is efficient in terms of memory, since each pass can consider each event indepenently of the others, but the two passes over the data set come at a considerable I/O cost. Can we do better?

# Reservoir Sampling

Another sampling technique is reservoir sampling which can efficiently pick N items out of a population of unknown, presumably large size, without having to hold the whole population in memory at once.

```import random

class ReservoirSample(object):
"""Efficiently pick N random objects from a stream of unknown length.
"""

def __init__(self, sample_size):
self.sample_size = sample_size
self.i = 0
self.samples = []

def __iter__(self):
return iter(self.samples)

size = len(self.samples)
if size >= self.sample_size:
spot = random.randint(0, self.i - 1)
if spot < size:
self.samples[spot] = element
else:
self.samples.append(element)

self.i += 1
```

Of course computing the number of impressions we want requires the same two passes as stratified sampling: a first pass to calculate the number of clicks and the desired impression count, and a second to sample impressions.

Furthermore, it is not scalable - the reservoir might not fit the memory of a single computer, and there’s no easy or obvious way to distribute it on a cluster.

# Iterative Reservoir Sampling

In our data set, clicks are spread over time and we normally have hundreds of impressions between clicks. We can leverage this property by repeatedly applying reservoir sampling to the range of impressions between each pair of clicks, picking ten of the impressions at a time.

In our distributed model building process, each partition of the input data represents a contiguous slice of time, so we assume that each partition contains (roughly) the same ratio of impressions to clicks. This allows us to parallelize this “iterative” reservoir sampling across each partition of the data, with little loss in fidelity. We do so with a Hadoop streaming script:

```from tools.sampling import ReservoirSample

impressions = ReservoirSample(10)
for line in sys.stdin:
is_click, _ = line.split("\t", 1)
if is_click == "1":
# the impressions came before the click, write them first
for impression_line in impressions:
sys.stdout.write(impression_line)
sys.stdout.write(line)
impressions = ReservoirSample(10)
else:

# there might be some left
for impression_line in impressions:
sys.stdout.write(impression_line)
```

With this approach, we achieve a final impression-to-click ratio very close to our ten-to-one goal (usually about 9.98:1), and never have to compute the total number of clicks, so we can do it in a single distributed pass over the data set.

# Adaptive Iterative Reservoir Sampling

As we mentioned before, this approach only works well if we can assume that the clicks are relatively evenly spaced, and that there are usually at least 10 impressions between each one. In our data sets, this is usually but not always true. When we get unevenly-distributed data, the reservoir will be drained before it has a chance to become full, even if the overall number of impressions is sufficient, leading to error in the achieved sampling ratio.

To correct for this case, we modify the algorithm to resize the reservoir after each click to reach the overall target ratio. Basically, after each click, we reserve some number of non targets discarded during the sampling to compensate for the lack of impressions in reservoir buckets because of the uneven distribution:

1. While sampling events, maintain running counts of target (click) and non-target (impression) events.
2. After emitting each target event, and the preceding sampled impressions, adjust the reservoir size such that if the next target event were the last, the size of the sampled impressions would bring the actual ratio to be exactly correct.

We do this in our streaming script like:

```from tools.sampling import ReservoirSample

targets = non_targets = 0
impressions = ReservoirSample(10)
for line in sys.stdin:
is_click, _ = line.split("\t", 1)
if is_click == "1":
# the impressions came before the click, write them first
for impression_line in impressions:
sys.stdout.write(impression_line)
non_targets += 1
targets += 1
sys.stdout.write(line)

# adjust the size of the next reservoir to maintain
# our goal 10:1 impression-to-click ratio
new_reservoir_size = 10 + (10 * targets) - non_targets
impressions = ReservoirSample(new_reservoir_size)
else: