如何在 Django-Celery 失败的情况下设置重试任务

Posted

技术标签:

【中文标题】如何在 Django-Celery 失败的情况下设置重试任务【英文标题】:How to set retry tasks in case of failure in Django-Celery 【发布时间】:2022-01-20 06:00:47 【问题描述】:

我正在尝试使用 celery 运行任务。 我需要在用户按下发送按钮时将发布请求发送到远程服务器,所以我尝试在此处使用带有 Redis 的 celery 并在设置文件中使用此配置:

BROKER_URL = os.environ.get("REDIS_URL")
CELERY_RESULT_BACKEND = os.environ.get("REDIS_URL")
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Dubai'

根据apply_async 的文档,我可以定义重试选项,如下面的代码:

__task_expiration = 60
__interval_start = 1 * 60

api_generator.apply_async(args=(*args),
                                group=user_key,
                                expires=__task_expiration,
                                retry=True,
                                retry_policy=
                                  "max_retries": 3,
                                  "interval_start": __interval_start
                                )

在文档中我找到了 apply_async 的定义:

apply_async(args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options)

按照文档,我可以使用 retry 和 retry_policy 进行设置

以及如何定义重试选项的示例代码

add.apply_async((2, 2), retry=True, retry_policy=
    'max_retries': 3,
    'interval_start': 0,
    'interval_step': 0.2,
    'interval_max': 0.2,
)

我希望我的任务运行 3 次以在任何失败的情况下运行,并且每次重试之间的间隔为 60 秒。 我的任务定义如下所示:

@shared_task
def api_generator(*args):
    import requests
    import json
    url = os.environ.get("API_URL_CALL")
    api_access_key = os.environ.get("API_ACCESS_KEY")

    headers = 
        "Authorization": api_access_key,
        "Content-Type": "application/json"
    

    json_schema = generate_json(*args)

    response = requests.request("POST", url, headers=headers, data=json.dumps(json_schema), timeout=30)

    if response.status_code != 200:
        raise NameError("API Response error")

    return response.status_code

但是当我的代码失败时,我在 celery 日志中看不到任何重试机制,这是什么问题?使用 apply_async 方法调用我的任务时如何定义重试?我正在提高 NameError("Exception") 告诉工人发生了错误。

【问题讨论】:

【参考方案1】:

[编辑 1:添加 acks_late]

将任务发送给 Celery 工作人员时可能会出现两件事:

    代理和消息队列的连接问题。 工作人员引发异常。

第一个问题可以通过定义retryretry_policy 来解决。

第二种(也就是你要解决的问题),可以通过在任务失败时调用self.retry()来解决。

根据您的问题类型,设置CELERY_ACKS_LATE = True 可能会有所帮助。

查看这些链接了解更多信息:

Retry Lost or Failed Tasks (Celery, Django and RabbitMQ)

https://coderbook.com/@marcus/how-to-automatically-retry-failed-tasks-with-celery/

【讨论】:

感谢这帮助了很多,你是对的。我尝试使用装饰器在发生异常时设置重试。

以上是关于如何在 Django-Celery 失败的情况下设置重试任务的主要内容,如果未能解决你的问题,请参考以下文章

如何判断任务是不是已经在 django-celery 中排队?

如何在heroku服务器中配置django-celery

django-celery配置

使用 django-celery 时如何创建单个类对象?

如何修改django-celery web界面进行周期性调度

django+celery实现异步任务