芹菜停止执行链

Posted

技术标签:

【中文标题】芹菜停止执行链【英文标题】:Celery stop execution of a chain 【发布时间】:2013-07-01 22:23:35 【问题描述】:

我有一个定期执行的 check_orders 任务。它创建一组任务,以便我可以计算执行任务所花费的时间,并在它们全部完成后执行某些操作(这是 res.join [1] 和 grouped_subs 的目的)分组的任务是成对的链式任务。

我想要的是当第一个任务不满足条件(失败)时不执行链中的第二个任务。我一生都无法弄清楚这一点,我觉得这对于作业队列管理器来说是非常基本的功能。当我尝试我在 [2] 之后注释掉的东西时(引发异常,删除回调)......我们由于某种原因卡在 check_orders 中的 join() 上(它破坏了组)。我已经尝试将所有这些任务的 ignore_result 设置为 False,但它仍然不起作用。

@task(ignore_result=True)
def check_orders():
    # check all the orders and send out appropriate notifications
    grouped_subs = []

    for thingy in things:
       ...

        grouped_subs.append(chain(is_room_open.subtask((args_sub_1, )), 
                        notify.subtask((args_sub_2, ), immutable=True)))

    res = group(grouped_subs).apply_async()

    res.join()         #[1]
    logger.info('Done checking orders at %s' % current_task.request.id))

@task(ignore_result=True)
def is_room_open(args_sub_1):
    #something time consuming
    if http_req_and_parse(args_sub_1):
        # go on and do the notify task
        return True
    else:
        # [2]
        # STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how?
        # None of the following things work:
        # is_room_open.update_state(state='FAILURE')
        # raise celery.exceptions.Ignore()
        # raise Exception('spam', 'eggs')
        # current_task.request.callbacks[:] = []

@task(ignore_result=True)
def notify(args_sub_2):
    # something else time consuming, only do this if the first part of the chain 
    # passed a test (the chained tasks before this were 'successful'
    notify_user(args_sub_2)

【问题讨论】:

对于 Celery 4.0,您可以使用这个答案 - ***.com/a/40579984/7355106 这能回答你的问题吗? Celery: clean way of revoking the entire chain from within a task 【参考方案1】:

首先,如果进入函数,似乎存在异常ignore_result对你没有帮助。

其次,你使用 immutable=True 这意味着下一个函数(在我们的例子中是 notify)不带额外的参数。如果适合您的决定,您当然应该使用notify.subtask((args_sub_2, ), immutable=False)

第三,你可以使用快捷键:

notify.si(args_sub_2) 而不是notify.subtask((args_sub_2, ), immutable=True)

is_room_open.s(args_sub_1) 而不是is_room_open.subtask((args_sub_1, ))

尝试使用它的代码:

@task
def check_orders():
    # check all the orders and send out appropriate notifications
    grouped_subs = []

    for thingy in things:
       ...

        grouped_subs.append(chain(is_room_open.s(args_sub_1), 
                                  notify.s(args_sub_2)))

    res = group(grouped_subs).apply_async()

    res.join()         #[1]
    logger.info('Done checking orders at %s' % current_task.request.id))

@task
def is_room_open(args_sub_1):
    #something time consuming
    if http_req_and_parse(args_sub_1):
        # go on and do the notify task
        return True
    else:
        # [2]
        # STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how?
        # None of the following things work:
        # is_room_open.update_state(state='FAILURE')
        # raise celery.exceptions.Ignore()
        # raise Exception('spam', 'eggs')
        # current_task.request.callbacks[:] = []
        return False

@task
def notify(result, args_sub_2):
    if result:
        # something else time consuming, only do this if the first part of the chain 
        # passed a test (the chained tasks before this were 'successful'
        notify_user(args_sub_2)
        return True
    return False

如果你想捕获异常,你必须像这样使用回调

is_room_open.s(args_sub_1, link_error=log_error.s())

from proj.celery import celery

@celery.task
def log_error(task_id):
    result = celery.AsyncResult(task_id)
    result.get(propagate=False)  # make sure result written.
    with open(os.path.join('/var/errors', task_id), 'a') as fh:
        fh.write('--\n\n%s %s %s' % (
            task_id, result.result, result.traceback))

