Python 分布式任务队列 Celery 入门与使用

最近工作的项目使我接触到了 Celery 这个任务队列。看了一下官方的文档,感觉设计得还挺 Pythonic,理念也非常简单易懂——类似生产者与消费者。在这里稍微总(fan)结(yi)一下 Celery 的使用方法。

简介

Celery 是一个分布式任务队列,网上也有说是分布式任务调度框架,这里我以官方文档的“Distributed Task Queue”为准。它简单、灵活、可靠,可以处理大量的大量的任务,其主要专注于实时处理,同时也支持计划任务。

为什么要用任务队列?我的理解是,首先方便了任务的分发调度与管理,另外也使调用的过程变得异步(非常适合 web 请求)。

名词解释

  • 任务队列(task queue):一种分发任务到不同的线程或机器的方法,其输入为一个任务(task)。
  • Worker:实际执行任务的进程,它不断检查任务队列中的新任务并执行。
  • Broker:客户端与 worker 通信的中介。客户端发送任务的消息到队列中,broker 把这条消息传递给一个 worker。

入门

如果不考虑进阶用法,5 分钟入门。

安装

首先安装 Celery 并选择 broker。其中 broker 主要支持 RabbitMQ 和 Redis。RabbitMQ 不需要额外依赖,直接pip install -U celery安装。 Redis 需要在安装 Celery 时安装附加的依赖:pip install -U "celery[redis]"

RabbitMQ 更为适合生产环境,也稍微大型;Redis 更轻量级,但突然退出时可能丢失数据。为了稍微简单轻量,本文都用 Redis。(如何安装 broker 不在此讨论,docker 启动一个最为简单)

新建 Celery 应用

新建一个mytasks.py

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

接下来就可以启动 worker 了:(生产环境当然不会这样手动运行,而会把它作为后台程序运行)

$ celery -A mytasks worker --loglevel=info

# 如果不了解上面的命令用法,可查看命令帮助
# celery help
# celery worker --help

调用 task

在当前的目录,运行

>>> from mytasks import add
>>> add.delay(1, 2)  # 使用 delay() 来调用这个 task

可以得到类似<AsyncResult: fd9cdbe3-bcb3-432a-8d46-67b41243cfed>的返回值,而不会返回 3;这个 3 在 worker 的控制台里可以看到:Task mytasks.add[fd9cdbe3-bcb3-432a-8d46-67b41243cfed] succeeded in 0.0003419s: 3

保存结果

默认情况下,结果是不保存的。如果想保存结果,需要指定 result backend,支持 SQLAlchemy/Django ORM, MongoDB, Memcached, Redis, RPC (RabbitMQ/AMQP) 等。例如app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0'),调用之后就可以查询任务的状态及结果。

>>> result = add.delay(1, 2)
>>> result.ready()
True
>>> result.get(timeout=1)
3

参数配置

简单的参数配置可以直接修改app.conf,例如:

app.conf.task_serializer = 'json'

对于大型一点的项目,最好专门做一个配置模块。先新建一个 celeryconfig.py:

broker_url = 'pyamqp://xxxx'
result_backend = 'rpc://xxxx'

task_serializer = 'json'
timezone = 'Europe/Oslo'
enable_utc = True

task_annotations = {
    'mytasks.add': {'rate_limit': '10/m'}
}

然后通过app.config_from_object('celeryconfig')导入。

稍微深入

Task

Task 有很多选项可以填入,例如@app.task(bind=True, default_retry_delay=30 * 60)。具体及其余内容见Tasks文档

调用 task

前面用到的 delay() 方法是 apply_async() 的简化,但前者不支持传递执行的参数。举例来说,

task.delay(arg1, arg2, kwarg1='x', kwarg2='y')
# 等价于
task.apply_async(args=(arg1, arg2), kwargs={'kwarg1': 'x', 'kwarg2': 'y'})

可见简化了许多。

