如何在不重复的情况下重试芹菜任务 - SQS

Posted

技术标签:

【中文标题】如何在不重复的情况下重试芹菜任务 - SQS【英文标题】:How to retry a celery task without duplicating it - SQS 【发布时间】:2016-08-30 14:53:51 【问题描述】:

我有一个 Celery 任务,它从 SQS 队列中获取消息并尝试运行它。如果失败,它应该每 10 秒重试至少 144 次。我认为正在发生的事情是它失败并重新进入队列,同时它创建一个新的,将其复制到 2。这 2 再次失败并遵循相同的模式创建 2 个新的并成为 4 个消息全部的。所以如果我让它运行一段时间,队列就会被阻塞。

我没有得到的是在不重复的情况下重试的正确方法。以下是重试的代码。请看看是否有人可以在这里指导我。

from celery import shared_task
from celery.exceptions import MaxRetriesExceededError


@shared_task
def send_br_update(bgc_id, xref_id, user_id, event):
    from myapp.models.mappings import BGC

    try:
        bgc = BGC.objects.get(pk=bgc_id)
        return bgc.send_br_update(user_id, event)

    except BGC.DoesNotExist:
        pass

    except MaxRetriesExceededError:
        pass

    except Exception as exc:
        # retry every 10 minutes for at least 24 hours
        raise send_br_update.retry(exc=exc, countdown=600, max_retries=144)

更新: 问题的更多解释...

用户在我的数据库中创建了一个对象。其他用户对该对象采取行动,当他们更改该对象的状态时,我的代码会发出信号。然后信号处理程序启动一个 celery 任务,这意味着它连接到所需的 SQS 队列并将消息提交到队列。运行工作程序的 celery 服务器看到该新消息并尝试执行任务。这是它失败的地方,重试逻辑进来了。

根据retry a task 的 celery 文档,我们需要做的就是使用 countdown 和/或 max_retries 引发 self.retry() 调用。如果 celery 任务引发异常,则将其视为失败。我不确定 SQS 如何处理这个问题。我所知道的是一个任务失败,队列中有两个,这两个都失败了,然后队列中有 4 个,依此类推......

【问题讨论】:

任务看起来没问题,请添加运行任务的代码。 这是标准的 celery 任务。由 celery worker 在单独的服务器上运行。在我的例子中,代理是 Amazon SQS。 【参考方案1】:

这不是 celery 也不是 SQS 问题。 真正的问题是工作流,即您将消息发送到 MQ 服务并对其进行处理的方式会导致重复。使用任何其他 MQ 服务都会遇到同样的问题。

想象你的流程

    脚本:读取任务消息。 MQ 消息:锁定 30 秒 脚本:任务失败。 MQ 消息:锁定超时,现在可以再次免费获取消息 脚本:创建另一个任务消息 脚本:重复步骤 1。MQ 消息:2 条消息具有相同的任务,因此步骤 1 将启动 2 任务。

所以如果任务一直失败,它会继续相乘,2,4,8,16,32....

如果 celery 脚本意味着“重新创建失败的任务并发送到消息队列”,您需要确保这些消息只能读取一次。 **您必须在已阅读 1 次后丢弃任务消息,即使任务失败。 **

至少有两种方法可以做到这一点,请选择一种。

    在重新创建任务之前删除消息。或 在 SQS 中,您可以通过创建 DeadLetter Queue、配置 Redrive Policy、将 Maximum Receives 设置为 1 来强制执行此操作。这将确保消息 已读取的任务从不回收。

您可能更喜欢方法 2,因为方法 1 要求您将 celery 配置为“消费”(读取和删除)尽快读取消息,这不是很实用。 (并且您必须确保在为失败的任务创建新消息之前将其删除) 这个死信队列是一种让您检查 celery CRASH 的方法,即已读取一次但未消耗(删除)的消息是否意味着程序在某处停止。

【讨论】:

太棒了。我将在星期一尝试这第一件事。我该如何处理死信队列?我读到一次队列中只能有 一定 条消息。我应该定期清除它吗? @ImranS。 SQS 让您保留消息长达 14 天,不同的消息队列服务处理队列限制的方式不同。对于清除部分,这取决于您的工作流程和要求。【参考方案2】:

这可能有点晚了,我已经为 Celery + SQS 编写了一个退避策略作为补丁。

你可以在这个仓库中看到它是如何实现的

https://github.com/galCohen88/celery_sqs_retry_policy/blob/master/svc/celery.py

【讨论】:

以上是关于如何在不重复的情况下重试芹菜任务 - SQS的主要内容,如果未能解决你的问题,请参考以下文章

如何在不删除或移动mysql中的表的情况下重命名数据库? [复制]

Amazon SQS 如何控制重试次数

我们如何在不丢失数据的情况下重定向到相同的视图

如何在不丢失当前页面的情况下重绘数据表

如何在不破坏 Subversion 历史的情况下重命名 Java 包?

如何在不重新加载 HTML 页面的情况下重定向锚标记链接?