查看 celery 任务是不是存在

Posted

技术标签:

【中文标题】查看 celery 任务是不是存在【英文标题】:Find out whether celery task exists查看 celery 任务是否存在 【发布时间】:2012-04-07 03:01:25 【问题描述】:

是否有可能找出具有特定任务ID的任务是否存在?当我尝试获取状态时,我将始终处于待处理状态。

>>> AsyncResult('...').status
'PENDING'

我想知道给定的任务 id 是否是真正的 celery 任务 id 而不是随机字符串。我想要不同的结果,具体取决于某个 id 是否存在有效任务。

过去可能存在具有相同 id 的有效任务,但结果可能已从后端删除。

【问题讨论】:

【参考方案1】:

Celery 在发送任务时不会写入状态,这部分是一种优化(参见documentation)。

如果你真的需要,添加很简单:

from celery import current_app
# `after_task_publish` is available in celery 3.1+
# for older versions use the deprecated `task_sent` signal
from celery.signals import after_task_publish

# when using celery versions older than 4.0, use body instead of headers

@after_task_publish.connect
def update_sent_state(sender=None, headers=None, **kwargs):
    # the task may not exist if sent using `send_task` which
    # sends tasks by name, so fall back to the default result backend
    # if that is the case.
    task = current_app.tasks.get(sender)
    backend = task.backend if task else current_app.backend
 
    backend.store_result(headers['id'], None, "SENT")

然后您可以测试 PENDING 状态以检测任务没有(看似) 已发送:

>>> result.state != "PENDING"

【讨论】:

值得一提的是,清除队列并不会删除任务元(至少在使用 Redis 作为后端时)。因此,这种方法不能可靠地用于确定任务是否仍然存在。 我是否只需将此代码 sn-p 添加到我现有的任务中?我将它们放在“tasks.py”模块中。此外,当我引用已发送任务的状态时,“result.state”仅适用于“AsyncResult('...').status”,还是? @sleepycal:那么你会推荐使用 RabbitMQ 而不是 Redis 吗? 目前,上述解决方案存在错误。要修复它,请将 body 更改为 headers。这可能会有所帮助docs.celeryproject.org/en/latest/internals/… 同时使用task_track_started 时可能会出现竞争条件。从我的测试来看,如果任务队列为空,它在执行此回调之前立即启动的任务(并且其状态设置为 STARTED),至少在store_result 实际将状态设置为'SENT' 之前。然后状态设置为"SENT""STARTED"信息丢失。 @Jérôme,你是救命稻草,我们复制粘贴了这个,但不明白为什么我们的某些任务永远停留在“SENT”上。在实践中,他们会收到“SUCCESS”的更新,然后是“SENT”。【参考方案2】:

如果任务 ID 未知,AsyncResult.state 返回 PENDING。

待处理

任务正在等待执行或未知。任何不是的任务 ID known 暗示处于待处理状态。

http://docs.celeryproject.org/en/latest/userguide/tasks.html#pending

如果您需要区分未知 ID 和现有 ID,您可以提供自定义任务 ID:

>>> from tasks import add
>>> from celery.utils import uuid
>>> r = add.apply_async(args=[1, 2], task_id="celery-task-id-"+uuid())
>>> id = r.task_id
>>> id
'celery-task-id-b774c3f9-5280-4ebe-a770-14a6977090cd'
>>> if not "blubb".startswith("celery-task-id-"): print "Unknown task id"
... 
Unknown task id
>>> if not id.startswith("celery-task-id-"): print "Unknown task id"
... 

【讨论】:

问题是我只有一个id。每个 id 曾经是一个有效的 id,但有些不再是因为结果已从后端删除。所以我总是有一个以celery-task-id- 开头的 id,但任务仍然可能无效。 在这种情况下,您应该在外部跟踪 id 历史记录。 celery 后端不能保证永远保留所有结果。例如 amqp 后端只能查询一次。 @0x00mh: 问题是有一个任务 id,我怎么知道任务是真的 PENDING 还是已经从后端删除(也许是因为我设置 celery 一段时间后忘记它) ?【参考方案3】:

现在我正在使用以下方案:

    获取任务 ID。 设置为 memcache 键,如 'task_%s' % task.id 消息 'Started'。 将任务 ID 传递给客户端。 现在我可以从客户端监控任务状态(从任务消息设置到内存缓存)。 从准备就绪的任务 - 设置为内存缓存键消息“准备就绪”。 从任务就绪的客户端开始 - 启动特殊任务,该任务将从内存缓存中删除密钥并执行必要的清理操作。

【讨论】:

这就是我想要的方式,但它似乎不是干净的方式。【参考方案4】:

您需要在您创建的 AsyncTask 对象上调用 .get() 以实际从后端获取结果。

