在 Luigi 的任务之间传递 Python 对象?

Posted

技术标签:

【中文标题】在 Luigi 的任务之间传递 Python 对象?【英文标题】:Passing Python objects between Tasks in Luigi? 【发布时间】:2017-07-19 19:41:09 【问题描述】:

我正在使用Spotify's Luigi 在 Python 3.6 中编写我的第一个项目,以在管道中安排一些自然语言处理任务。

我注意到Task 类的output() 函数总是返回某种Target 对象,它只是某个地方的某个文件,无论是本地的还是远程的。因为我的任务会产生更复杂的数据结构,比如解析树,所以我将它们作为字符串写入文件并在之后再次读取它们是非常尴尬的。

因此我想问一下是否有可能在管道内的任务之间传递 Python 对象?

【问题讨论】:

【参考方案1】:

简答:没有。

Luigi 参数仅限于日期/日期时间对象、字符串、整数和浮点数。见docs for reference。

这意味着您需要将复杂的数据结构序列化为字符串(使用 json、msgpack、任何您喜欢的序列化器,甚至压缩它)并将其作为字符串参数传递。

当然,你可以编写一个自定义的 Parameter 子类,但基本上你需要实现 serialize and parse methods。

但请注意:如果您使用参数而不是将计算的数据保存到目标,您将失去使用 Luigi 的一个关键优势:如果树中的父任务失败次数超过您指定的重试次数,那么您将需要再次运行计算该复杂数据结构的任务。如果您的任务计算复杂数据或花费大量时间或消耗大量资源,则应将输出保存为目标,以免再次进行所有昂贵的计算。

展望未来:另一个任务可能也需要这些数据,那么为什么不保存它呢?

另外,请注意目标不仅仅是文件:您可以将数据保存到数据库表、Redis、Hadoop、弹性搜索索引等等:http://luigi.readthedocs.io/en/stable/api/luigi.contrib.html#submodules

【讨论】:

非常感谢! A+ 答案。【参考方案2】:

还有其他——仍然有点老套——的方法来实现你想要用目标而不是参数做的事情。

luigi.mock 中有一个特殊的MockFile 目标,可让您将其“文件”存储在内存中。

它的 api 与其他 Target 继承类类似,所以你必须 open, readwrite 来访问它。突然它只支持string 输入,所以你仍然需要序列化你的对象(这是由于通过进程之间的管道发送这些数据)。看下面的例子(yaml 序列化):

import yaml
from luigi import Task

class TaskA(Task):
    def output(self):
        return MockFile('whatever')

    def run(self):
        object_to_send = yaml.dump("example": "dict")

        _out = self.output().open('r')
        _out.write(object_to_send)
        _out.close()


class TaskB(Task):
    def requires(self):
        return TaskA()

    def run(self):
        _in = self.input().read('r')
        serialised = _in.read()
        deserialised = yaml.load(serialised)
        print(deserialised)

请注意,序列化大对象可能需要很长时间。

【讨论】:

以上是关于在 Luigi 的任务之间传递 Python 对象?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Luigi 中启用动态需求?

Python Luigi - 满意时继续执行外部任务

在luigi中使用multiprocessing.Queue

Python Luigi中的事件处理

从 cmd 运行 Luigi 任务 - “没有名为任务的模块”

如何在 Python Luigi 中使用参数