Hadoop Combiner组件

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop Combiner组件相关的知识,希望对你有一定的参考价值。

一:背景

在MapReduce模型中,reduce的功能大多是统计分类类型的总量、求最大值最小值等,对于这些操作可以考虑在Map输出后进行Combiner操作,这样可以减少网络传输负载,同时减轻reduce任务的负担。Combiner操作是运行在每个节点上的,只会影响本地Map的输出结果,Combiner的输入为本地map的输出结果,很多时候Combiner的逻辑和reduce的逻辑是相同的,因此两者可以共用reducer体。

 

二:什么时候运行Combiner

(1):当job设置了Combiner,并且spill的个数达到了min.num.spill.for.combine(默认是3)的时候,那么Combiner就会在merge之前执行。

(2):但是有的情况下,merge开始执行,但spill文件的个数没有达到需求,这个时候Combiner可能会在merge之后运行。

(3):Combiner也有可能不运行,Combiner会考虑当时集群的一个负载情况。

 

 

三:程序代码

 

[java] view plain copy
 
  1. public class WordCountTest {  
  2.   
  3.     // 定义输入路径  
  4.     private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/hello";  
  5.     // 定义输出路径  
  6.     private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";  
  7.   
  8.     public static void main(String[] args) {  
  9.   
  10.         try {  
  11.             // 创建配置信息  
  12.             Configuration conf = new Configuration();  
  13.               
  14.             /**********************************************/  
  15.             //对Map端输出进行压缩  
  16.             //conf.setBoolean("mapred.compress.map.output", true);  
  17.             //设置map端输出使用的压缩类  
  18.             //conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);  
  19.             //对reduce端输出进行压缩  
  20.             //conf.setBoolean("mapred.output.compress", true);  
  21.             //设置reduce端输出使用的压缩类  
  22.             //conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);  
  23.             // 添加配置文件(我们可以在编程的时候动态配置信息,而不需要手动去改变集群)  
  24.             /* 
  25.              * conf.addResource("classpath://hadoop/core-site.xml");  
  26.              * conf.addResource("classpath://hadoop/hdfs-site.xml"); 
  27.              * conf.addResource("classpath://hadoop/hdfs-site.xml"); 
  28.              */  
  29.   
  30.             // 创建文件系统  
  31.             FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);  
  32.             // 如果输出目录存在,我们就删除  
  33.             if (fileSystem.exists(new Path(OUT_PATH))) {  
  34.                 fileSystem.delete(new Path(OUT_PATH), true);  
  35.             }  
  36.   
  37.             // 创建任务  
  38.             Job job = new Job(conf, WordCountTest.class.getName());  
  39.   
  40.             //1.1   设置输入目录和设置输入数据格式化的类  
  41.             FileInputFormat.setInputPaths(job, INPUT_PATH);  
  42.             job.setInputFormatClass(TextInputFormat.class);  
  43.   
  44.             //1.2   设置自定义Mapper类和设置map函数输出数据的key和value的类型  
  45.             job.setMapperClass(MyMapper.class);  
  46.               
  47.             job.setMapOutputKeyClass(Text.class);  
  48.             job.setMapOutputValueClass(LongWritable.class);  
  49.   
  50.             //1.3   设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)  
  51.             job.setPartitionerClass(HashPartitioner.class);  
  52.             job.setNumReduceTasks(1);  
  53.   
  54.             // 1.4  排序、分组  
  55.             //1.5   归约(Combiner可以和Reducer共用)  
  56.             job.setCombinerClass(MyReducer.class);  
  57.             //2.1   Shuffle把数据从Map端拷贝到Reduce端。  
  58.             //2.2   指定Reducer类和输出key和value的类型  
  59.             job.setReducerClass(MyReducer.class);  
  60.             job.setOutputKeyClass(Text.class);  
  61.             job.setOutputValueClass(LongWritable.class);  
  62.   
  63.             //2.3   指定输出的路径和设置输出的格式化类  
  64.             FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));  
  65.             job.setOutputFormatClass(TextOutputFormat.class);  
  66.   
  67.   
  68.             // 提交作业 退出  
  69.             System.exit(job.waitForCompletion(true) ? 0 : 1);  
  70.           
  71.         } catch (Exception e) {  
  72.             e.printStackTrace();  
  73.         }  
  74.     }  
  75.   
  76.     public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {  
  77.   
  78.         // 定义一个LongWritable对象作为map输出的value类型  
  79.         LongWritable oneTime = new LongWritable(1);  
  80.         // 定义一个Text对象作为map输出的key类型  
  81.         Text word = new Text();  
  82.   
  83.         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException,  
  84.                 InterruptedException {  
  85.   
  86.             // 对每一行记录采用制表符(\t)进行分割  
  87.             String[] splits = value.toString().split("\t");  
  88.   
  89.             // 遍历字符串数组输出每一个单词  
  90.             for (String str : splits) {  
  91.   
  92.                 // 设置word  
  93.                 word.set(str);  
  94.                 // 把结果写出去  
  95.                 context.write(word, oneTime);  
  96.             }  
  97.         }  
  98.     }  
  99.   
  100.     public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {  
  101.   
  102.         // 定义LongWritable对象最为Reduce输出的value类型  
  103.         LongWritable result = new LongWritable();  
  104.   
  105.         protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException,  
  106.                 InterruptedException {  
  107.   
  108.             int sum = 0;  
  109.   
  110.             // 遍历集合,计算每个单词出现的和  
  111.             for (LongWritable s : values) {  
  112.   
  113.                 sum += s.get();  
  114.             }  
  115.             // 设置result  
  116.             result.set(sum);  
  117.             // 把结果写出去  
  118.             context.write(key, result);  
  119.         }  
  120.     }  
  121. }  



 

程序使用的数据:

技术分享

程序运行流程:

技术分享

(1):Map的输入记录为3即<0,hello you> <10,hello me> <19,you me love>

(2):Map的输出记录为7即<hello,1> <you,1> <hello,1> <me,1> <you,1> <me,1> <love,1>

(3):排序分组后的记录为<hello,{1,1}> <love,{1}> <me,{1,1}>  <you,{1}> 分了4组,但是记录数依然是7,只不过是分了组而已。

(4):进入Combiner的记录为7,经过Combiner之后的结果为<hello,2> <love,1> <me,2> <you,1>即Combiner的输出为4条记录

 

四:使用Combiner的限制

并不是所有情况都能使用Combiner,Combiner适用的场景是汇总求和、求最值的场景,但是对于求平均数的场景就不适用了。因为如果在求平均数的程序中使用了Combiner即在每个Map后都使用Combiner进行求平均,每个map计算出的平均值到了reduce端再进行平均,结果和正真的平均数就有出入了。

以上是关于Hadoop Combiner组件的主要内容,如果未能解决你的问题,请参考以下文章

为啥这个使用Combiner 类的Hadoop 示例不能正常工作? (不要执行Combiner提供的“局部缩减”)

combiner hadoop

Hadoop学习之路(十八)MapReduce框架Combiner分区

大数据-Hadoop生态(19)-MapReduce框架原理-Combiner合并

hadoop-Combiner作用用法

自定义combiner实现文件倒排索引