task_revoked 处理程序中的芹菜任务 ID

Posted

技术标签:

【中文标题】task_revoked 处理程序中的芹菜任务 ID【英文标题】:Celery task id in task_revoked handler 【发布时间】:2013-03-16 00:18:00 【问题描述】:

所以我有一个任务,它创建一个工作目录并在那里完成所有工作。 该任务从 server A 调用并在 worker 服务器 上执行。

我需要确保在任务完成/取消后删除工作目录。

我添加了一个任务撤销处理程序,它看起来像这样:

@task
def my_task(value):

    task_id  = current_task.request.id
    work_dir = os.path.join(BASE_WORK_DIR, task_id)
    os.makedirs(work_dir)

    try:
        # Do work...
    finally:
        shutil.rmtree(work_dir)


@task_revoked.connect(sender=my_task)
def my_task_revoked_handler(*args, **kwargs):
    # FIXME: delete work_dir

    print args
    # ()

    print kwargs
    # 'terminated': True, 'signal': <Signal: Signal>, 'expired': False, 'sender': <@task: myapp.core.tasks.my_task>, 'signum': '15'

我的问题是,当 server A 取消任务时,我无法在撤销的处理程序中清理工作目录,因为它没有 task_id。

有没有办法从这个特定的信号处理程序中获取任务 ID? 一些other Signals 有它们,我查看了它们的发布源,由于某种原因,这个 Signal 没有提供 task_id。

提供的sender 任务包含一个trace_task 函数:'__trace__': &lt;function trace_task at 0x3ee8230&gt;,但我不知道如何使用它,因为该函数本身需要一个task_id。

欢迎任何其他想法。

【问题讨论】:

【参考方案1】:

我认为这里发生的情况是您使用的是旧版本的 celery,它不支持第一个“请求”参数。

添加此内容的上游问题是 [1];在此之前,我认为你运气不好,不幸的是没有办法获得task_id。

[1]https://github.com/celery/celery/issues/1555

【讨论】:

迟到总比没有好我猜=)

以上是关于task_revoked 处理程序中的芹菜任务 ID的主要内容,如果未能解决你的问题,请参考以下文章

芹菜任务未处理

检索芹菜队列中的任务列表

带有芹菜的Django:计划任务(ETA)并行执行多次

芹菜:列出所有任务,计划的,活动的*和*完成

芹菜任务设置与视频帧的内存缓存作为python中的循环缓冲区策略

如何在芹菜中将任务从一个队列移动到另一个队列