hadoop MapReduce之辅助排序

Posted 健哥说编程

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop MapReduce之辅助排序相关的知识,希望对你有一定的参考价值。

辅助排序

也可以叫做二次排序或SecondarySort

前言:

1MapReduce框架在记录到达Reducer之前已经按键记录排序,但键所对应的值并没有排序。这些值来自不同的map任务。

2:一般来说,大多数MapReduce程序会避免让Reduce函数依赖于值的排序。

3:但有时,也需要按特性的方法对键进行排序和分组等以实现对值的排序。

 


示例:

假设存在以下气温记录

Datetemperature

2001/01/1123

2001/01/1215

2001/01/1318

2001/01/1430

2002/01/1134

2002/01/1212

2002/01/1318

...

 

现在需要查询出每年最高气温的那一天,即最终显示的结果为:

2001/01/1430

2002/01/1134

...

 

1:排序输出

由于是按年进行统计,所以,我们可以使用年(yyyy)做为key进行排序处理。我们只需要按气温value倒序显示,并在Reducer中只输出key,不处理Iterable<..>的记录即可以获取最高的记录。

为了可以保存日期和气温的所有信息,我们可以声明一个JavaBean用于保存上面解析出来的气温信息。以下是JavaBean的源代码,它实现了WritableComparable接口类:

类名:Temperature。源代码如下,以下代码省去了getterssetters方法:

package cn.wangjian.mapreduce;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
 * 开发封装每一天气温的JavaBean
 * @author wangjian
 * @version 1.0 2018/5/30 0030
 */
public class Temperature implements WritableComparable<Temperature> {
    private String year;//可以使用int类型,也可以使用String类型
    private String date;
    private int temperature;
    /**
     * 在比较的代码中,我们先按年进行正序的比较,再按气温进行倒序的比较,这样气温最高的就会在第一行
     * 如果这儿只按某一个值进行了比较,还可以通过setSortComparatorClass修改这儿的排序规则
     */
    @Override
    public int compareTo(Temperature o) {
        int comp = this.year.compareTo(o.year);
        if (comp == 0) {
            comp = -(this.temperature - o.temperature);//注意前面取了反值,即倒序
        }
        return comp;
    }
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(year);
        out.writeUTF(date);
        out.writeInt(temperature);
    }
    @Override
    public void readFields(DataInput in) throws IOException {
        year = in.readUTF();
        date = in.readUTF();
        temperature = in.readInt();
    }
    @Override
    public String toString() { //通过重写toString输出一个格式化的数据
        return year+"\t"+date+"\t"+temperature;
    }
}

 

根据上面的代码,我们已经先根据年进行了排序,又根据气温降序进行了排序。现在我们先开发好MapperReducer并开发一个驱动,查看输出的结果:

开发Mapper,类名:TerperatureMapper

通过mapper类的源代码可知,只是对输入的字符串,进行分割并封装到Temperature类中,并没有什么特别的代码。

package cn.wangjian.mapreduce;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
 * 处理接收到的数据并进行每一行的分析,然后直接输出分析并封装好的对象
 * @author wangjian
 * @version 1.0 2018/5/30 0030
 */
public class TemperatureMapper extends Mapper<LongWritable,Text,Temperature,NullWritable> {
    private Temperature key2 = new Temperature();
    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] strs = value.toString().split("\\s+");
        String year = strs[0].substring(0,4);//取出年
        int airTemp = Integer.parseInt(strs[1]);
        key2.setYear(year);
        key2.setTemperature(airTemp);
        key2.setDate(strs[0]);
        context.write(key2,NullWritable.get());
    }
}

 

开发Reducer,其源代码如下:

通过查看Reducer的源代码,可知,并没有处理value部分的数据,只是将key中的数据直接输出。

package cn.wangjian.mapreduce;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
 * @author wangjian
 * @version 1.0 2018/5/30 0030
 */
public class TemperatureReducer extends Reducer<Temperature,NullWritable,Temperature,NullWritable> {
    @Override
    public void reduce(Temperature key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        //接收到数据以后直接输出
        context.write(key,NullWritable.get());
    }
}

 

现在开发驱动,先暂时输出查看输出的数据格式:

