Python / Celery:杀死父任务时如何杀死子任务?

Posted

技术标签:

【中文标题】Python / Celery:杀死父任务时如何杀死子任务?【英文标题】:Python / Celery : how can I kill subtasks when killing a parent task? 【发布时间】:2019-05-19 18:11:49 【问题描述】:

上下文

我创建了一个 Django 应用程序,该应用程序正在调用一个 celery 任务,该任务依次生成其他任务并等待它们完成。

这是工作流程:

1) 主python/django代码在后台启动一个celery任务

2) celery 任务处理一些代码,然后启动一组不同的 celery 任务并等待它们准备好

3) 该组的每个任务然后以相同的方式生成另一组子任务并等待它们完成

它工作得很好(虽然我是一个初学者并且可能实现它很糟糕)但是现在我希望能够终止每个子进程,如果我杀死在开始时开始的主要 celery 任务。

我目前所拥有的

我已经使用产生多个子任务的简单父任务重新创建了这种情况,并且我修改了 celery Task 类的“on_failure”方法以在它失败时杀死它的子任务。

Tasks.py

from celery import Celery, group,Task, result
from celery.signals import task_revoked
import time
from pprint import pprint
application = Celery('tasks',backend='amqp://',broker='amqp://guest@localhost//')


class MyTask(Task):
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print(self.AsyncResult(task_id).children[0].revoke(terminate=True,signal='SIGTERM'))
        print('0!r failed: 1!r'.format(task_id, exc))

@application.task(base=MyTask)
def childTask():
    while True:
        time.sleep(10)
        print("Message de la tache enfant")
        continue

@application.task(base=MyTask)
def parentTask(pra_id = None):
    child_tasks = []
    print("Lancement tache mère")
    child_tasks.append(childTask.s())
    child_tasks.append(childTask.s())
    child_tasks.append(childTask.s())
    tasks = group(child_tasks)
    tasks.apply_async()

    time.sleep(15)
    raise KeyError

main.py

from tasks import parentTask

parent1 = parentTask.delay(pra_id = 10)
parent2 = parentTask.delay(pra_id = 20)

当代码引发错误时,父任务被成功杀死,其子任务也被杀死,这就是我想要的。

我需要什么

我需要能够从我的 django 应用程序中手动终止我的父任务。

这是通过检查 celery worker 并通过搜索它的参数找到我的任务来完成的,这已成功完成,但是,当我找到它后手动撤销 celery 任务时,它不会终止由此产生的子任务任务,这就是我需要的。

到目前为止我所做的尝试

我试图创建一个由“撤销”信号触发的函数

(http://docs.celeryproject.org/en/latest/userguide/signals.html#task-revoked)

当任务被撤销时执行。

捕获信号有效(我能够在撤销任务时执行一些代码)但我无法使用与上述“On_failure”方法相同的代码)来终止子任务。

问题

发送到函数的 Request 对象确实包含我的父任务,但是当它应该包含包含子任务的 GroupResult 对象时,该类的“children”属性为空。

【问题讨论】:

【参考方案1】:

默认情况下,Celery Task 对象有trail = True,这意味着它将存储它的孩子。因此,您将能够使用 request.children 或使用 (Async)Result 的 children 属性来获取它。一旦有了子 task_id 的列表,撤销这些任务就很简单了。

请记住,在某些情况下,即使使用 terminate=True,Celery 也无法撤销任务,因此您实际上可能需要通过调用 revoke(terminate=True, signal='SIGKILL') 向它发送 SIGKILL。这不是 Celery 中的错误,但它或多/少取决于任务的性质以及它的作用......

【讨论】:

【参考方案2】:

不确定这是否对您有帮助,但我发现有些可行的方法是将每个子任务 ID 在创建时存储在 redis 或某个数据库中,并将它们与 pipeline_id 相关联。然后,如果我需要终止父任务,我也可以终止存储在列表中的所有子任务。

result.revoke(terminate=True)

subtask_results = get_subtask_status(pipeline_id) #Custom Function

for subtask_result in subtask_results:
    subtask_result.revoke(terminate=True)

【讨论】:

以上是关于Python / Celery:杀死父任务时如何杀死子任务?的主要内容,如果未能解决你的问题,请参考以下文章

django-vuecelery延迟任务定时任务 django中使用celery 秒杀功能 双写一致性 首页轮播图定时更新 课程前端页面

如何在不杀死未完成的芹菜任务的情况下重新启动heroku应用程序

当父进程死亡时,如何杀死使用 subprocess.check_output() 创建的 python 子进程?

django celery 终止任务的子进程

celery介绍

如何杀死oracle死锁进程