将 n 个任务添加到 celery 队列并等待结果
Posted
技术标签:
【中文标题】将 n 个任务添加到 celery 队列并等待结果【英文标题】:Add n tasks to celery queue and wait for the results 【发布时间】:2014-12-28 11:49:03 【问题描述】:我会在 celery 队列中添加几个作业并等待结果。我对如何使用某种类型的共享存储(memcached、redis、数据库等)来实现这一点有很多想法,但我认为这是 Celery 可以自动处理的东西,但我在网上找不到任何资源。
代码示例
def do_tasks(b):
for a in b:
c.delay(a)
return c.all_results_some_how()
【问题讨论】:
【参考方案1】:对于 Celery >= 3.0,TaskSet 是 deprecated 支持 group。
from celery import group
from tasks import add
job = group([
add.s(2, 2),
add.s(4, 4),
add.s(8, 8),
add.s(16, 16),
add.s(32, 32),
])
在后台启动组:
result = job.apply_async()
等待:
result.join()
【讨论】:
关于.s()
的信息可以在in the celery.Signature
docs, here找到【参考方案2】:
Task.delay
返回AsyncResult
。使用AsyncResult.get
获取每个任务的结果。
为此,您需要保留对任务的引用。
def do_tasks(b):
tasks = []
for a in b:
tasks.append(c.delay(a))
return [t.get() for t in tasks]
或者你可以使用ResultSet
:
更新:ResultSet
已弃用,请参阅@laffuste 's answer。
def do_tasks(b):
rs = ResultSet([])
for a in b:
rs.add(c.delay(a))
return rs.get()
【讨论】:
工作就像一个魅力保存ResultSet
需要在其构造函数中的结果列表(或空列表)。我对帖子进行了修改以进行更正。
@Prydie,感谢您的反馈和更正。【参考方案3】:
我有一种预感,你不是真的想要延迟,而是 Celery 的异步功能。
我想你真的想要一个TaskSet:
from celery.task.sets import TaskSet
from someapp.tasks import sometask
def do_tasks(b):
job = TaskSet([sometask.subtask((a,)) for a in b])
result = job.apply_async()
# might want to handle result.successful() == False
return result.join()
【讨论】:
以上是关于将 n 个任务添加到 celery 队列并等待结果的主要内容,如果未能解决你的问题,请参考以下文章