如何使用 Celery 和 Django 将任务路由到不同的队列
Posted
技术标签:
【中文标题】如何使用 Celery 和 Django 将任务路由到不同的队列【英文标题】:How to route tasks to different queues with Celery and Django 【发布时间】:2019-01-08 22:00:28 【问题描述】:我正在使用以下堆栈:
Python 3.6 Celery v4.2.1(代理:RabbitMQ v3.6.0) Django v2.0.4。根据Celery's documentation,在不同的队列上运行计划任务应该就像为CELERY_ROUTES上的任务定义相应的队列一样简单,但所有任务似乎都在Celery的默认队列上执行。
这是my_app/settings.py上的配置:
CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_ROUTES =
'app1.tasks.*': 'queue': 'queue1',
'app2.tasks.*': 'queue': 'queue2',
CELERY_BEAT_SCHEDULE =
'app1_test':
'task': 'app1.tasks.app1_test',
'schedule': 15,
,
'app2_test':
'task': 'app2.tasks.app2_test',
'schedule': 15,
,
这些任务只是用于测试路由的简单脚本:
文件app1/tasks.py:
from my_app.celery import app
import time
@app.task()
def app1_test():
print('I am app1_test task!')
time.sleep(10)
文件app2/tasks.py:
from my_app.celery import app
import time
@app.task()
def app2_test():
print('I am app2_test task!')
time.sleep(10)
当我使用所有必需的队列运行 Celery 时:
celery -A my_app worker -B -l info -Q celery,queue1,queue2
RabbitMQ 将显示只有默认队列“celery”正在运行任务:
sudo rabbitmqctl list_queues
# Tasks executed by each queue:
# - celery 2
# - queue1 0
# - queue2 0
有人知道如何解决这种意外行为吗?
问候,
【问题讨论】:
【参考方案1】:我已经成功了,这里有几件事需要注意:
根据Celery's 4.2.0 documentation,CELERY_ROUTES
应该是定义队列路由的变量,但它只适用于我使用CELERY_TASK_ROUTES
代替。任务路由似乎独立于 Celery Beat,因此这只适用于手动安排的任务:
app1_test.delay()
app2_test.delay()
或
app1_test.apply_async()
app2_test.apply_async()
要使其与 Celery Beat 一起使用,我们只需要在 CELERY_BEAT_SCHEDULE
变量中明确定义队列。 my_app/settings.py
文件的最终设置如下:
CELERY_BROKER_URL = "amqp://guest:guest@localhost:5672//"
CELERY_TASK_ROUTES =
'app1.tasks.*': 'queue': 'queue1',
'app2.tasks.*': 'queue': 'queue2',
CELERY_BEAT_SCHEDULE =
'app1_test':
'task': 'app1.tasks.app1_test',
'schedule': 15,
'options': 'queue': 'queue1'
,
'app2_test':
'task': 'app2.tasks.app2_test',
'schedule': 15,
'options': 'queue': 'queue2'
,
并在这两个队列上运行 Celery:
celery -A my_app worker -B -l INFO -Q queue1,queue2
在哪里
-A
:项目或应用的名称。
-B
:启动任务调度器Celery beat。
-l
:定义日志记录级别。
-Q
:定义了这个worker处理的队列。
我希望这可以为其他开发人员节省一些时间。
【讨论】:
CELERY_ROUTES
与 CELERY_TASK_ROUTES
混淆的解释:CELERY_ROUTES
是旧的 celery 设置名称,现在已替换为 task_routes
。但是 django 设置文件中的 celery 设置必须是大写的(例如TASK_ROUTES
)。为避免与其他 django 设置冲突,建议在 celery 设置前加上 CELERY_
前缀,结果为 CELERY_TASK_ROUTES
。这是通过执行以下操作加载的:app.config_from_object('django.conf:settings', namespace='CELERY')
。所以CELERY_TASK_ROUTES
只是新设置名称的大写和前缀更改。
对于那些想知道 celery beat 的人来说,它并不独立于任务路由,它应该也可以工作。
不,我认为我们不需要在设置文件中定义“选项”参数,如果您只是从命令中删除其他队列名称之前的芹菜,那么它就可以正常工作,因为您有已经在 settings.py 文件中定义了 CELERY_ROUTES
这也适用于 celery v5.0 ,谢谢
喜欢这个答案!在 2022 年 1 月的 v4.4.0 中对我来说工作得很好,@sparrowt 评论和解释使用 CELERY_TASK_ROUTES 而不是 CELERY_ROUTES 是 :chefskiss: :D【参考方案2】:
在装饰器中添加queue
参数可能会对您有所帮助,
@app.task(queue='queue1')
def app1_test():
print('I am app1_test task!')
time.sleep(10)
【讨论】:
感谢@JPG,这将是一个有效的替代方案,但我更喜欢在 Django 设置文件中定义队列以获得更大的灵活性。这样我可以根据环境使用不同的队列名称:测试、暂存、生产 我放弃了配置选项“celery_task_default_queue”、“task_default_queue”、“task_routes”。这些都没有做任何事情。唯一有用的是装饰器,谢谢 JPG。【参考方案3】:好的,因为我尝试了与您运行工作程序相同的命令,所以我发现您只需要删除 -Q 参数后的“celery”就可以了。
所以旧命令是
celery -A my_app worker -B -l info -Q celery,queue1,queue2
而新命令是
celery -A my_app worker -B -l info -Q queue1,queue2
【讨论】:
应始终指定队列名称和工作人员名称。例如。 "celery -A my_app worker -Q my_queue,my_other_queue -P 线程 --task-events -c 40 -l INFO -B --scheduler django_celery_beat.scheduler:DatabaseScheduler -n celery@%h""以上是关于如何使用 Celery 和 Django 将任务路由到不同的队列的主要内容,如果未能解决你的问题,请参考以下文章
如何限制 django 网站的 redis/celery 任务?
如何在任务中获取芹菜结果模型(使用 django-celery-results)