芹菜 - 如何使用多个队列?

Posted

技术标签:

【中文标题】芹菜 - 如何使用多个队列?【英文标题】:Celery - how to use multiple queues? 【发布时间】:2019-09-03 09:48:58 【问题描述】:

我想为不同的任务创建多个队列。例如emailqueue 发送电子邮件或pipedrive 队列以与pipedrive API 同步任务,因此email 不必等到所有pipedrives 都同步后,反之亦然。

我是路由新手,我尝试了两种创建队列的方法,但似乎都不起作用。

    这是首选方法。我试图在 @task 装饰器中定义队列

    @task(bind=True,  queue='pipedrivequeue')
    

    def backsync_lead(self,lead_id):

    settings.py

    CELERY_ROUTES =  # tried CELERY_TASK_ROUTES too
        'pipedrive.tasks.*': 'queue': 'pipedrivequeue',
       ...
    
    

在这两种情况下,当我手动运行celery worker 时,我只看到一个默认的celery 队列。

(project) milano@milano-PC:~/PycharmProjects/project$ celery -A project.celery worker -l info

 -------------- celery@milano-PC v4.2.2 (windowlicker)
---- **** ----- 
--- * ***  * -- Linux-4.15.0-47-generic-x86_64-with-Ubuntu-18.04-bionic 2019-04-12 17:17:05
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         project:0x7f3b47f66cf8
- ** ---------- .> transport:   redis://localhost:6379//
- ** ---------- .> results:     redis://localhost/
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery


[tasks]
  . project.apps.apis.pipedrive.tasks.backsync_all_stages
  . project.apps.apis.pipedrive.tasks.backsync_lead

正如你在这一行中看到的:

 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

可能只有一个队列。我只想将此队列用于未指定队列的任务。

你知道问题出在哪里吗?

编辑

   (project) milano@milano-PC:~/PycharmProjects/peoject$ celery inspect active_queues
Error: No nodes replied within time constraint.

【问题讨论】:

【参考方案1】:

您需要使用明确命名的队列运行工作程序,然后 django 将能够向该队列提供数据;

celery worker -A project.celery -l info  # Default queue worker
celery worker -A project.celery -l info -Q pipedrivequeue  # pipedrivequeue worker
celery worker -A project.celery -l info -Q testqueue  # testqueue worker

【讨论】:

我不能只运行一个工人(并发在我的情况下是 12)?我知道我可以在一个工作人员中指定多个队列。 它只是部分正确,默认情况下 celery 将适用于所有配置的队列。他的问题是他没有在设置中定义队列(CELERY_QUEUES @Milano) @marxin 好吧,公平点,这与我配置将作业放入单独队列的方式不同。 @marxin 设置CELERY_QUEUES = ( Queue('celery'), Queue('testqueue'), ) 好像没有效果。输出还是一样的。 显然它在最近的版本中已重命名为 CELERY_TASK_QUEUES 顺便说一句,您可以使用逗号运行多个队列,-Q a,b,c

以上是关于芹菜 - 如何使用多个队列?的主要内容,如果未能解决你的问题,请参考以下文章

如何取消芹菜队列上的任务? [复制]

芹菜是如何工作的?

如何在芹菜中将任务从一个队列移动到另一个队列

如何在不同的机器上设置芹菜工人?

检索芹菜队列中的任务列表

如何在不重复的情况下重试芹菜任务 - SQS