A Practical Guide to the Celery Distributed Task Queue
March 20, 2016
A Practical Guide to the Celery Distributed Task Queue
Introduction
Celery is an “asynchronous task queue/job queue based on distributed message passing”. The documentation is pretty good, and you should follow their guide to get started. However, the examples are pretty simple and it may not be obvious how to move beyond the basics. This guide demonstrates using Celery for a rudimentary “Map/Reduce” implementation we will call “Celery Soup”.
Choosing a Broker
According to the documentation, RabbitMQ is the recommended message broker. In our experience, Redis has performed better, but your mileage may vary. Using the Django ORM is an easy and effective way to get started using Celery… if you are already using Django. However, for this discussion, I will assume you have followed the “First steps with Python” guide from the Celery documentation.
Writing a Task
A Celery task is simply a python function. From the documentation,
@app.task
def add(x, y):
return x + y
There are a few pitfalls when writing tasks that may not be apparent from the simple example above. Primarily, you have to remember that the task can be executed anywhere a Celery worker is running and you can not count on any data not passed in as an argument to the task being resident on the worker node. The other important thing to remember is that tasks should be written such that it will not cause any problems if a task gets re-run.
To illustrate, let’s make a slightly less trivial function that downloads a file from a url (using the excellent requests library), parses it (with BeautifulSoup), and creates a word count histogram on the document text. This will be our “map” function:
@app.task
def map(url):
c = Counter()
r = requests.get(url)
soup = BeautifulSoup(r.text)
for word in soup.get_text().split():
if word not in c:
c[word] = 0
c[word] += 1
return c
For the reduce step, we will take a list of word count histograms and “reduce” them to a single dictionary. Here I am using the built-in collections.Counter class to implement this functionality:
@app.task
def reduce(counters):
res = counters[0]
for c in counters[1:]:
res += c
return res
Submitting Task(s)
To test our tasks, it is reasonable (and generally a good idea) to fire up a python shell and just call the function directly:
>>> soup.map(‘[http://en.wikipedia.org/wiki/Wikipedia:Top_25_Report](http://en.wikipedia.org/wiki/Wikipedia:Top_25_Report)')
Assuming the tasks above are defined in a module called ‘soup.py’. To submit the task to Celery, just add a ‘delay’:
>>> soup.map.delay(‘[http://en.wikipedia.org/wiki/Wikipedia:Top_25_Report](http://en.wikipedia.org/wiki/Wikipedia:Top_25_Report)')
Actually, don’t do that. Not yet, anyway. It will submit the task, but it will submit it to the default Celery queue, which we don’t necessarily want. In order for Celery to execute this task, it is necessary to start up a worker process. We can start a worker on the local node from the command line:
$ celery worker -A soup -Q soup -l info
I have used the -Q flag to tell the worker to only listen for and process tasks submitted to the ‘soup’ queue. This is especially useful when you are sharing a broker… I don’t want my worker to execute other people’s tasks and vice versa. But how do I get my results? Just assign the result to a variable and get the response. In this case, the “get” will block until the task is completed.
>>> r = soup.map.delay(‘[http://en.wikipedia.org/wiki/Wikipedia:Top_25_Report](http://en.wikipedia.org/wiki/Wikipedia:Top_25_Report)')
>>> r.get()
…
Orchestration
So, now that our task is working, we can start processing documents. This amounts to submitting a bunch of tasks and then collating the results. For example, let’s find the top 25 words from the top 25 Wikipedia articles. From a shell:
>>> r = requests.get(‘[http://en.wikipedia.org/wiki/Wikipedia:Top_25_Report](http://en.wikipedia.org/wiki/Wikipedia:Top_25_Report)')
>>> d = BeautifulSoup(r.text)
>>> for table in d.find_all(‘table’, class_=’wikitable’):
... counters = []
... for link in table.find_all(‘a’):
... counters.append(soup.map(link.get('href')))
... m = soup.reduce(counters)
... for k in sorted(m, key=m.get, reverse=True)[:25]:
... print k, m[k]
...
the 24345
of 14571
and 12016
to 9504
in 8745
a 8614
^ 6710
The 5937
Retrieved 5009
on 4542
is 4254
for 3933
from 3662
with 3374
as 3297
that 3049
by 2879
was 2643
May 2044
at 2010
2015. 1952
an 1891
be 1876
article 1797
2014. 1672
Now that everything seems to be working, let’s convert this to use Celery (and bring it all together). To implement our data flow, we will use the “chord” primitive. The syntax for using the chord (and any of the other “canvas” primitives) is a little bit different, but it is also the most general way of using Celery and is worth going over.
r = requests.get(‘[http://en.wikipedia.org/wiki/Wikipedia:Top_25_Report](http://en.wikipedia.org/wiki/Wikipedia:Top_25_Report)')
d = BeautifulSoup(r.text)
for table in d.find_all(‘table’, class_=’wikitable’):
# get a signature for the reduce callback
callback = reduce.s().set(queue=’soup’)
# the header is a list of “map” signatures
header = [ map.s(‘[http://en.wikipedia.org](http://en.wikipedia.org/)'+link.get('href')).set(queue='soup') for link in table.find_all(‘a’) ]
# the chord primitive is a barrier that waits for all of the tasks
# in the header to complete and provides the input to the callback
res = chord(header)(callback)
m = res.get()
for k in sorted(m, key=m.get, reverse=True)[:25]:
print(k, m[k])
The task “signature” can be accessed using the “s()” function of a defined task. Here, we create a list of “map” signatures to submit and assign the “reduce” signature to a callback variable. These get submitted using the chord primitive and we get the response as before. Notice that using the task signature allows us to easily set the queue for the tasks as well.
This code can be cleaned up a bit by routing tasks, which is left as an exercise for the reader… or you can just check out the code. Finally, while I can’t speak to the scalability of the “reduce” step, the “map” part scales quite nicely. In fact, we actually used celery to great effect using almost 30,000 CPUs to process 1 petabyte of NASA imagery in just 16 hours on the Google Cloud.
Written by aliasmrchips next to the Rio Grande in Northern New Mexico. You can follow me on Twitter