Countdown 参数可以设置任务至少(可能受 worker busy 或其他原因有所推迟)多少秒后执行;而 eta (estimated time of arrival) 参数可以设置任务至少(原因相同)在具体时刻之后执行:

>>> result = add.apply_async((1, 2), countdown=5)  # 至少5秒后执行
>>> result.get()  # 阻塞至任务完成

>>> tomorrow = datetime.utcnow() + timedelta(days=1)
>>> add.apply_async((1, 2), eta=tomorrow)

一个任务由于种种原因,延迟太久了,我们可以把它设置为过期,支持输入秒数或一个 datetime:

add.apply_async((10, 10), expires=60)  # 如果任务延迟超过60s,将不会被执行

对于一个任务,还可以指定这个任务放到哪个队列中(routing),例如

add.apply_async(queue='priority.high')

使用 -Q 来给 worker 指定监听的队列:

$ celery -A mytasks worker -l info -Q celery,priority.high

像上面这样硬编码 add 的对应 queue 不是太好,更佳的方法是使用 configuration routers

其他调用 task 的文档,见 Calling Tasks

函数签名(signature)

对于简单的 task 调用,使用 .delay() 或 .apply_async() 方法一般就已足够。但有时我们需要更高级的调用,例如把任务的返回值用作下一个任务的输入,如果把一系列任务写成串行,就很不推荐了。为此,可以通过函数签名来调用 tasks。

下面给 add() 函数创建一个签名(signature):

>>> add.signature((2, 2), countdown=10)
tasks.add(2, 2)
>>> add.s(2, 2)  # 简化,但不能传入task的option,例如countdown
tasks.add(2, 2)
>>> sig = add.signature((2, 2), {'debug': True}, countdown=10)  # 完全版

定义了签名后,就可以用sig.delay()来调用这个任务。

签名的一个很重要的功能是它可以定义偏函数,类似 Python 的 functools.partial:

>>> partial = add.s(2)          # 不完整的 signature
>>> partial.delay(1)            # 1 + 2  注意这个1是填在前面的

偏函数主要的应用场合是各种的原语(Primitives)。这些 primitives 主要包括 group、chain、chord、map、starmap、chunks 等。下面介绍其中几个的用法。

group

group 可以实现任务的并行:

>>> from celery import group
>>> res = group(add.s(i, i) for i in range(10))()
>>> res.get(timeout=1)
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
chain

chain 可以按顺序执行任务,把前一个任务的结果作为接下来的任务的输入。注意偏函数的使用:

>>> from celery import chain
>>> result = chain(add.s('h', 'e'), add.s('llo'), add.s(' world'))()
>>> result.get()
'hello world'
>>> (add.s('h', 'e') | add.s('llo') | add.s(' world'))().get()  # 也可以用 | 连接

有关这一部分的更详细内容,见 Canvas: Designing Work-flows

后台启动

在实际环境中,celery 肯定是以后台服务的方式运行的。文档给出了 systemd、init.d、supervisor 等启动的方式。具体见 Daemonization

定时任务

定时运行任务的功能由 celery beat 完成。它按设定的周期/时间把任务发送到队列中,并被 worker 执行。对于一个集群的 worker,celery beat 应只有一个,否则任务会重复。

mytasks.py改成下面所示:

from celery import Celery
from celery.schedules import crontab

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def add(x, y):
    return x + y

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # 每10s执行一次
    sender.add_periodic_task(10.0, add.s('hello', ' world'), name='every 10s')

    # 按crontab的格式定期执行
    sender.add_periodic_task(
        crontab(hour='*', minute=5),
        add.s('it is', ' xx:05')
    )

然后启动 beat:

$ celery -A mytasks beat

可以在 worker 看到每 10s 输出了一次 “hello world”。每个小时的 5 分,都会输出 “it is xx:05”

关于定时任务,具体见 Periodic Tasks

相关参考

上面只是比较基本的用法。对于更多深入使用中遇到的问题,还是应该参考官网文档


发表评论

解决 : *
34 + 27 =