如何让 Celery 工作人员使用“外部”RabbitMQ 队列?
Posted
技术标签:
【中文标题】如何让 Celery 工作人员使用“外部”RabbitMQ 队列?【英文标题】:How do I get a Celery worker to consume an 'outside' RabbitMQ queue? 【发布时间】:2019-02-22 13:52:40 【问题描述】:我有以下脚本:
celery_tasks.py
from celery import Celery
app = Celery(broker='amqp://guest:guest@localhost:5672//')
app.conf.task_default_queue = 'test_queue'
@app.task(acks_late=True)
def test(a):
return a
发布.py
from celery_tasks import test
test.delay('abc')
当我运行 publish.py 并启动 worker (celery -A celery_tasks worker --loglevel=DEBUG) 时,'abc' 内容会发布在 'test_queue' 中并被 worker 使用。
有没有办法让工作人员从不是由 Celery 发布的队列中消费一些东西?例如,当我直接通过 RabbitMQ 将某些内容放入 test_queue 中,而不通过 Celery 发布者,并运行 Celery 工作者时,它给了我以下警告:
WARNING/MainProcess] 收到并删除未知消息。目的地错误?!?
邮件正文的完整内容是:body: 'abc' (3b)
content_type:无 content_encoding:无 delivery_info:'exchange': '', 'redelivered': False, 'delivery_tag': 1, 'consumer_tag': 'None2', 'routing_key': 'test_queue' headers=
有没有办法解决这个问题?
【问题讨论】:
你是如何手动将消息发布到rabbitmq的?从文档中查看,看起来 content_type 和 content_encoding 是必需的,并且可能还有其他必填字段。 【参考方案1】:Celery 具有特定的格式和一组需要维护以符合它的标头。因此,您必须对其进行逆向工程以使芹菜兼容的消息不是由芹菜产生的。 请记住,celery 并不是真正用于通过代理发送消息,而是发送任务,这是增强的消息,因此在 amqp 消息的标头部分中有额外内容
【讨论】:
【参考方案2】:这是一个较晚的答案,但自定义消费者可能会对您有所帮助。我用它来消费来自rabbitmq的消息。这些消息是从另一个使用 pika 的应用程序填充的。
http://docs.celeryproject.org/en/latest/userguide/extending.html#custom-message-consumers
【讨论】:
以上是关于如何让 Celery 工作人员使用“外部”RabbitMQ 队列?的主要内容,如果未能解决你的问题,请参考以下文章