Celery:构建顺序任务并获得正确的状态

Posted

技术标签:

【中文标题】Celery:构建顺序任务并获得正确的状态【英文标题】:Celery: Structuring sequential tasks and getting the correct status 【发布时间】:2014-10-03 06:48:20 【问题描述】:

使用 Celery 构建一系列顺序任务的最佳方法是什么?我的代码有一堆独立的任务(所以它们都可以是不可变的签名),但是如果其中一个任务抛出异常,我想停止序列。

我一直在寻找解决此问题的方法,但我被困住了。我们正在使用 Celery 3.1.12 + RabbitMQ。

一开始,我们使用一个和弦来表示头部任务成功,以便回调发生。它工作得很好,除了我们需要在标题中添加更多任务。

所以我试着在和弦上做一个链条。这也可行,但弦会因 PENDING 而挂起,因为当子任务引发异常时链不会退出。

一个人为的例子:

@celery.task
def bite(food):
    if food == 'salad':
        raise TypeError('Throwing up. I hate '.format(food))
    print "bite ...".format(food)
    return True

@celery.task
def chew(food):
    print "chewing ...".format(food)
    return True

@celery.task
def swallow(food):
    print "swallowing ...".format(food)
    return True

@celery.task
def chain_in_chord(food):
    return chord(
        chain(
            bite.si(food), chew.si(food)
        ),
        swallow.si(food)
    ).delay()

如果 food=salad,bite 子任务会抛出异常。并且链的其余部分不会发生 - 这就是我想要的。但是整个和弦都卡在了 PENDING 状态,因为链卡在 PENDING 并且不会退出。

>>> res = foo.chain_in_chord('salad')
>>> res.status
'PENDING'

所以我需要:

    找出一种方法来中止链并重新引发异常,如果 链失败 或者,找出一种方法来指定多个子任务 和弦的标题(我似乎做不到)。

在线搜索,chain 显然表现如预期 - 所以你必须遍历 asyncResult 的每个父状态。我更喜欢一种机制,让整个事情中止并重新引发异常/跟踪......就像和弦一样,但可以选择添加多个子任务。

任何反馈都将不胜感激。谢谢。

【问题讨论】:

【参考方案1】:

最近有人指出Celery-tasktree。玩弄它,它看起来很有希望。

上面的任务(咬、咀嚼、吞咽)可以作为一个序列按顺序执行——一个接一个地连接一个任务。也可以用 vanilla Celery 完成,但这里的语法看起来更简洁。

def eat_tree(food):
    tree = TaskTree()
    bite_task = tree.add_task(bite, args=[food])
    chew_task = bite_task.add_task(chew, args=[food])
    swallow_task = chew_task.add_task(swallow, args=[food])
    return tree.apply_async()

结果是:

>>> res = foo.eat_tree('steak')
>>> res
<TaskSetResult: 62c392ce-5ac2-4bd7-89f9-c1e004af1e56 [1b971232-2341-474b-897a-e5caab609eeb]>
>>> res.get()
[True]
>>> dir(res)
['__class__', '__delattr__', '__dict__', '__doc__', '__eq__', '__format__', '__getattribute__', '__getitem__', '__hash__', '__init__', '__iter__', '__len__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_args__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_failed_join_report', 'add', 'app', 'as_tuple', 'backend', 'children', 'clear', 'completed_count', 'delete', 'discard', 'failed', 'forget', 'get', 'id', 'iter_native', 'iterate', 'itersubtasks', 'join', 'join_native', 'maybe_reraise', 'parent', 'ready', 'remove', 'restore', 'results', 'revoke', 'save', 'serializable', 'subtasks', 'successful', 'supports_native_join', 'taskset_id', 'total', 'update', 'waiting']
>>> res.results[0]
<AsyncResult: 1b971232-2341-474b-897a-e5caab609eeb>
>>> res.results[0].status
u'SUCCESS'
>>> res.successful()
True
>>> res.taskset_id
'62c392ce-5ac2-4bd7-89f9-c1e004af1e56'
>>>

如果其中一个子任务要抛出异常,Tasktree 会抛出异常并将树标记为 FAILURE(这是我想要的),而不是 PENDING(就像在常规 Celery 中一样)。

>>> res = foo.eat_tree('salad')
>>> res.get()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/paulchoi/env/isd-python/lib/python2.7/site-packages/celery/result.py", line 574, in get
    interval=interval, callback=callback, no_ack=no_ack)
  File "/Users/paulchoi/env/isd-python/lib/python2.7/site-packages/celery/result.py", line 687, in join_native
    raise value
celery.backends.base.Exception: Throwing up. I hate salad
>>> res.successful()
False
>>> res.results[0].status
u'FAILURE'

>>>

【讨论】:

以上是关于Celery:构建顺序任务并获得正确的状态的主要内容,如果未能解决你的问题,请参考以下文章

从数据库中选择数据并在分组后获得正确的顺序

Python 将任务排入队列并按顺序获取结果

在 Celery 任务中使用 GeoIP 进行地理定位的正确方法

浅谈 Celery 分布式队列

ngRx 状态更新和效果执行顺序

我是不是需要事务才能按顺序获得正确的最后插入 ID?