从未知任务中检索芹菜中“task_id”的结果

Posted

技术标签:

【中文标题】从未知任务中检索芹菜中“task_id”的结果【英文标题】:Retrieve result from 'task_id' in Celery from unknown task 【发布时间】:2014-08-14 23:26:23 【问题描述】:

如果我以前不知道执行了哪个任务,如何提取任务的结果? 这是设置: 给定以下来源('tasks.py'):

from celery import Celery

app = Celery('tasks', backend="db+mysql://u:p@localhost/db", broker = 'amqp://guest:guest@localhost:5672//')

@app.task
def add(x,y):
   return x + y


@app.task
def mul(x,y):
   return x * y

在本地运行 RabbitMQ 3.3.2:

marcs-mbp:sbin marcstreeter$ ./rabbitmq-server

              RabbitMQ 3.3.2. Copyright (C) 2007-2014 GoPivotal, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
  ######  ##        /usr/local/var/log/rabbitmq/rabbit@localhost-sasl.log
  ##########
              Starting broker... completed with 10 plugins.

在本地运行 Celery 3.1.12:

 -------------- celery@Marcs-MacBook-Pro.local v3.1.12 (Cipater)
---- **** -----
--- * ***  * -- Darwin-13.2.0-x86_64-i386-64bit
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x105dea3d0
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     disabled
- *** --- * --- .> concurrency: 8 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery

然后我可以导入方法并使用“task_id”检索结果:

from tasks import add, mul
from celery.result import AsyncResult

result = add.delay(2,2)
task_id = result.task_id
result.get() # 4

result = AsyncResult(id=task_id)
result.get() # 4

result = add.AsyncResult(id=task_id)
result.get() # 4

# and the same for the 'mul' task. Just imagine I put it here

在下一个示例中,我在进程之间拆分了这些步骤。在一个过程中,我像这样检索“task_id”:

from tasks import add

result = add.delay(5,5)
task_id = result.task_id

如果我在另一个进程中使用相同的“task_id”(复制并粘贴到另一个 REPL,或在不同的 HTTP 请求中),如下所示:

from celery.result import AsyncResult

result = AsyncResult(id="copied_task_id", backend="db+mysql://u:p@localhost/db")
result.get() # AttributeError: 'str' object has no attribute 'get_task_meta'
result.state # AttributeError: 'str' object has no attribute 'get_task_meta'
result.status # AttributeError: 'str' object has no attribute 'get_task_meta'

如果我这样做,在另一个过程中:

from task import add # in this instance I know that an add task was performed

result = add.AsyncResult(id="copied_task_id")
result.status # "SUCCESSFUL"
result.state # "SUCCESSFUL"
result.get() # 10

我希望能够在事先不知道生成结果的任务的情况下获得结果。在我的真实环境中,我计划将此 task_id 返回给客户端,并让他们通过 HTTP 请求查询其作业的状态。

【问题讨论】:

【参考方案1】:

好的-所以我一直在寻找解决方案很长时间,现在我终于正式发布了这个并查看了documentation我找到了this gem:

class celery.result.AsyncResult(id, backend=None, task_name=None, app=None, parent=None)

查询任务状态。

参数

id – 见id。

后端——见backend。

异常 超时错误

超时引发错误。

AsyncResult.app = 无

所以我没有提供后端参数,而是提供了“app”参数,如下所示:

from celery.result import AsyncResult
from task import app

# Assuming add.delay(10,10) was called in another process
# and that I'm using a 'task_id' I retrieved from that process

result = AsyncResult(id='copied_task_id', app=app)
result.state # 'SUCCESSFUL'
result.get() # 20

这对很多人来说可能是显而易见的。这不是我的。现在我只能说这个解决方案“行得通”,但如果我知道这是经过批准的做法,我会感觉更舒服。如果您知道文档中的某个部分使这一点更清楚,请将其发布在 cmets 中或作为答案,如果可以的话,我会选择它作为答案。

【讨论】:

正是我想要的;我同意您的观点,即文档中的内容非常不清楚,因此您的帖子对我有很大帮助。 你可能想在这个调用上设置一个小的超时,因为一些 Celery 'get' 调用可能在很长一段时间内不会返回,以防任务 ID 无效,或者任务不再是经纪人知道的。见***.com/a/10074280/992887 感谢您的修复。它让我拉了两个星期的头发。 许多年后,行为似乎是,如果您像使用 from task import app 一样导入正确配置的 Celery 实例,则会在 AsyncResult 上自动设置。因此,如果不显式设置参数,我可以检索 result.backendresult.app,因此状态也是正确的。【参考方案2】:

万一它对任何人有帮助,事实证明backend 参数并不需要一个字符串,而是一个后端对象:How do I override the backend for celery tasks

对我有用的是:

from celery.backends.rpc import RPCBackend
from myapp.workers.main import app as worker

@worker.task(backend=RPCBackend(app=worker))
def status_check():
    return "OK"

【讨论】:

以上是关于从未知任务中检索芹菜中“task_id”的结果的主要内容,如果未能解决你的问题,请参考以下文章

芹菜 - 无法获取任务结果

芹菜任务结果不与 rpc 保持一致

如何在任务中获取芹菜结果模型(使用 django-celery-results)

如何取消芹菜队列上的任务? [复制]

检索芹菜队列中的任务列表

如何禁用芹菜任务结果记录?