如何判断任务是不是已经在 django-celery 中排队?

Posted

技术标签:

【中文标题】如何判断任务是不是已经在 django-celery 中排队?【英文标题】:How to tell if a task has already been queued in django-celery?如何判断任务是否已经在 django-celery 中排队? 【发布时间】:2011-08-18 19:46:38 【问题描述】:

这是我的设置:

django 1.3 芹菜 2.2.6 django-celery 2.2.4 djkombu 0.9.2

在我的 settings.py 文件中

BROKER_BACKEND = "djkombu.transport.DatabaseTransport"

即我只是使用数据库来排队任务。

现在谈谈我的问题:我有一个用户启动的任务,可能需要几分钟才能完成。我希望每个用户只运行一次任务,并且我会将任务的结果缓存在一个临时文件中,因此如果用户再次启动任务,我只返回缓存的文件。我的视图函数中有如下代码:

task_id = "long-task-%d" % user_id
result = tasks.some_long_task.AsyncResult(task_id)

if result.state == celery.states.PENDING:
    # The next line makes a duplicate task if the user rapidly refreshes the page
    tasks.some_long_task.apply_async(task_id=task_id)
    return HttpResponse("Task started...")
elif result.state == celery.states.STARTED:
    return HttpResponse("Task is still running, please wait...")
elif result.state == celery.states.SUCCESS:
    if cached_file_still_exists():
        return get_cached_file()
    else:
        result.forget()
        tasks.some_long_task.apply_async(task_id=task_id)
        return HttpResponse("Task started...")

这段代码几乎可以工作。但是当用户快速重新加载页面时,我遇到了问题。从任务排队到任务最终从队列中拉出并交给工作人员之间有 1-3 秒的延迟。在此期间,任务的状态保持为 PENDING,这会导致视图逻辑启动重复任务。

我需要的是某种方式来判断任务是否已经提交到队列中,这样我就不会最终提交两次。在芹菜中有这样做的标准方法吗?

【问题讨论】:

kick_off_the_long_task_again() 可以检查以确保任务已移出待处理吗?如果是这样,这可能是一个足够的延迟来防止用户和 celery 之间的竞争条件。 kick_off_the_long_task_again() 不会导致重复任务。我更新了我的示例以显示代码将在何处执行重复任务。 这不是我的问题。 kick_off_the_long_task_again() 可以检查并等待以确保任务在完成之前移出待处理吗? 好吧,当然,但这似乎没有任何作用。 result.forget() 删除结果并将任务放回 PENDING,因此我们已经“知道”状态,除非出现另一个不太可能的竞争条件。在考虑较小的边缘情况之前,我想先解决我最初的问题。 如果看不到Pending状态(因为你一直等到它通过了),那么你的问题就解决了,对吧?还是有其他事情发生? 【参考方案1】:

我不认为(正如 Tomek 和其他人所建议的那样)使用数据库是执行此锁定的方法。 django 有内置的缓存框架,它应该足以完成这种锁定,并且必须更快。见:

http://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html#cookbook-task-serial

Django 可以配置为使用memcached 作为其缓存后端,并且可以分布在多台机器上……这对我来说似乎更好。想法?

【讨论】:

一个漂亮的解决方案,正是我想要的。感谢您的链接!【参考方案2】:

我用 Redis 解决了这个问题。只需在 redis 中为每个任务设置一个键,然后在任务的 after_return 方法中从 redis 中删除该键。 Redis 轻巧且快速。

【讨论】:

【参考方案3】:

您可以通过手动将结果存储在数据库中来作弊。让我解释一下这会有什么帮助。

例如,如果使用 RDBMS(包含列的表 - task_id、state、result):

查看部分:

    使用事务管理。 使用 SELECT FOR UPDATE 获取 task_id == "long-task-%d" % user_id 的行。 SELECT FOR UPDATE 将阻止其他请求,直到这个 COMMIT 或 ROLLBACK。 如果不存在 - 将状态设置为 PENDING 并启动“some_long_task”,结束请求。 如果状态为 PENDING - 通知用户。 如果状态为成功 - 将状态设置为 PENDING,启动任务,返回 'result' 列指向的文件。我基于这样的假设,即您希望在获得结果后重新运行任务。提交 如果状态为 ERROR - 将状态设置为 PENDING,启动任务,通知用户。提交

任务部分:

    准备文件,包裹在try,catch块中。 成功时 - 使用 state = SUCCESS, result 更新正确的行。 失败时 - 使用 state = ERROR 更新正确的行。

【讨论】:

以上是关于如何判断任务是不是已经在 django-celery 中排队?的主要内容,如果未能解决你的问题,请参考以下文章

Java如何判断线程池所有任务是不是执行完毕

Java如何判断线程池所有任务是不是执行完毕

面试突击35:如何判断线程池已经执行完所有任务了?

如何判断我的程序的另一个实例是不是已经在运行?

怎样判断是不是已经安装了

如何判断用户名是不是在数据库中已经存在呢JSP