MapReduce排序之 二次排序

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce排序之 二次排序相关的知识,希望对你有一定的参考价值。

一:背景

Hadoop中虽然有自动排序和分组,由于自带的排序是按照Key进行排序的,有些时候,我们希望同时对Key和Value进行排序。自带的排序功能就无法满足我们了,还好Hadoop提供了一些组件可以让开发人员进行二次排序。

 

二:技术实现

我们先来看案例需求

#需求1: 首先按照第一列数字升序排列,当第一列数字相同时,第二列数字也升序排列(列之间用制表符\t隔开)

 

[java] view plain copy
 
  1. 3   3  
  2. 3   2  
  3. 3   1  
  4. 2   2  
  5. 2   1  
  6. 1   1  

MapReduce计算之后的结果应该是:

 

 

[java] view plain copy
 
  1. 1   1  
  2. 2   1  
  3. 2   2  
  4. 3   1  
  5. 3   2  
  6. 3   3  


#需求2:第一列不相等时,第一列按降序排列,当第一列相等时,第二列按升序排列

 

 

[java] view plain copy
 
  1. 3   3  
  2. 3   2  
  3. 3   1  
  4. 2   2  
  5. 2   1  
  6. 1   1  

MapReduce计算之后的结果应该是:

[java] view plain copy
 
  1. 3   1  
  2. 3   2  
  3. 3   3  
  4. 2   1  
  5. 2   2  
  6. 1   1  



下面是实现代码,实现两种需求的关键是compareTo()方法的实现不同:

 

