Python Luigi中的事件处理

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python Luigi中的事件处理相关的知识,希望对你有一定的参考价值。

我一直在尝试将Luigi集成为我们的工作流处理程序。目前我们正在使用大厅,但是我们尝试做的很多事情都是在大厅里闲逛的麻烦所以我们切换到Luigi作为我们的依赖管理器。到目前为止没有问题,工作流程会触发并正确执行。

当任务因任何原因失败时,问题就出现了。这种情况特别是需要阻止任务,但是所有情况都需要处理。截至目前,Luigi优雅地处理错误并将其写入STDOUT。它仍然会发出并退出代码0,这对于大厅意味着工作通过了。误报。

我一直试图通过事件处理来解决这个问题,但即使是非常简单的工作,我也无法触发它:

@luigi.Task.event_handler(luigi.Event.FAILURE)
def mourn_failure(task, exception):
    with open('/root/luigi', 'a') as f:
        f.write("we got the exception!") #testing in concourse image
    sys.exit(luigi.retcodes.retcode().unhandled_exception)

class Test(luigi.Task):
    def requires(self):
        raise Exception()
        return []

    def run(self):
        pass

    def output(self):
        return []

然后在python shell中运行该命令

luigi.run(main_task_cls=Test, local_scheduler=True)

提出异常,但事件不会触发或触发。文件未写入,退出代码仍为0。

另外,如果它有所不同,我在/etc/luigi/client.cfg中有我的luigi配置,其中包含

[retcode]
already_running=10
missing_data=20
not_run=25
task_failed=30
scheduling_error=35
unhandled_exception=40

我不知道为什么事件处理程序不会触发,但不知怎的,我需要进程在出错时失败。

答案

似乎问题在于您放置“提升异常”调用。如果将它放在requires函数中 - 它基本上在Test task run方法之前运行。所以它不是你的测试任务失败,而是它依赖的任务(现在,空......)。

例如,如果您将加注移动到运行,您的代码将按预期运行。

    def run(self):
       print('start')
       raise Exception()

要处理依赖项失败的情况(在这种情况下,在requires方法中引发异常),您可以添加另一种类型的luigi事件处理程序BROKEN_TASK:luigi.Event.BROKEN_TASK。这将确保luigi代码发出您期望的返回代码(不同于0)。

干杯!

另一答案

如果我理解正确,你只是希望luigi在任务失败时返回错误代码,我遇到了很多问题,但事实证明这很简单,你只需要在命令行上使用luigi运行它,不是用python。像这样:

luigi --module my_module MyTask

我不知道这是不是你的问题,但是我用python运行,然后luigi忽略了luigi.cfg上的retcodes。希望能帮助到你。

以上是关于Python Luigi中的事件处理的主要内容,如果未能解决你的问题,请参考以下文章

Python 任务调度器 Luigi 可以检测到间接依赖吗?

在 Luigi 的任务之间传递 Python 对象?

如何使用 Luigi 处理输出

如何从 Python Luigi 登录

如何使 Luigi 任务生成内存列表作为目标

python luigi localTarget 泡菜