Understanding celery for async task processing

Bharat Kalluri / 2020-09-30

Why do I need to care about Asynchronous task processing?

A lot of times you will have to queue stuff up for processing without holding the user on the website. If a user requests for a report, you cannot expect the user to stay on the website staring at a loading screen for 10 minutes and then see a success or failure. This is where asynchronous task procesing helps. Using a async task processor, you can just queue the task and send back an OK to the client. And after 10 minutes, once the report is ready, send out an email to the customer.

What does celery do?

Celery is insanely powerful python task scheduler, it is almost magical how much it can do with so less code.

A unit in celery is called a task. A function can be also used as a task by using the @shared_task or @app.task (assuming you have already defined app, as shown in the documentation) decorator. A broker is needed to process messages, supported broker platforms are SQS, rabbit MQ etc.. tasks can be scheduled, run after some delay, really complex work flows can be defined etc..

The thing which really interests me is the complex work flows.

Hypothetical use case to demo celery's power

Let us say you have a service which will send emails from a csv file. This hypothetical service let's people upload a csv file with 2 columns, email and contents. Here is what you are supposed to do

  • perform validation on each of the row. This file can have millions of rows, so we do not want the user to wait post file upload.

  • For each row in the file

  • Check if that particular user can send emails to this emails and the email is verified

  • send out an email with corresponding contents

  • Send a email back to the customer saying the bulk email campaign has been successfully completed.

Here we can see there is a validation task, a verify email task, send email task and send a success email back to the customer. The work flow might look something like this:

+-------------------------------------------+
| |
| validate_file |
| |
+-------------------------------------------+
+-----------------------+
| |
| |
| |
| |
+---------------+ +------------------+
| verified_check| | verified_check |
| | | | | |
| | | | | |
| v | | v |
| send_email | | send_email |
+---------------+ +------------------+
| |
| |
| |
+-----------------------+
|
|
+---------------------+
| send_success_email |
+---------------------+

Although this is not hyper complex, it is still fairly complicated. Broadly speaking we have to,

  • execute a task
  • and then execute a group of (chain of two tasks)
  • and post completion execute something else.

This is key, verified_check and send_email are a chain. They need to be executed in sequence. But the chains should be executed in parallel. The vocabulary is very important.

I love what the celery authors call the piece in celery which takes care of designing these work flows, they call it canvas. It is a canvas on which we can draw distributed work flows.

A canvas has many primitives, we will go over two of the important ones.

  • chain: primitives executed in series.
  • group: primitives executed in parallel.

Note that, chains can be grouped, groups can be chained etc..

Here are our functions:

# file: tasks_email.py
from datetime import datetime
from celery import Celery, chain, group
app = Celery('tasks_email', broker='redis://localhost:6379/0', backend='redis')
@app.task
def process_email_file():
list_of_emails = ['a@gmail.com', 'b@protonmail.com']
final_chain = chain(
group(chain(verified_check.s(i), send_email.s(datetime.now())) for i in list_of_emails),
success_on_complete_email.si(),
)
final_chain.delay()
return list_of_emails
@app.task
def verified_check(msg: str):
print(f"verifying email {msg} at {datetime.now()}")
return msg
@app.task
def send_email(msg: str, now: datetime, ):
print(f"Sending email at {now} for {msg}")
return msg
@app.task
def success_on_complete_email():
print(f"Sending bulk send complete email to customer at {datetime.now()} !")
if __name__ == "__main__":
process_email_file.delay()

To run this, first run the celery worker which will pickup the tasks and run them for you in the background

celery -A tasks_email worker

and run the file

python tasks_email.py

Here is the output in my celery worker instance

[2020-10-09 20:57:22,568: WARNING/ForkPoolWorker-6] verifying email a@gmail.com at 2020-10-09 20:57:22.568351
[2020-10-09 20:57:22,582: WARNING/ForkPoolWorker-6] verifying email b@protonmail.com at 2020-10-09 20:57:22.582849
[2020-10-09 20:57:22,586: WARNING/ForkPoolWorker-6] Sending email at 2020-10-09T20:57:22.525777 for a@gmail.com
[2020-10-09 20:57:22,588: WARNING/ForkPoolWorker-6] Sending email at 2020-10-09T20:57:22.526285 for b@protonmail.com
[2020-10-09 20:57:22,593: WARNING/ForkPoolWorker-6] Sending bulk send complete email to customer at 2020-10-09 20:57:22.593770 !

Some notes:

  • The name of the celery worker should be the same name as it is defined in the app declaration.
  • Any function will become a task if you add the celery decorator to it.
  • To execute a function as a celery task instead of a function, add .delay to it and make sure the function has the decorator.
  • A signature is a serialize able version of a task, which means it can be sent over a broker as json.
  • There are two types of ways you can work with tasks. You can create a simple task signature, for which the first argument will be from the previous task. Or you can create an immutable signature for which there are no arguments from the previous tasks
  • All the magic is happening in process_email_file, we create a group of chains, and chain this workflow along with the success_on_complete_email. remember, chains run in series, groups run in parallel.
  • Celery also has a really cool feature of auto retry, you can dump some params in the decorator and it takes care of it automatically. @app.task(autoretry_for=(Exception,), retry_backoff=2)
  • When deploying to AWS, if you happen to run Django and Celery running in the same machine and you have auto-scaling enabled. There is a risk that AWS will auto scale from 1 -> 2 and the 2nd machine picks up some tasks and then AWS kills the instance when it is scaling down. Take care that celery instance is never killed.

That's it. That was us Painting on a distributed canvas!

Subscribe to the newsletter

Spotify album cover

NowReading logDashboardUses