Latest news about Bitcoin and all cryptocurrencies. Your daily crypto news habit.
On this post, I’ll show how to work with multiple queues, scheduled tasks, and retry when something goes wrong.
If you don’t know how to use celery, read this post first: https://fernandofreitasalves.com/executing-time-consuming-tasks-asynchronously-with-django-and-celery/
Retrying a task
Let’s say your task depends on an external API or connects to another web service and for any reason, it’s raising a ConnectionError, for instance. It’s plausible to think that after a few seconds the API, web service, or anything you are using may be back on track and working again. In this cases, you may want to catch an exception and retry your task.
from celery import shared_task@shared_task(bind=True, max_retries=3) # you can determine the max_retries heredef access_awful_system(self, my_obj_id): from core.models import Object from requests import ConnectionError o = Object.objects.get(pk=my_obj_id) # If ConnectionError try again in 180 seconds try: o.access_awful_system() except ConnectionError as exc: self.retry(exc=exc, countdown=180) # the task goes back to the queue
The self.retry inside a function is what’s interesting here. That’s possible thanks to bind=True on the shared_task decorator. It turns our function access_awful_system into a method of Task class. And it forced us to use self as the first argument of the function too.
Another nice way to retry a function is using exponential backoff:
self.retry(exc=exc, countdown=2 ** self.request.retries)
ETA — Scheduling a task for later
Now, imagine that your application has to call an asynchronous task, but need to wait one hour until running it.
In this case, we just need to call the task using the ETA(estimated time of arrival) property and it means your task will be executed any time after ETA. To be precise not exactly in ETA time because it will depend if there are workers available at that time. If you want to schedule tasks exactly as you do in crontab, you may want to take a look at CeleryBeat).
from django.utils import timezonefrom datetime import timedelta
now = timezone.now() # later is one hour from nowlater = now + timedelta(hours=1)
access_awful_system.apply_async((object_id), eta=later)
Using more queues
When you execute celery, it creates a queue on your broker (in the last blog post it was RabbitMQ). If you have a few asynchronous tasks and you use just the celery default queue, all tasks will be going to the same queue.
Suppose that we have another task called too_long_task and one more called quick_task and imagine that we have one single queue and four workers.
In that scenario, imagine if the producer sends ten messages to the queue to be executed by too_long_task and right after that, it produces ten more messages to quick_task. What is going to happen? All your workers may be occupied executing too_long_task that went first on the queue and you don’t have workers on quick_task.
The solution for this is routing each task using named queues.
# CELERY ROUTESCELERY_ROUTES = { 'core.tasks.too_long_task': {'queue': 'too_long_queue'}, 'core.tasks.quick_task': {'queue': 'quick_queue'},}
Now we can split the workers, determining which queue they will be consuming.
# For too long queuecelery --app=proj_name worker -Q too_long_queue -c 2
# For quick queuecelery --app=proj_name worker -Q quick_queue -c 2
I’m using 2 workers for each queue, but it depends on your system.
As, in the last post, you may want to run it on Supervisord
There is a lot of interesting things to do with your workers here.
Calling Sequential Tasks
Another common issue is having to call two asynchronous tasks one after the other. It can happen in a lot of scenarios, e.g. if the second tasks use the first task as a parameter.
You can use chain to do that
from celery import chainfrom tasks import first_task, second_taskchain(first_task.s(meu_objeto_id) | second_task.s())
The chain is a task too, so you can use parameters on apply_async, for instance, using an ETA:
chain(salvar_dados.s(meu_objeto_id) | trabalhar_dados.s()).apply_async(eta=depois)
Ignoring the results from ResultBackend
If you just use tasks to execute something that doesn’t need the return from the task you can ignore the results and improve your performance.
If you’re just saving something on your models, you’d like to use this in your settings.py:
CELERY_IGNORE_RESULT = True
Sources:
- Tasks - Celery 4.1.0 documentation
- Optimizing - Celery 4.1.0 documentation
- Deni Bertovic :: Celery - Best Practices
- Celery - Best Practices | Hacker News
- Workers Guide - Celery 4.1.0 documentation
- Canvas: Designing Work-flows - Celery 4.1.0 documentation
Super BĂ´nus
Celery Messaging at Scale at Instagram — Pycon 2013
Originally published at Fernando Alves.
Using celery with multiple queues, retries, and scheduled tasks was originally published in Hacker Noon on Medium, where people are continuing the conversation by highlighting and responding to this story.
Disclaimer
The views and opinions expressed in this article are solely those of the authors and do not reflect the views of Bitcoin Insider. Every investment and trading move involves risk - this is especially true for cryptocurrencies given their volatility. We strongly advise our readers to conduct their own research when making a decision.