将 n 个任务添加到 celery 队列并等待结果

Posted

技术标签:

【中文标题】将 n 个任务添加到 celery 队列并等待结果【英文标题】:Add n tasks to celery queue and wait for the results 【发布时间】:2014-12-28 11:49:03 【问题描述】:

我会在 celery 队列中添加几个作业并等待结果。我对如何使用某种类型的共享存储(memcached、redis、数据库等)来实现这一点有很多想法,但我认为这是 Celery 可以自动处理的东西,但我在网上找不到任何资源。

代码示例

def do_tasks(b):
    for a in b:
        c.delay(a)

    return c.all_results_some_how()

【问题讨论】:

【参考方案1】:

对于 Celery >= 3.0,TaskSet 是 deprecated 支持 group。

from celery import group
from tasks import add

job = group([
             add.s(2, 2),
             add.s(4, 4),
             add.s(8, 8),
             add.s(16, 16),
             add.s(32, 32),
])

在后台启动组:

result = job.apply_async()

等待:

result.join()

【讨论】:

关于.s()的信息可以在in the celery.Signature docs, here找到【参考方案2】:

Task.delay 返回AsyncResult。使用AsyncResult.get 获取每个任务的结果。

为此,您需要保留对任务的引用。

def do_tasks(b):
    tasks = []
    for a in b:
        tasks.append(c.delay(a))
    return [t.get() for t in tasks]

或者你可以使用ResultSet:

更新ResultSet 已弃用,请参阅@laffuste 's answer。

def do_tasks(b):
    rs = ResultSet([])
    for a in b:
        rs.add(c.delay(a))
    return rs.get()

【讨论】:

工作就像一个魅力保存ResultSet 需要在其构造函数中的结果列表(或空列表)。我对帖子进行了修改以进行更正。 @Prydie,感谢您的反馈和更正。【参考方案3】:

我有一种预感,你不是真的想要延迟,而是 Celery 的异步功能。

我想你真的想要一个TaskSet:

from celery.task.sets import TaskSet
from someapp.tasks import sometask

def do_tasks(b):
    job = TaskSet([sometask.subtask((a,)) for a in b])
    result = job.apply_async()
    # might want to handle result.successful() == False
    return result.join()

【讨论】:

以上是关于将 n 个任务添加到 celery 队列并等待结果的主要内容,如果未能解决你的问题,请参考以下文章

celery定时任务

如何将定期任务发送到 Celery 中的特定队列

Python 异步任务队列Celery 使用

芹菜任务结果不与 rpc 保持一致

php怎么调用celery任务

Celery分布式任务队列快速入门