Skip to main content

Celery

What is the Celery? Celery is a simple, flexible, and reliable distributed system to process vast amounts of tasks. In practice,

  1. Celery can be used to execute long-running jobs behind web servers to let your servers handle requests/responses in non-blocking ways.
  2. Celery can do scheduled jobs.

Here are some production-ready use cases for Celery I have encountered.

Celery worker

Celery worker Mechanism:

To start a Celery worker will start a main process that will spawn child processes or threads(based on the --pool option): the main process will handle receiving task/sending task result the and these child processes/threads(a.k.a execution pool) execute the actual tasks.

To increase the number of child processes/threads(via --concurrency option) will increase the number of tasks the Celery worker can process in parallel. More processes are usually better.

However, in reality, there are some situations in following modes:

  1. Run N workers with M child processes each.
  2. Run 1 worker with N*M child processes.
  3. Run N workers with only 1 main process each.
  4. Run N workers with M child threads each.
  5. Run 1 worker with N*M child threads.

Whether to use processes or threads depends on what your tasks will actually do and whether they are GPU bound or IO bound.

Worker procedure

from celery import Celery

app = Celery(...)

@app.task()
def add(x, y):
return x + y

@app.task()
def mul(x, y):
return x * y

The @app.task decoration will use Task class in default if you don't specify explicitly.

When a worker start by celery -A tasks worker,

  1. Worker will spawn child Processes, the number of child Processes is based on CPU cores in default.
  2. Each child Process will initialize a Task instance for every decorated function. Here add() has its own Task instance and mul() also has its own Task instance respectively.

When a client call add.delay(1, 2),

  1. Worker receive a Task in Queue.
  2. Worker assign the Task to a child Process, which will determine to use which Task instance to execute. A Task instance is initialized in each decorated function and registered with a task name using function name in default(such as add, mul). Here is the Task instance with name add() should be picked up to run the task.
  3. When be decorated in add(), the Task instance run() method will be add() original function body. The child Process will use the Task instance's __call__() method to run task, and __call__() will invoke the run() within itself.

Option --pool=prefork

It spawns multiple processes.

When start a Celery worker via celery -A tasks worker --loglevel INFO --concurrency 3 --pool=prefork, what will happen underneath?

  1. Celery start a main process.
  2. The main process will then spawn 3 child processes. The default concurrency is based on the number of CPU available on the machine. The default pool is prefork which uses multiprocessing library from Python.
  3. These child processes will execute the tasks assigned from the main process.

Option --pool=eventlet or --pool=gevent

It creates multiple threads.

When start a Celery worker via celery -A tasks worker --loglevel INFO --concurrency 3 --pool=eventlet

Option --pool=solo

It will not create any child process or thread to run task. The tasks will be executed in main process, which causes the main process to be blocked.

It seems as: Run 1 worker with 1 process, however --concurrency will not take any effect when --pool=solo!

When coming to a microservices environment, this option becomes useful and practical especially running CPU intensive tasks. The container manager such as Docker can increase the task processing capabilities through managing the number of worker containers instead of managing the number of pool processes per worker.

When start a Celery worker via celery -A tasks worker --loglevel INFO --pool=solo

Celery Task

What's the lifecycle of a Celery task from the time it's created to the it's done?

Here we analyze a simple task with all Celery configuration in default and use Redis as broker and backend

@app.task(acks_late=True)
def wait(secs: float) -> str:
print(f"wait() - Start, secs[{secs}]s")
time.sleep(secs)
print(f"wait() - Done, secs[{secs}]s")
return f"wait() - Done, secs[{secs}]s"
  1. When a client call wait.delay(60), this task is added to a default queue named celery in Redis.
  2. Celery worker polls the queue and pulls the task, then it removes the task from the queue and moves it a special queue named unacked in Redis.
  3. The worker holds on to the task(prefetch), until it has abilities to process the task.
  4. Once after The worker successfully processes the task, it acks now (acks_late=True) that it removes the task from the unacked queue in Redis.
    • If acks_late=False, the worker acks before processing the task.

