Python mrjob mapreduce如何预处理输入文件

Posted

技术标签:

【中文标题】Python mrjob mapreduce如何预处理输入文件【英文标题】:Python mrjob mapreduce how to preprocess the input file 【发布时间】:2016-04-06 04:30:15 【问题描述】:

我正在尝试预处理 XML 文件以在放入 mapreduce 之前提取某些节点。我有以下代码:

from mrjob.compat import jobconf_from_env
from mrjob.job import MRJob
from mrjob.util import cmd_line, bash_wrap

class MRCountLinesByFile(MRJob):
    def configure_options(self):
        super(MRCountLinesByFile, self).configure_options()
        self.add_file_option('--filter')

    def mapper_cmd(self):
        cmd = cmd_line([self.options.filter, jobconf_from_env('mapreduce.map.input.file'])
        return cmd



if __name__ == '__main__':
    MRCountLinesByFile.run()

然后在命令行中输入:

python3 test_job_conf.py --filter ./filter.py -r local < test.txt

test.txt 是一个普通的 XML 文件,例如 here。而filter.py 是一个查找所有标题信息的脚本。

但是,我收到以下错误:

Creating temp directory /tmp/test_job_conf.vagrant.20160406.042648.689625
Running step 1 of 1...
Traceback (most recent call last):
  File "./filter.py", line 8, in <module>
    with open(filename) as f:
FileNotFoundError: [Errno 2] No such file or directory: 'None'
Step 1 of 1 failed: Command '['./filter.py', 'None']' returned non-zero exit status 1

在这种情况下,它看起来像 mapreduce.map.input.file 渲染 None。如何让mapper_cmd 函数读取mrjob 当前正在读取的文件?

【问题讨论】:

【参考方案1】:

根据我的理解,你的 self.add_file_option 应该有你文件的路径。

self.add_file_option('--items', help='Path to u.item')

我不太明白你的情况,但这是我的理解。 您使用配置选项来确保将给定文件发送到所有映射器进行处理,例如,当您想要对源以外的另一个文件中的数据进行辅助查找时。这个辅助查找文件由 self.add_file_option('--items', help='Path to u.item') 提供。

要在 reducer 或 mapper 阶段之前预处理某些内容,您可以使用 reducer_init 或 mapper_init。这些初始化或处理步骤也需要在您的步骤函数中提及,如下所示。

def steps(self):
        return [
            MRStep(mapper=self.mapper_get_name,
                   reducer_init=self.reducer_init,
                   reducer=self.reducer_count_name),
            MRStep(reducer = self.reducer_find_maxname)
        ]

在您的 init 函数中,您在发送到 mapper 或 reducer 之前对您需要做的事情进行实际的预处理。例如,打开一个文件 xyz 并将第一个字段中的值复制到另一个字段中,我将在我的 reducer 中使用并输出相同的值。

def reducer_init(self):
        self.movieNames =     
        with open("xyz") as f:
            for line in f:
                fields = line.split('|')
                self.myNames[fields[0]] = fields[1]

希望这会有所帮助!

【讨论】:

以上是关于Python mrjob mapreduce如何预处理输入文件的主要内容,如果未能解决你的问题,请参考以下文章

MapReduce 实战

2.9 MRJob编写和运行MapReduce

Hadoop学习笔记:使用Mrjob框架编写MapReduce

MRJob在Python中排序

MRJob 极速入门,Python玩转Hadoop你会么?

2.10 MapReduce文件合并