如何让 Celery 工作人员使用“外部”RabbitMQ 队列?

Posted

技术标签:

【中文标题】如何让 Celery 工作人员使用“外部”RabbitMQ 队列?【英文标题】:How do I get a Celery worker to consume an 'outside' RabbitMQ queue? 【发布时间】:2019-02-22 13:52:40 【问题描述】:

我有以下脚本:

celery_tasks.py

from celery import Celery
app = Celery(broker='amqp://guest:guest@localhost:5672//')
app.conf.task_default_queue = 'test_queue'

@app.task(acks_late=True)
def test(a):
   return a

发布.py

from celery_tasks import test
test.delay('abc')

当我运行 publish.py 并启动 worker (celery -A celery_tasks worker --loglevel=DEBUG) 时,'abc' 内容会发布在 'test_queue' 中并被 worker 使用。

有没有办法让工作人员从不是由 Celery 发布的队列中消费一些东西?例如,当我直接通过 RabbitMQ 将某些内容放入 test_queue 中,而不通过 Celery 发布者,并运行 Celery 工作者时,它给了我以下警告:

WARNING/MainProcess] 收到并删除未知消息。目的地错误?!?

邮件正文的完整内容是:body: 'abc' (3b)

content_type:无 content_encoding:无 delivery_info:'exchange': '', 'redelivered': False, 'delivery_tag': 1, 'consumer_tag': 'None2', 'routing_key': 'test_queue' headers=

有没有办法解决这个问题?

【问题讨论】:

你是如何手动将消息发布到rabbitmq的?从文档中查看,看起来 content_type 和 content_encoding 是必需的,并且可能还有其他必填字段。 【参考方案1】:

Celery 具有特定的格式和一组需要维护以符合它的标头。因此,您必须对其进行逆向工程以使芹菜兼容的消息不是由芹菜产生的。 请记住,celery 并不是真正用于通过代理发送消息,而是发送任务,这是增强的消息,因此在 amqp 消息的标头部分中有额外内容

【讨论】:

【参考方案2】:

这是一个较晚的答案,但自定义消费者可能会对您有所帮助。我用它来消费来自rabbitmq的消息。这些消息是从另一个使用 pika 的应用程序填充的。

http://docs.celeryproject.org/en/latest/userguide/extending.html#custom-message-consumers

【讨论】:

以上是关于如何让 Celery 工作人员使用“外部”RabbitMQ 队列?的主要内容,如果未能解决你的问题,请参考以下文章

如何让celery接受定制的参数

无法让 Celery 使用 prod 设置处理数字海洋液滴

使用 Celery 通过 Gevent 进行实时、同步的外部 API 查询

将所有 celery 任务的日志消息发送到单个文件

如何使用 celery 配置不同的工作池?

如何检查 Celery 中的任务状态?