Mon 09 Nov 2015

Embarrassingly Serial

The past decade has seen a surge in technologies around “big data,” claiming to make it easy to process large data sets quickly, or at least scalably, by distributing work across a cluster of machines. This is not a story of success with a big data framework. This is a story of a small data set suffering at the hands of big data assumptions, and a warning to developers to check what your big data tools are doing for you.

The Problem

Advertising is a highly seasonal business, with the lowest volumes in the start of the year, and growing to about three times the volume near the end of the year. The surge in the number of impressions, clicks, and conversions is felt especially acutely in the run-up to Thanksgiving and the shopping season between Thanksgiving and Christmas.

This, in turn, causes problems for our batch data pipelines, as code which seemed to perform well when it was written suddenly behaves badly with fourth quarter volumes. Recently we noticed a severe performance problem with what we thought was a trivially simple part of the pipeline: sorting about ten million events, grouping by a computed value, and partitioning the results into six equal sized groups. This straightforward Pig script was upwards of an hour to run:

events = LOAD '/path/to/events' USING PigStorage();

DEFINE predict `score_with_vw.py`
    SHIP('/path/to/score_with_vw.py')
    CACHE('/path/to/model.vw');

scored_events = STREAM events THROUGH predict AS (
    raw_score: double,
    target: int,
    group_key: chararray);

-- Group by the group key, and compute the target
-- (click or conversion) rate over each group
target_rate = FOREACH (GROUP scored_events BY group_key) {
    tries = COUNT_STAR(scored_events);
    targets = SUM(scored_events.target);
    GENERATE group as group_key,
             tries as tries,
             targets as targets,
             ((double)targets / tries) as rate;
}

-- Events whose group has a target rate of exactly 0
-- are handled separately later in the pipeline
SPLIT target_rate INTO top IF rate > 0.0, zero IF rate == 0.0;

-- Sort and split into 6 (approximately) equal-sized quantiles
ordered_top = ORDER top BY rate PARALLEL 6;

RMF $O_QUANTILES;
STORE ordered_top INTO '$O_QUANTILES' USING PigStorage();

zero_grouped = GROUP zero ALL;

RMF $O_ZERO_QUANTILE;
STORE zero_grouped INTO '$O_ZERO_QUANTILE' USING PigStorage();

RMF $O_SCORED;
STORE scored_events INTO '$O_SCORED' USING PigStorage();

Seems simple enough, right? Certainly we’ve had many more complicated scripts, dealing with larger data sets and more sophisticated transformations which Pig has handled well, automatically optimizing to use as few mappers, reducers, and map-reduce phases as possible.

Embarrassingly Serial

I knew that the streaming script had the possibility to be the problem — it’s doing some heavy lifting with our Vowpal Wabbit predictive models to score events, and some string processing to emit the target and group_key values back to the parent script. While observing the job run, I noticed that during the phase when Pig is calling streaming, it was using only one mapper. Our big data framework designed to efficiently process massive data sets in parallel was choosing to run a critical section of code in just one thread in a single process.

The culprit was a Pig feature called splitCombine, which combines a large number of small input files into a smaller number of relatively larger files. When the input file list is very long — hundreds or thousands — this can save a lot of time, as each mapper that needs to be launched carries some constant startup overhead. I believe (but can’t confirm), that the Pig authors thought this a good idea because they understood the performance characteristics of the Pig built-in operators, and deemed this a worthy trade-off. However, when Pig is shelling out to a streaming script (or UDF or any other non-Pig code), all bets are off about exactly what the performance characteristics are.

In our case, the startup time wasn’t a problem, since we have such a small data set to begin with (the ten million events are spread over 16 files). Instead, the per-line processing time was relatively high. Pig was actively working against us by guessing wrong about the performance characteristics in this particular case.

Fortunately, this is pretty easy to work around — by setting the pig.maxCombinedSplitSize property to a relatively small amount, 16 megabytes. This resulted in Pig choosing to use 8 mappers, rather than 1, an 8x performance improvment with just 1 line. Nice!

Don’t Repeat Yourself

