# VIRBs and Sampling Events from Streams

VIRB (Variable Incoming Rate Biased) reservoir sampling is a streaming sampling algorithm that stores a representative fixed-sized sample of events from the recent past (the user specifies the desired mean age of samples), even when the incoming rate varies. It is heavily inspired by reservoir sampling.

# Motivation

When an ad exchange invites us to bid on an auction, our system automatically calculates the predicted probability of a conversion. In the course of deciding what price to bid for a given ad auction, we want to know where this auction ranks in relation to other auctions in terms of predicted conversion probability. If it’s in the bottom 1% of auctions, for example, we might not want to pay much for it. If it’s in the 99th percentile, we will usually want to file a competitive bid.

In order to achieve this we need a way to keep a relatively small (~5,000-10,000) but representative sample of predicted conversion probabilities from the recent past in order to estimate the CDF (cumulative distribution function). We started by modifying reservoir sampling, and after much iteration we ended up with the VIRB (Variable Incoming Rate Biased) sampler.

The details of how we make our bid price decision are interesting, but beyond the scope of this blog post (but watch this blog for more!). Regardless, this is a general-purpose algorithm, potentially useful whether or not you work in ad-tech.

# Classic Reservoir Sampling

How would you go about keeping a random, fixed-size sample from a stream of events of unknown (perhaps infinite) length? A naive (and absurdly terrible) approach would be to store every event, then sample from that massive file when you actually need your sample. Of course, if at the end you will only want to access K samples from that stream, you would ideally never need to store more than K items at any point.

Reservoir sampling offers an elegant and powerful alternative. It is a streaming algorithm that at any point in time stores at most K samples from the stream. It’s one of those solutions that’s obvious after you’ve seen it, but not before.

To describe the algorithm in plain English: given that you’ve already decided how many samples you want to hold on to (let’s call that K), the first K elements in the stream are simply added to your sample (often called the reservoir in this context). The K+1th event, however, will enter the reservoir with probability K / (K+1). If it does so, it will knock out a random event that’s already in the reservoir. To generalize, event number i in the stream (starting from 1) will have a probability of K / i of entering the reservoir and knocking out an older event if i > K.

```class ReservoirClassic(object):
def __init__(self, max_size):
self.samples = []
self.max_size = max_size
self.i = 1

def add(self, element, timestamp):
size = len(self.samples)
if size >= self.max_size:
spot = random.randint(0, self.i - 1)
if spot < size:
self.samples[spot] = (element, timestamp)
else:
self.samples.append((element, timestamp))

self.i += 1
```

The punch-line is: while each new event has a lower probability of entering the sample than previous events, it also has a decreasing probability of being knocked out in the future. These two effects end up evening out, and this algorithm delivers the same result you would get if you stored every single event and then sampled K from them. To be a bit more technical, the resulting samples are distributed uniformly over the entire event stream.

# But…

We don’t want a uniform sample over the entire stream! We want a sample that’s heavily biased towards the present - after all, the ad-tech world moves quickly, and internet traffic at 3 PM is very different from traffic at 3 AM. For the use case that motivated this algorithm we wanted samples to be from the past hour, if not the past 10 minutes.

# VIRB Reservoir Sampling

We couldn’t decide initially whether we wanted events in this sample to be distributed uniformly or exponentially with regards to time - so we created two algorithms. As you’ll see, their code differs in only one substantial way.

Both algorithms aspire to keep the mean age of events in the reservoir at around a constant number of seconds. In both cases, the first K events are added automatically until the reservoir is full. For every subsequent event in the stream you evaluate whether the current mean age is older than the desired mean. If it is, the reservoir needs to replace an old event with a new one.

At this point the two algorithms differ, and in the case of the Exponential VIRB it adds the new event and uses it to replace a randomly selected event that is already in the reservoir. It’s very similar to classic Reservoir sampling aside from the criterion for accepting new events.

The Uniform VIRB is initialized the same way and the same comparison is made when a new event is seen. The difference is that instead of a list of numbers, you have a queue - so every event that is added to the reservoir replaces the oldest event instead of a random one.

