芹菜预取任务卡在其他任务后面
Posted
技术标签:
【中文标题】芹菜预取任务卡在其他任务后面【英文标题】:Celery prefetched tasks stuck behind other tasks 【发布时间】:2021-12-27 10:17:25 【问题描述】:当集群需要扩展时,我在包含多个 Celery worker 的 ECS 集群上遇到了问题。
一些背景:
我有一个可能要运行几个小时的任务。 ECS 集群上的 Celery 工作线程当前使用 Flower 根据队列深度进行扩展。每当队列深度大于 1 时,它就会扩大工作人员以接收更多任务。 使用的代理是 Redis。 我已经将worker_prefetch_multiplier
设置为1,每个worker的并发数等于4。
问题定义: 由于这些设置,每个工作人员在填充队列深度之前预取 4 个任务。所以假设我们有一个工作人员正在运行,它需要在第 9 个任务的队列深度填充到 1 之前调用 8 个任务。 4 个任务将处于 STARTED 状态,4 个任务将处于 RECEIVED 状态。每当将工作节点的数量增加到 2 个时,只会将第 9 个任务发送给该工作节点。但是,这意味着处于 RECEIVED 状态的 4 个任务可能会“卡”在处于 STARTED 状态的 4 个任务之后几个小时,这是不可取的。
研究的解决方案:
在 Celery 的文档 (https://docs.celeryproject.org/en/stable/userguide/optimizing.html) 中搜索解决方案时,禁用预取的唯一方法是对任务使用acks_late=True
。它确实解决了没有预取任务的问题,但它也导致了其他问题,例如在新扩展的工作节点上复制任务,这绝对不是我想要的。
另外,在 worker 上设置 -O fair
也经常被认为是一种解决方案,但似乎它仍然会创建处于 RECEIVED 状态的任务。
目前,我正在考虑一个稍微复杂的解决方案来解决这个问题,所以我很高兴听到其他解决方案。当前提出的解决方案是将并发设置为-c 2
(而不是-c 4
)。这意味着将在第一个工作节点上预取 2 个任务并启动 2 个任务。所有其他任务将最终进入队列,需要扩展事件。一旦 ECS 扩展到两个工作节点,我会将第一个工作节点的并发量从 2 个扩展到 4 个,以释放预取的任务。
有什么想法/建议吗?
【问题讨论】:
另一个与此问题相关的帖子:github.com/celery/celery/issues/6500 【参考方案1】:在@samdoolin 的帮助下,我找到了解决这个问题的方法(在这些帖子中:https://github.com/celery/celery/issues/6500)。我会在这里为和我有同样问题的人提供完整的答案。
解决方案:
@samdoolin 提供的解决方案是对 Consumer 的 can_consume
功能进行monkeypatch,使其具有仅在保留请求少于工作人员可以处理的情况(工作人员的并发性)时才使用消息的功能。在我的情况下,这意味着如果已经有 4 个请求处于活动状态,它将不会消耗请求。相反,任何请求都会累积在队列中,从而导致预期的行为。然后,我可以根据队列深度轻松扩展容纳单个 worker 的 ECS 容器的数量。
实际上,这看起来像(再次感谢@samdoolin):
class SingleTaskLoader(AppLoader):
def on_worker_init(self):
# called when the worker starts, before logging setup
super().on_worker_init()
"""
STEP 1:
monkey patch kombu.transport.virtual.base.QoS.can_consume()
to prefer to run a delegate function,
instead of the builtin implementation.
"""
import kombu.transport.virtual
builtin_can_consume = kombu.transport.virtual.QoS.can_consume
def can_consume(self):
"""
monkey patch for kombu.transport.virtual.QoS.can_consume
if self.delegate_can_consume exists, run it instead
"""
if delegate := getattr(self, 'delegate_can_consume', False):
return delegate()
else:
return builtin_can_consume(self)
kombu.transport.virtual.QoS.can_consume = can_consume
"""
STEP 2:
add a bootstep to the celery Consumer blueprint
to supply the delegate function above.
"""
from celery import bootsteps
from celery.worker import state as worker_state
class Set_QoS_Delegate(bootsteps.StartStopStep):
requires = 'celery.worker.consumer.tasks:Tasks'
def start(self, c):
def can_consume():
"""
delegate for QoS.can_consume
only fetch a message from the queue if the worker has
no other messages
"""
# note: reserved_requests includes active_requests
return len(worker_state.reserved_requests) == 0
# types...
# c: celery.worker.consumer.consumer.Consumer
# c.task_consumer: kombu.messaging.Consumer
# c.task_consumer.channel: kombu.transport.virtual.Channel
# c.task_consumer.channel.qos: kombu.transport.virtual.QoS
c.task_consumer.channel.qos.delegate_can_consume = can_consume
# add bootstep to Consumer blueprint
self.app.steps['consumer'].add(Set_QoS_Delegate)
# Create a Celery application as normal with the custom loader and any required **kwargs
celery = Celery(loader=SingleTaskLoader, **kwargs)
然后我们通过下面这行启动worker:
celery -A proj worker -c 4 --prefetch-multiplier -1
请确保不要忘记 --prefetch-multiplier -1
选项,该选项完全禁用获取新请求。这将确保它使用can_consume
monkeypatch。
现在,当 Celery 应用启动时,您请求 6 个任务,其中 4 个将按预期执行,2 个将在队列中结束,而不是被预取。这是未实际设置 acks_late=True
的预期行为。
那么我想最后一点。根据 Celery 的文档,在命令行中启动 worker 时也应该可以将路径传递给SingleTaskLoader
。像这样:
celery -A proj --loader path.to.SingleTaskLoader worker -c 4 --prefetch-multiplier -1
不幸的是,这对我来说不起作用。但是可以通过实际传递给构造函数来解决。
【讨论】:
以上是关于芹菜预取任务卡在其他任务后面的主要内容,如果未能解决你的问题,请参考以下文章