celery使用group或者chord如何实时更新状态进度?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了celery使用group或者chord如何实时更新状态进度?相关的知识,希望对你有一定的参考价值。

参考技术A 在celery中单任务可以使用self.update_state方法来更新进度的,如下:

最重要的一个属性就是taskid了,这里可以不写,不写的话默认就是self.request.id自动生成的是当前的id

那么如果是group,chord这样的批量任务产生的多个任务,就有多个任务id,这样就没办法更新了,也没有办法将task id 传到前端来更新进度条了

在搜索这样的解决方案后找到了一个方法.

这里重新继承了chord,并在body中的options字典中,将task_id 放入了, 这样当我们使用这个类作为默认的celery.chord的功能时候就可以获取到这个task id 了

header = [task.s(url = item['href'], page = item['page'], total =self.total, filename =self.filename)for itemin items]

callback = templink.s(1)

task = progress_chord(group(header))(callback) # callback 是一个回调的celery task任务

在task类中,使用self.request.chord['options']['task_id']来得到id

并使用

self.update_state(task_id = task_id, state = state, meta = meta)

来更新

那么思考下,group的操作可能与chord类似

获取使用 celery chord 创建的所有任务的 task_ids

【中文标题】获取使用 celery chord 创建的所有任务的 task_ids【英文标题】:Getting task_ids for all tasks created with celery chord 【发布时间】:2013-09-17 07:27:12 【问题描述】:

我的目标是从 django celery chord 调用中检索所有 task_id,以便稍后我可以在需要时撤销任务。但是,我无法找出检索任务 ID 的正确方法。我执行和弦为:

c = chord((loadTask.s(i) for i in range(0, num_lines, CHUNK_SIZE)), finalizeTask.si())
task_result = c.delay()
# get task_ids

我检查了 task_result 的 children 变量,但它是 None。

我可以通过使用组和另一个任务来手动创建和弦语义,如下所示,并检索关联的 task_ids,但我不喜欢中断呼叫。 当此代码作为子任务在任务中运行时,如果在完成任务开始之前撤销组,则可能导致主任务挂起

g = group((loadTask.s(i) for i in range(0, num_lines, CHUNK_SIZE))) 
task_result = g.delay()
storeTaskIds(task_result.children)
task_result.get()

task_result2 = self.finalizeTask.delay()
storeTaskIds([task_result2.task_id])

任何想法将不胜感激!

【问题讨论】:

【参考方案1】:

我正在尝试做类似的事情,我希望我可以通过一次调用来撤销和弦,并且其中的所有内容都会被递归地撤销。

您可以在群组和您的 finalizeTask 之间建立和弦,以免打断电话。

我意识到这将在您询问两个月后到来,但也许它会对某人有所帮助,也许我应该获取我小组中所有内容的任务 ID。

【讨论】:

以上是关于celery使用group或者chord如何实时更新状态进度?的主要内容,如果未能解决你的问题,请参考以下文章

Django Celery Chord 不执行

Celery 使用简介

Celery基本使用

Celery的使用

使用 Celery 通过 Gevent 进行实时、同步的外部 API 查询

celery 任务的实时进度跟踪