MapReduce二次排序

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce二次排序相关的知识,希望对你有一定的参考价值。

本文主要介绍下二次排序的实现方式

 

我们知道MapReduce是按照key来进行排序的,那么如果有个需求就是先按照第一个字段排序,在第一个字段相等的情况下,按照第二个字段排序,这就是传说中的二次排序。

 

下面就具体说一下二次排序的实现方式

 

主要就是4点

 

1.自定义一个Key

为什么要自定义一个Key,我们知道MapReduce中排序就是按照Key来排序的,我们既然想要实现按照两个字段进行排序,默认的方式肯定是不行的,所以自定义一个新的Key,Key里面有两个属性,也就是我们要排序的两个字段。

首先,实现WritableComparable接口,因为Key是可序列化并且可以比较的。其次,重载相关的方法,例如序列化、反序列化相关的方法write()、readFields()。重载在分区的时候要用到的hashCode()方法,注意后面会说到一个Partitioner类,也是用来分区的,用hashCode()方法和Partitioner类进行分区都是可以的,使用其中的一个即可。重载排序用的compareTo()方法。

 

2.分区函数类

上面定义了一个新的Key,那么我现在做分发,到底按照什么样的规则进行分发是在分区函数中定义的,这个类要继承Partitioner类,重载其中的分区方法getPartition(),在main()函数里面给job添加上即可,例如:job.setPartitionerClass(XXX.class);

注:这个类的作用和新Key中的hashCode()方法作用一样,所以如果在新key的hashCode()方法中写了分区的实现,这个分区类是可以省略的。

 

3.比较函数类

这个类决定着Key的排序规则,是一个比较器,需要继承WritableComparator类,并且重载其中的compare()方法。在main()函数里给job添加上即可,例如:job.setSortComparatorClass(XXX.class);

注:这个类的作用跟自定义Key的compareTo()方法一样,如果在自定义的Key中重载了compareTo()方法,这个类是可以省略的。

 

4.分组函数类

通过分区类,我们重新定义了key的分区规则,但是多个key不同的也可以进入一个reduce中,这不是我们想要的,我们需要分区函数来定义什么样的key可以进入相应的reduce来执行,因为也涉及到比较,所以这个类也需要继承WritableComparator,也可以实现RawComparator,并且重载其中的compare()方法,在main()函数中给job加上即可,如:job.setGroupingComparatorClass(XXX.class)。

 

下面我们来重新简化一下上一篇文章中提到了例子:

#需求:第一列升序,第一列相同时,第二列升序,其第一列相同的放在一个分区中输出

 

[java] view plain copy
 
  1. sort1   1  
  2. sort2   3  
  3. sort2   88  
  4. sort2   54  
  5. sort1   2  
  6. sort6   22  
  7. sort6   888  
  8. sort6   58  

#预期输出结果

 

#part-r-00000文件

 

[java] view plain copy
 
  1. sort1   1  
  2. sort1   2  

#part-r-00001文件

 

 

[java] view plain copy
 
  1. sort2   3  
  2. sort2   54  
  3. sort2   88  


#part-r-00002

 

 

[java] view plain copy
 
  1. sort6   22  
  2. sort6   58  
  3. sort6   888  





 

1.自定义组合键

 

