Aynchronous tasks¶
Overview¶
We use celery as our task queue. Task queues manage background work that must be executed outside the usual HTTP request-response cycle. More infos about task queues general principles.
To give an example of how we use them in laputa: when a conceptor clicks on 'refresh data', the data processing (extract, transform, load) is not executed synchronously in the gunicorn process which received the request. Instead, the gunicorn process just puts a message in a message broker (Redis, in our case). This message is then fetched by celery workers, which does the real work.
Usage¶
Let's say you want to create an async task which performs a git push of the config files
directory for a given small app.
First step: add a function decorated with @operation in commons/tasks/tasks.py:
@operation(
queue=Queues.QUICK,
tracked_args=['force'],
lock='github',
lock_from_args=['small_app_id'],
lock_reject=False,
)
def push_changes(small_app_id, force=False):
small_app = build_small_app(bucket.config, small_app_id)
return small_app.sync_with_remote(SmallAppFolder.PUSH, force)
(we will see the role of the operation args further)
Second step: use it!
from laputa.common.tasks import push_changes
[...]
result = push_changes.delay(g.small_app.id, force=True)
resp = {'message': 'Pushing changes to remote', 'operation_id': str(result)}
And that's all (as you can imagine, there's some magic happening under the hood).
Celery workers and pools¶
Celery can spawn multiple workers if several tasks have to be processed in parallel. This can be verified by looking at the processes running in the docker container:
root@container-back-worker:/app# ps faux | grep celery
USER PID %CPU %MEM START TIME COMMAND
toucan 2251 10.1 0.2 17:14 0:03 python -m laputa.common.tasks.celery_app …
toucan 2306 0.0 0.1 17:14 0:00 \_ python -m laputa.common.tasks.celery_app …
toucan 2252 10.4 0.2 17:14 0:03 python -m laputa.common.tasks.celery_app …
toucan 2307 0.0 0.1 17:14 0:00 \_ python -m laputa.common.tasks.celery_app …
toucan 2308 0.0 0.1 17:14 0:00 \_ python -m laputa.common.tasks.celery_app …
Here we see that we have 2 pools of processes, with 2 celery 'master' processes. The 1st pool has one child worker, the 2nd pool has two of them.
Question
Why 2 pools? What are their differences?
Not much, actually. The first pool is aimed at handling 'heavy' tasks (e.g refresh data),
whereas the other one is aimed at handling 'light / quick' tasks (e.g datasource upload).
This is because we don't want heavy tasks to prevent quick tasks to be executed, because it would
affect the user experience
(e.g we don't want file uploads to be blocked because a refresh is running).
Concretely, the difference between the 2 pools is that the 'quick' pool will proceed
messages sent to the 'quick' queue (because it has the env var WORKER_QUEUE="quick"),
whereas the other one will proceed the 'default' queue
(some tasks are mapped to the quick queue, cf. COMMON_QUICK_QUEUE_PARAMS).
We use the autoscale parameter to tell celery to spawn new workers to process incoming messages
in parallel, up to a limit of 10 parallel workers (configurable) for each pool.
A pool will always have at least one worker up, even if it's not doing anything
(it will be in charge of the next task incoming).
Note
In development mode (i.e when you start laputa with toucan-ctl runserver),
what is described above won't apply (unless you set the env var PRODUCTION=true).
This is because of the task_always_eager option of CeleryConfig: when this
is set to True, tasks are processed synchronously in the current process,
instead of being delayed to celery via redis.
Locks¶
By default, as long as celery has free workers available, it will run tasks in parallel. But, there are situations where we don't want 2 tasks to be executed in parallel. For example, we don't want 2 refresh to be executed at the same time on the same small app, or a refresh and a publish (because it could create weird states).
To handle these cases, we can (optionally) put a lock on some operations. If the lock is already held, the same operation cannot be delayed. For example, try to launch a publish on an app where a refresh is already running: it will be rejected.
Waitlist¶
In some other cases, we don't want to reject the 2nd operation, we just want it to be delayed after the 1st one is done. The problem is, if you put a message in the redis queue, celery will execute it right now. The solution we chose is to put the message in a different redis queue (we call this one the waitlist). This way, celery is not aware that a message has been sent. Then, when an operation finishes, in the teardown process, if there are messages in the waitlist, we move the 1st message from the waitlist to the "normal" redis queue, so celery will get this message.
Tasks who should use a waitlist instead of rejecting new operations are marked
with locked_reject=False in the @operation decorator's args.
(more details in PAT 109).
Async jobs life cycle¶
First, on the caller side (i.e on api side (in a gunicorn process) or in the scheduler process), the task is delayed, aka put in redis. Then on celery side, the celery master fetches the message from redis, and gives it to one of its workers. There's some laputa's code involved when the worker initializes (before executing the task) and finishes (after).
The code executed on the caller side can be found in celery_app.Task.
It mainly consists of:
- acquire the lock
- mark the operation as "pending" in mongodb
- inject some informations which will be deserialized on the worker side
The code executing on the worker side can be found in celery_app.operation.
It mainly consists of:
- deserialize injected params and populate
bucketwith some of them - mark the operation as "running" in mongodb
- execute the actual task (
result = func(*args, **kwargs)) - then call the
operation_teardownutil
The operation_teardown util is in charge of:
- release the lock if there is one (or unstack a message from the waitlist, if any)
- mark the operation as finished (cancelled, successful or failed) in mongo
- notify the clients connected via websocket that the task is finished
OOM Kill¶
(Out Of Memory Kill)
When a worker is savagely killed by the linux kernel, it doesn't have the chance
to call operation_teardown.
To handle that, we implemented our own CustomRequest with a custom on_failure method,
which takes care of all of that, by calling operation_teardown.
Websocket notifications¶
When an operation's state changes, the function broadcast_operation_updated is
called, in order to notify websocket clients.
This is done by
- putting a message in redis
- this message is then fetched by the websocket daemon
- this daemon forwards this message to all the concerned clients connected