Bloom Filter-Assisted Joins with PySpark
One of the most attractive features of Spark is the fine grained control of what you can broadcast to every executor with very simple code. When I first studied broadcast variables my thought process centered around map-side joins and other obvious candidates. I’ve since expanded my understanding of just how much flexibility broadcast variables can offer.
Our category lift analysis was recently ported from Apache Pig to PySpark and has benefited enormously from the careful application of broadcast variables. This job joins 30 days of search data to 30 days of customer page view data by cookie. The goal is to find search category patterns amongst users who visited our advertisers’ websites.
The process loads only several required fields from the data
sources when creating the initial Spark resilient distributed datasets
(RDD). The projected searches RDD is about 88GB compressed. Records are
simple 2-tuples of the form
(cookie, category). It is about 10x larger
then the projected page views RDD, which consists of 2-tuples of the form
(cookie, page_id). The job primarily cares about their intersection by
cookie, so having to persist and operate over the entire search RDD when we
only expect to inner join a tiny fraction of the dataset is inefficient.
We addressed this inefficiency by using the PyBloom library directly within PySpark. We chose this library because it is lightweight is in terms of dependencies and files that need to be shipped to our cluster. After the Spark job loads and persists the page views RDD to memory, we can collect a bloom filter back to the driver that represents the set of all cookies in the page views RDD. We then use this bloom filter to prune from the larger data set rows that we know won’t be joined, making the eventual join far easier.
The first step is to generate a function that creates a bloom filter for all cookies seen in one partition. The capacity is initialized to the number of unique users expected in our 30-day page views dataset. The function returns a function pointer to a closure, which outputs just a single bloom filter based on the cookies seen in a given partition.
def build_partial_cookie_bloom(capacity, error_rate=0.001): def _build_partial_bloom(page_visits): bloom_filter = BloomFilter( capacity=capacity, error_rate=error_rate, ) for cookie, _ in page_visits: bloom_filter.add(cookie) yield (None, bloom_filter) return _build_partial_bloom
Now we have a function we can use to build bloom filters across page view
partitions. Since bloom filters emitted from every partition are configured
with the same capacity and false-positive rate, we can union them together
reduceByKey(). The fact that we emit only a single
means we’ll end up with a single
(None, bloom_filter) row in the RDD, with
a bloom filter that contains every
cookie observed across every partition.
def merge_bloom(bloom_in1, bloom_in2): return bloom_in1.union(bloom_in2) generate_bloom = build_partial_cookie_bloom( capacity=capacity, error_rate=error_rate, ) bloom_filter_rdd = page_visits.mapPartitions(generate_bloom).\ reduceByKey(merge_bloom).\ collect() bloom_filter_broadcast = spark_context.broadcast(bloom_filter_rdd)
As an aside, we’ve found that managing the number of partitions on which you create bloom filters is very important for overall wall-clock performance. If you have too many partitions, collecting and unioning the bloom filters can take a long time; if you have too few, you sacrifice parallelism, and building each bloom filter takes longer than it could. We recommend using a number of partitions equal to the number of executors as a good starting point.
Once we have the bloom filter as a broadcast variable, we can quickly discard search events as early as possible in the search RDD workflow that we know won’t join to any page views. The bloom filter instance has a very generic name because the filter searches function doesn’t care what generated the instance. It’s only job is to emit tuples where the cookie is contained in the bloom filter it closes over.
def filter_searches_by_cookies(bloom_filter_broadcast): bloom_filter = bloom_filter_broadcast.value def _filter(searches): for cookie, category in searches: if cookie in bloom_filter: yield (cookie, category) return _filter searches_rdd.mapPartitions(filter_searches_by_cookies(bloom_filter))
The nice thing about this approach is that the few search events (0.01% by default) that sneak passed the filter due to error are still discarded by the inner join. Hence for cases where the page views user cardinality might be quite high and require a large capacity bloom filter, we can configure a higher error rate as a trade off to keep the bloom filter space requirements manageable.
On our production cluster the PySpark version of this analysis that uses broadcast bloom filters runs in about half the time as the Pig version over the same datasets. Admittedly the decrease in runtime is not entirely because of this broadcast variable trick, but it was a major contributing factor.