重试属于链的 celery 失败的任务

Posted

技术标签:

【中文标题】重试属于链的 celery 失败的任务【英文标题】:Retrying celery failed tasks that are part of a chain 【发布时间】:2012-07-15 12:36:19 【问题描述】:

我有一个运行一些任务的芹菜链。每个任务都可能失败并重试。请参阅下面的示例:

from celery import task

@task(ignore_result=True)
def add(x, y, fail=True):
    try:
        if fail:
            raise Exception('Ugly exception.')
        print '%d + %d = %d' % (x, y, x+y)
    except Exception as e:
        raise add.retry(args=(x, y, False), exc=e, countdown=10)

@task(ignore_result=True)
def mul(x, y):
    print '%d * %d = %d' % (x, y, x*y)

还有链条:

from celery.canvas import chain
chain(add.si(1, 2), mul.si(3, 4)).apply_async()

运行这两个任务(并假设没有失败),你会得到/看到打印:

1 + 2 = 3
3 * 4 = 12

但是,当添加任务第一次失败并在后续的重试调用中成功时,链中的其余任务都不会运行,即添加任务失败,链中的所有其他任务都不会运行并且经过一个几秒钟后,添加任务再次运行并成功,链中的其余任务(在本例中为 mul.si(3, 4))不运行。

celery 是否提供了一种方法来从失败的任务继续失败的链?如果不是,那么完成此任务并确保链的任务按照指定的顺序运行并且仅在前一个任务成功执行之后(即使该任务重试几次)的最佳方法是什么?

注意1:问题可以解决

add.delay(1, 2).get()
mul.delay(3, 4).get()

但我有兴趣了解为什么链不能处理失败的任务。

【问题讨论】:

【参考方案1】:

你发现了一个错误:)

固定在https://github.com/celery/celery/commit/b2b9d922fdaed5571cf685249bdc46f28acacde3 将成为 3.0.4 的一部分。

【讨论】:

嗨,即使使用新版本,这仍然无法正常工作。我在这里遇到了同样的问题,我正在尝试寻找解决方法..如果可以的话,请提供帮助。【参考方案2】:

我也有兴趣了解为什么链不能处理失败的任务。

我挖掘了一些芹菜代码,到目前为止我发现的是:

实现发生在 app.builtins.py

@shared_task
def add_chain_task(app):
    from celery.canvas import chord, group, maybe_subtask
    _app = app

    class Chain(app.Task):
        app = _app
        name = 'celery.chain'
        accept_magic_kwargs = False

        def prepare_steps(self, args, tasks):
            steps = deque(tasks)
            next_step = prev_task = prev_res = None
            tasks, results = [], []
            i = 0
            while steps:
                # First task get partial args from chain.
                task = maybe_subtask(steps.popleft())
                task = task.clone() if i else task.clone(args)
                i += 1
                tid = task.options.get('task_id')
                if tid is None:
                    tid = task.options['task_id'] = uuid()
                res = task.type.AsyncResult(tid)

                # automatically upgrade group(..) | s to chord(group, s)
                if isinstance(task, group):
                    try:
                        next_step = steps.popleft()
                    except IndexError:
                        next_step = None
                if next_step is not None:
                    task = chord(task, body=next_step, task_id=tid)
                if prev_task:
                    # link previous task to this task.
                    prev_task.link(task)
                    # set the results parent attribute.
                    res.parent = prev_res

                results.append(res)
                tasks.append(task)
                prev_task, prev_res = task, res

            return tasks, results

        def apply_async(self, args=(), kwargs=, group_id=None, chord=None,
                task_id=None, **options):
            if self.app.conf.CELERY_ALWAYS_EAGER:
                return self.apply(args, kwargs, **options)
            options.pop('publisher', None)
            tasks, results = self.prepare_steps(args, kwargs['tasks'])
            result = results[-1]
            if group_id:
                tasks[-1].set(group_id=group_id)
            if chord:
                tasks[-1].set(chord=chord)
            if task_id:
                tasks[-1].set(task_id=task_id)
                result = tasks[-1].type.AsyncResult(task_id)
            tasks[0].apply_async()
            return result

        def apply(self, args=(), kwargs=, **options):
            tasks = [maybe_subtask(task).clone() for task in kwargs['tasks']]
            res = prev = None
            for task in tasks:
                res = task.apply((prev.get(), ) if prev else ())
                res.parent, prev = prev, res
            return res
    return Chain

你可以看到最后prepare_stepsprev_task链接到下一个任务。 当 prev_task 失败时,不会调用下一个任务。

我正在测试将上一个任务中的链接错误添加到下一个任务:

if prev_task:
    # link and link_error previous task to this task.
    prev_task.link(task)
    prev_task.link_error(task)
    # set the results parent attribute.
    res.parent = prev_res

但是,下一个任务必须同时处理这两种情况(也许,除非它被配置为不可变,例如不接受更多参数)。

我认为链可以通过允许一些这样的语法来支持它:

c = chain(t1, (t2, t1e), (t3, t2e))

意思是:

t1 linkt2link_errort1e

t2 linkt3link_errort2e

【讨论】:

我决定使用类似链的任务来运行链中的所有任务,但在启动另一个任务之前等待一个任务完成,例如:task1.delay([params]).get(); task2.delay([params]).get(); task3.delay([params]).get()。链式任务可以捕获任何任务引发的异常并自行重试。 所以从你的例子来看,t1e 和 t2e 必须分别调用 t2 和 t3,对吗? 这个例子只是我对链的可能语法的想法。这意味着现在每个下一个任务确实是一对任务,如果上一步没有发生异常/错误,则将调用该对中的第一个元素,第二个元素是上一步失败的异常/错误处理程序。 t1e 意思是 t1 error handler 链是一项新功能,因此我们可能会遇到以前没有人考虑过的事情。我会说这是一个错误:重试任务也应该转发任何链接的任务,我会找到一种方法来修复它以供下一个错误修复版本 现在已修复:github.com/celery/celery/commit/… 将成为 celery 3.0.4 的一部分

以上是关于重试属于链的 celery 失败的任务的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Django-Celery 失败的情况下设置重试任务

python celery 错误重试配置

python celery 错误重试配置

使用 Django-Celery 重试任务 - Django/Celery

使用指数回退重试 Celery 任务

如何在不重复的情况下重试芹菜任务 - SQS