Skip to content

Commit dca8cf0

Browse files
committed
rewrite celery background tasks docs
1 parent dd2423e commit dca8cf0

File tree

1 file changed

+200
-70
lines changed

1 file changed

+200
-70
lines changed

docs/patterns/celery.rst

Lines changed: 200 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -1,105 +1,235 @@
1-
Celery Background Tasks
2-
=======================
1+
Background Tasks with Celery
2+
============================
33

4-
If your application has a long running task, such as processing some uploaded
5-
data or sending email, you don't want to wait for it to finish during a
6-
request. Instead, use a task queue to send the necessary data to another
7-
process that will run the task in the background while the request returns
8-
immediately.
4+
If your application has a long running task, such as processing some uploaded data or
5+
sending email, you don't want to wait for it to finish during a request. Instead, use a
6+
task queue to send the necessary data to another process that will run the task in the
7+
background while the request returns immediately.
8+
9+
`Celery`_ is a powerful task queue that can be used for simple background tasks as well
10+
as complex multi-stage programs and schedules. This guide will show you how to configure
11+
Celery using Flask. Read Celery's `First Steps with Celery`_ guide to learn how to use
12+
Celery itself.
13+
14+
.. _Celery: https://celery.readthedocs.io
15+
.. _First Steps with Celery: https://celery.readthedocs.io/en/latest/getting-started/first-steps-with-celery.html
916

10-
Celery is a powerful task queue that can be used for simple background tasks
11-
as well as complex multi-stage programs and schedules. This guide will show you
12-
how to configure Celery using Flask, but assumes you've already read the
13-
`First Steps with Celery <https://celery.readthedocs.io/en/latest/getting-started/first-steps-with-celery.html>`_
14-
guide in the Celery documentation.
1517

1618
Install
1719
-------
1820

19-
Celery is a separate Python package. Install it from PyPI using pip::
21+
Install Celery from PyPI, for example using pip:
22+
23+
.. code-block:: text
2024
2125
$ pip install celery
2226
23-
Configure
24-
---------
2527
26-
The first thing you need is a Celery instance, this is called the celery
27-
application. It serves the same purpose as the :class:`~flask.Flask`
28-
object in Flask, just for Celery. Since this instance is used as the
29-
entry-point for everything you want to do in Celery, like creating tasks
30-
and managing workers, it must be possible for other modules to import it.
28+
Integrate Celery with Flask
29+
---------------------------
3130

32-
For instance you can place this in a ``tasks`` module. While you can use
33-
Celery without any reconfiguration with Flask, it becomes a bit nicer by
34-
subclassing tasks and adding support for Flask's application contexts and
35-
hooking it up with the Flask configuration.
31+
You can use Celery without any integration with Flask, but it's convenient to configure
32+
it through Flask's config, and to let tasks access the Flask application.
3633

37-
This is all that is necessary to integrate Celery with Flask:
34+
Celery uses similar ideas to Flask, with a ``Celery`` app object that has configuration
35+
and registers tasks. While creating a Flask app, use the following code to create and
36+
configure a Celery app as well.
3837

3938
.. code-block:: python
4039
41-
from celery import Celery
42-
43-
def make_celery(app):
44-
celery = Celery(app.import_name)
45-
celery.conf.update(app.config["CELERY_CONFIG"])
40+
from celery import Celery, Task
4641
47-
class ContextTask(celery.Task):
48-
def __call__(self, *args, **kwargs):
42+
def celery_init_app(app: Flask) -> Celery:
43+
class FlaskTask(Task):
44+
def __call__(self, *args: object, **kwargs: object) -> object:
4945
with app.app_context():
5046
return self.run(*args, **kwargs)
5147
52-
celery.Task = ContextTask
53-
return celery
48+
celery_app = Celery(app.name, task_cls=FlaskTask)
49+
celery_app.config_from_object(app.config["CELERY"])
50+
celery_app.set_default()
51+
app.extensions["celery"] = celery_app
52+
return celery_app
5453
55-
The function creates a new Celery object, configures it with the broker
56-
from the application config, updates the rest of the Celery config from
57-
the Flask config and then creates a subclass of the task that wraps the
58-
task execution in an application context.
54+
This creates and returns a ``Celery`` app object. Celery `configuration`_ is taken from
55+
the ``CELERY`` key in the Flask configuration. The Celery app is set as the default, so
56+
that it is seen during each request. The ``Task`` subclass automatically runs task
57+
functions with a Flask app context active, so that services like your database
58+
connections are available.
5959

