在reduce阶段之后合并输出文件
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了在reduce阶段之后合并输出文件相关的知识,希望对你有一定的参考价值。
在mapreduce中,每个reduce任务将其输出写入名为part-r-nnnnn的文件,其中nnnnn是与reduce任务关联的分区ID。 map / reduce是否合并这些文件?如果有,怎么样?
您可以通过调用以下命令来委托reduce输出文件的整个合并,而不是自己合并文件。
hadoop fs -getmerge /output/dir/on/hdfs/ /desired/local/output/file.txt
注意这会在本地组合HDFS文件。确保在运行之前有足够的磁盘空间
。 map / reduce是否合并这些文件?
不,它没有合并。
您可以使用IdentityReducer来实现您的目标。
不执行缩减,将所有输入值直接写入输出。
public void reduce(K key,
Iterator<V> values,
OutputCollector<K,V> output,
Reporter reporter)
throws IOException
将所有键和值直接写入输出。
看看相关的SE帖子:
hadoop: difference between 0 reducer and identity reducer?
不,这些文件不会被Hadoop合并。您获得的文件数与reduce任务数相同。
如果您需要它作为下一个作业的输入,那么不要担心有单独的文件。只需将整个目录指定为下一个作业的输入。
如果确实需要群集外的数据,那么我通常会在从群集中提取数据时将它们合并到接收端。
即这样的事情:
hadoop fs -cat /some/where/on/hdfs/job-output/part-r-* > TheCombinedResultOfTheJob.txt
这是您可以用于在HDFS中合并文件的功能
public boolean getMergeInHdfs(String src, String dest) throws IllegalArgumentException, IOException {
FileSystem fs = FileSystem.get(config);
Path srcPath = new Path(src);
Path dstPath = new Path(dest);
// Check if the path already exists
if (!(fs.exists(srcPath))) {
logger.info("Path " + src + " does not exists!");
return false;
}
if (!(fs.exists(dstPath))) {
logger.info("Path " + dest + " does not exists!");
return false;
}
return FileUtil.copyMerge(fs, srcPath, fs, dstPath, false, config, null);
}
仅对于文本文件和HDFS作为源和目标,请使用以下命令:
hadoop fs -cat /input_hdfs_dir/* | hadoop fs -put - /output_hdfs_file
这将连接input_hdfs_dir
中的所有文件,并将输出写回output_hdfs_file
的HDFS。请记住,所有数据都将被带回本地系统,然后再次上传到hdfs,尽管没有创建临时文件,这使用UNIX pe即时发生。
此外,这不适用于非文本文件,如Avro,ORC等。
对于二进制文件,您可以执行类似的操作(如果您在目录上映射了Hive表):
insert overwrite table tbl select * from tbl
根据您的配置,这可能还会创建多个文件。要创建单个文件,请使用mapreduce.job.reduces=1
将reducers的数量设置为1,或将hive属性设置为hive.merge.mapredfiles=true
。
part-r-nnnnn文件是在'r'之间指定的reduce阶段之后生成的。现在的事实是,如果你有一个减速器运行,你将有一个输出文件,如part-r-00000。如果减速器的数量是2,那么你将得到part-r-00000和part-r-00001,依此类推。看,如果输出文件太大而无法放入机器内存中,因为hadoop框架设计为在商品机器上运行,那么文件会被分割。根据MRv1,您可以使用20个减速器来限制逻辑。您可能需要在配置文件mapred-site.xml中自定义更多但相同的需求。谈论你的问题;您可以使用getmerge,也可以通过将以下语句嵌入到驱动程序代码中来将reducers的数量设置为1
job.setNumReduceTasks(1);
希望这能回答你的问题。
您可以运行其他map / reduce任务,其中map和reduce不会更改数据,分区程序会将所有数据分配给单个reducer。
除了我之前的回答,我还有一个答案,我几分钟前就试过了。您可以使用CustomOutputFormat,它看起来像下面给出的代码
public class VictorOutputFormat extends FileOutputFormat<StudentKey,PassValue> {
@Override
public RecordWriter<StudentKey,PassValue> getRecordWriter(
TaskAttemptContext tac) throws IOException, InterruptedException {
//step 1: GET THE CURRENT PATH
Path currPath=FileOutputFormat.getOutputPath(tac);
//Create the full path
Path fullPath=new Path(currPath,"Aniruddha.txt");
//create the file in the file system
FileSystem fs=currPath.getFileSystem(tac.getConfiguration());
FSDataOutputStream fileOut=fs.create(fullPath,tac);
return new VictorRecordWriter(fileOut);
}
}
只是,看看最后一行的第四行。我使用自己的名字作为输出文件名,我用15个reducer测试了程序。文件仍然保持不变。因此,可以获得单个输出文件而不是两个或更多文件,但是非常清楚输出文件的大小不得超过主存储器的大小,即输出文件必须适合商品机器的内存,否则可能存在输出文件拆分出现问题。谢谢!!
为什么不使用像这样的猪脚本来合并分区文件:
stuff = load "/path/to/dir/*"
store stuff into "/path/to/mergedir"
如果文件有标题,你可以通过这样做摆脱它:
hadoop fs -cat /path/to/hdfs/job-output/part-* | grep -v "header" > output.csv
然后手动为output.csv添加标题
以上是关于在reduce阶段之后合并输出文件的主要内容,如果未能解决你的问题,请参考以下文章
Hadoop:如何将 reducer 输出合并到单个文件中? [复制]
将不同文件夹的 map-reduce 输出合并到单个文件夹中