如何检查 Celery 中的任务状态?
Posted
技术标签:
【中文标题】如何检查 Celery 中的任务状态?【英文标题】:How to check task status in Celery? 【发布时间】:2012-02-20 11:44:32 【问题描述】:如何检查一项任务是否在 celery 中运行(具体来说,我使用的是 celery-django)?
我已经阅读了文档,并在 Google 上搜索过,但我看不到类似以下的调用:
my_example_task.state() == RUNNING
我的用例是我有一个用于转码的外部 (java) 服务。当我发送要转码的文档时,我想检查运行该服务的任务是否正在运行,如果没有,则(重新)启动它。
我正在使用当前的稳定版本 - 2.4,我相信。
【问题讨论】:
就我而言,this part 提供了帮助。 【参考方案1】:res = method.delay()
print(f"id=res.id, state=res.state, status=res.status ")
print(res.get())
【讨论】:
请不要只发布代码作为答案,还要解释您的代码的作用以及它如何解决问题的问题。带有解释的答案通常更有帮助、质量更好,并且更有可能吸引投票。【参考方案2】:2020 年的答案:
#### tasks.py
@celery.task()
def mytask(arg1):
print(arg1)
#### blueprint.py
@bp.route("/args/arg1=<arg1>")
def sleeper(arg1):
process = mytask.apply_async(args=(arg1,)) #mytask.delay(arg1)
state = process.state
return f"Thanks for your patience, your job process.task_id \
is being processed. Status state"
【讨论】:
【参考方案3】: 首先,在你的celery APP中:vi my_celery_apps/app1.py
app = Celery(worker_name)
接下来,切换到任务文件,从你的 celery 应用模块中导入应用。
vi 任务/task1.py
from my_celery_apps.app1 import app
app.AsyncResult(taskid)
try:
if task.state.lower() != "success":
return
except:
""" do something """
【讨论】:
【参考方案4】:只需使用来自celery FAQ的这个API
result = app.AsyncResult(task_id)
这很好用。
【讨论】:
【参考方案5】:除了上述程序化方法 使用Flower Task状态一目了然。
使用 Celery Events 进行实时监控。 Flower 是一个基于 Web 的工具,用于监控和管理 Celery 集群。
-
任务进度和历史记录
能够显示任务详细信息(参数、开始时间、运行时间等)
图表和统计数据
官方文档: Flower - Celery monitoring tool
安装:
$ pip install flower
用法:
http://localhost:5555
【讨论】:
【参考方案6】:从任务 id 创建一个AsyncResult
对象是在FAQ 中推荐的方式来获取任务状态,当你唯一拥有的是任务 id 时。
但是,从 Celery 3.x 开始,如果人们不注意它们,有一些重要的警告可能会咬人。这真的取决于具体的用例场景。
默认情况下,Celery 不记录“运行”状态。
为了让 Celery 记录任务正在运行,您必须将 task_track_started
设置为 True
。这是一个测试这个的简单任务:
@app.task(bind=True)
def test(self):
print self.AsyncResult(self.request.id).state
当task_track_started
为False
(默认)时,即使任务已启动,状态显示为PENDING
。如果将task_track_started
设置为True
,则状态将为STARTED
。
PENDING
状态表示“我不知道”。
状态为PENDING
的AsyncResult
并不意味着Celery 不知道任务的状态。这可能是由于多种原因。
一方面,AsyncResult
可以使用无效的任务 ID 构造。这样的“任务”将被 Celery 视为未决:
>>> task.AsyncResult("invalid").status
'PENDING'
好的,所以没有人会将显然无效的 id 提供给AsyncResult
。很公平,但它也有效果,AsyncResult
还将考虑一个已成功运行但 Celery 忘记为 PENDING
的任务。再次,在某些用例中场景 这可能是个问题。部分问题取决于如何配置 Celery 以保留任务结果,因为它取决于结果后端中“墓碑”的可用性。 (“墓碑”是 Celery 文档中用于记录任务如何结束的数据块的术语。)如果 task_ignore_result
是 True
,则使用 AsyncResult
根本不起作用。一个更令人烦恼的问题是 Celery 默认会过期墓碑。 result_expires
默认设置为 24 小时。因此,如果您启动一个任务,并将 id 记录在长期存储中,并在 24 小时后,您使用它创建一个AsyncResult
,状态将为PENDING
。
所有“真正的任务”都以PENDING
状态开始。因此,在一项任务上获得PENDING
可能意味着该任务已被请求但从未比这更进一步(无论出于何种原因)。或者这可能意味着任务运行但 Celery 忘记了它的状态。
哎哟! AsyncResult
对我不起作用。我还能做什么?
我更喜欢跟踪目标,而不是跟踪任务本身。我确实保留了一些任务信息,但它对于跟踪目标确实是次要的。目标存储在独立于 Celery 的存储中。当一个请求需要执行计算取决于某个目标是否已经实现时,它会检查该目标是否已经实现,如果是,则使用此缓存的目标,否则启动将影响该目标的任务,并发送到发出 HTTP 请求的客户端响应指示它应该等待结果。
上面的变量名和超链接是针对 Celery 4.x 的。在 3.x 中对应的变量和超链接为:CELERY_TRACK_STARTED
、CELERY_IGNORE_RESULT
、CELERY_TASK_RESULT_EXPIRES
。
【讨论】:
所以如果我想稍后检查结果(甚至可能在另一个进程中),我最好自己实现?手动将结果存入数据库? 是的,我会将跟踪“目标”与跟踪“任务”分开。我写了“执行取决于某个目标的计算”。通常,“目标”也是一个计算。例如,如果我想向用户展示文章 X,我必须将其从 XML 转换为 html,但在此之前,我必须解析所有参考书目。 (X 就像一篇期刊文章。)我检查目标“解决了所有参考书目的文章 X”是否存在并使用它,而不是尝试检查可以计算出我想要的目标的 Celery 任务的任务状态。跨度> 并且信息“文章 X 已解决所有参考书目”存储在内存缓存中并存储在 eXist-db 数据库中。【参考方案7】:我在
中找到了有用的信息Celery Project Workers Guide inspecting-workers
就我而言,我正在检查 Celery 是否正在运行。
inspect_workers = task.app.control.inspect()
if inspect_workers.registered() is None:
state = 'FAILURE'
else:
state = str(task.state)
您可以使用检查来满足您的需求。
【讨论】:
【参考方案8】:对于简单的任务,我们可以使用http://flower.readthedocs.io/en/latest/screenshots.html 和http://policystat.github.io/jobtastic/ 来做监控。
对于复杂的任务,比如说一个处理很多其他模块的任务。我们建议手动记录特定任务单元的进度和消息。
【讨论】:
【参考方案9】:试试:
task.AsyncResult(task.request.id).state
这将提供 Celery 任务状态。如果 Celery Task 已经处于 FAILURE 状态,它将抛出异常:
raised unexpected: KeyError('exc_type',)
【讨论】:
【参考方案10】:老问题,但我最近遇到了这个问题。
如果你想获取 task_id,你可以这样做:
import celery
from celery_app import add
from celery import uuid
task_id = uuid()
result = add.apply_async((2, 2), task_id=task_id)
现在您确切知道 task_id 是什么,现在可以使用它来获取 AsyncResult:
# grab the AsyncResult
result = celery.result.AsyncResult(task_id)
# print the task id
print result.task_id
09dad9cf-c9fa-4aee-933f-ff54dae39bdf
# print the AsyncResult's status
print result.status
SUCCESS
# print the result returned
print result.result
4
【讨论】:
绝对不需要创建自己的任务 ID 并将其传递给apply_async
。 apply_async
返回的对象是一个AsyncResult
对象,它确实有 Celery 生成的任务的 id。
如果我错了,请纠正我,但是根据某些输入生成 UUID 有时不是很有用,以便所有获得相同输入的调用都获得相同的 UUID? IOW,也许有时指定你的 task_id 很有用。
@dstromberg OP 提出的问题是“我如何检查任务状态”,这里的答案是“如果你试图获取 task_id ...”。检查任务状态和获取task_id
都不需要您自己生成 任务ID。在您的评论中,您想象了一个超越“我如何检查任务状态”和“如果您尝试获取 task_id...”的原因需要,但这里不是这样。(此外,使用 uuid()
生成任务 ID 绝对没有超出 Celery 默认所做的。)
我同意 OP 没有专门询问如何获得可预测的任务 ID,但 OP 问题的答案目前是“跟踪任务 ID 并执行 x”。在我看来,在各种情况下跟踪任务 ID 是不切实际的,因此答案实际上可能并不令人满意。这个答案可以帮助我解决我的用例(如果我可以克服其他指出的限制),原因与@dstromberg 指出的相同——无论它是否出于这个原因。【参考方案11】:
您还可以创建自定义状态并在任务执行期间更新其值。 此示例来自文档:
@app.task(bind=True)
def upload_files(self, filenames):
for i, file in enumerate(filenames):
if not self.request.called_directly:
self.update_state(state='PROGRESS',
meta='current': i, 'total': len(filenames))
http://celery.readthedocs.org/en/latest/userguide/tasks.html#custom-states
【讨论】:
【参考方案12】:每个Task
对象都有一个.request
属性,其中包含AsyncRequest
对象。因此,以下行给出了任务task
的状态:
task.AsyncResult(task.request.id).state
【讨论】:
有没有办法存储任务的进度百分比? 当我这样做时,我得到一个永久的 PENDING AsyncResult,即使我等待了足够长的时间来完成任务。有没有办法让这个看到状态变化?我相信我的后端已配置,我尝试设置 CELERY_TRACK_STARTED=True 无济于事。 @dstromberg 不幸的是,这对我来说已经是 4 年了,所以我无能为力。你几乎肯定需要配置 celery 来跟踪状态。 进一步补充@dstromberg 的观察,为了确认起见,我拿起了一个我知道肯定成功成功的 celery 任务并检查了它的state
属性,它仍然返回PENDING
。这似乎不是从终端跟踪芹菜任务状态的可靠方法。此外,我正在运行 Celery Flower(Celery 监控工具),由于某种原因,它没有在它已执行的任务列表中显示我正在寻找的任务。我可能需要查看 Flower 设置,看看是否有任何内容显示仅在过去的特定小时内显示。【参考方案13】:
返回 task_id(由 .delay() 给出),然后向 celery 实例询问状态:
x = method.delay(1,2)
print x.task_id
询问时,使用此 task_id 获取新的 AsyncResult:
from celery.result import AsyncResult
res = AsyncResult("your-task-id")
res.ready()
【讨论】:
谢谢,但是如果我无法访问x
怎么办?
您在哪里将您的工作排入 celery?在那里你必须返回 task_id 来跟踪未来的工作。
与@Marcin 的不同,此答案不使用静态方法 Task.AsyncResult() 作为 AsyncResult 的工厂,这有助于重用后端配置,否则在尝试获取结果时会引发错误.
@Chris 与@gregor 代码的争议在于async_result
的实例化。在您的用例中,您已经拥有实例,您可以开始了。但是,如果您只有任务 ID,并且需要实例化一个 async_result
实例才能调用 async_result.get()
,会发生什么?这是AsyncResult
类的一个实例,但是你不能使用原始类celery.result.AsyncResult
,你需要从app.task()
包装的函数中获取类。在你的情况下,你会做async_result = run_instance.AsyncResult('task-id')
but you cannot use the raw class celery.result.AsyncResult, you need to get the class from the function wrapped by app.task().
- 我认为这就是它的实际使用方式。阅读代码:github.com/celery/celery/blob/…以上是关于如何检查 Celery 中的任务状态?的主要内容,如果未能解决你的问题,请参考以下文章