MapReduce自定义分组Group
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce自定义分组Group相关的知识,希望对你有一定的参考价值。
一:背景
在上一篇文章中我们可以对两列数据进行排序,即完成了当第一列相同时第二列数据升序排列的功能,现在我们需要进一步完善一个功能,那就是当第一列相同时求出第二列的最小值或最大值,Hadoop提供了自定义分组的功能,可以满足我们的需求。
二:技术实现
我们先来看看需求
#当第一列不相等时,第一列按升序排列,当第一列相等时,求出对应第二列的最小值
- 3 3
- 3 2
- 3 1
- 2 2
- 2 1
- 1 1
输出结果应该是:
- 1 1
- 2 1
- 3 1
实现:
(1).自定义分组比较器继承RawComparator,实现compare()方法。
(2).在设置作业是设置job.setGroupingComparatorClass()。
代码如下:
- public class MyGroupTest {
- // 定义输入路径
- private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/data";
- // 定义输出路径
- private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";
- public static void main(String[] args) {
- try {
- // 创建配置信息
- Configuration conf = new Configuration();
- // 创建文件系统
- FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
- // 如果输出目录存在,我们就删除
- if (fileSystem.exists(new Path(OUT_PATH))) {
- fileSystem.delete(new Path(OUT_PATH), true);
- }
- // 创建任务
- Job job = new Job(conf, MyGroupTest.class.getName());
- // 天龙八部1.1 设置输入目录和设置输入数据格式化的类
- FileInputFormat.setInputPaths(job, INPUT_PATH);
- job.setInputFormatClass(TextInputFormat.class);
- //1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
- job.setMapperClass(MyGroupMapper.class);
- job.setMapOutputKeyClass(CombineKey.class);
- job.setMapOutputValueClass(LongWritable.class);
- //一定不要忘记设置自定义分组比较器的类(这一步是关键)
- job.setGroupingComparatorClass(MyGroupComparator.class);
- //1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
- job.setPartitionerClass(HashPartitioner.class);
- job.setNumReduceTasks(1);
- //1.4 排序、分组
- //1.5 归约
- //2.1 Shuffle把数据从Map端拷贝到Reduce端。
- //2.2 指定Reducer类和输出key和value的类型
- job.setReducerClass(MyGroupReducer.class);
- job.setOutputKeyClass(LongWritable.class);
- job.setOutputValueClass(LongWritable.class);
- //2.3 指定输出的路径和设置输出的格式化类
- FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
- job.setOutputFormatClass(TextOutputFormat.class);
- // 提交作业 退出
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- public static class MyGroupMapper extends Mapper<LongWritable, Text, CombineKey, LongWritable> {
- // 创建联合的key
- private CombineKey combineKey = new CombineKey();
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CombineKey, LongWritable>.Context context) throws IOException,
- InterruptedException {
- // 对输入value进行分割
- String[] splits = value.toString().split("\t");
- // 设置联合的Key
- combineKey.setComKey(Long.parseLong(splits[0]));
- combineKey.setComVal(Long.parseLong(splits[1]));
- // 写出去
- context.write(combineKey, new LongWritable(Long.parseLong(splits[1])));
- }
- }
- public static class MyGroupReducer extends Reducer<CombineKey, LongWritable, LongWritable, LongWritable> {
- @Override
- protected void reduce(CombineKey combineKey, Iterable<LongWritable> values,
- Reducer<CombineKey, LongWritable, LongWritable, LongWritable>.Context context) throws IOException, InterruptedException {
- long min = Long.MAX_VALUE;
- // 遍历比较求出每个组中的最小值
- for (LongWritable val : values) {
- if (val.get() < min) {
- min = val.get();
- }
- }
- // 把原始数据中的第一列中的元素分组后的组号作为key,所求的最小值为value将结果写出去
- context.write(new LongWritable(combineKey.getComKey()), new LongWritable(min));
- }
- }
- }
- /**
- * 二次排序构造一个新的Key
- * @author 廖*民
- * time : 2015年1月18日下午9:15:55
- * @version
- */
- class CombineKey implements WritableComparable<CombineKey> {
- private Long comKey;
- private Long comVal;
- // 无参构造函数必须提供,否则Hadoop的反射机制会报错
- public CombineKey() {
- }
- // 有参构造函数
- public CombineKey(Long comKey, Long comVal) {
- this.comKey = comKey;
- this.comVal = comVal;
- }
- public Long getComKey() {
- return comKey;
- }
- public void setComKey(Long comKey) {
- this.comKey = comKey;
- }
- public Long getComVal() {
- return comVal;
- }
- public void setComVal(Long comVal) {
- this.comVal = comVal;
- }
- public void write(DataOutput out) throws IOException {
- out.writeLong(this.comKey);
- out.writeLong(this.comVal);
- }
- public void readFields(DataInput in) throws IOException {
- this.comKey = in.readLong();
- this.comVal = in.readLong();
- }
- /**
- * 第一列按升序排列,第一列相同时,第二列也按升序排列
- */
- public int compareTo(CombineKey o) {
- long minus = this.comKey - o.comVal;
- if (minus != 0) {
- return (int) minus;
- }
- return (int) (this.comVal - o.comVal);
- }
- }
- /**
- * 自定义分组比较器
- * @author 廖*民
- * time : 2015年1月18日下午9:15:26
- * @version
- */
- class MyGroupComparator implements RawComparator<CombineKey> {
- // 分组策略中,这个方法不是重点
- public int compare(CombineKey o1, CombineKey o2) {
- // TODO Auto-generated method stub
- return 0;
- }
- /**
- * b1 表示第一个参与比较的字节数组
- * s1 表示第一个字节数组中开始比较的位置
- * l1 表示第一个字节数组中参与比较的字节长度
- * b2 表示第二个参与比较的字节数组
- * s2 表示第二个字节数组中开始比较的位置
- * l2 表示第二个字节数组参与比较的字节长度
- */
- public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- // 这里是按第CombineKey中的第一个元素进行分组,因为是long类型,所以是8个字节
- return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);
- }
- }
以上是关于MapReduce自定义分组Group的主要内容,如果未能解决你的问题,请参考以下文章
一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现