After this change, the script run time had dropped from over an hour to about 15 minutes. This still seemed to me like too long, so I decided to dig even deeper. I found that Pig was running a total of 7 map-reduce phases for this short script.

Pig’s logs, though often unhelpfully verbose, pointed me rather quickly in the right direction. In the output of one run, I saw Pig features used in the script: STREAMING three times. We only have 1 streaming command in the script, and only invoke it once, so what’s happening?

For whatever reason, Pig has decided to run the script 3 times, once for each of the STORE commands. More accurately, I suspect it is running a subset of the script 3 times, each subset doing just enough to produce one output. However, as discussed above, the streaming script is by far the heaviest part of this script, and Pig is running it 3 times.

Worst of all, one of the three times, Pig is only running it to store its output directly into HDFS.

I’m not sure why Pig couldn’t infer on its own that it could re-order the work to take advantage of this fact, but it is simple enough for us to force it to do so — immediately STORE the results of the streaming step to HDFS, then re-LOAD it for use in the remainder of the script.

Our final script now looks like:

-- Don't combine small splits to more than 16MB, to
-- gain better parallelism on the streaming step.
SET pig.maxCombinedSplitSize 16777216;

events = LOAD '/path/to/events' USING PigStorage();

DEFINE predict `score_with_vw.py`
    SHIP('/path/to/score_with_vw.py')
    CACHE('/path/to/model.vw');

scored_events = STREAM events THROUGH predict AS (
    raw_score: double,
    target: int,
    group_key: chararray);

RMF $O_SCORED;
STORE scored_events INTO '$O_SCORED' USING PigStorage();

scored_events = LOAD '$O_SCORED' USING PigStorage();

-- Group by the group key, and compute the target
-- (click or conversion) rate over each group
target_rate = FOREACH (GROUP scored_events BY group_key) {
    tries = COUNT_STAR(scored_events);
    targets = SUM(scored_events.target);
    GENERATE group as group_key,
             tries as tries,
             targets as targets,
             ((double)targets / tries) as rate;
}

-- Events whose group has a target rate of exactly 0
-- are handled separately later in the pipeline
SPLIT target_rate INTO top IF rate > 0.0, zero IF rate == 0.0;

-- Sort and split into 6 (approximately) equal-sized quantiles
ordered_top = ORDER top BY rate PARALLEL 6;

RMF $O_QUANTILES;
STORE ordered_top INTO '$O_QUANTILES' USING PigStorage();

zero_grouped = GROUP zero ALL;

RMF $O_ZERO_QUANTILE;
STORE zero_grouped INTO '$O_ZERO_QUANTILE' USING PigStorage();

After adding 2 lines and moving 2 others, our script now runs in about 6 minutes total, an overall speedup of 10x or more.

Trust, but Verify

I’m sure that the developer who originally wrote this (names have been changed to protect the innocent) didn’t set out to write an inefficient or slow Pig script. More likely, while testing earlier in the year with smaller data set sizes, the differences were not as pronounced, and the performance was deemed good enough.

By the same token, I do not write this to criticize the Pig developers for their choices. In most cases I’ve witnessed, Pig does a great job of optimizing, often automatically finding solutions that would take a developer a lot of careful thought to come up with. This is one of the advantages of using a DSL like Pig Latin or SQL for this kind of data transformation.

But as developers, we can’t shirk our responsibility to understand our tools and platform well. We can’t outsource performance entirely. We discovered this misbehaving script because we have extensive timing metrics on each phase of our pipelines.

Ultimately, the best way to achieve maximum utility from any tool set is to become an expert in it, and to know its ins, outs, and quirks. Because we’ve used Pig, Python, and Vowpal Wabbit for so long at Magnetic, I could quickly guess where the problem lay, and was able to confirm it with a quick skim of the logs. Developing this expertise takes time and many failures along the way, but your patience and diligence are rewarded at moments like these, when you can move fluidly between logs and script, and quickly diagnose and remediate what would otherwise be a thorny fourth quarter performance problem.

Tags: big data, performance

We are Hiring!

We have in Engineering, including:

Apply online and see all positions at our job board.