```class BaseVIRB(object):
def __init__(self, max_size, mean_age):
self.max_size = max_size
self.desired_mean_age = float(mean_age)
self.current_sum_ts = 0.0

def add(self, element, timestamp):
pass

class ExpVIRB(BaseVIRB):
def __init__(self, max_size, mean_age):
super(ExpVIRB, self).__init__(max_size, mean_age)
self.samples = []

def add(self, element, timestamp):
"""
If we're adding an element, add the new timestamp to
current_sum_ts, subtract the one we're about to remove
"""
if len(self.samples) < self.max_size:
self.current_sum_ts += timestamp
self.samples.append((element, timestamp))
elif timestamp - (self.current_sum_ts / self.max_size) > self.desired_mean_age:
spot = random.randint(0, int(self.max_size) - 1)
self.current_sum_ts += timestamp - self.samples[spot]
self.samples[spot] = (element, timestamp)

class UnifVIRB(BaseVIRB):
def __init__(self, max_size, mean_age):
super(UnifVIRB, self).__init__(max_size, mean_age)
self.samples = deque(maxlen=max_size)

def add(self, element, timestamp):
"""
If we're adding an element, add the new timestamp to
current_sum_ts, subtract the one we're about to remove
"""
if len(self.samples) < self.max_size:
self.current_sum_ts += timestamp
self.samples.append((element, timestamp))
elif timestamp - (self.current_sum_ts / self.max_size) > self.desired_mean_age:
self.current_sum_ts += timestamp - self.samples
self.samples.append((element, timestamp)) # this pushes out the oldest event
```

A few notes on VIRBs:

1. Given the data stream, the Uniform VIRB is deterministic. There’s not a single (pseudo)-random variable called in evaluating whether to add/remove events from the reservoir. The Exponential VIRB is not deterministic because it removes old events randomly.

2. With a fast, constant incoming rate both algorithms converge to simply adding every nth item, and (with one big exception) respond very well to changing the incoming rate drastically mid-stream. But what if the incoming rate of events in a stream is one per hour, and we are sampling from these events with a Uniform or Exponential VIRB of size 1,000 and a desired mean age of 10 minutes? In this case the incoming rate is far too low to satisfy both parameters, and by design both the Exponential and Uniform VIRBs will still keep 1,000 samples but will not keep the mean age at the desired level.

3. Generally, if the incoming rate falls below the minimum necessary level (which is `max_size / (2 * mean_age)` events per second for the Uniform VIRB and `max_size / mean_age` for the Exponential VIRB) the Uniform VIRB’s samples will become the last `max_size` events, while the Exponential VIRB will resemble the steady state of Aggarwal’s sampler see below with parameter `p_in = 1`. If the incoming rate rises again above the minimum necessary level the VIRB will spend some time accepting every new event until the desired mean is restored.

## Specifying the Distribution of the VIRBs’ Samples

The VIRBs outlined above are fine if you know what mean age you want. But what if you have a more specific desire, like “95% of the events in my sample should be from the past 10 minutes”? To find the mean age that corresponds to these numbers, we derived the following formulas from the cumulative distribution functions of each distribution.

```def exp_mean_age_from_percentile(percentile, age):
"""
Answers the question: If <percentile> of my samples from an Exponential
distribution are within <age> seconds, what's the mean age?
Derived from the Exponential Distribution CDF
"""
return -age / math.log(1.0 - percentile)

def unif_mean_age_from_percentile(percentile, age):
"""
Answers the question: If <percentile> of my samples from an Uniform
distribution are within <age> seconds, what's the mean age?
Derived from the Uniform Distribution CDF
"""
return age * 0.5 / percentile
```

For example, if we want our reservoir to hold 1,000 events whose ages are exponentially distributed, 95% of which occurred in the past 10 minutes, we can simply write:

```my_exp_VIRB = ExpVIRB(1000, exp_mean_age_from_percentile(0.95, 10*60))

for event_value, timestamp in stream_of_events:
my_exp_VIRB.add(event_value, timestamp)
```

# Standing on the Shoulders of Giants

