如何检查 Celery 中的任务状态?

Posted

技术标签:

【中文标题】如何检查 Celery 中的任务状态?【英文标题】:How to check task status in Celery? 【发布时间】:2012-02-20 11:44:32 【问题描述】:

如何检查一项任务是否在 celery 中运行(具体来说,我使用的是 celery-django)?

我已经阅读了文档,并在 Google 上搜索过,但我看不到类似以下的调用:

my_example_task.state() == RUNNING

我的用例是我有一个用于转码的外部 (java) 服务。当我发送要转码的文档时,我想检查运行该服务的任务是否正在运行,如果没有,则(重新)启动它。

我正在使用当前的稳定版本 - 2.4,我相信。

【问题讨论】:

就我而言,this part 提供了帮助。 【参考方案1】:
res = method.delay()
    
print(f"id=res.id, state=res.state, status=res.status ")

print(res.get())

【讨论】:

请不要只发布代码作为答案,还要解释您的代码的作用以及它如何解决问题的问题。带有解释的答案通常更有帮助、质量更好,并且更有可能吸引投票。【参考方案2】:

2020 年的答案:

#### tasks.py
@celery.task()
def mytask(arg1):
    print(arg1)

#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
    process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
    state = process.state
    return f"Thanks for your patience, your job process.task_id \
             is being processed. Status state"

【讨论】:

【参考方案3】: 首先,在你的celery APP中:

vi my_celery_apps/app1.py

app = Celery(worker_name)
接下来,切换到任务文件,从你的 celery 应用模块中导入应用。

vi 任务/task1.py

from my_celery_apps.app1 import app

app.AsyncResult(taskid)

try:
   if task.state.lower() != "success":
        return
except:
    """ do something """

【讨论】:

【参考方案4】:

只需使用来自celery FAQ的这个API

result = app.AsyncResult(task_id)

这很好用。

【讨论】:

【参考方案5】:

除了上述程序化方法 使用Flower Task状态一目了然。

使用 Celery Events 进行实时监控。 Flower 是一个基于 Web 的工具,用于监控和管理 Celery 集群。

    任务进度和历史记录 能够显示任务详细信息(参数、开始时间、运行时间等) 图表和统计数据

官方文档: Flower - Celery monitoring tool

安装:

$ pip install flower

用法:

http://localhost:5555

【讨论】:

【参考方案6】:

从任务 id 创建一个AsyncResult 对象在FAQ 中推荐的方式来获取任务状态,当你唯一拥有的是任务 id 时。

但是,从 Celery 3.x 开始,如果人们不注意它们,有一些重要的警告可能会咬人。这真的取决于具体的用例场景。

默认情况下,Celery 不记录“运行”状态。

为了让 Celery 记录任务正在运行,您必须将 task_track_started 设置为 True。这是一个测试这个的简单任务:

@app.task(bind=True)
def test(self):
    print self.AsyncResult(self.request.id).state

task_track_startedFalse(默认)时,即使任务已启动,状态显示为PENDING。如果将task_track_started 设置为True,则状态将为STARTED

PENDING 状态表示“我不知道”。

状态为PENDINGAsyncResult 并不意味着Celery 不知道任务的状态。这可能是由于多种原因。

一方面,AsyncResult 可以使用无效的任务 ID 构造。这样的“任务”将被 Celery 视为未决:

>>> task.AsyncResult("invalid").status
'PENDING'

好的,所以没有人会将显然无效的 id 提供给AsyncResult。很公平,但它也有效果,AsyncResult 还将考虑一个已成功运行但 Celery 忘记为 PENDING 的任务。再次,在某些用例中场景 这可能是个问题。部分问题取决于如何配置 Celery 以保留任务结果,因为它取决于结果后端中“墓碑”的可用性。 (“墓碑”是 Celery 文档中用于记录任务如何结束的数据块的术语。)如果 task_ignore_resultTrue,则使用 AsyncResult 根本不起作用。一个更令人烦恼的问题是 Celery 默认会过期墓碑。 result_expires 默认设置为 24 小时。因此,如果您启动一个任务,并将 id 记录在长期存储中,并在 24 小时后,您使用它创建一个AsyncResult,状态将为PENDING

所有“真正的任务”都以PENDING 状态开始。因此,在一项任务上获得PENDING 可能意味着该任务已被请求但从未比这更进一步(无论出于何种原因)。或者这可能意味着任务运行但 Celery 忘记了它的状态。

哎哟! AsyncResult 对我不起作用。我还能做什么?

我更喜欢跟踪目标,而不是跟踪任务本身。我确实保留了一些任务信息,但它对于跟踪目标确实是次要的。目标存储在独立于 Celery 的存储中。当一个请求需要执行计算取决于某个目标是否已经实现时,它会检查该目标是否已经实现,如果是,则使用此缓存的目标,否则启动将影响该目标的任务,并发送到发出 HTTP 请求的客户端响应指示它应该等待结果。


上面的变量名和超链接是针对 Celery 4.x 的。在 3.x 中对应的变量和超链接为:CELERY_TRACK_STARTEDCELERY_IGNORE_RESULTCELERY_TASK_RESULT_EXPIRES

【讨论】:

所以如果我想稍后检查结果(甚至可能在另一个进程中),我最好自己实现?手动将结果存入数据库? 是的,我会将跟踪“目标”与跟踪“任务”分开。我写了“执行取决于某个目标的计算”。通常,“目标”也是一个计算。例如,如果我想向用户展示文章 X,我必须将其从 XML 转换为 html,但在此之前,我必须解析所有参考书目。 (X 就像一篇期刊文章。)我检查目标“解决了所有参考书目的文章 X”是否存在并使用它,而不是尝试检查可以计算出我想要的目标的 Celery 任务的任务状态。跨度> 并且信息“文章 X 已解决所有参考书目”存储在内存缓存中并存储在 eXist-db 数据库中。【参考方案7】:

