芹菜预取任务卡在其他任务后面

Posted

技术标签:

【中文标题】芹菜预取任务卡在其他任务后面【英文标题】:Celery prefetched tasks stuck behind other tasks 【发布时间】:2021-12-27 10:17:25 【问题描述】:

当集群需要扩展时,我在包含多个 Celery worker 的 ECS 集群上遇到了问题。

一些背景:

我有一个可能要运行几个小时的任务。 ECS 集群上的 Celery 工作线程当前使用 Flower 根据队列深度进行扩展。每当队列深度大于 1 时,它就会扩大工作人员以接收更多任务。 使用的代理是 Redis。 我已经将worker_prefetch_multiplier设置为1,每个worker的并发数等于4。

问题定义: 由于这些设置,每个工作人员在填充队列深度之前预取 4 个任务。所以假设我们有一个工作人员正在运行,它需要在第 9 个任务的队列深度填充到 1 之前调用 8 个任务。 4 个任务将处于 STARTED 状态,4 个任务将处于 RECEIVED 状态。每当将工作节点的数量增加到 2 个时,只会将第 9 个任务发送给该工作节点。但是,这意味着处于 RECEIVED 状态的 4 个任务可能会“卡”在处于 STARTED 状态的 4 个任务之后几个小时,这是不可取的。

研究的解决方案:

在 Celery 的文档 (https://docs.celeryproject.org/en/stable/userguide/optimizing.html) 中搜索解决方案时,禁用预取的唯一方法是对任务使用 acks_late=True。它确实解决了没有预取任务的问题,但它也导致了其他问题,例如在新扩展的工作节点上复制任务,这绝对不是我想要的。 另外,在 worker 上设置 -O fair 也经常被认为是一种解决方案,但似乎它仍然会创建处于 RECEIVED 状态的任务。

目前,我正在考虑一个稍微复杂的解决方案来解决这个问题,所以我很高兴听到其他解决方案。当前提出的解决方案是将并发设置为-c 2(而不是-c 4)。这意味着将在第一个工作节点上预取 2 个任务并启动 2 个任务。所有其他任务将最终进入队列,需要扩展事件。一旦 ECS 扩展到两个工作节点,我会将第一个工作节点的并发量从 2 个扩展到 4 个,以释放预取的任务。

有什么想法/建议吗?

【问题讨论】:

另一个与此问题相关的帖子:github.com/celery/celery/issues/6500 【参考方案1】:

在@samdoolin 的帮助下,我找到了解决这个问题的方法(在这些帖子中:https://github.com/celery/celery/issues/6500)。我会在这里为和我有同样问题的人提供完整的答案。

解决方案: @samdoolin 提供的解决方案是对 Consumer 的 can_consume 功能进行monkeypatch,使其具有仅在保留请求少于工作人员可以处理的情况(工作人员的并发性)时才使用消息的功能。在我的情况下,这意味着如果已经有 4 个请求处于活动状态,它将不会消耗请求。相反,任何请求都会累积在队列中,从而导致预期的行为。然后,我可以根据队列深度轻松扩展容纳单个 worker 的 ECS 容器的数量。

实际上,这看起来像(再次感谢@samdoolin):

class SingleTaskLoader(AppLoader):

    def on_worker_init(self):
        # called when the worker starts, before logging setup
        super().on_worker_init()

        """
        STEP 1:
        monkey patch kombu.transport.virtual.base.QoS.can_consume()
        to prefer to run a delegate function,
        instead of the builtin implementation.
        """

        import kombu.transport.virtual

        builtin_can_consume = kombu.transport.virtual.QoS.can_consume

        def can_consume(self):
            """
            monkey patch for kombu.transport.virtual.QoS.can_consume

            if self.delegate_can_consume exists, run it instead
            """
            if delegate := getattr(self, 'delegate_can_consume', False):
                return delegate()
            else:
                return builtin_can_consume(self)

        kombu.transport.virtual.QoS.can_consume = can_consume

        """
        STEP 2:
        add a bootstep to the celery Consumer blueprint
        to supply the delegate function above.
        """

        from celery import bootsteps
        from celery.worker import state as worker_state

        class Set_QoS_Delegate(bootsteps.StartStopStep):

            requires = 'celery.worker.consumer.tasks:Tasks'

            def start(self, c):

                def can_consume():
                    """
                    delegate for QoS.can_consume

                    only fetch a message from the queue if the worker has
                    no other messages
                    """
                    # note: reserved_requests includes active_requests
                    return len(worker_state.reserved_requests) == 0

                # types...
                # c: celery.worker.consumer.consumer.Consumer
                # c.task_consumer: kombu.messaging.Consumer
                # c.task_consumer.channel: kombu.transport.virtual.Channel
                # c.task_consumer.channel.qos: kombu.transport.virtual.QoS
                c.task_consumer.channel.qos.delegate_can_consume = can_consume

        # add bootstep to Consumer blueprint
        self.app.steps['consumer'].add(Set_QoS_Delegate)


# Create a Celery application as normal with the custom loader and any required **kwargs
celery = Celery(loader=SingleTaskLoader, **kwargs)

然后我们通过下面这行启动worker:

celery -A proj worker -c 4 --prefetch-multiplier -1

请确保不要忘记 --prefetch-multiplier -1 选项,该选项完全禁用获取新请求。这将确保它使用can_consume monkeypatch。

现在,当 Celery 应用启动时,您请求 6 个任务,其中 4 个将按预期执行,2 个将在队列中结束,而不是被预取。这是未实际设置 acks_late=True 的预期行为。

那么我想最后一点。根据 Celery 的文档,在命令行中启动 worker 时也应该可以将路径传递给SingleTaskLoader。像这样:

celery -A proj --loader path.to.SingleTaskLoader worker -c 4 --prefetch-multiplier -1

不幸的是,这对我来说不起作用。但是可以通过实际传递给构造函数来解决。

【讨论】:

以上是关于芹菜预取任务卡在其他任务后面的主要内容,如果未能解决你的问题,请参考以下文章

Python Celery - 如何在其他任务中调用芹菜任务

用芹菜对特定任务设置时间限制

检索芹菜队列中的任务列表

姜戈芹菜。如何在准确的时间运行任务?

芹菜任务结果不与 rpc 保持一致

Celery 收下这捆芹菜!