I’d be remiss not to mention that we’re not the first people to think about this problem - a paper by Charu C. Aggarwal outlined a beautiful algorithm with interesting properties. Assuming a constant incoming rate of events you can keep the distribution of event ages relatively constant, and his algorithm starts to look a lot like our exponentially distributed VIRB (eventually Aggarwal’s algorithm will converge to a state of adding a `1 / p_in` proportion of incoming events). However, our algorithm deals much better with variable incoming rates.

To summarize the algorithm brifly: a new event will be added to the reservoir with probability `p_in`, otherwise it will be ignored. Next, it will replace an older event with a probability of `current_reservoir_size / max_size`. Otherwise, it will be added without removing a previous event and will expand the size of the reservoir.

```class AggarwalReservoir(object):
"""
On Biased Reservoir Sampling in the Presence of Stream Evolution - Charu C. Aggarwal
Algorithm 3.1
"""
def __init__(self, max_size, p_in=1.0):
self.samples = []
self.max_size = max_size
self.p_in = p_in

def add(self, element, timestamp):
if random.random() < self.p_in:
spot = random.randint(0, self.max_size - 1)
if spot < len(self.samples):
self.samples[spot] = (element, timestamp)
else:
self.samples.append((element, timestamp))
```

This algorithm differs from the Exponential VIRB in two major ways:

1. Both VIRBs respond automatically to changes in the incoming rate, keeping the mean age at the desired level. The mean (or the nth percentile) age in Aggarwal’s sampler will change with any shift in the incoming rate.

2. Aggarwal’s aglorithm has a substantial waiting period before the sample size approaches the maximum sample size, while ours does so much faster. There are tradeoffs to this decision, but we felt that this was better for our purposes.

# Simulated Performance

We created a simulated 24 hour stream of events where every 6 hours the incoming rate changed. The rates were 10, 1, 30 and 15 events per second. We then tested all four sampling algorithms discussed here (Reservoir, Aggarwal’s, and the Exponential and Uniform VIRB samplers) to see if they behaved as expected with varying incoming rates.

Below are charts showing the distribution of ages at various points throughout the simulation for each algorithm. At 6 hours on the x-axis the incoming rate fell from 10 to 1 event per second, which as previously discussed was far too low for the VIRBs to adjust to. But outside of that period the VIRB samplers adjusted automatically to any change in the incoming rate - notice how seamless the transition was between 30 and 15 events per second in their charts at 18 hours, and contrast that with Aggarwal’s algorithm. Timeline from the Exponential VIRB algorithm Timeline of samples from the Uniform VIRB algorithm

Aggarwal’s chart looks very similar to that of the Exponential VIRB. The difference is that the distribution of Aggarwal’s ages directly dependended on the incoming stream, while the VIRB was able to dynamically limit input to keep the mean stable. Timeline from Aggarwal’s algorithm

Classic Reservoir sampling had only a small response to these changes. The maximum age increased almost linearly, while the mean and median responded strongly to random replacement of samples. Timeline from the classic Reservoir algorithm

Below are the histograms of the ages of each algorithm’s samples taken at the end of the simulation. As you can see, classic reservoir sampling kept a uniform distribution of all samples since the beginning of the stream - but was not uniform with respect to time. Aggarwal’s algorithm was exponentially distributed just like the Exponential VIRB, but it kept a stable distribution of ages only when the incoming rate was constant - unlike the VIRBs, it couldn’t dynamically respond to changing incoming rates. Histogram of samples from Aggarwal’s algorithm Histogram of samples from the classic Reservoir algorithm Histogram of samples from the Exponential VIRB algorithm Histogram of samples from the Uniform VIRB algorithm

# In Summary

The classic Reservoir sampling algorithm is perfect if you want a random uniform sample across all events in a stream of data. Our in-house algorithms, the Exponential and Uniform VIRB samplers, are useful if you want to keep samples from the recent past even if the incoming rate varies wildly. Aggarwal’s algorithm is elegant and has some interesting properties, but it doesn’t respond smoothly to changing incoming rates and has a substantial waiting period until the number of samples in the reservoir approaches the maximum number.

Collaborators in developing this algorithm include Dan Crosta, Sam Steingold, and Vladimir Vladimirov.

Tags: data science, python

# We are Hiring!

We have in Engineering, including:

Apply online and see all positions at our job board.