【讨论】:

感谢您提供有关快捷方式的提示。虽然这可行,但它并不能解决我的问题。如果第一个任务失败,我希望第二个任务永远不会执行。该解决方案仍然具有每次独立于第一个任务的结果启动第二个任务的开销。我想停止执行链。 我理解你。如果任务引发异常,则链的执行将停止。默认情况下它的行为。你不需要为它搜索特殊的决定。 @Alexander,引发异常无法正常工作。 “当我尝试我在 [2] 之后注释掉的事情时(引发异常,删除回调)......我们由于某种原因卡在 check_orders 中的 join() 上(它破坏了组)。”【参考方案2】:

在我看来,这是一个常见的用例,在文档中没有得到足够的关注。

假设您想在中途中止一个链,同时仍将 SUCCESS 作为已完成任务的状态报告,并且不发送任何错误日志或诸如此类的东西(否则您可以只引发异常),那么实现此目的的方法是:

@app.task(bind=True)  # Note that we need bind=True for self to work
def task1(self, other_args):
    #do_stuff
    if end_chain:
        self.request.callbacks = None
        return
    #Other stuff to do if end_chain is False

所以在你的例子中:

@app.task(ignore_result=True, bind=True)
def is_room_open(self, args_sub_1):
    #something time consuming
    if http_req_and_parse(args_sub_1):
        # go on and do the notify task
        return True
    else:
        self.request.callbacks = None

会工作。请注意,您可以使用@abbasov-alexander 所述的快捷方式.si() 而不是ignore_result=Truesubtask()

按照@PhilipGarnero 在 cmets 中的建议,编辑为使用 EAGER 模式。

【讨论】:

如果您在 EAGER 模式下运行任务,上述将停止任务。我用self.request.callbacks = None 替换了self.request.callbacks[:] = [],现在它在这两种情况下都有效。 如果它在这两种情况下都有效,那么让我们建议吧。感谢您发表评论以改进答案:) 显然它不再适用于 Celery 4.0,但 self.request.chain = None 可以。 ***.com/questions/23793928/… self.request.callbacks = None 和 self.request.chain = None 对我不起作用。我通过返回一个布尔值来解决这个问题,指示它是否应该被中止,并在链的后续任务中接受它作为参数。【参考方案3】:

令人难以置信,因为任何官方文档都没有处理如此常见的情况。我不得不处理同样的问题(但是使用 shared_tasksbind 选项,所以我们可以看到 self 对象),所以我编写了一个自定义装饰器来自动处理撤销:

def revoke_chain_authority(a_shared_task):
    """
    @see: https://gist.github.com/bloudermilk/2173940
    @param a_shared_task: a @shared_task(bind=True) celery function.
    @return:
    """
    @wraps(a_shared_task)
    def inner(self, *args, **kwargs):
        try:
            return a_shared_task(self, *args, **kwargs)
        except RevokeChainRequested, e:
            # Drop subsequent tasks in chain (if not EAGER mode)
            if self.request.callbacks:
                self.request.callbacks[:] = []
            return e.return_value

    return inner

您可以按如下方式使用它:

@shared_task(bind=True)
@revoke_chain_authority
def apply_fetching_decision(self, latitude, longitude):
    #...

    if condition:
        raise RevokeChainRequested(False)

查看完整说明here。 希望对您有所帮助!

【讨论】:

谢谢。很好的解决方案。 现在callbacks 变量似乎是一个元组,因此在尝试执行该操作时会返回错误:self.request.callbacks[:] = [] ''' line break ''' TypeError: 'tuple' object does not support item assignment

以上是关于芹菜停止执行链的主要内容,如果未能解决你的问题,请参考以下文章

有没有办法非暴力地停止芹菜工人的特定任务?

如何停止芹菜工人进程

Django 学习之Celery(芹菜)

打破承诺链 - 停止执行下一个“then”

如何从 django 模板暂停和停止 celery 任务

jQuery 学习02——效果:隐藏/显示淡入淡出滑动动画停止动画Callback链