MapReduce自定义分组Group

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce自定义分组Group相关的知识,希望对你有一定的参考价值。

一:背景

在上一篇文章中我们可以对两列数据进行排序,即完成了当第一列相同时第二列数据升序排列的功能,现在我们需要进一步完善一个功能,那就是当第一列相同时求出第二列的最小值或最大值,Hadoop提供了自定义分组的功能,可以满足我们的需求。

 

二:技术实现

我们先来看看需求

#当第一列不相等时,第一列按升序排列,当第一列相等时,求出对应第二列的最小值

 

[java] view plain copy
 
  1. 3   3  
  2. 3   2  
  3. 3   1  
  4. 2   2  
  5. 2   1  
  6. 1   1  


输出结果应该是:

 

 

[java] view plain copy
 
  1. 1   1  
  2. 2   1  
  3. 3   1  



 

实现:

(1).自定义分组比较器继承RawComparator,实现compare()方法。

(2).在设置作业是设置job.setGroupingComparatorClass()。

 

代码如下:

 

[java] view plain copy
 
    1. public class MyGroupTest {  
    2.     // 定义输入路径  
    3.     private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/data";  
    4.     // 定义输出路径  
    5.     private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";  
    6.   
    7.     public static void main(String[] args) {  
    8.   
    9.         try {  
    10.             // 创建配置信息  
    11.             Configuration conf = new Configuration();  
    12.   
    13.   
    14.             // 创建文件系统  
    15.             FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);  
    16.             // 如果输出目录存在,我们就删除  
    17.             if (fileSystem.exists(new Path(OUT_PATH))) {  
    18.                 fileSystem.delete(new Path(OUT_PATH), true);  
    19.             }  
    20.   
    21.             // 创建任务  
    22.             Job job = new Job(conf, MyGroupTest.class.getName());  
    23.   
    24.             // 天龙八部1.1 设置输入目录和设置输入数据格式化的类  
    25.             FileInputFormat.setInputPaths(job, INPUT_PATH);  
    26.             job.setInputFormatClass(TextInputFormat.class);  
    27.   
    28.             //1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型  
    29.             job.setMapperClass(MyGroupMapper.class);  
    30.             job.setMapOutputKeyClass(CombineKey.class);  
    31.             job.setMapOutputValueClass(LongWritable.class);  
    32.               
    33.             //一定不要忘记设置自定义分组比较器的类(这一步是关键)  
    34.             job.setGroupingComparatorClass(MyGroupComparator.class);  
    35.               
    36.             //1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)  
    37.             job.setPartitionerClass(HashPartitioner.class);  
    38.             job.setNumReduceTasks(1);  
    39.   
    40.             //1.4 排序、分组  
    41.             //1.5 归约  
    42.             //2.1 Shuffle把数据从Map端拷贝到Reduce端。  
    43.             //2.2 指定Reducer类和输出key和value的类型  
    44.             job.setReducerClass(MyGroupReducer.class);  
    45.             job.setOutputKeyClass(LongWritable.class);  
    46.             job.setOutputValueClass(LongWritable.class);  
    47.   
    48.             //2.3 指定输出的路径和设置输出的格式化类  
    49.             FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));  
    50.             job.setOutputFormatClass(TextOutputFormat.class);  
    51.   
    52.             // 提交作业 退出  
    53.             System.exit(job.waitForCompletion(true) ? 0 : 1);  
    54.   
    55.         } catch (Exception e) {  
    56.             e.printStackTrace();  
    57.         }  
    58.     }  
    59.   
    60.     public static class MyGroupMapper extends Mapper<LongWritable, Text, CombineKey, LongWritable> {  
    61.         // 创建联合的key  
    62.         private CombineKey combineKey = new CombineKey();  
    63.   
    64.         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CombineKey, LongWritable>.Context context) throws IOException,  
    65.                 InterruptedException {  
    66.             // 对输入value进行分割  
    67.             String[] splits = value.toString().split("\t");  
    68.             // 设置联合的Key  
    69.             combineKey.setComKey(Long.parseLong(splits[0]));  
    70.             combineKey.setComVal(Long.parseLong(splits[1]));  
    71.   
    72.             // 写出去  
    73.             context.write(combineKey, new LongWritable(Long.parseLong(splits[1])));  
    74.         }  
    75.     }  
    76.   
    77.     public static class MyGroupReducer extends Reducer<CombineKey, LongWritable, LongWritable, LongWritable> {  
    78.         @Override  
    79.         protected void reduce(CombineKey combineKey, Iterable<LongWritable> values,  
    80.                 Reducer<CombineKey, LongWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException {  
    81.   
    82.             long min = Long.MAX_VALUE;  
    83.             // 遍历比较求出每个组中的最小值  
    84.             for (LongWritable val : values) {  
    85.   
    86.                 if (val.get() < min) {  
    87.                     min = val.get();  
    88.                 }  
    89.             }  
    90.   
    91.             // 把原始数据中的第一列中的元素分组后的组号作为key,所求的最小值为value将结果写出去  
    92.             context.write(new LongWritable(combineKey.getComKey()), new LongWritable(min));  
    93.         }  
    94.     }  
    95. }  
    96.   
    97. /** 
    98.  * 二次排序构造一个新的Key 
    99.  * @author 廖*民 
    100.  * time : 2015年1月18日下午9:15:55 
    101.  * @version 
    102.  */  
    103. class CombineKey implements WritableComparable<CombineKey> {  
    104.   
    105.     private Long comKey;  
    106.     private Long comVal;  
    107.   
    108.     // 无参构造函数必须提供,否则Hadoop的反射机制会报错  
    109.     public CombineKey() {  
    110.     }  
    111.   
    112.     // 有参构造函数  
    113.     public CombineKey(Long comKey, Long comVal) {  
    114.         this.comKey = comKey;  
    115.         this.comVal = comVal;  
    116.     }  
    117.   
    118.     public Long getComKey() {  
    119.         return comKey;  
    120.     }  
    121.   
    122.     public void setComKey(Long comKey) {  
    123.         this.comKey = comKey;  
    124.     }  
    125.   
    126.     public Long getComVal() {  
    127.         return comVal;  
    128.     }  
    129.   
    130.     public void setComVal(Long comVal) {  
    131.         this.comVal = comVal;  
    132.     }  
    133.   
    134.     public void write(DataOutput out) throws IOException {  
    135.         out.writeLong(this.comKey);  
    136.         out.writeLong(this.comVal);  
    137.     }  
    138.   
    139.     public void readFields(DataInput in) throws IOException {  
    140.         this.comKey = in.readLong();  
    141.         this.comVal = in.readLong();  
    142.     }  
    143.   
    144.     /** 
    145.      * 第一列按升序排列,第一列相同时,第二列也按升序排列 
    146.      */  
    147.     public int compareTo(CombineKey o) {  
    148.         long minus = this.comKey - o.comVal;  
    149.         if (minus != 0) {  
    150.             return (int) minus;  
    151.         }  
    152.   
    153.         return (int) (this.comVal - o.comVal);  
    154.     }  
    155.   
    156. }  
    157.   
    158. /** 
    159.  * 自定义分组比较器 
    160.  * @author 廖*民 
    161.  * time : 2015年1月18日下午9:15:26 
    162.  * @version 
    163.  */  
    164. class MyGroupComparator implements RawComparator<CombineKey> {  
    165.   
    166.     // 分组策略中,这个方法不是重点  
    167.     public int compare(CombineKey o1, CombineKey o2) {  
    168.         // TODO Auto-generated method stub  
    169.         return 0;  
    170.     }  
    171.   
    172.     /** 
    173.      * b1 表示第一个参与比较的字节数组 
    174.      * s1 表示第一个字节数组中开始比较的位置  
    175.      * l1 表示第一个字节数组中参与比较的字节长度  
    176.      * b2 表示第二个参与比较的字节数组  
    177.      * s2 表示第二个字节数组中开始比较的位置  
    178.      * l2 表示第二个字节数组参与比较的字节长度 
    179.      */  
    180.     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {  
    181.   
    182.         // 这里是按第CombineKey中的第一个元素进行分组,因为是long类型,所以是8个字节  
    183.         return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);  
    184.     }  
    185.   
    186. }  

以上是关于MapReduce自定义分组Group的主要内容,如果未能解决你的问题,请参考以下文章

一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现

Hadoop学习之路MapReduce自定义排序

MapReduce之自定义Combiner

分组 UITableView 上的自定义背景图像

为分组的 UITableViewCell 自定义选择的背景颜色

分组 UITableView 自定义绘制单元格,而不更改背景颜色?