60-
.. note::
61-
Celery 5.x deprecated uppercase configuration keys, and 6.x will
62-
remove them. See their official `migration guide`_.
60+
.. _configuration: https://celery.readthedocs.io/en/stable/userguide/configuration.html
6361

64-
.. _migration guide: https://docs.celeryproject.org/en/stable/userguide/configuration.html#conf-old-settings-map.
62+
Here's a basic ``example.py`` that configures Celery to use Redis for communication. We
63+
enable a result backend, but ignore results by default. This allows us to store results
64+
only for tasks where we care about the result.
6565

66-
An example task
67-
---------------
68-
69-
Let's write a task that adds two numbers together and returns the result. We
70-
configure Celery's broker and backend to use Redis, create a ``celery``
71-
application using the factory from above, and then use it to define the task. ::
66+
.. code-block:: python
7267
7368
from flask import Flask
7469
75-
flask_app = Flask(__name__)
76-
flask_app.config.update(CELERY_CONFIG={
77-
'broker_url': 'redis://localhost:6379',
78-
'result_backend': 'redis://localhost:6379',
79-
})
80-
celery = make_celery(flask_app)
70+
app = Flask(__name__)
71+
app.config.from_mapping(
72+
CELERY=dict(
73+
broker_url="redis://localhost",
74+
result_backend="redis://localhost",
75+
task_ignore_result=True,
76+
),
77+
)
78+
celery_app = celery_init_app(app)
79+
80+
Point the ``celery worker`` command at this and it will find the ``celery_app`` object.
81+
82+
.. code-block:: text
83+
84+
$ celery -A example worker --loglevel INFO
85+
86+
You can also run the ``celery beat`` command to run tasks on a schedule. See Celery's
87+
docs for more information about defining schedules.
88+
89+
.. code-block:: text
90+
91+
$ celery -A example beat --loglevel INFO
92+
93+
94+
Application Factory
95+
-------------------
96+
97+
When using the Flask application factory pattern, call the ``celery_init_app`` function
98+
inside the factory. It sets ``app.extensions["celery"]`` to the Celery app object, which
99+
can be used to get the Celery app from the Flask app returned by the factory.
100+
101+
.. code-block:: python
102+
103+
def create_app() -> Flask:
104+
app = Flask(__name__)
105+
app.config.from_mapping(
106+
CELERY=dict(
107+
broker_url="redis://localhost",
108+
result_backend="redis://localhost",
109+
task_ignore_result=True,
110+
),
111+
)
112+
app.config.from_prefixed_env()
113+
celery_init_app(app)
114+
return app
115+
116+
To use ``celery`` commands, Celery needs an app object, but that's no longer directly
117+
available. Create a ``make_celery.py`` file that calls the Flask app factory and gets
118+
the Celery app from the returned Flask app.
119+
120+
.. code-block:: python
121+
122+
from example import create_app
123+
124+
flask_app = create_app()
125+
celery_app = flask_app.extensions["celery"]
126+
127+
Point the ``celery`` command to this file.
128+
129+
.. code-block:: text
130+
131+
$ celery -A make_celery worker --loglevel INFO
132+
$ celery -A make_celery beat --loglevel INFO
133+
81134
82-
@celery.task()
83-
def add_together(a, b):
135+
Defining Tasks
136+
--------------
137+
138+
Using ``@celery_app.task`` to decorate task functions requires access to the
139+
``celery_app`` object, which won't be available when using the factory pattern. It also
140+
means that the decorated tasks are tied to the specific Flask and Celery app instances,
141+
which could be an issue during testing if you change configuration for a test.
142+
143+
Instead, use Celery's ``@shared_task`` decorator. This creates task objects that will
144+
access whatever the "current app" is, which is a similar concept to Flask's blueprints
145+
and app context. This is why we called ``celery_app.set_default()`` above.
146+
147+
Here's an example task that adds two numbers together and returns the result.
148+
149+
.. code-block:: python
150+
151+
from celery import shared_task
152+
153+
@shared_task(ignore_result=False)
154+
def add_together(a: int, b: int) -> int:
84155
return a + b
85156
86-
This task can now be called in the background::
157+
Earlier, we configured Celery to ignore task results by default. Since we want to know
158+
the return value of this task, we set ``ignore_result=False``. On the other hand, a task
159+
that didn't need a result, such as sending an email, wouldn't set this.
160+
161+
162+
Calling Tasks
163+
-------------
164+
165+
The decorated function becomes a task object with methods to call it in the background.
166+
The simplest way is to use the ``delay(*args, **kwargs)`` method. See Celery's docs for
167+
more methods.
168+
169+
A Celery worker must be running to run the task. Starting a worker is shown in the
170+
previous sections.
171+
172+
.. code-block:: python
173+
174+
from flask import request
87175
88-
result = add_together.delay(23, 42)
89-
result.wait() # 65
176+
@app.post("/add")
177+
def start_add() -> dict[str, object]:
178+
a = request.form.get("a", type=int)
179+
b = request.form.get("b", type=int)
180+
result = add_together.delay(a, b)
181+
return {"result_id": result.id}
90182
91-
Run a worker
92-
------------
183+
The route doesn't get the task's result immediately. That would defeat the purpose by
184+
blocking the response. Instead, we return the running task's result id, which we can use
185+
later to get the result.
93186