我在

中找到了有用的信息

Celery Project Workers Guide inspecting-workers

就我而言,我正在检查 Celery 是否正在运行。

inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
    state = 'FAILURE'
else:
    state = str(task.state) 

您可以使用检查来满足您的需求。

【讨论】:

【参考方案8】:

对于简单的任务,我们可以使用http://flower.readthedocs.io/en/latest/screenshots.html 和http://policystat.github.io/jobtastic/ 来做监控。

对于复杂的任务,比如说一个处理很多其他模块的任务。我们建议手动记录特定任务单元的进度和消息。

【讨论】:

【参考方案9】:

试试:

task.AsyncResult(task.request.id).state

这将提供 Celery 任务状态。如果 Celery Task 已经处于 FAILURE 状态,它将抛出异常:

raised unexpected: KeyError('exc_type',)

【讨论】:

【参考方案10】:

老问题,但我最近遇到了这个问题。

如果你想获取 task_id,你可以这样做:

import celery
from celery_app import add
from celery import uuid

task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)

现在您确切知道 task_id 是什么,现在可以使用它来获取 AsyncResult:

# grab the AsyncResult 
result = celery.result.AsyncResult(task_id)

# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf

# print the AsyncResult's status
print result.status
SUCCESS

# print the result returned 
print result.result
4

【讨论】:

绝对不需要创建自己的任务 ID 并将其传递给apply_asyncapply_async 返回的对象是一个AsyncResult 对象,它确实有 Celery 生成的任务的 id。 如果我错了,请纠正我,但是根据某些输入生成 UUID 有时不是很有用,以便所有获得相同输入的调用都获得相同的 UUID? IOW,也许有时指定你的 task_id 很有用。 @dstromberg OP 提出的问题是“我如何检查任务状态”,这里的答案是“如果你试图获取 task_id ...”。检查任务状态和获取task_id 都不需要您自己生成 任务ID。在您的评论中,您想象了一个超越“我如何检查任务状态”和“如果您尝试获取 task_id...”的原因需要,但这里不是这样。(此外,使用 uuid() 生成任务 ID 绝对没有超出 Celery 默认所做的。) 我同意 OP 没有专门询问如何获得可预测的任务 ID,但 OP 问题的答案目前是“跟踪任务 ID 并执行 x”。在我看来,在各种情况下跟踪任务 ID 是不切实际的,因此答案实际上可能并不令人满意。这个答案可以帮助我解决我的用例(如果我可以克服其他指出的限制),原因与@dstromberg 指出的相同——无论它是否出于这个原因。【参考方案11】:

您还可以创建自定义状态并在任务执行期间更新其值。 此示例来自文档:

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta='current': i, 'total': len(filenames))

http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states

【讨论】:

【参考方案12】:

每个Task 对象都有一个.request 属性,其中包含AsyncRequest 对象。因此,以下行给出了任务task 的状态:

task.AsyncResult(task.request.id).state

【讨论】:

有没有办法存储任务的进度百分比? 当我这样做时,我得到一个永久的 PENDING AsyncResult,即使我等待了足够长的时间来完成任务。有没有办法让这个看到状态变化?我相信我的后端已配置,我尝试设置 CELERY_TRACK_STARTED=True 无济于事。 @dstromberg 不幸的是,这对我来说已经是 4 年了,所以我无能为力。你几乎肯定需要配置 celery 来跟踪状态。 进一步补充@dstromberg 的观察,为了确认起见,我拿起了一个我知道肯定成功成功的 celery 任务并检查了它的state 属性,它仍然返回PENDING。这似乎不是从终端跟踪芹菜任务状态的可靠方法。此外,我正在运行 Celery Flower(Celery 监控工具),由于某种原因,它没有在它已执行的任务列表中显示我正在寻找的任务。我可能需要查看 Flower 设置,看看是否有任何内容显示仅在过去的特定小时内显示。【参考方案13】:

返回 task_id(由 .delay() 给出),然后向 celery 实例询问状态:

x = method.delay(1,2)
print x.task_id

询问时,使用此 task_id 获取新的 AsyncResult:

from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()

【讨论】:

谢谢,但是如果我无法访问 x 怎么办? 您在哪里将您的工作排入 celery?在那里你必须返回 task_id 来跟踪未来的工作。 与@Marcin 的不同,此答案不使用静态方法 Task.AsyncResult() 作为 AsyncResult 的工厂,这有助于重用后端配置,否则在尝试获取结果时会引发错误. @Chris 与@gregor 代码的争议在于async_result 的实例化。在您的用例中,您已经拥有实例,您可以开始了。但是,如果您只有任务 ID,并且需要实例化一个 async_result 实例才能调用 async_result.get(),会发生什么?这是AsyncResult类的一个实例,但是你不能使用原始类celery.result.AsyncResult,你需要从app.task()包装的函数中获取类。在你的情况下,你会做async_result = run_instance.AsyncResult('task-id') but you cannot use the raw class celery.result.AsyncResult, you need to get the class from the function wrapped by app.task(). - 我认为这就是它的实际使用方式。阅读代码:github.com/celery/celery/blob/…

以上是关于如何检查 Celery 中的任务状态?的主要内容,如果未能解决你的问题,请参考以下文章

Celery:构建顺序任务并获得正确的状态

如何在任务中获取芹菜结果模型(使用 django-celery-results)

如何获取 celery broker 和后端的状态?

如何按任务名称检查和取消 Celery 任务

查看 celery 任务是不是存在

如何检查处理 Celery 任务的队列