如何按名称限制运行 Celery 任务的最大数量

Posted

技术标签:

【中文标题】如何按名称限制运行 Celery 任务的最大数量【英文标题】:How to limit the maximum number of running Celery tasks by name 【发布时间】:2016-04-22 05:18:44 【问题描述】:

如何限制可以同时运行的特定 Celery 任务的实例数?

我有一个处理大文件的任务。我遇到了一个问题,用户可能会启动多个任务,导致服务器在尝试一次处理太多文件时耗尽 CPU 和内存。我想确保在任何给定时间只运行此类任务的 N 个实例,并且其他任务将在调度程序中排队等待其他任务完成。

我看到任务装饰器中有一个rate_limit 选项,但我认为这不是我想要的。如果我正确理解文档,这只会限制任务启动的速度,但不会限制正在运行的任务总数,因此这将使我的服务器崩溃得更慢......但它仍然会还是崩溃了。

【问题讨论】:

【参考方案1】:

您必须设置额外的队列并为其设置所需的并发级别。来自Routing Tasks:

# Old config style    
CELERY_ROUTES = 
                'app.tasks.limited_task': 'queue': 'limited_queue'
             

from kombu import Exchange, Queue
celery.conf.task_queues = (
        Queue('default', default_exchange, routing_key='default'),
        Queue('limited_queue', default_exchange, routing_key='limited_queue')
    ) 

并启动额外的工作人员,仅服务 limited_queue:

$ celery -A celery_app worker -Q limited_queue --loglevel=info -c 1 -n limited_queue

然后您可以使用Flower 或inspect 命令检查一切是否顺利运行:

$ celery -A celery_app worker inspect --help

【讨论】:

【参考方案2】:

您可以做的是将这些任务推送到特定队列并让 X 数量的工作人员处理它们。让两个工作人员在一个包含 100 个项目的队列中将确保同时处理两个任务。

【讨论】:

【参考方案3】:

我不确定您是否可以在 Celery 中执行此操作,您可以做的是检查请求到达时当前正在运行的该名称的任务数量,如果超过最大值,则返回错误或添加定期检查的机制如果任务有空位并运行它(如果添加这样的机制,则无需仔细检查,只需在每次请求时将其添加到队列中即可。

为了检查正在运行的任务,您可以使用inspect 命令。

简而言之:

app = Celery(...)
i = app.control.inspect()
i.active()

【讨论】:

以上是关于如何按名称限制运行 Celery 任务的最大数量的主要内容,如果未能解决你的问题,请参考以下文章

Celery:当排队太多时阻止添加更多任务

Celery 为每个任务创建一个新连接

AFNetworking 2:限制并发下载任务的最大数量

如何限制 django 网站的 redis/celery 任务?

Celery Beat:一次限制为单个任务实例

如何限制运行的并行任务数量? [关闭]