package cn.wangjian.mapreduce;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
 * 辅助排序
 * @author wangjian
 * @version 1.0 2018/5/30 0030
 */
public class TemperatureMR extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        if(args.length!=2){
            System.out.println("usage : input output");
            return -1;
        }
        Configuration config = getConf();
        FileSystem fs = FileSystem.get(config);
        Path dest = new Path(args[1]);
        if(fs.exists(dest)){
            fs.delete(dest,true);
        }
        Job job = Job.getInstance(config,"辅助排序");
        job.setJarByClass(getClass());
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.addInputPath(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,dest);
        job.setMapperClass(TemperatureMapper.class);
        job.setMapOutputKeyClass(Temperature.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setReducerClass(TemperatureReducer.class);
        job.setOutputKeyClass(Temperature.class);
        job.setOutputValueClass(NullWritable.class);
        int code = job.waitForCompletion(true)?0:1;
        return code;
    }
    public static void main(String[] args) throws Exception {
        int code = ToolRunner.run(new TemperatureMR(),args);
        System.exit(code);
    }
}

 

执行上面的MR驱动程序,输出以下结果:

通过结果,可以看出,已经按年进行了排序,且也已经根据气温的降序进行了排序。

20012001/01/1430

20012001/01/1123

20012001/01/1318

20012001/01/1215

20022002/01/1134

20022002/01/1318

20022002/01/1212

 

2:开发分区类

现在我们需要开发一个Partitioner,根据年,以便于将相同的年放到同一个分区中处理。这样每一个分区中的第一行,就是本年气温最高的记录。然后在自己开发的Reducerreduce方法中,只需要输出key就可以了。

现在发开发分区,源代码如下:

package cn.wangjian.mapreduce;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
/**
 * @author wangjian
 * @version 1.0 2018/5/30 0030
 */
public class TemperaturePartitioner extends Partitioner<Temperature, NullWritable> {
    @Override
    public int getPartition(Temperature temperature, NullWritable nullWritable, int numPartitions) {
        return temperature.getYear().hashCode() % numPartitions;
    }
}

 

 

3:开发分组规则

在到达Reducerreduce方法之前,会使用以下方法进行数据的分组合并,现在修改这个分组的策略:

package cn.wangjian.mapreduce;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
 * 指定用于分组的类
 * @author wangjian
 * @version 1.0 2018/5/30 0030
 */
public class TemperatureGrouping extends WritableComparator {
    public TemperatureGrouping(){
        super(Temperature.class,true);
    }
    /**
     * 注意重写的方法接收的参数
     */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        //根据名称来进行分组所以比较名称
        Temperature t1 = (Temperature) a;
        Temperature t2 = (Temperature) b;
        return t1.getYear().compareTo(t2.getYear());//比较年
    }
}

 

4:注册这两个类

job.setPartitionerClass(TemperaturePartitioner.class);
job.setGroupingComparatorClass(TemperatureGrouping.class);

 

5:然后启动MR测试

输出以下结果,即为每一年,气温最高的那一天的记录:

20012001/01/1430

20022002/01/1134

 

小结:

1:使用自定义WritableComparableJavaBean并在compareTo方法中实现自己比较的规则。

2:如果然后修改规则,可以再另开发一个WritableComparator的子类。并通过job.setSortComparatorClass修改javaBean中的比较的规则 。

3:开发类实现WritableComparator用于指定分组的规则。然后通过job.setGroupingComparatorClass进行注册 。

4:其他:setOutputKeyComparatorClass,0.20以后为:setSortComparatorClass<br>

 * setOutputValueGroupingComparator,0.20以后为:setGroupingComparatorClass<br>


以上是关于hadoop MapReduce之辅助排序的主要内容,如果未能解决你的问题,请参考以下文章

大数据之Hadoop(MapReduce):Partition之WritableComparable排序

hadoop之MapReduce的案例(排序最大值)

大数据之Hadoop(MapReduce):WritableComparable排序案例实操(全排序)

Hadoop MapReduce编程 API入门系列之二次排序

大数据之Hadoop(MapReduce):WritableComparable排序案例实操(区内排序)

hadoop mapreduce 分桶