芹菜工人在当前任务完成后不会再接新任务

Posted

技术标签:

【中文标题】芹菜工人在当前任务完成后不会再接新任务【英文标题】:Celery worker will not pick up a new task after the current one is finished 【发布时间】:2015-04-29 10:51:53 【问题描述】:

我有三个任务:

@app.task(name='timey')
def timey():
    print "timey"
    while True:
        pass
    return 1

@app.task(name='endtimey')
def endtimey():
    for i in range(10):
        print "ENDTIMEY", time()
        sleep(3)
    return 1

@app.task(name='nexttask')
def nexttask(n):
    print "NEXT TASK"
    return 1

如果我唯一要做的就是将 endtimey 和 nexttask 链接在一起 -

chain(endtimey.s() | nexttask.s()).delay()

一切都按预期进行。我在芹菜日志中看到ENDTIMEY <current time> 打印十次,然后是NEXT TASK。但是,如果我用无限任务 timey 填满 7 个工人,然后将 endtimeynexttask 链接在一起 -

for i in range(7):
    timey.s().delay()
chain(endtimey.s() | nexttask.s()).delay()

所有timey的任务将被8个worker中的7个接走,endtimey会在第8个worker上运行,之后日志会显示nexttask已经收到,但是@987654332 @ 不会运行。

这是为什么?

另外,如果我杀死 celery 服务器然后重新启动它,nexttask 将首先运行。

这是一个人为的例子,但我在一个更复杂的情况下遇到了一个问题,即 celery 工作人员在完成当前任务后没有拿起排队的任务。如果我在这种情况下重新启动 celery,空闲的工作人员将再次开始接任务。

【问题讨论】:

这听起来像是一个错误,你有没有用 celery 提交过错误,或者你知道提交过的错误吗? 我不是 100% 确定这是一个错误。根据 Chris Ward 的回答,问题在于工作人员提前保留任务,因此如果运行无限任务的工作人员之一保留 nexttask.s(),那么它永远不会被拾取。这似乎是一个有意识的设计决定。 【参考方案1】:

听起来问题在于 celery 的默认预取行为。每个工作人员将在当前处于最大容量时提前保留一定数量的任务,这被称为Prefetch Multiplier。

这样做的原因是,当您有大量的短任务时,如果这些任务已经预取并准备好立即执行,那么您的整体吞吐量将会高得多。

问题在于,当您有很多长时间运行的任务或混合了长短任务时,即使其他工作人员可以处理任务,任务也会被忙碌的工作人员保留和阻塞。

因此,在您的情况下,您可能需要将预取乘数降低到 1。

【讨论】:

非常感谢@Chris 拯救了我的一天。理想情况下,celery 应该在文档中非常强调这种行为。

以上是关于芹菜工人在当前任务完成后不会再接新任务的主要内容,如果未能解决你的问题,请参考以下文章

从芹菜任务中获取芹菜工人的名字?

芹菜任务应该在工人迷路时排队

结束芹菜工人的任务、时间限制、工作阶段或客户的指示

Flower UI 不显示芹菜工人和任务

如何检查处理 Celery 任务的队列

芹菜工人的水平尺度导致相同的处理时间