为什么芹菜会给rabbitmq添加数千个队列,这些队列在任务完成后似乎会持续很长时间?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了为什么芹菜会给rabbitmq添加数千个队列,这些队列在任务完成后似乎会持续很长时间?相关的知识,希望对你有一定的参考价值。

我正在使用带有rabbitmq后端的芹菜。在rabbitmq中生成数千个队列,其中包含0或1个项目,如下所示:

$ sudo rabbitmqctl list_queues
Listing queues ...
c2e9b4beefc7468ea7c9005009a57e1d        1
1162a89dd72840b19fbe9151c63a4eaa        0
07638a97896744a190f8131c3ba063de        0
b34f8d6d7402408c92c77ff93cdd7cf8        1
f388839917ff4afa9338ef81c28aad75        0
8b898d0c7c7e4be4aa8007b38ccc00ea        1
3fb4be51aaaa4ac097af535301084b01        1

这似乎是低效的,但我进一步观察到这些队列在处理完成后会持续很长时间。

我发现似乎这样做的任务:

@celery.task(ignore_result=True)
def write_pages(page_generator):  
    g = group(render_page.s(page) for page in page_generator)
    res = g.apply_async()

    for rendered_page in res:
        print rendered_page # TODO: print to file

似乎因为这些任务是在一个组中调用的,所以它们被抛入队列但从未被释放。但是,我显然正在消耗结果(因为我可以看到它们在我遍历res时被打印。所以,我不明白为什么这些任务会持续存在于队列中。

另外,我想知道正在创建的大量队列是否表明我做错了什么。

感谢您的帮助!

答案

具有AMQP后端的Celery将任务逻辑删除(结果)存储在以产生结果的任务ID命名的AMQP队列中。即使在结果耗尽后,这些队列也会持续存在。

一对建议:

另一答案

使用CELERY_TASK_RESULT_EXPIRES(或4.1 CELERY_RESULT_EXPIRES)定期清理任务从rabbitmq中删除旧数据。

http://docs.celeryproject.org/en/master/userguide/configuration.html#std:setting-result_expires

以上是关于为什么芹菜会给rabbitmq添加数千个队列,这些队列在任务完成后似乎会持续很长时间?的主要内容,如果未能解决你的问题,请参考以下文章

Celery + Flower + FastAPI + RabbitMQ ,Python实现异步消息队列和监控

芹菜不处理来自 RabbitMQ 的任务

什么可能会延迟我的芹菜任务?

检索芹菜队列中的任务列表

芹菜是如何工作的?

rabbitmq 最多能多少个queue