[java] view plain copy
 
    1. public class SecondSortTest {  
    2.   
    3.     // 定义输入路径  
    4.         private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/data";  
    5.         // 定义输出路径  
    6.         private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";  
    7.   
    8.         public static void main(String[] args) {  
    9.   
    10.             try {  
    11.                 // 创建配置信息  
    12.                 Configuration conf = new Configuration();  
    13.                   
    14.                 /**********************************************/  
    15.                 //对Map端输出进行压缩  
    16.                 //conf.setBoolean("mapred.compress.map.output", true);  
    17.                 //设置map端输出使用的压缩类  
    18.                 //conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);  
    19.                 //对reduce端输出进行压缩  
    20.                 //conf.setBoolean("mapred.output.compress", true);  
    21.                 //设置reduce端输出使用的压缩类  
    22.                 //conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);  
    23.                 // 添加配置文件(我们可以在编程的时候动态配置信息,而不需要手动去改变集群)  
    24.                 /* 
    25.                  * conf.addResource("classpath://hadoop/core-site.xml");  
    26.                  * conf.addResource("classpath://hadoop/hdfs-site.xml"); 
    27.                  * conf.addResource("classpath://hadoop/hdfs-site.xml"); 
    28.                  */  
    29.   
    30.                 // 创建文件系统  
    31.                 FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);  
    32.                 // 如果输出目录存在,我们就删除  
    33.                 if (fileSystem.exists(new Path(OUT_PATH))) {  
    34.                     fileSystem.delete(new Path(OUT_PATH), true);  
    35.                 }  
    36.   
    37.                 // 创建任务  
    38.                 Job job = new Job(conf, SecondSortTest.class.getName());  
    39.   
    40.                 //1.1   设置输入目录和设置输入数据格式化的类  
    41.                 FileInputFormat.setInputPaths(job, INPUT_PATH);  
    42.                 job.setInputFormatClass(TextInputFormat.class);  
    43.   
    44.                 //1.2   设置自定义Mapper类和设置map函数输出数据的key和value的类型  
    45.                 job.setMapperClass(MySecondSortMapper.class);  
    46.                 job.setMapOutputKeyClass(CombineKey.class);  
    47.                 job.setMapOutputValueClass(LongWritable.class);  
    48.   
    49.                 //1.3   设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)  
    50.                 job.setPartitionerClass(HashPartitioner.class);  
    51.                 job.setNumReduceTasks(1);  
    52.   
    53.                 //1.4   排序、分组  
    54.                 //1.5   归约  
    55.                 //2.1   Shuffle把数据从Map端拷贝到Reduce端。  
    56.                 //2.2   指定Reducer类和输出key和value的类型  
    57.                 job.setReducerClass(MySecondSortReducer.class);  
    58.                 job.setOutputKeyClass(LongWritable.class);  
    59.                 job.setOutputValueClass(LongWritable.class);  
    60.   
    61.                 //2.3   指定输出的路径和设置输出的格式化类  
    62.                 FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));  
    63.                 job.setOutputFormatClass(TextOutputFormat.class);  
    64.   
    65.   
    66.                 // 提交作业 退出  
    67.                 System.exit(job.waitForCompletion(true) ? 0 : 1);  
    68.               
    69.             } catch (Exception e) {  
    70.                 e.printStackTrace();  
    71.             }  
    72.         }  
    73.       
    74.     public static class MySecondSortMapper extends Mapper<LongWritable, Text, CombineKey, LongWritable>{  
    75.           
    76.         //定义联合的key  
    77.         private CombineKey combineKey = new CombineKey();  
    78.           
    79.         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CombineKey, LongWritable>.Context context) throws IOException,  
    80.                 InterruptedException {  
    81.             //对输入的value进行切分  
    82.             String[] splits = value.toString().split("\t");  
    83.             //设置联合的key  
    84.             combineKey.setComKey(Long.parseLong(splits[0]));  
    85.             combineKey.setComVal(Long.parseLong(splits[1]));  
    86.               
    87.             //通过context写出去  
    88.             context.write(combineKey, new LongWritable(Long.parseLong(splits[1])));  
    89.         }  
    90.     }  
    91.       
    92.       
    93.     public static class MySecondSortReducer extends Reducer<CombineKey, LongWritable, LongWritable, LongWritable>{  
    94.         @Override  
    95.         protected void reduce(CombineKey combineKey, Iterable<LongWritable> values, Reducer<CombineKey, LongWritable, LongWritable, LongWritable>.Context context)  
    96.                 throws IOException, InterruptedException {  
    97.             //因为输入的CombineKey已经排好序了,所有我们只要获取其中的两个成员变量写出去就可以了。values在这个例子中没有什么作用  
    98.             context.write(new LongWritable(combineKey.getComKey()), new LongWritable(combineKey.getComVal()));  
    99.         }  
    100.     }  
    101.   
    102. }  
    103.   
    104. /** 
    105.  * 重新组合成一个key,实现二次排序 
    106.  * @author 廖*民 
    107.  * time : 2015年1月18日下午7:27:52 
    108.  * @version 
    109.  */  
    110. class CombineKey implements WritableComparable<CombineKey>{  
    111.   
    112.     public long comKey;  
    113.     public long comVal;  
    114.       
    115.     //必须提供无参构造函数,否则hadoop反射机制会出错  
    116.     public CombineKey() {  
    117.           
    118.     }  
    119.     //有参构造函数  
    120.     public CombineKey(long comKey, long comVal) {  
    121.         this.comKey = comKey;  
    122.         this.comVal = comVal;  
    123.     }  
    124.   
    125.       
    126.       
    127.     public long getComKey() {  
    128.         return comKey;  
    129.     }  
    130.     public void setComKey(long comKey) {  
    131.         this.comKey = comKey;  
    132.     }  
    133.     public long getComVal() {  
    134.         return comVal;  
    135.     }  
    136.     public void setComVal(long comVal) {  
    137.         this.comVal = comVal;  
    138.     }  
    139.       
    140.     public void write(DataOutput out) throws IOException {  
    141.         out.writeLong(comKey);  
    142.         out.writeLong(comVal);  
    143.     }  
    144.   
    145.     public void readFields(DataInput in) throws IOException {  
    146.         this.comKey = in.readLong();  
    147.         this.comVal = in.readLong();  
    148.     }  
    149.   
    150.     /** 
    151.      * 这个方法一定要实现 
    152.      * java里面排序默认是小的放在前面,即返回负数的放在前面,这样就是所谓的升序排列 
    153.      * 我们在下面的方法中直接返回一个差值,也就相当于会升序排列。 
    154.      * 如果我们要实现降序排列,那么我们就可以返回一个正数 
    155.      */  
    156.     /*public int compareTo(CombineKey o) { 
    157.         //第一列不相同时按升序排列,当第一列相同时第二列按升序排列 
    158.         long minus = this.comKey - o.comKey; 
    159.         //如果第一个值不相等时,我们就先对第一列进行排序 
    160.         if (minus != 0){ 
    161.             return (int) minus; 
    162.         } 
    163.         //如果第一列相等时,我们就对第二列进行排序 
    164.         return (int) (this.comVal - o.comVal); 
    165.     }*/  
    166.       
    167.     /** 
    168.      * 为了实现第一列不同时按降序排序,第一列相同时第二列按升序排列 
    169.      * 第一列:降序,当第一列相同时,第二列:升序 
    170.      * 为了实现降序, 
    171.      */  
    172.     public int compareTo(CombineKey o) {  
    173.         //如果a-b<0即,a小于b,按这样 的思路应该是升序排列,我们可以返回一个相反数使其降序  
    174.         long tmp = this.comKey - o.comKey;  
    175.         //如果第一个值不相等时,我们就先对第一列进行排序  
    176.         if (tmp != 0){  
    177.             return (int) (-tmp);  
    178.         }  
    179.         //如果第一列相等时,我们就对第二列进行升序排列  
    180.         return (int) (this.comVal - o.comVal);  
    181.     }  
    182.       
    183.       
    184.     @Override  
    185.     public int hashCode() {  
    186.         final int prime = 31;  
    187.         int result = 1;  
    188.         result = prime * result + (int) (comKey ^ (comKey >>> 32));  
    189.         return result;  
    190.     }  
    191.     @Override  
    192.     public boolean equals(Object obj) {  
    193.         if (this == obj)  
    194.             return true;  
    195.         if (obj == null)  
    196.             return false;  
    197.         if (getClass() != obj.getClass())  
    198.             return false;  
    199.         CombineKey other = (CombineKey) obj;  
    200.         if (comKey != other.comKey)  
    201.             return false;  
    202.         return true;  
    203.     }  
    204.       
    205. }  

以上是关于MapReduce排序之 二次排序的主要内容,如果未能解决你的问题,请参考以下文章

MapReduce二次排序

mapreduce 的二次排序

Hadoop MapReduce编程 API入门系列之二次排序

hadoop MapReduce之辅助排序

MapReduce二次排序

MapReduce二次排序