Spreading jobs to reduce worker contention
Recently at work we were having trouble with resource contention in a job producer/multiple worker context. The workload consisted of groups of jobs, where each group contended for a single resource but not for each others’ resources. On hearing this, my brain decided to come up with a scheduling scheme to reduce contention, whether I wanted it to or not.
We have a job dispatcher which generates groups of jobs, with a varying number of jobs in each group. Each group contains jobs which are related (they contend for a shared resource as a part of their processing) but logically independent (they can execute in any order, and other parts of their processing benefit hugely from parallelism). Jobs in different groups don’t contend for any shared resource. Hence, we’d like to interleave the jobs from one group with jobs from other groups.
One option is to run the dispatcher to completion, then randomly shuffle all the jobs, but that’s silly because the first jobs could be processing while the dispatcher is still working. We’d also lose the caching benefits of having jobs in a group run close to each other in time. The jobs may content for a resource, but we’d like to keep those resources from contending for cache space!
Another option is to take only some of the groups at a time and shuffle them, but the groups are of (sometimes hugely) different lengths, so it’s hard to know how many we need to get a good mix in a shuffle. We could read groups until we have enough that the largest group is well-shuffled, but that could go on forever if we keep reading larger groups. And if we stop short, we could get a very bad mix.
This algorithm has three logical parts: a dispatcher, a set of feeder queues, and a spreader.
The dispatcher generates groups of jobs. Each group contains zero or more jobs.
Each feeder queue asks for a group of jobs from the dispatcher, and hands out each of the jobs inside to the spreader one at a time. When the group it’s working on is empty, it asks the dispatcher for another group of jobs. When the dispatcher has no more groups of jobs to give, the feeder stops handing out jobs.
The spreader asks each feeder, in turn, for a job, which it passes along to the message queue. Because the jobs in each group will come from exactly one feeder, this keeps the jobs in each group spaced out in the message queue, thus avoiding the contention we were so worried about at the beginning.
As an example, let’s say we have 2 workers (a truly magic number). Because some jobs are faster than others, we’d like to spread out the jobs so that each group’s jobs occur every 3 jobs in the final output (2 workers plus a fudge factor since not all jobs run for the same length of time). We’ll have 3 feeders, so the first 3 groups from the dispatcher will go into each of the 3 feeders. The spreader will start taking jobs from each feeder in turn. As the feeder’s job groups empty, each feeder will get the next group from the dispatcher.
Effectively what we’re doing is lazily bin-packing the incoming groups of jobs from the dispatcher into feeder queues.
For cases where each group is much smaller than the total number of jobs and the number of groups is much larger than the number of feeders, we are basically guaranteed to have the desired spread between jobs. If there are larger jobs at the tail end they might get less spread out. Arranging for the bigger job groups to be produced first mitigates this issue.
At work the job producer is written in PHP, but I’m much happier in Python. My first implementation was in Python (written on my Android phone (in the Android Scripting Environment ) which has Python but not PHP), and I’ve managed to create a not-quite-so-horrible version for PHP. If you can improve on the PHP (or Python!) code, I’d be interested to see it!
In the code I’ve called the dispatcher a “blockpool”, the feeders are “feeders”, and the spreader is…well…the whole thing. I wrote the code before this exposition and it seems silly to go back and change the names.
Each of the implementations expects an iterator (of some sort) for the dispatcher, and an iterator (of some sort) for each group of jobs. Generating everything up-front as in the examples defeats the laziness, so in production you’re encouraged to return a live iterator that lazily generates jobs instead of a dumb list.
I find the requirements for creating PHP iterators to be burdensome, and have provided a pair of PHP<->Python iterator converter classes for PHP to ease the pain. The PHP solution implements a Python-style iterator in PHP. If you’re okay consuming that directly, go for it. Otherwise you’ll want to wrap it in a Py2PHPIterator class so you can make use of
each. You might want to use it for your own dispatcher to simplify the implementation, too.
I’ve shown the progression of code from first-implementation to I’ve-thought-about-this-too-much. There’s an urban story (StackOverflow doesn’t know if it’s a myth) that the number of defects per LoC is constant regardless of language. Even if that’s not the case, it’s hard to argue that spreader3_documented.php (≅39LoC, plus helpers) is less likely to have bugs than spreader4_documented.py (≅9LoC, complete). Once you understand what spreader4_documented.py is doing, there’s really nowhere for bugs manifest.
You can prove for yourself that the code works by running spreader_driver.php and spreader_driver.py. Each prints out the list of expected results at the end (used to assert that the code is working correctly) which you can diff to see that the PHP and Python do the same thing in the end.
It seemed to be the easiest way to upload a dozen files for the world to see, and be able to update them easily. I’m not expecting anyone to fork this, but if you want to use the idea in your own job queueing that’d be awesome, and I’d love to hear about it.
– Taavi Burns (firstname at firstnamelastname dot ca)