MapReduce排序之 二次排序
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce排序之 二次排序相关的知识,希望对你有一定的参考价值。
一:背景
Hadoop中虽然有自动排序和分组,由于自带的排序是按照Key进行排序的,有些时候,我们希望同时对Key和Value进行排序。自带的排序功能就无法满足我们了,还好Hadoop提供了一些组件可以让开发人员进行二次排序。
二:技术实现
我们先来看案例需求
#需求1: 首先按照第一列数字升序排列,当第一列数字相同时,第二列数字也升序排列(列之间用制表符\t隔开)
- 3 3
- 3 2
- 3 1
- 2 2
- 2 1
- 1 1
MapReduce计算之后的结果应该是:
- 1 1
- 2 1
- 2 2
- 3 1
- 3 2
- 3 3
#需求2:第一列不相等时,第一列按降序排列,当第一列相等时,第二列按升序排列
- 3 3
- 3 2
- 3 1
- 2 2
- 2 1
- 1 1
MapReduce计算之后的结果应该是:
- 3 1
- 3 2
- 3 3
- 2 1
- 2 2
- 1 1
下面是实现代码,实现两种需求的关键是compareTo()方法的实现不同:
- public class SecondSortTest {
- // 定义输入路径
- private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/data";
- // 定义输出路径
- private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";
- public static void main(String[] args) {
- try {
- // 创建配置信息
- Configuration conf = new Configuration();
- /**********************************************/
- //对Map端输出进行压缩
- //conf.setBoolean("mapred.compress.map.output", true);
- //设置map端输出使用的压缩类
- //conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
- //对reduce端输出进行压缩
- //conf.setBoolean("mapred.output.compress", true);
- //设置reduce端输出使用的压缩类
- //conf.setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
- // 添加配置文件(我们可以在编程的时候动态配置信息,而不需要手动去改变集群)
- /*
- * conf.addResource("classpath://hadoop/core-site.xml");
- * conf.addResource("classpath://hadoop/hdfs-site.xml");
- * conf.addResource("classpath://hadoop/hdfs-site.xml");
- */
- // 创建文件系统
- FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
- // 如果输出目录存在,我们就删除
- if (fileSystem.exists(new Path(OUT_PATH))) {
- fileSystem.delete(new Path(OUT_PATH), true);
- }
- // 创建任务
- Job job = new Job(conf, SecondSortTest.class.getName());
- //1.1 设置输入目录和设置输入数据格式化的类
- FileInputFormat.setInputPaths(job, INPUT_PATH);
- job.setInputFormatClass(TextInputFormat.class);
- //1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
- job.setMapperClass(MySecondSortMapper.class);
- job.setMapOutputKeyClass(CombineKey.class);
- job.setMapOutputValueClass(LongWritable.class);
- //1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
- job.setPartitionerClass(HashPartitioner.class);
- job.setNumReduceTasks(1);
- //1.4 排序、分组
- //1.5 归约
- //2.1 Shuffle把数据从Map端拷贝到Reduce端。
- //2.2 指定Reducer类和输出key和value的类型
- job.setReducerClass(MySecondSortReducer.class);
- job.setOutputKeyClass(LongWritable.class);
- job.setOutputValueClass(LongWritable.class);
- //2.3 指定输出的路径和设置输出的格式化类
- FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
- job.setOutputFormatClass(TextOutputFormat.class);
- // 提交作业 退出
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- public static class MySecondSortMapper extends Mapper<LongWritable, Text, CombineKey, LongWritable>{
- //定义联合的key
- private CombineKey combineKey = new CombineKey();
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, CombineKey, LongWritable>.Context context) throws IOException,
- InterruptedException {
- //对输入的value进行切分
- String[] splits = value.toString().split("\t");
- //设置联合的key
- combineKey.setComKey(Long.parseLong(splits[0]));
- combineKey.setComVal(Long.parseLong(splits[1]));
- //通过context写出去
- context.write(combineKey, new LongWritable(Long.parseLong(splits[1])));
- }
- }
- public static class MySecondSortReducer extends Reducer<CombineKey, LongWritable, LongWritable, LongWritable>{
- @Override
- protected void reduce(CombineKey combineKey, Iterable<LongWritable> values, Reducer<CombineKey, LongWritable, LongWritable, LongWritable>.Context context)
- throws IOException, InterruptedException {
- //因为输入的CombineKey已经排好序了,所有我们只要获取其中的两个成员变量写出去就可以了。values在这个例子中没有什么作用
- context.write(new LongWritable(combineKey.getComKey()), new LongWritable(combineKey.getComVal()));
- }
- }
- }
- /**
- * 重新组合成一个key,实现二次排序
- * @author 廖*民
- * time : 2015年1月18日下午7:27:52
- * @version
- */
- class CombineKey implements WritableComparable<CombineKey>{
- public long comKey;
- public long comVal;
- //必须提供无参构造函数,否则hadoop反射机制会出错
- public CombineKey() {
- }
- //有参构造函数
- public CombineKey(long comKey, long comVal) {
- this.comKey = comKey;
- this.comVal = comVal;
- }
- public long getComKey() {
- return comKey;
- }
- public void setComKey(long comKey) {
- this.comKey = comKey;
- }
- public long getComVal() {
- return comVal;
- }
- public void setComVal(long comVal) {
- this.comVal = comVal;
- }
- public void write(DataOutput out) throws IOException {
- out.writeLong(comKey);
- out.writeLong(comVal);
- }
- public void readFields(DataInput in) throws IOException {
- this.comKey = in.readLong();
- this.comVal = in.readLong();
- }
- /**
- * 这个方法一定要实现
- * java里面排序默认是小的放在前面,即返回负数的放在前面,这样就是所谓的升序排列
- * 我们在下面的方法中直接返回一个差值,也就相当于会升序排列。
- * 如果我们要实现降序排列,那么我们就可以返回一个正数
- */
- /*public int compareTo(CombineKey o) {
- //第一列不相同时按升序排列,当第一列相同时第二列按升序排列
- long minus = this.comKey - o.comKey;
- //如果第一个值不相等时,我们就先对第一列进行排序
- if (minus != 0){
- return (int) minus;
- }
- //如果第一列相等时,我们就对第二列进行排序
- return (int) (this.comVal - o.comVal);
- }*/
- /**
- * 为了实现第一列不同时按降序排序,第一列相同时第二列按升序排列
- * 第一列:降序,当第一列相同时,第二列:升序
- * 为了实现降序,
- */
- public int compareTo(CombineKey o) {
- //如果a-b<0即,a小于b,按这样 的思路应该是升序排列,我们可以返回一个相反数使其降序
- long tmp = this.comKey - o.comKey;
- //如果第一个值不相等时,我们就先对第一列进行排序
- if (tmp != 0){
- return (int) (-tmp);
- }
- //如果第一列相等时,我们就对第二列进行升序排列
- return (int) (this.comVal - o.comVal);
- }
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + (int) (comKey ^ (comKey >>> 32));
- return result;
- }
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- CombineKey other = (CombineKey) obj;
- if (comKey != other.comKey)
- return false;
- return true;
- }
- }
以上是关于MapReduce排序之 二次排序的主要内容,如果未能解决你的问题,请参考以下文章