Let's get more concrete understanding in practices.

  1. First, let's enter a redis-cli interactive mode with the newly launched application,
127.0.0.1:6379> KEYS *
1) "_kombu.binding.email_service"
2) "_kombu.binding.ml_service"
3) "_kombu.binding.celery.pidbox"
4) "_kombu.binding.celeryev"
5) "_kombu.binding.celery"

At the beginning, you can see that the celery key and the unacked key do not exist in Redis.

  1. Then, let's call wait.delay(60) multiple times at the same time,
127.0.0.1:6379> KEYS *
1) "unacked_index"
2) "_kombu.binding.email_service"
3) "_kombu.binding.celery.pidbox"
4) "celery-task-meta-3d6b2028-6ee6-4e2c-85f1-cbeba644aca5"
5) "celery"
6) "_kombu.binding.celeryev"
7) "_kombu.binding.celery"
8) "_kombu.binding.ml_service"
9) "celery-task-meta-e5a1b7db-f1ad-4d3e-b2b9-3b7de8f8c87e"
10) "unacked"
127.0.0.1:6379> TYPE unacked
hash
127.0.0.1:6379> TYPE celery
list

After we create tasks, the celery key of list type and the unacked key of hash type are both created in Redis.

127.0.0.1:6379> LRANGE celery 0 -1
1) "{\"body\": \"W1s2MC4wXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"app.celery_app.tasks.wait\", \"id\": \"da959152-1f45-4846-99e4-5205d30c1be7\", \"shadow\": null, \"eta\": null, \"expires\": null, \"group\": null, \"group_index\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"da959152-1f45-4846-99e4-5205d30c1be7\", \"parent_id\": null, \"argsrepr\": \"(60.0,)\", \"kwargsrepr\": \"{}\", \"origin\": \"gen11@a840cdd15b13\", \"ignore_result\": false}, \"properties\": {\"correlation_id\": \"da959152-1f45-4846-99e4-5205d30c1be7\", \"reply_to\": \"4b0f2f2d-aee2-3349-81ab-e95a1f0e9f02\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"d657c66d-4e4b-483d-9fbe-fe4b5b9541e7\"}}"
2) "{\"body\": \"W1s2MC4wXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"app.celery_app.tasks.wait\", \"id\": \"1ddc3c5e-fa33-4d12-aa3f-c3d13581a4c8\", \"shadow\": null, \"eta\": null, \"expires\": null, \"group\": null, \"group_index\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"1ddc3c5e-fa33-4d12-aa3f-c3d13581a4c8\", \"parent_id\": null, \"argsrepr\": \"(60.0,)\", \"kwargsrepr\": \"{}\", \"origin\": \"gen11@a840cdd15b13\", \"ignore_result\": false}, \"properties\": {\"correlation_id\": \"1ddc3c5e-fa33-4d12-aa3f-c3d13581a4c8\", \"reply_to\": \"4b0f2f2d-aee2-3349-81ab-e95a1f0e9f02\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"927d1ac0-3709-4e23-8c0f-037713c55217\"}}"
127.0.0.1:6379> HGETALL unacked
1) "927d1ac0-3709-4e23-8c0f-037713c55217"
2) "[{\"body\": \"W1s2MC4wXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"app.celery_app.tasks.wait\", \"id\": \"1ddc3c5e-fa33-4d12-aa3f-c3d13581a4c8\", \"shadow\": null, \"eta\": null, \"expires\": null, \"group\": null, \"group_index\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"1ddc3c5e-fa33-4d12-aa3f-c3d13581a4c8\", \"parent_id\": null, \"argsrepr\": \"(60.0,)\", \"kwargsrepr\": \"{}\", \"origin\": \"gen11@a840cdd15b13\", \"ignore_result\": false}, \"properties\": {\"correlation_id\": \"1ddc3c5e-fa33-4d12-aa3f-c3d13581a4c8\", \"reply_to\": \"4b0f2f2d-aee2-3349-81ab-e95a1f0e9f02\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"927d1ac0-3709-4e23-8c0f-037713c55217\"}}, \"\", \"celery\"]"
  1. Wait for all these tasks to be done
