在减少阶段后合并输出文件

Posted

技术标签:

【中文标题】在减少阶段后合并输出文件【英文标题】:merge output files after reduce phase 【发布时间】:2011-08-07 16:32:35 【问题描述】:

在 mapreduce 中,每个 reduce 任务将其输出写入名为 part-r-nnnnn 的文件,其中 nnnnn 是与 reduce 任务关联的分区 ID。 map/reduce 是否合并这些文件?如果是,怎么做?

【问题讨论】:

【参考方案1】:

您可以委托reduce输出文件的整个合并,而不是自己进行文件合并:

hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt

注意 这会在本地组合 HDFS 文件。运行前确保有足够的磁盘空间

【讨论】:

有没有办法做到这一点,但在 dfs 上?我的意思是我想将它们合并到 dfs 上的单个文件中? 它似乎不适用于 dfs,合并后的文件被写入本地文件系统。当然你也可以直接写回去,但是好像很浪费。 注意:这对于非文本文件是不安全的。 getMerge 对文件进行简单的连接,使用 SequenceFile 之类的文件不会给出合理的输出。 这不适用于 HDFS 作为预期的目标。 getmerge 将数据从 hdfs 带到本地。【参考方案2】:

。 map/reduce 是否合并这些文件?

没有。它不合并。

您可以使用IdentityReducer 来实现您的目标。

不执行归约,将所有输入值直接写入输出。

public void reduce(K key,
                   Iterator<V> values,
                   OutputCollector<K,V> output,
                   Reporter reporter)
            throws IOException

将所有键和值直接写入输出。

查看相关的 SE 帖子:

hadoop: difference between 0 reducer and identity reducer?

【讨论】:

【参考方案3】:

如果文件有标题,你可以通过这样做摆脱它:

hadoop fs -cat /path/to/hdfs/job-output/part-* | grep -v "header" > output.csv

然后手动为 output.csv 添加标头

【讨论】:

【参考方案4】:

除了我之前的答案之外,我还有一个几分钟前尝试过的答案。 您可以使用 CustomOutputFormat,它看起来像下面给出的代码

public class VictorOutputFormat extends FileOutputFormat<StudentKey,PassValue> 

    @Override
    public RecordWriter<StudentKey,PassValue> getRecordWriter(
            TaskAttemptContext tac) throws IOException, InterruptedException 
        //step 1: GET THE CURRENT PATH
        Path currPath=FileOutputFormat.getOutputPath(tac);

        //Create the full path
        Path fullPath=new Path(currPath,"Aniruddha.txt");

        //create the file in the file system
        FileSystem fs=currPath.getFileSystem(tac.getConfiguration());
        FSDataOutputStream fileOut=fs.create(fullPath,tac);
        return new VictorRecordWriter(fileOut);
    


只是,看看倒数第四行。我使用了我自己的名字作为输出文件名,并且我已经用 15 个 reducer 测试了这个程序。文件仍然保持不变。所以得到一个单独的输出文件而不是两个或更多是可能的但很清楚输出文件的大小不能超过主内存的大小,即输出文件必须适合商用机器的内存,否则可能会有输出文件拆分问题。 谢谢!!

【讨论】:

getmerge 可以解决您的目的,但这是另一种选择。但这很有用【参考方案5】:

part-r-nnnnn 文件是在中间由 'r' 指定的缩减阶段之后生成的。现在的事实是,如果你有一个 reducer 正在运行,你将有一个像 part-r-00000 这样的输出文件。如果减速器的数量为 2,那么您将拥有 part-r-00000 和 part-r-00001 等等。看,如果输出文件太大而无法放入机器内存,因为 hadoop 框架被设计为在 Commodity Machines 上运行,那么文件就会被拆分。根据 MRv1,你有 20 个 reducer 的限制来处理你的逻辑。您可能需要在配置文件 ma​​pred-site.xml 中自定义更多但相同的需求。 谈论你的问题;您可以使用 getmerge,也可以通过将以下语句嵌入到驱动程序代码中来将 reducer 的数量设置为 1

