使用指数回退重试 Celery 任务
Posted
技术标签:
【中文标题】使用指数回退重试 Celery 任务【英文标题】:Retry Celery tasks with exponential back off 【发布时间】:2012-04-01 15:38:46 【问题描述】:对于这样的任务:
from celery.decorators import task
@task()
def add(x, y):
if not x or not y:
raise Exception("test error")
return self.wait_until_server_responds(
如果它引发异常并且我想从守护程序端重试它,如何应用指数退避算法,即在2^2, 2^3,2^4
等秒之后?
也是从服务器端维护重试,这样如果工人碰巧被杀死,那么下一个产生的工人将接受重试任务?
【问题讨论】:
【参考方案1】:task.request.retries
属性包含到目前为止的尝试次数,
所以你可以用它来实现指数退避:
from celery.task import task
@task(bind=True, max_retries=3)
def update_status(self, auth, status):
try:
Twitter(auth).update_status(status)
except Twitter.WhaleFail as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
为防止出现Thundering Herd Problem,您可以考虑在指数退避中添加随机抖动:
import random
self.retry(exc=exc, countdown=int(random.uniform(2, 4) ** self.request.retries))
【讨论】:
你知道这是服务器端重试还是客户端一直等待?如果客户一直等待,那就不好了。 据我所知,倒计时属性为 MQ 后端的任务设置了一个 eta(例如 RabbitMQ)。所以不是在客户端设置的。 客户端不会一直等待,除非您执行result.get()
,这是一个等待结果准备好的明确请求,但还有一个超时参数和一个重试状态,因此您可以检查是否正在重试任务(以及重试的原因是什么)
对于 celery 3.1,您应该使用 @task(bind=True)
并且 celery 会将 self
作为第一个参数传递给函数,因此您可以将 args 更改为 def update_status(self, auth, status):
,然后您可以访问self.retries
感谢@robbyt!只是一个小的修正 - retries
is an attribute of request
,所以 self.request.retries
是正确的调用。【参考方案2】:
从 Celery 4.2 开始,您可以将任务配置为自动使用指数退避:http://docs.celeryproject.org/en/master/userguide/tasks.html#automatic-retry-for-known-exceptions
@app.task(autoretry_for=(Exception,), retry_backoff=2)
def add(x, y):
...
(这已经在 Celery 4.1 的文档中,但实际上当时还没有发布,请参阅 merge request)
【讨论】:
很好,在 4.1.0 中摸不着头脑,为什么我的“retry_backoff”参数不被尊重。 @kororo 它似乎不适用于self.retry
,只有其他异常类型
通过这种方法,您还可以从内置的retry_jitter
(默认为True
)中受益,它可以避免asksol 的回答中提到的Thundering Herd Problem
这是正确答案,因为它是内置的,不需要手动处理倒计时【参考方案3】:
仅供参考,celery 有一个 util 函数可以用 jitter here 计算指数退避时间,所以你不需要自己编写。
【讨论】:
以上是关于使用指数回退重试 Celery 任务的主要内容,如果未能解决你的问题,请参考以下文章
使用 Django-Celery 重试任务 - Django/Celery
重试丢失或失败的任务(Celery、Django 和 RabbitMQ)