127.0.0.1:6379> KEYS *
1) "_kombu.binding.email_service"
2) "celery-task-meta-da959152-1f45-4846-99e4-5205d30c1be7"
3) "celery-task-meta-815587f5-782d-454a-8498-b4ebbb91abd8"
4) "_kombu.binding.celery.pidbox"
5) "celery-task-meta-3d6b2028-6ee6-4e2c-85f1-cbeba644aca5"
6) "_kombu.binding.celeryev"
7) "_kombu.binding.celery"
8) "_kombu.binding.ml_service"
9) "celery-task-meta-1ddc3c5e-fa33-4d12-aa3f-c3d13581a4c8"
10) "celery-task-meta-e5a1b7db-f1ad-4d3e-b2b9-3b7de8f8c87e"

After all tasks are done successfully, both keys: celery and unacked are removed from Redis.

The result of a task is stored in celery-task-meta-{{uuid}} key.

127.0.0.1:6379> TYPE celery-task-meta-da959152-1f45-4846-99e4-5205d30c1be7
string
127.0.0.1:6379> GET celery-task-meta-da959152-1f45-4846-99e4-5205d30c1be7
"{\"status\": \"SUCCESS\", \"result\": \"wait() - Done, secs[60.0]s\", \"traceback\": null, \"children\": [], \"date_done\": \"2023-11-07T07:54:16.954872\", \"task_id\": \"da959152-1f45-4846-99e4-5205d30c1be7\"}"

Serve machine learning model

Properly running a machine learning model in task is different with running other jobs as we need avoiding loading ML model every time we run tasks. So it is stateful that we should keep something in worker.

Different workers for different tasks

Assuming a such situation: There is a worker x to only handle email tasks and a worker y to only handle machine learning related tasks.

These are configurations for project x:

#Celery routing.
app.conf.task_routes = {
'celery_app.email_tasks.*': {
'queue': 'email_service',
},
}

#Run celery.
celery -A celery_app.email_tasks:app worker -l info -E -Q email_service

These are configurations for project y:

#Celery routing.
app.conf.task_routes = {
'celery_app.ml_tasks.*': {
'queue': 'ml_service',
},
}

#Run celery.
celery -A celery_app.ml_tasks:app worker -l info -E -Q ml_service

Details in explanation:

  1. Different workers handle their own queues for separate tasks.

Look at https://github.com/liviaerxin/fastapi-celery-ml for see a complete Celery project.

Code Analysis

from celery import signature
sig = add.s(2, 2)
sig.freeze()

Known issues

Result state is always PENDING in windows

FIX: use --pool=solo instead of --pool=prefork in default. multiprocessing may cause this problem as its some defect in windows!

Long running jobs redelivering after broker visibility timeout with celery and redis · Issue #5935 · celery/celery · GitHub Long tasks are executed multiple times · Issue #3430 · celery/celery · GitHub

No Worker Heartbeat With Solo Pool · Issue #3768 · celery/celery · GitHub

Resources

Celery - Distributed Task Queue — Celery 5.3.4 documentation

Celery Execution Pools: What is it all about?

Celery Execution Pool: The worker and the pool - separation of concerns

Serving ML Models in Production with FastAPI and Celery | by Jonathan Readshaw | Towards Data Science

GitHub - liviaerxin/FastAPISpamDetection: Code for my Medium article: "How you can quickly deploy your ML models with FastAPI"

Celery ETA Tasks Demystified. At Instawork, we use Celery to queue… | by Oleg Pesok | Instawork Engineering