job.setNumReduceTasks(1);

希望这能回答你的问题。

【讨论】:

【参考方案6】:

仅对于文本文件和 HDFS 作为源和目标,使用以下命令:

hadoop fs -cat /input_hdfs_dir/* | hadoop fs -put - /output_hdfs_file

这将连接input_hdfs_dir 中的所有文件,并将输出写回到output_hdfs_file 的HDFS。请记住,所有数据都将被带回本地系统,然后再次上传到 hdfs,尽管不会创建临时文件,而且这是使用 UNIX pe 即时发生的。

此外,这不适用于 Avro、ORC 等非文本文件。

对于二进制文件,您可以执行以下操作(如果您在目录上映射了 Hive 表):

insert overwrite table tbl select * from tbl

根据您的配置,这也可能创建多个文件。要创建单个文件,请使用 mapreduce.job.reduces=1 将 reducer 的数量显式设置为 1,或将 hive 属性设置为 hive.merge.mapredfiles=true

【讨论】:

使用此解决方案还应注意从标准输入进入最终目的地的可能输入。也就是说,我遇到了一种情况,在启用 HA 的集群中,当其中一个节点处于待机模式时会出现警告消息。在那种情况下,我的输出包含那些原本无辜的警告消息。 link【参考方案7】:

这就是你可以用来在HDFS中合并文件的功能

public boolean getMergeInHdfs(String src, String dest) throws IllegalArgumentException, IOException 
    FileSystem fs = FileSystem.get(config);
    Path srcPath = new Path(src);
    Path dstPath = new Path(dest);

    // Check if the path already exists
    if (!(fs.exists(srcPath))) 
        logger.info("Path " + src + " does not exists!");
        return false;
    

    if (!(fs.exists(dstPath))) 
        logger.info("Path " + dest + " does not exists!");
        return false;
    
    return FileUtil.copyMerge(fs, srcPath, fs, dstPath, false, config, null);

【讨论】:

【参考方案8】:

为什么不使用像这样的猪脚本来合并分区文件:

stuff = load "/path/to/dir/*"

store stuff into "/path/to/mergedir"

【讨论】:

【参考方案9】:

不,这些文件不会被 Hadoop 合并。你得到的文件数量和reduce任务的数量是一样的。

如果您需要将其作为下一份工作的输入,那么不必担心有单独的文件。只需将整个目录指定为下一个作业的输入即可。

如果您确实需要集群外部的数据,那么我通常在将数据从集群中拉出时在接收端合并它们。

即像这样:

hadoop fs -cat /some/where/on/hdfs/job-output/part-r-* > TheCombinedResultOfTheJob.txt

【讨论】:

感谢您的回答 buf 在 map/reduce (mapred-default.xml) 的配置文件中有一个名为 io.sort.factor的属性>,它是干什么用的??? io.sort.factor 与 map 和 reduce 步骤之间的处理有关。不是 reduce 的输出。 你怎么知道part-r-*文件合并的顺序是对的? @Razvan:顺序无关紧要。如果它确实重要,那么您有一个无法扩展的算法,并且您显然对哪个 Reducer 完成了哪部分工作有假设。因此,如果发生这种情况,您将遇到不同类型的问题。 @NielsBasjes:最好使用“hadoop fs -getmerge”而不是“hadoop fs -cat”【参考方案10】:

您可以运行额外的 map/reduce 任务,其中 map 和 reduce 不会更改数据,partitioner 将所有数据分配给单个 reducer。

【讨论】:

如果您需要合并的数据超出本地机器的处理能力,则不需要

以上是关于在减少阶段后合并输出文件的主要内容,如果未能解决你的问题,请参考以下文章

大数据 : Hadoop reduce阶段

大数据 : Hadoop reduce阶段

光栅化阶段:三角形设置、三角形遍历、像素着色、合并

.explain() 输出中的阶段是啥

web加载优化

大数据之Hadoop(MapReduce):MapReduce Shuffle的优化