如何使用 Celery 和 Django 将任务路由到不同的队列

Posted

技术标签:

【中文标题】如何使用 Celery 和 Django 将任务路由到不同的队列【英文标题】:How to route tasks to different queues with Celery and Django 【发布时间】:2019-01-08 22:00:28 【问题描述】:

我正在使用以下堆栈:

Python 3.6 Celery v4.2.1(代理:RabbitMQ v3.6.0Django v2.0.4

根据Celery's documentation,在不同的队列上运行计划任务应该就像为CELERY_ROUTES上的任务定义相应的队列一样简单,但所有任务似乎都在Celery的默认队列上执行。

这是my_app/settings.py上的配置:

CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_ROUTES = 
 'app1.tasks.*': 'queue': 'queue1',
 'app2.tasks.*': 'queue': 'queue2',

CELERY_BEAT_SCHEDULE = 
    'app1_test': 
        'task': 'app1.tasks.app1_test',
        'schedule': 15,
    ,
    'app2_test': 
        'task': 'app2.tasks.app2_test',
        'schedule': 15,
    ,


这些任务只是用于测试路由的简单脚本:

文件app1/tasks.py

from my_app.celery import app
import time


@app.task()
def app1_test():
    print('I am app1_test task!')
    time.sleep(10)

文件app2/tasks.py

from my_app.celery import app
import time


@app.task()
def app2_test():
    print('I am app2_test task!')
    time.sleep(10)

当我使用所有必需的队列运行 Celery 时:

celery -A my_app worker -B -l info -Q celery,queue1,queue2

RabbitMQ 将显示只有默认队列“celery”正在运行任务:

sudo rabbitmqctl list_queues
# Tasks executed by each queue:
#  - celery 2
#  - queue1 0
#  - queue2 0

有人知道如何解决这种意外行为吗?

问候,

【问题讨论】:

【参考方案1】:

我已经成功了,这里有几件事需要注意:

根据Celery's 4.2.0 documentation,CELERY_ROUTES 应该是定义队列路由的变量,但它只适用于我使用CELERY_TASK_ROUTES 代替。任务路由似乎独立于 Celery Beat,因此这只适用于手动安排的任务:

app1_test.delay()
app2_test.delay()

app1_test.apply_async()
app2_test.apply_async()

要使其与 Celery Beat 一起使用,我们只需要在 CELERY_BEAT_SCHEDULE 变量中明确定义队列。 my_app/settings.py 文件的最终设置如下:

CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_TASK_ROUTES = 
 'app1.tasks.*': 'queue': 'queue1',
 'app2.tasks.*': 'queue': 'queue2',

CELERY_BEAT_SCHEDULE = 
    'app1_test': 
        'task': 'app1.tasks.app1_test',
        'schedule': 15,
        'options': 'queue': 'queue1'
    ,
    'app2_test': 
        'task': 'app2.tasks.app2_test',
        'schedule': 15,
        'options': 'queue': 'queue2'
    ,


并在这两个队列上运行 Celery:

celery -A my_app worker -B -l INFO -Q queue1,queue2

在哪里

-A:项目或应用的名称。 -B:启动任务调度器Celery beat。 -l:定义日志记录级别。 -Q:定义了这个worker处理的队列。

我希望这可以为其他开发人员节省一些时间。

【讨论】:

CELERY_ROUTESCELERY_TASK_ROUTES 混淆的解释:CELERY_ROUTES 是旧的 celery 设置名称,现在已替换为 task_routes。但是 django 设置文件中的 celery 设置必须是大写的(例如TASK_ROUTES)。为避免与其他 django 设置冲突,建议在 celery 设置前加上 CELERY_ 前缀,结果为 CELERY_TASK_ROUTES。这是通过执行以下操作加载的:app.config_from_object('django.conf:settings', namespace='CELERY')。所以CELERY_TASK_ROUTES 只是新设置名称的大写和前缀更改。 对于那些想知道 celery beat 的人来说,它并不独立于任务路由,它应该也可以工作。 不,我认为我们不需要在设置文件中定义“选项”参数,如果您只是从命令中删除其他队列名称之前的芹菜,那么它就可以正常工作,因为您有已经在 settings.py 文件中定义了 CELERY_ROUTES 这也适用于 celery v5.0 ,谢谢 喜欢这个答案!在 2022 年 1 月的 v4.4.0 中对我来说工作得很好,@sparrowt 评论和解释使用 CELERY_TASK_ROUTES 而不是 CELERY_ROUTES 是 :chefskiss: :D【参考方案2】:

在装饰器中添加queue 参数可能会对您有所帮助,

@app.task(queue='queue1')
def app1_test():
    print('I am app1_test task!')
    time.sleep(10)

【讨论】:

感谢@JPG,这将是一个有效的替代方案,但我更喜欢在 Django 设置文件中定义队列以获得更大的灵活性。这样我可以根据环境使用不同的队列名称:测试、暂存、生产 我放弃了配置选项“celery_task_default_queue”、“task_default_queue”、“task_routes”。这些都没有做任何事情。唯一有用的是装饰器,谢谢 JPG。【参考方案3】:

好的,因为我尝试了与您运行工作程序相同的命令,所以我发现您只需要删除 -Q 参数后的“celery”就可以了。

所以旧命令是

celery -A my_app worker -B -l info -Q celery,queue1,queue2

而新命令是

celery -A my_app worker -B -l info -Q queue1,queue2

【讨论】:

应始终指定队列名称和工作人员名称。例如。 "celery -A my_app worker -Q my_queue,my_other_queue -P 线程 --task-events -c 40 -l INFO -B --scheduler django_celery_beat.scheduler:DatabaseScheduler -n celery@%h""

以上是关于如何使用 Celery 和 Django 将任务路由到不同的队列的主要内容,如果未能解决你的问题,请参考以下文章

如何将 django 对象发送到 celery 任务?

如何从 django 模板暂停和停止 celery 任务

如何限制 django 网站的 redis/celery 任务?

如何在任务中获取芹菜结果模型(使用 django-celery-results)

如何在 Django-Celery 失败的情况下设置重试任务

Django - 如何在 celery 和 redis 中使用异步任务队列