celery使用group或者chord如何实时更新状态进度?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了celery使用group或者chord如何实时更新状态进度?相关的知识,希望对你有一定的参考价值。
参考技术A 在celery中单任务可以使用self.update_state方法来更新进度的,如下:最重要的一个属性就是taskid了,这里可以不写,不写的话默认就是self.request.id自动生成的是当前的id
那么如果是group,chord这样的批量任务产生的多个任务,就有多个任务id,这样就没办法更新了,也没有办法将task id 传到前端来更新进度条了
在搜索这样的解决方案后找到了一个方法.
这里重新继承了chord,并在body中的options字典中,将task_id 放入了, 这样当我们使用这个类作为默认的celery.chord的功能时候就可以获取到这个task id 了
header = [task.s(url = item['href'], page = item['page'], total =self.total, filename =self.filename)for itemin items]
callback = templink.s(1)
task = progress_chord(group(header))(callback) # callback 是一个回调的celery task任务
在task类中,使用self.request.chord['options']['task_id']来得到id
并使用
self.update_state(task_id = task_id, state = state, meta = meta)
来更新
那么思考下,group的操作可能与chord类似
获取使用 celery chord 创建的所有任务的 task_ids
【中文标题】获取使用 celery chord 创建的所有任务的 task_ids【英文标题】:Getting task_ids for all tasks created with celery chord 【发布时间】:2013-09-17 07:27:12 【问题描述】:我的目标是从 django celery chord 调用中检索所有 task_id,以便稍后我可以在需要时撤销任务。但是,我无法找出检索任务 ID 的正确方法。我执行和弦为:
c = chord((loadTask.s(i) for i in range(0, num_lines, CHUNK_SIZE)), finalizeTask.si())
task_result = c.delay()
# get task_ids
我检查了 task_result 的 children 变量,但它是 None。
我可以通过使用组和另一个任务来手动创建和弦语义,如下所示,并检索关联的 task_ids,但我不喜欢中断呼叫。 当此代码作为子任务在任务中运行时,如果在完成任务开始之前撤销组,则可能导致主任务挂起。
g = group((loadTask.s(i) for i in range(0, num_lines, CHUNK_SIZE)))
task_result = g.delay()
storeTaskIds(task_result.children)
task_result.get()
task_result2 = self.finalizeTask.delay()
storeTaskIds([task_result2.task_id])
任何想法将不胜感激!
【问题讨论】:
【参考方案1】:我正在尝试做类似的事情,我希望我可以通过一次调用来撤销和弦,并且其中的所有内容都会被递归地撤销。
您可以在群组和您的 finalizeTask
之间建立和弦,以免打断电话。
我意识到这将在您询问两个月后到来,但也许它会对某人有所帮助,也许我应该获取我小组中所有内容的任务 ID。
【讨论】:
以上是关于celery使用group或者chord如何实时更新状态进度?的主要内容,如果未能解决你的问题,请参考以下文章