如何使用 Luigi 处理输出

Posted

技术标签:

【中文标题】如何使用 Luigi 处理输出【英文标题】:How to handle output with Luigi 【发布时间】:2017-01-22 13:46:09 【问题描述】:

我正在尝试了解 luigi 的工作原理,我明白了这个想法,但实际实现有点困难;)这就是我所拥有的:

class MyTask(luigi.Task):

    x = luigi.IntParameter()

    def requires(self):
        return OtherTask(self.x)

    def run(self):
        print(self.x)

class OtherTask(luigi.Task):

    x = luigi.IntParameter()

    def run(self):
        y = self.x + 1
        print(y)

这会因RuntimeError: Unfulfilled dependency at run time: OtherTask_3_5862334ee2 而失败。我认为我需要使用def output(self): 生成输出来解决此问题\功能。而且我无法理解如何在不写入文件的情况下产生合理的输出,比如:

def output(self):
    return luigi.LocalTarget('words.txt')

def run(self):

    words = [
            'apple',
            'banana',
            'grapefruit'
            ]

    with self.output().open('w') as f:
        for word in words:
            f.write('word\n'.format(word=word))

我已经尝试阅读文档,但我根本无法理解输出背后的概念。如果我只需要输出到屏幕怎么办。如果我需要将一个对象输出到另一个任务怎么办?谢谢!

【问题讨论】:

这里混杂了一堆问题,但只有一个带有问号。 【参考方案1】:

如果我需要将一个对象输出到另一个任务怎么办?

Luigi 任务可以在不同的进程中运行。因此,如果您想要交换一个结果为一个任务。

与编写需要目标的 output() 方法相反,您还可以重写 complete() 方法,您可以在其中编写任何允许将任务视为完成的自定义逻辑。

【讨论】:

好的,这听起来很公平,你能指点我一些例子吗?我开始怀疑我可以覆盖完整的方法,但我无法找到与该主题相关的任何内容。谢谢! 是的,当事情完成时返回 true 是你需要做的所有事情来覆盖 complete()。 complete() 的基本行为是检查 output() 返回的 Target 是否存在

以上是关于如何使用 Luigi 处理输出的主要内容,如果未能解决你的问题,请参考以下文章

如何使用luigi将输出写入带有orc格式的分区表?

如何从 Python Luigi 登录

让 Airflow 表现得像 Luigi:如果任务的输出只需要获得一次,如何防止任务在 DAG 的未来运行中重新运行?

如何在 Python Luigi 中使用参数

如何使参数可用于所有 Luigi 任务?

如何在 Luigi 中启用动态需求?