报告长期运行的 Celery 任务的结果

Posted

技术标签:

【中文标题】报告长期运行的 Celery 任务的结果【英文标题】:Reporting yielded results of long-running Celery task 【发布时间】:2013-06-07 18:51:32 【问题描述】:

问题

我已将长时间运行的任务分割成逻辑子任务,因此我可以在每个子任务完成时报告其结果。但是,我正在尝试报告一项实际上永远不会完成的任务的结果(而是在执行过程中产生值),并且我正在努力使用我现有的解决方案来做到这一点。

背景

我正在为我编写的一些 Python 程序构建一个 Web 界面。用户可以通过网络表单提交作业,然后回来查看作业的进度。

假设我有两个函数,每个函数都通过单独的表单访问:

med_func:执行大约需要 1 分钟,结果将传递给 render(),这会产生额外的数据。 long_func:返回一个生成器。每个yield 大约需要 30 分钟,并且应该报告给用户。有这么多产量,我们可以认为这个迭代器是无限的(仅在revoked 时终止)。

代码,当前实现

使用med_func,我报告结果如下:

在提交表单时,我将AsyncResult 保存到Django session:

    task_result = med_func.apply_async([form], link=render.s())
    request.session["task_result"] = task_result

结果页面的 Django 视图访问此AsyncResult。任务完成后,结果将保存到一个对象中,该对象作为上下文传递给 Django 模板。

def results(request):
    """ Serve (possibly incomplete) results of a session's latest run. """
    session = request.session

    try:  # Load most recent task
        task_result = session["task_result"]
    except KeyError:  # Already cleared, or doesn't exist
        if "results" not in session:
            session["status"] = "No job submitted"
    else:  # Extract data from Asynchronous Tasks
        session["status"] = task_result.status
        if task_result.ready():
            session["results"] = task_result.get()
            render_task = task_result.children[0]

            # Decorate with rendering results
            session["render_status"] = render_task.status
            if render_task.ready():
                session["results"].render_output = render_task.get()
                del(request.session["task_result"])  # Don't need any more

    return render_to_response('results.html', request.session)

此解决方案仅在函数实际终止时有效。我无法将long_func 的逻辑子任务链接在一起,因为yields 的数量未知(long_func 的循环的每次迭代可能不会产生结果)。

问题

有没有什么明智的方法可以从一个极其长时间运行的 Celery 任务中访问产生的对象,以便在生成器耗尽之前显示它们?

【问题讨论】:

这个问题含糊不清,cmets 提出了完全不同的问题。 您能否详细说明实际问题“是否有任何明智的方法可以从极其长时间运行的 Celery 任务中渲染产生的对象?”?使用“渲染”,您的意思是您希望以某种方式能够在视图函数中获得未结束任务的结果?这些结果看起来如何(模型实例?)? 公平点,@Brent 和 @Bernhard;我在这里要做的实际上并不是渲染结果本身,而是能够在生成器完成之前访问产生的值(以便它们可以在页面上呈现)。所以,是的,我本质上是在任务终止之前,在视图函数中尽可能多地显示任务的进度。 和@Bernhard,“结果”只是标准的 Python 对象,完全独立于 Celery 和 Django(我可以将该程序用作独立的 Python 应用程序)。 【参考方案1】:

为了让 Celery 知道任务的当前状态是什么,它会在您拥有的任何结果后端设置一些元数据。您可以搭载它来存储其他类型的元数据。

def yielder():
    for i in range(2**100):
        yield i

@task
def report_progress():
    for progress in yielder():
        # set current progress on the task
        report_progress.backend.mark_as_started(
            report_progress.request.id,
            progress=progress)

def view_function(request):
    task_id = request.session['task_id']
    task = AsyncResult(task_id)
    progress = task.info['progress']
    # do something with your current progress

我不会在其中抛出 数据,但它可以很好地跟踪长时间运行的任务的进度。

【讨论】:

我收到以下错误:celery.backends.amqp.BacklogLimitExceeded: 483bb075-a27c-4298-8227-3bbebb13074f。有解决办法吗?【参考方案2】:

保罗的回答很棒。作为使用mark_as_started 的替代方法,您可以使用Taskupdate_state 方法。他们最终会做同样的事情,但名称“update_state”更适合您尝试做的事情。您可以选择定义一个 custom state 来表示您的任务正在进行中(我已将自定义状态命名为“PROGRESS”):

def yielder():
    for i in range(2**100):
        yield i

@task
def report_progress():
    for progress in yielder():
        # set current progress on the task
        report_progress.update_state(state='PROGRESS', meta='progress': progress)

def view_function(request):
    task_id = request.session['task_id']
    task = AsyncResult(task_id)
    progress = task.info['progress']
    # do something with your current progress

【讨论】:

【参考方案3】:

芹菜部分:

def long_func(*args, **kwargs):
    i = 0
    while True:
        yield i
        do_something_here(*args, **kwargs)
        i += 1


@task()
def test_yield_task(task_id=None, **kwargs):
    the_progress = 0        
    for the_progress in long_func(**kwargs):
        cache.set('celery-task-%s' % task_id, the_progress)

Webclient端,启动任务:

r = test_yield_task.apply_async()
request.session['task_id'] = r.task_id

测试最后产生的值:

   v = cache.get('celery-task-%s' % session.get('task_id'))
   if v:
        do_someting()

如果你不喜欢使用缓存,或者不可能,你可以使用 db、file 或任何其他 celery worker 和服务器端都可以访问的地方。使用缓存是最简单的解决方案,但工作人员和服务器必须使用相同的缓存。

【讨论】:

【参考方案4】:

需要考虑的几个选项:

1 -- 任务组。如果您可以从调用时枚举所有子任务,则可以将组作为一个整体应用——它返回一个 TaskSetResult 对象,您可以使用它来监视整个组的结果,或者组中单个任务的结果-- 当你需要检查状态时,根据需要查询。

2 -- 回调。如果您无法枚举所有子任务(或者即使您可以!),您可以定义一个网络挂钩/回调,这是任务的最后一步——在任务的其余部分完成时调用。该钩子将针对您应用中的 URI,该 URI 提取结果并通过 DB 或应用内部 API 使其可用。

这些的一些组合可以解决您的挑战。

【讨论】:

【参考方案5】:

另请参阅来自 Instagram 工程师的这个很棒的 PyCon preso。

http://blogs.vmware.com/vfabric/2013/04/how-instagram-feeds-work-celery-and-rabbitmq.html

在视频标记 16:00,他讨论了他们如何构建长长的子任务列表。

【讨论】:

以上是关于报告长期运行的 Celery 任务的结果的主要内容,如果未能解决你的问题,请参考以下文章

管理 Celery 任务结果

芹菜:访问上次运行任务的时间?

如何在任务中获取芹菜结果模型(使用 django-celery-results)

在 Celery 中通过 id 检索任务结果

防止 Celery Beat 运行相同的任务

celery - 需要优先运行的任务