请参阅Celery FAQ。


进一步澄清我的回答。

任何字符串在技术上都是有效的 ID,没有办法验证任务 ID。找出任务是否存在的唯一方法是询问后端是否知道它,并且您必须使用.get()

这引入了.get() 在后端没有关于您提供的任务 ID 的任何信息时阻塞的问题,这是设计为允许您启动任务然后等待其完成的问题。

在原始问题的情况下,我将假设 OP 想要获取先前完成的任务的状态。为此,您可以传递一个非常小的超时并捕获超时错误:

from celery.exceptions import TimeoutError
try:
    # fetch the result from the backend
    # your backend must be fast enough to return
    # results within 100ms (0.1 seconds)
    result = AsyncResult('blubb').get(timeout=0.1)
except TimeoutError:
    result = None

if result:
    print "Result exists; state=%s" % (result.state,)
else:
    print "Result does not exist"

不用说,这仅在您的后端存储结果时才有效,如果不是,则无法知道任务 ID 是否有效,因为没有记录它们。


进一步澄清。

使用 AMQP 后端无法完成您想做的事情,因为it does not store results, it forwards them。

我的建议是切换到数据库后端,以便结果在数据库中,您可以在现有 celery 模块之外查询。如果结果数据库中不存在任何任务,则可以假定 ID 无效。

【讨论】:

.get() 将阻塞直到系统收到结果。如果 ID 不存在,这只会锁定应用程序。您可以传递 timeout 参数,但您仍然无法确定任务 ID 是否错误 对,你需要传递一个超时值,并捕获超时错误。这是根据您的后端确定任务 ID 是否“有效”的唯一方法。任何 id 在技术上都是“有效的”,但只有后端知道的 ID 才会真正返回任何数据。 我的任务通常持续大约 30 秒。所以这是没有选择的,对吧? 您想在任务完成之前获取有关任务的信息,但要从创建任务的进程之外的另一个进程中获取。基本上这样你就可以检查是否有东西在运行?对吗? 这是一个有用的答案,因为它澄清了 .get() 在没有 timeout 参数的情况下有时永远不会返回。关于在 Celery 之外存储任务状态的其他答案更正确,因为代理不会永远存储数据。但是,切换到数据库作为代理并不是一个好主意(这样的后端仅用于测试,不支持某些 Celery 功能)。【参考方案5】:

所以我有这个想法:

import project.celery_tasks as tasks

def task_exist(task_id):
  found = False
  # tasks is my imported task module from celery
  # it is located under /project/project, where the settings.py file is located
  i = tasks.app.control.inspect()
  s = i.scheduled()
  for e in s:
    if task_id in s[e]:
      found = True
      break
  a = i.active()
  if not found:
    for e in a:
      if task_id in a[e]:
        found = True
        break
  r = i.reserved()
  if not found:
    for e in r:
      if task_id in r[e]:
        found = True
        break
  # if checking the status returns pending, yet we found it in any queues... it means it exists...
  # if it returns pending, yet we didn't find it on any of the queues... it doesn't exist
  return found

根据https://docs.celeryproject.org/en/stable/userguide/monitoring.html,不同类型的队列检查是: 积极的, 预定的, 预订的, 撤销, 挂号的, 统计数据, 查询任务,

请随意挑选。

而且可能有更好的方法来检查队列中的任务,但现在这对我来说应该有用。

【讨论】:

【参考方案6】:

试试

AsyncResult('blubb').state

这可能有效。

它应该返回不同的东西。

【讨论】:

我想根据任务 ID 是或曾经是真实任务 ID 获得不同的结果。问题是即使我使用像 blubb 这样的假 id,我也会一直处于 PENDING 状态。 .status 是属性 state 的弃用别名【参考方案7】:

如果我错了,请纠正我。

if built_in_status_check(task_id) == 'pending'
   if registry_exists(task_id) == true
      print 'Pending'
   else
      print 'Task does not exist'

【讨论】:

built_in_status_checkregistry_exists 是什么?你将如何实现它? 嗯,我知道有 6 种任务状态(PENDING、STARTED、SUCCESS、FAILURE、RETRY 和 REVOKED)。所以,我想我们可以有一个代码来检查任务是否处于“待处理”状态。如果它处于“PENDING”状态,那么我们可以使用注册表项检查该特定任务是否存在。 不,我知道该状态处于未决状态,但我不知道它处于未决状态的原因。我正在寻找一个聪明的registry_exists

以上是关于查看 celery 任务是不是存在的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Django 中运行和查看 celery 任务?

pythonCelery实现异步任务

django天天生鲜项目--------celery功能

django天天生鲜项目--------celery功能

Celery 周期任务运行一段时间后意外停止

Celery定时任务细讲