重试丢失或失败的任务(Celery、Django 和 RabbitMQ)

Posted

技术标签:

【中文标题】重试丢失或失败的任务(Celery、Django 和 RabbitMQ)【英文标题】:Retry Lost or Failed Tasks (Celery, Django and RabbitMQ) 【发布时间】:2011-07-17 05:41:37 【问题描述】:

有没有办法确定是否有任何任务丢失并重试?


我认为丢失的原因可能是调度程序错误或工作线程崩溃。

我打算重试它们,但我不确定如何确定哪些任务需要停用?

以及如何使这个过程自动进行?我可以使用自己的自定义调度程序来创建新任务吗?

编辑:我从文档中发现 RabbitMQ 从不丢失任务,但是当工作线程在任务执行过程中崩溃时会发生什么?

【问题讨论】:

【参考方案1】:

你需要的是设置

CELERY_ACKS_LATE = 真

Late ack 表示任务执行完毕后会确认任务消息, 不只是之前,这是默认行为。 这样如果worker崩溃了rabbit MQ还是会有消息的。

显然,在完全崩溃(Rabbit + workers)的同时,没有办法恢复任务,除非您在任务开始和任务结束时实现日志记录。 就我个人而言,我在 mongodb 中每次任务开始时写一行,任务完成时写另一行(独立形成结果),这样我可以通过分析 mongo 日志知道哪个任务被中断。

您可以通过覆盖 celery 基任务类的 __call__after_return 方法轻松地做到这一点。

接下来您会看到我的一段代码,它使用 taskLogger 类作为上下文管理器(带有入口和出口点)。 taskLogger 类只是在 mongodb 实例中写入包含任务信息的行。

def __call__(self, *args, **kwargs):
    """In celery task this function call the run method, here you can
    set some environment variable before the run of the task"""

    #Inizialize context managers    

    self.taskLogger = TaskLogger(args, kwargs)
    self.taskLogger.__enter__()

    return self.run(*args, **kwargs)

def after_return(self, status, retval, task_id, args, kwargs, einfo):
    #exit point for context managers
    self.taskLogger.__exit__(status, retval, task_id, args, kwargs, einfo)

希望对你有帮助

【讨论】:

2 关于CELERY_ACKS_LATE=True 的疑问 [1] 如何(如果有的话)Celery 确保相同的task 不会被多个workers 拾取? [2] 如果Celerytasks should ideally be idempotent,那么problem 是什么,它们运行了多次? (对于第二个问题,实际上here 他们说没关系,但我正在寻找一个明确的肯定) 即使使用 ack_late,broker 也知道消息已被选中,因此它永远不会从其他工作人员处被选中。 在项目中我应该在哪里写上面提到的代码块。有什么想法吗?

以上是关于重试丢失或失败的任务(Celery、Django 和 RabbitMQ)的主要内容,如果未能解决你的问题,请参考以下文章

使用 Django-Celery 重试任务 - Django/Celery

重试属于链的 celery 失败的任务

python celery 错误重试配置

python celery 错误重试配置

Django-Celery 进度条

基础入门_Python-模块和包.深入Celery之常用架构/方案选型/必知必会?