[java] view plain copy
 
  1. public class CombinationKey implements WritableComparable<CombinationKey>{  
  2.   
  3.     private Text firstKey;  
  4.     private IntWritable secondKey;  
  5.       
  6.     //无参构造函数  
  7.     public CombinationKey() {  
  8.         this.firstKey = new Text();  
  9.         this.secondKey = new IntWritable();  
  10.     }  
  11.       
  12.     //有参构造函数  
  13.     public CombinationKey(Text firstKey, IntWritable secondKey) {  
  14.         this.firstKey = firstKey;  
  15.         this.secondKey = secondKey;  
  16.     }  
  17.   
  18.     public Text getFirstKey() {  
  19.         return firstKey;  
  20.     }  
  21.   
  22.     public void setFirstKey(Text firstKey) {  
  23.         this.firstKey = firstKey;  
  24.     }  
  25.   
  26.     public IntWritable getSecondKey() {  
  27.         return secondKey;  
  28.     }  
  29.   
  30.     public void setSecondKey(IntWritable secondKey) {  
  31.         this.secondKey = secondKey;  
  32.     }  
  33.   
  34.     public void write(DataOutput out) throws IOException {  
  35.         this.firstKey.write(out);  
  36.         this.secondKey.write(out);  
  37.     }  
  38.   
  39.     public void readFields(DataInput in) throws IOException {  
  40.         this.firstKey.readFields(in);  
  41.         this.secondKey.readFields(in);  
  42.     }  
  43.   
  44.     /** 
  45.      * 自定义比较策略 
  46.      * 注意:该比较策略用于MapReduce的第一次默认排序 
  47.      * 也就是发生在Map端的sort阶段 
  48.      * 发生地点为环形缓冲区(可以通过io.sort.mb进行大小调整) 
  49.      */  
  50.     public int compareTo(CombinationKey combinationKey) {  
  51.         int minus = this.getFirstKey().compareTo(combinationKey.getFirstKey());  
  52.         if (minus != 0){  
  53.             return minus;  
  54.         }  
  55.         return this.getSecondKey().get() - combinationKey.getSecondKey().get();  
  56.     }  
  57.       
  58.     /*  public int compareTo(CombinationKey combinationKey) { 
  59.         System.out.println("------------------------CombineKey flag-------------------"); 
  60.         return this.firstKey.compareTo(combinationKey.getFirstKey()); 
  61.     }*/  
  62.   
  63.     @Override  
  64.     public int hashCode() {  
  65.         final int prime = 31;  
  66.         int result = 1;  
  67.         result = prime * result + ((firstKey == null) ? 0 : firstKey.hashCode());  
  68.         return result;  
  69.     }  
  70.   
  71.     @Override  
  72.     public boolean equals(Object obj) {  
  73.         if (this == obj)  
  74.             return true;  
  75.         if (obj == null)  
  76.             return false;  
  77.         if (getClass() != obj.getClass())  
  78.             return false;  
  79.         CombinationKey other = (CombinationKey) obj;  
  80.         if (firstKey == null) {  
  81.             if (other.firstKey != null)  
  82.                 return false;  
  83.         } else if (!firstKey.equals(other.firstKey))  
  84.             return false;  
  85.         return true;  
  86.     }  
  87.   
  88.       
  89. }  


2.自定义分组

 

 

[java] view plain copy
 
  1. /** 
  2.  * 自定义分组有中方式,一种是继承WritableComparator 
  3.  * 另外一种是实现RawComparator接口 
  4.  * @author 廖钟民 
  5.  * time : 2015年1月19日下午3:30:11 
  6.  * @version 
  7.  */  
  8. public class DefinedGroupSort extends WritableComparator{  
  9.   
  10.   
  11.     protected DefinedGroupSort() {  
  12.         super(CombinationKey.class,true);  
  13.     }  
  14.   
  15.     @Override  
  16.     public int compare(WritableComparable a, WritableComparable b) {  
  17.         System.out.println("---------------------进入自定义分组---------------------");  
  18.         CombinationKey combinationKey1 = (CombinationKey) a;  
  19.         CombinationKey combinationKey2 = (CombinationKey) b;  
  20.         System.out.println("---------------------分组结果:" + combinationKey1.getFirstKey().compareTo(combinationKey2.getFirstKey()));  
  21.         System.out.println("---------------------结束自定义分组---------------------");  
  22.         //自定义按原始数据中第一个key分组  
  23.         return combinationKey1.getFirstKey().compareTo(combinationKey2.getFirstKey());  
  24.     }  
  25.   
  26.   
  27. }  


3.自定义分区

 

 

