python luigi localTarget 泡菜

Posted

技术标签:

【中文标题】python luigi localTarget 泡菜【英文标题】:python luigi localTarget pickle 【发布时间】:2017-11-09 01:59:36 【问题描述】:

我通过 Anaconda 4.3.17、Luigi 2.4.0、Pandas 0.18、sklearn 版本 0.18 在 Windows 7、Python 2.7 上运行。根据下面,我试图让一个 luigi.LocalTarget 输出成为一个泡菜来存储一些不同的对象(使用 firstJob),然后在依赖作业(secondJob)中从该泡菜中读取。如果我从命令行运行以下命令,则 firstJob 成功完成:

"python -m luigi --module luigiPickle firstJob --date 2017-06-07 --local-scheduler"

但是,如果我尝试运行 secondJob,即

"python -m luigi --module luigiPickle secondJob --date 2017-06-07 --local-scheduler"

我明白了

Traceback (most recent call last):
  File "C:\Anaconda2\lib\site-packages\luigi-2.4.0-py2.7.egg\luigi\worker.py", l
ine 191, in run
    new_deps = self._run_get_new_deps()
  File "C:\Anaconda2\lib\site-packages\luigi-2.4.0-py2.7.egg\luigi\worker.py", l
ine 129, in _run_get_new_deps
    task_gen = self.task.run()
  File "luigiPickle.py", line 41, in run
    ret2 = pickle.load(inFile)
  File "C:\Anaconda2\lib\pickle.py", line 1384, in load
    return Unpickler(file).load()
  File "C:\Anaconda2\lib\pickle.py", line 864, in load
    dispatch[key](self)
  File "C:\Anaconda2\lib\pickle.py", line 1096, in load_global
    klass = self.find_class(module, name)
  File "C:\Anaconda2\lib\pickle.py", line 1130, in find_class
    __import__(module)
ImportError: No module named frame

由于无法识别 pandas.DataFrame() 对象(可能是范围问题?),luigi 似乎无法读取 pickle。

import luigi
import pandas as pd
import pickle
from sklearn.linear_model import LinearRegression

class firstJob(luigi.Task):
    date = luigi.DateParameter()

    def requires(self):
        return None

    def output(self):
        return luigi.LocalTarget('%s_first.pickle' % self.date)

    def run(self):
        ret = 
        ret['a'] = pd.DataFrame('a': [1, 2], 'b': [3, 4])
        ret['b'] = pd.DataFrame('a': [3, 4], 'd': [0, 0])
        ret['c'] = LinearRegression()
        outFile = self.output().open('wb')
        pickle.dump(ret, outFile, protocol=pickle.HIGHEST_PROTOCOL)
        outFile.close()

class secondJob(luigi.Task):
    date = luigi.DateParameter()

    def requires(self):
        return firstJob(self.date)

    def output(self):
        return luigi.LocalTarget('%s_second.pickle' % self.date)

    def run(self):
        inFile = self.input().open('rb')
        ret2 = pickle.load(inFile)
        inFile.close()

if __name__ == '__main__':
    luigi.run()

【问题讨论】:

【参考方案1】:

luigi open 命令不适用于二进制的 b 标志 - 它会将其从选项字符串中剥离出来。 (不知道为什么)。最好只使用带有路径属性的标准打开:

open(self.input().path, 'rb')open(self.output().path, 'wb')

【讨论】:

谢谢。我将outFile = self.output().open('wb') 更改为outFile = open(self.output().path, 'w'),将inFile = self.input().open('rb') 更改为inFile=open(self.input().path, 'r')。现在运行 secondJob 会得到File "C:\Anaconda2\lib\pickle.py", line 1384, in load return Unpickler(file).load() File "C:\Anaconda2\lib\pickle.py", line 864, in load dispatch[key](self) File "C:\Anaconda2\lib\pickle.py", line 1175, in load_binput i = ord(self.read(1)) TypeError: ord() expected a character, but string of length 0 found 我认为你实际上需要输出文件中的'b',所以请执行类似 `outFile = open(self.output().path, 'wb') 啊哈,这行得通。谢谢!为了向其他人澄清,将outFile = self.output().open('wb') 替换为outFile = open(self.output().path, 'wb')inFile = self.input().open('rb')inFile=open(self.input().path, 'rb') 解决了这个问题。【参考方案2】:

d6tflow 解决了这个问题,请参阅回答这个问题的example for sklearn model pickle。另外,您不需要编写所有样板代码。

import d6tflow

class firstJob(d6tflow.tasks.TaskPickle):
    def run(self):
        # your code
        self.save(ret)

class secondJob(TaskClass):
    date = luigi.DateParameter()

    def requires(self):
        return firstJob(self.date)

    def run(self):
        inFile = self.input().load()
        # use inFile

d6tflow.run([secondJob])

【讨论】:

以上是关于python luigi localTarget 泡菜的主要内容,如果未能解决你的问题,请参考以下文章

如何从 Python Luigi 登录

luigi框架--关于python运行spark程序

Python Luigi中的事件处理

如何在 Python Luigi 中使用参数

Python Luigi - 满意时继续执行外部任务

Python 任务调度器 Luigi 可以检测到间接依赖吗?