94-
If you jumped in and already executed the above code you will be
95-
disappointed to learn that ``.wait()`` will never actually return.
96-
That's because you also need to run a Celery worker to receive and execute the
97-
task. ::
98187

99-
$ celery -A your_application.celery worker
188+
Getting Results
189+
---------------
190+
191+
To fetch the result of the task we started above, we'll add another route that takes the
192+
result id we returned before. We return whether the task is finished (ready), whether it
193+
finished successfully, and what the return value (or error) was if it is finished.
194+
195+
.. code-block:: python
196+
197+
from celery.result import AsyncResult
198+
199+
@app.get("/result/<id>")
200+
def task_result(id: str) -> dict[str, object]:
201+
result = AsyncResult(id)
202+
return {
203+
"ready": result.ready(),
204+
"successful": result.successful(),
205+
"value": result.result if result.ready() else None,
206+
}
207+
208+
Now you can start the task using the first route, then poll for the result using the
209+
second route. This keeps the Flask request workers from being blocked waiting for tasks
210+
to finish.
211+
212+
213+
Passing Data to Tasks
214+
---------------------
215+
216+
The "add" task above took two integers as arguments. To pass arguments to tasks, Celery
217+
has to serialize them to a format that it can pass to other processes. Therefore,
218+
passing complex objects is not recommended. For example, it would be impossible to pass
219+
a SQLAlchemy model object, since that object is probably not serializable and is tied to
220+
the session that queried it.
221+
222+
Pass the minimal amount of data necessary to fetch or recreate any complex data within
223+
the task. Consider a task that will run when the logged in user asks for an archive of
224+
their data. The Flask request knows the logged in user, and has the user object queried
225+
from the database. It got that by querying the database for a given id, so the task can
226+
do the same thing. Pass the user's id rather than the user object.
227+
228+
.. code-block:: python
100229
101-
The ``your_application`` string has to point to your application's package
102-
or module that creates the ``celery`` object.
230+
@shared_task
231+
def generate_user_archive(user_id: str) -> None:
232+
user = db.session.get(User, user_id)
233+
...
103234
104-
Now that the worker is running, ``wait`` will return the result once the task
105-
is finished.
235+
generate_user_archive.delay(current_user.id)

0 commit comments

Comments
 (0)