Celery
What is the Celery? Celery is a simple, flexible, and reliable distributed system to process vast amounts of tasks. In practice,
- Celery can be used to execute long-running jobs behind web servers to let your servers handle requests/responses in non-blocking ways.
- Celery can do scheduled jobs.
Here are some production-ready use cases for Celery I have encountered.
Celery worker
Celery worker Mechanism:
To start a Celery worker will start a main process that will spawn child processes or threads(based on the --pool
option): the main process will handle receiving task/sending task result the and these child processes/threads(a.k.a execution pool
) execute the actual tasks.
To increase the number of child processes/threads(via --concurrency
option) will increase the number of tasks the Celery worker can process in parallel. More processes are usually better.
However, in reality, there are some situations in following modes:
- Run N workers with M child processes each.
- Run 1 worker with N*M child processes.
- Run N workers with only 1 main process each.
- Run N workers with M child threads each.
- Run 1 worker with N*M child threads.
Whether to use processes
or threads
depends on what your tasks will actually do and whether they are GPU bound or IO bound.
Worker procedure
from celery import Celery
app = Celery(...)
@app.task()
def add(x, y):
return x + y
@app.task()
def mul(x, y):
return x * y
The @app.task
decoration will use Task
class in default if you don't specify explicitly.
When a worker start by celery -A tasks worker
,
- Worker will spawn child Processes, the number of child Processes is based on CPU cores in default.
- Each child Process will initialize a
Task
instance for every decorated function. Hereadd()
has its ownTask
instance andmul()
also has its ownTask
instance respectively.
When a client call add.delay(1, 2)
,
- Worker receive a Task in Queue.
- Worker assign the Task to a child Process, which will determine to use which
Task
instance to execute. ATask
instance is initialized in each decorated function and registered with a task name using function name in default(such asadd
,mul
). Here is theTask
instance with nameadd()
should be picked up to run the task. - When be decorated in
add()
, theTask
instancerun()
method will beadd()
original function body. The child Process will use theTask
instance's__call__()
method to run task, and__call__()
will invoke therun()
within itself.
Option --pool=prefork
It spawns multiple processes.
When start a Celery worker via celery -A tasks worker --loglevel INFO --concurrency 3 --pool=prefork
, what will happen underneath?
- Celery start a main process.
- The main process will then spawn 3 child processes. The default
concurrency
is based on the number ofCPU
available on the machine. The defaultpool
isprefork
which usesmultiprocessing
library from Python. - These child processes will execute the tasks assigned from the main process.
Option --pool=eventlet or --pool=gevent
It creates multiple threads.
When start a Celery worker via celery -A tasks worker --loglevel INFO --concurrency 3 --pool=eventlet
Option --pool=solo
It will not create any child process or thread to run task. The tasks will be executed in main process, which causes the main process to be blocked.
It seems as: Run 1 worker with 1 process, however --concurrency
will not take any effect when --pool=solo
!
When coming to a microservices environment, this option becomes useful and practical especially running CPU intensive tasks. The container manager such as Docker
can increase the task processing capabilities through managing the number of worker containers instead of managing the number of pool processes per worker.
When start a Celery worker via celery -A tasks worker --loglevel INFO --pool=solo
Celery Task
What's the lifecycle of a Celery task from the time it's created to the it's done?
Here we analyze a simple task with all Celery configuration in default and use Redis as broker and backend
@app.task(acks_late=True)
def wait(secs: float) -> str:
print(f"wait() - Start, secs[{secs}]s")
time.sleep(secs)
print(f"wait() - Done, secs[{secs}]s")
return f"wait() - Done, secs[{secs}]s"
- When a client call
wait.delay(60)
, this task is added to a default queue namedcelery
in Redis. - Celery worker polls the queue and pulls the task, then it removes the task from the queue and moves it a special queue named
unacked
in Redis. - The worker holds on to the task(
prefetch
), until it has abilities to process the task. - Once after The worker successfully processes the task, it
acks
now (acks_late=True
) that it removes the task from theunacked
queue in Redis.- If
acks_late=False
, the workeracks
before processing the task.
- If
Let's get more concrete understanding in practices.
- First, let's enter a
redis-cli
interactive mode with the newly launched application,
127.0.0.1:6379> KEYS *
1) "_kombu.binding.email_service"
2) "_kombu.binding.ml_service"
3) "_kombu.binding.celery.pidbox"
4) "_kombu.binding.celeryev"
5) "_kombu.binding.celery"
At the beginning, you can see that the celery
key and the unacked
key do not exist in Redis.
- Then, let's call
wait.delay(60)
multiple times at the same time,
127.0.0.1:6379> KEYS *
1) "unacked_index"
2) "_kombu.binding.email_service"
3) "_kombu.binding.celery.pidbox"
4) "celery-task-meta-3d6b2028-6ee6-4e2c-85f1-cbeba644aca5"
5) "celery"
6) "_kombu.binding.celeryev"
7) "_kombu.binding.celery"
8) "_kombu.binding.ml_service"
9) "celery-task-meta-e5a1b7db-f1ad-4d3e-b2b9-3b7de8f8c87e"
10) "unacked"
127.0.0.1:6379> TYPE unacked
hash
127.0.0.1:6379> TYPE celery
list
After we create tasks, the celery
key of list
type and the unacked
key of hash
type are both created in Redis.
127.0.0.1:6379> LRANGE celery 0 -1
1) "{\"body\": \"W1s2MC4wXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"app.celery_app.tasks.wait\", \"id\": \"da959152-1f45-4846-99e4-5205d30c1be7\", \"shadow\": null, \"eta\": null, \"expires\": null, \"group\": null, \"group_index\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"da959152-1f45-4846-99e4-5205d30c1be7\", \"parent_id\": null, \"argsrepr\": \"(60.0,)\", \"kwargsrepr\": \"{}\", \"origin\": \"gen11@a840cdd15b13\", \"ignore_result\": false}, \"properties\": {\"correlation_id\": \"da959152-1f45-4846-99e4-5205d30c1be7\", \"reply_to\": \"4b0f2f2d-aee2-3349-81ab-e95a1f0e9f02\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"d657c66d-4e4b-483d-9fbe-fe4b5b9541e7\"}}"
2) "{\"body\": \"W1s2MC4wXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"app.celery_app.tasks.wait\", \"id\": \"1ddc3c5e-fa33-4d12-aa3f-c3d13581a4c8\", \"shadow\": null, \"eta\": null, \"expires\": null, \"group\": null, \"group_index\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"1ddc3c5e-fa33-4d12-aa3f-c3d13581a4c8\", \"parent_id\": null, \"argsrepr\": \"(60.0,)\", \"kwargsrepr\": \"{}\", \"origin\": \"gen11@a840cdd15b13\", \"ignore_result\": false}, \"properties\": {\"correlation_id\": \"1ddc3c5e-fa33-4d12-aa3f-c3d13581a4c8\", \"reply_to\": \"4b0f2f2d-aee2-3349-81ab-e95a1f0e9f02\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"927d1ac0-3709-4e23-8c0f-037713c55217\"}}"
127.0.0.1:6379> HGETALL unacked
1) "927d1ac0-3709-4e23-8c0f-037713c55217"
2) "[{\"body\": \"W1s2MC4wXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"headers\": {\"lang\": \"py\", \"task\": \"app.celery_app.tasks.wait\", \"id\": \"1ddc3c5e-fa33-4d12-aa3f-c3d13581a4c8\", \"shadow\": null, \"eta\": null, \"expires\": null, \"group\": null, \"group_index\": null, \"retries\": 0, \"timelimit\": [null, null], \"root_id\": \"1ddc3c5e-fa33-4d12-aa3f-c3d13581a4c8\", \"parent_id\": null, \"argsrepr\": \"(60.0,)\", \"kwargsrepr\": \"{}\", \"origin\": \"gen11@a840cdd15b13\", \"ignore_result\": false}, \"properties\": {\"correlation_id\": \"1ddc3c5e-fa33-4d12-aa3f-c3d13581a4c8\", \"reply_to\": \"4b0f2f2d-aee2-3349-81ab-e95a1f0e9f02\", \"delivery_mode\": 2, \"delivery_info\": {\"exchange\": \"\", \"routing_key\": \"celery\"}, \"priority\": 0, \"body_encoding\": \"base64\", \"delivery_tag\": \"927d1ac0-3709-4e23-8c0f-037713c55217\"}}, \"\", \"celery\"]"
- Wait for all these tasks to be done
127.0.0.1:6379> KEYS *
1) "_kombu.binding.email_service"
2) "celery-task-meta-da959152-1f45-4846-99e4-5205d30c1be7"
3) "celery-task-meta-815587f5-782d-454a-8498-b4ebbb91abd8"
4) "_kombu.binding.celery.pidbox"
5) "celery-task-meta-3d6b2028-6ee6-4e2c-85f1-cbeba644aca5"
6) "_kombu.binding.celeryev"
7) "_kombu.binding.celery"
8) "_kombu.binding.ml_service"
9) "celery-task-meta-1ddc3c5e-fa33-4d12-aa3f-c3d13581a4c8"
10) "celery-task-meta-e5a1b7db-f1ad-4d3e-b2b9-3b7de8f8c87e"
After all tasks are done successfully, both keys: celery
and unacked
are removed from Redis.
The result of a task is stored in celery-task-meta-{{uuid}}
key.
127.0.0.1:6379> TYPE celery-task-meta-da959152-1f45-4846-99e4-5205d30c1be7
string
127.0.0.1:6379> GET celery-task-meta-da959152-1f45-4846-99e4-5205d30c1be7
"{\"status\": \"SUCCESS\", \"result\": \"wait() - Done, secs[60.0]s\", \"traceback\": null, \"children\": [], \"date_done\": \"2023-11-07T07:54:16.954872\", \"task_id\": \"da959152-1f45-4846-99e4-5205d30c1be7\"}"
Serve machine learning model
Properly running a machine learning model in task is different with running other jobs as we need avoiding loading ML model every time we run tasks. So it is stateful that we should keep something in worker.
Different workers for different tasks
Assuming a such situation:
There is a worker x
to only handle email tasks and a worker y
to only handle machine learning related tasks.
These are configurations for project x
:
#Celery routing.
app.conf.task_routes = {
'celery_app.email_tasks.*': {
'queue': 'email_service',
},
}
#Run celery.
celery -A celery_app.email_tasks:app worker -l info -E -Q email_service
These are configurations for project y
:
#Celery routing.
app.conf.task_routes = {
'celery_app.ml_tasks.*': {
'queue': 'ml_service',
},
}
#Run celery.
celery -A celery_app.ml_tasks:app worker -l info -E -Q ml_service
Details in explanation:
- Different workers handle their own queues for separate tasks.
Look at https://github.com/liviaerxin/fastapi-celery-ml for see a complete Celery project.
Code Analysis
from celery import signature
sig = add.s(2, 2)
sig.freeze()
Known issues
Result state is always PENDING in windows
FIX: use
--pool=solo
instead of--pool=prefork
in default.multiprocessing
may cause this problem as its some defect in windows!
Long running jobs redelivering after broker visibility timeout with celery and redis · Issue #5935 · celery/celery · GitHub Long tasks are executed multiple times · Issue #3430 · celery/celery · GitHub
No Worker Heartbeat With Solo Pool · Issue #3768 · celery/celery · GitHub
Resources
Celery - Distributed Task Queue — Celery 5.3.4 documentation
Celery Execution Pools: What is it all about?
Celery Execution Pool: The worker and the pool - separation of concerns