[java] view plain copy
 
  1. /** 
  2.  * 自定义分区 
  3.  *  
  4.  * @author 廖钟*民 time : 2015年1月19日下午12:13:54 
  5.  * @version 
  6.  */  
  7. public class DefinedPartition extends Partitioner<CombinationKey, IntWritable> {  
  8.   
  9.     /** 
  10.      * 数据输入来源:map输出 我们这里根据组合键的第一个值作为分区 如果不自定义分区的话,MapReduce会根据默认的Hash分区方法 将整个组合键相等的分到一个分区中,这样的话显然不是我们要的效果 
  11.      *  
  12.      * @param key 
  13.      *            map输出键值 
  14.      * @param value 
  15.      *            map输出value值 
  16.      * @param numPartitions 
  17.      *            分区总数,即reduce task个数 
  18.      */  
  19.     public int getPartition(CombinationKey key, IntWritable value, int numPartitions) {  
  20.           
  21.         if (key.getFirstKey().toString().endsWith("1")){  
  22.             return 0;  
  23.         } else if (key.getFirstKey().toString().endsWith("2")){  
  24.             return 1;  
  25.         } else {  
  26.             return 2;  
  27.         }  
  28.     }  
  29.   
  30. }  


4.主类

 

 

[java] view plain copy
 
  1. public class SecondSortMapReduce extends Configured implements Tool{  
  2.   
  3.     // 定义输入路径  
  4.         private String INPUT_PATH = "";  
  5.         // 定义输出路径  
  6.         private String OUT_PATH = "";  
  7.   
  8.         public static void main(String[] args) {  
  9.   
  10.             try {  
  11.                 ToolRunner.run(new SecondSortMapReduce(), args);  
  12.             } catch (Exception e) {  
  13.                 e.printStackTrace();  
  14.             }  
  15.         }  
  16.   
  17.           
  18.     public static class SecondSortMapper extends Mapper<Text, Text, CombinationKey, IntWritable>{  
  19.         /** 
  20.          * 这里要特殊说明一下,为什么要将这些变量写在map函数外边 
  21.          * 对于分布式的程序,我们一定要注意到内存的使用情况,对于MapReduce框架 
  22.          * 每一行的原始记录的处理都要调用一次map()函数,假设,这个map()函数要处理1一亿 
  23.          * 条输入记录,如果将这些变量都定义在map函数里面则会导致这4个变量的对象句柄 
  24.          * 非常的多(极端情况下将产生4*1亿个句柄,当然java也是有自动的GC机制的,一定不会达到这么多) 
  25.          * 导致栈内存被浪费掉,我们将其写在map函数外面,顶多就只有4个对象句柄 
  26.          */  
  27.         private CombinationKey combinationKey = new CombinationKey();  
  28.         Text sortName = new Text();  
  29.         IntWritable score = new IntWritable();  
  30.         String[] splits = null;  
  31.         protected void map(Text key, Text value, Mapper<Text, Text, CombinationKey, IntWritable>.Context context) throws IOException, InterruptedException {  
  32.             System.out.println("---------------------进入map()函数---------------------");  
  33.             //过滤非法记录(这里用计数器比较好)  
  34.             if (key == null || value == null || key.toString().equals("")){  
  35.                 return;  
  36.             }  
  37.             //构造相关属性  
  38.             sortName.set(key.toString());  
  39.             score.set(Integer.parseInt(value.toString()));  
  40.             //设置联合key  
  41.             combinationKey.setFirstKey(sortName);  
  42.             combinationKey.setSecondKey(score);  
  43.               
  44.             //通过context把map处理后的结果输出  
  45.             context.write(combinationKey, score);  
  46.             System.out.println("---------------------结束map()函数---------------------");  
  47.         }  
  48.           
  49.     }  
  50.       
  51.       
  52.     public static class SecondSortReducer extends Reducer<CombinationKey, IntWritable, Text, Text>{  
  53.           
  54.         StringBuffer sb = new StringBuffer();  
  55.         Text score = new Text();  
  56.         /** 
  57.          * 这里要注意一下reduce的调用时机和次数: 
  58.          * reduce每次处理一个分组的时候会调用一次reduce函数。 
  59.          * 所谓的分组就是将相同的key对应的value放在一个集合中 
  60.          * 例如:<sort1,1> <sort1,2> 
  61.          * 分组后的结果就是 
  62.          * <sort1,{1,2}>这个分组会调用一次reduce函数 
  63.          */  
  64.         protected void reduce(CombinationKey key, Iterable<IntWritable> values, Reducer<CombinationKey, IntWritable, Text, Text>.Context context)  
  65.                 throws IOException, InterruptedException {  
  66.               
  67.             //将联合Key的第一个元素作为新的key,遍历values将结果写出去  
  68.             for (IntWritable val : values){  
  69.                 context.write(key.getFirstKey(), new Text(String.valueOf(val.get())));  
  70.             }  
  71.               
  72.               
  73.         }  
  74.     }  
  75.   
  76.   
  77.     public int run(String[] args) throws Exception {  
  78.         // 给路径赋值  
  79.         INPUT_PATH = args[0];  
  80.         OUT_PATH = args[1];  
  81.         try {  
  82.             // 创建配置信息  
  83.             Configuration conf = new Configuration();  
  84.             conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");  
  85.   
  86.             // 创建文件系统  
  87.             FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);  
  88.             // 如果输出目录存在,我们就删除  
  89.             if (fileSystem.exists(new Path(OUT_PATH))) {  
  90.                 fileSystem.delete(new Path(OUT_PATH), true);  
  91.             }  
  92.   
  93.             // 创建任务  
  94.             Job job = new Job(conf, SecondSortMapReduce.class.getName());  
  95.             //设置成jar运行型  
  96.             job.setJarByClass(SecondSortMapReduce.class);  
  97.             // 1.1  设置输入目录和设置输入数据格式化的类  
  98.             FileInputFormat.setInputPaths(job, INPUT_PATH);  
  99.             job.setInputFormatClass(KeyValueTextInputFormat.class);  
  100.   
  101.             // 部1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型  
  102.             job.setMapperClass(SecondSortMapper.class);  
  103.             job.setMapOutputKeyClass(CombinationKey.class);  
  104.             job.setMapOutputValueClass(IntWritable.class);  
  105.   
  106.             // 1.3  设置分区和reduce数量(reduce的数量,和分区的数量对应)  
  107.             job.setPartitionerClass(DefinedPartition.class);  
  108.             job.setNumReduceTasks(3);  
  109.               
  110.             //设置自定义分组策略  
  111.             job.setGroupingComparatorClass(DefinedGroupSort.class);  
  112.             //设置自定义比较策略(因为我的CombineKey重写了compareTo方法,所以这个可以省略)  
  113.             //job.setSortComparatorClass(DefinedComparator.class);  
  114.               
  115.             // 1.4  排序  
  116.             //1.5   归约  
  117.             // 2.1  Shuffle把数据从Map端拷贝到Reduce端。  
  118.             // 2.2  指定Reducer类和输出key和value的类型  
  119.             job.setReducerClass(SecondSortReducer.class);  
  120.             job.setOutputKeyClass(Text.class);  
  121.             job.setOutputValueClass(Text.class);  
  122.   
  123.             //2.3   指定输出的路径和设置输出的格式化类  
  124.             FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));  
  125.             job.setOutputFormatClass(TextOutputFormat.class);  
  126.   
  127.   
  128.             // 提交作业 退出  
  129.             System.exit(job.waitForCompletion(true) ? 0 : 1);  
  130.           
  131.         } catch (Exception e) {  
  132.             e.printStackTrace();  
  133.         }  
  134.         return 0;  
  135.     }  
  136. }  

注:使用jar包的方式运行程序一定不要忘记很重要的一句:job.setJarByClass(XXX);

 

程序运行的结果:

技术分享

以上是关于MapReduce二次排序的主要内容,如果未能解决你的问题,请参考以下文章

MapReduce排序之 二次排序

Hadoop---mapreduce排序和二次排序以及全排序

完成功能--MapReduce驱动两个job二次排序求Url的topN

MapReduce二次排序

MapReduce程序之二次排序与多次排序

mapreduce编程-二次排序