MapReduce的WritableComparable 排序

Posted NC_NE

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce的WritableComparable 排序相关的知识,希望对你有一定的参考价值。

一、排序概述

排序是MapReduce框架中最重要的操作之一。

MapTask和ReduceTask阶段均会对数据按 照key进行排序,该操作属于Hadoop的默认行为。

默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

二、排序分类(mapreduce流程中的排序)

(1)部分排序:MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序(分区内排序,环形缓冲区spill前)。

(2)全排序:最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。

(3)辅助排序:(GroupingComparator分组) 在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。

(4)二次排序:在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序

三、MapTask阶段排序

        对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序(按照key的索引以字典顺序进行快速排序),并将这些有序数 据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序(将所有溢写文件归并排序成一个大文件)。

四、ReduceTask阶段排序

        对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序

五、自定义排序

原理:实现 WritableComparable 接口重写 compareTo 方法,就可以实现排序。

5.1、WritableComparable 排序案例实操(全排序)

(1)需求:对序列化案例产生的结果再次对总流量进行倒序排序

(2)需求分析:

(3)代码实现:

        1、FlowBean类实现WritableComparable接口,重写compareTo 方法

public class FlowBean implements WritableComparable<FlowBean> {

    private Long upFlow;
    private Long downFlow;
    private Long sumFlow;

    public Long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(Long upFlow) {
        this.upFlow = upFlow;
    }

    public Long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(Long downFlow) {
        this.downFlow = downFlow;
    }

    public Long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(Long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public void setSumFlow() {
        this.sumFlow = this.upFlow + this.downFlow;
    }

    //空参构造
    public FlowBean() {
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        upFlow = dataInput.readLong();
        downFlow = dataInput.readLong();
        sumFlow = dataInput.readLong();
    }
    
    @Override
    public String toString() {
        return  upFlow + "\\t" + downFlow + "\\t" + sumFlow ;
    }

    // 按照总流量大小,倒序排列
    @Override
    public int compareTo(FlowBean o) {
        if (this.sumFlow > o.sumFlow){
            return -1;
        }else if (this.sumFlow < o.sumFlow){
            return 1;
        }else{
            //二次排序
            if (this.upFlow > o.upFlow){
                return -1;
            }else if (this.upFlow < o.upFlow){
                return 1;
            }else{
                return 0;
            }
        }
    }
}

        2、编写 Mapper 类

/**
 * @author oyl
 * @create 2021-06-05 16:46
 * @Description
 * 需要对总流量进行排序,所以总量了必须是key
 */
public class FlowMapper extends Mapper<LongWritable, Text,FlowBean,Text> {

    private Text outv = new Text();
    private FlowBean outk = new FlowBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //获取一行数据
        String line = value.toString();

        //切割
        String[] splits = line.split("\\t");

        String phone = splits[0];
        outv.set(phone);

        outk.setUpFlow(Long.parseLong(splits[1]));
        outk.setDownFlow(Long.parseLong(splits[2]));
        outk.setSumFlow();

        context.write(outk,outv);
    }
}

        3、编写 Reducer 类

/**
 * @author oyl
 * @create 2021-06-05 17:06
 * @Description
 */
public class FlowReducer extends Reducer<FlowBean,Text,Text, FlowBean> {


    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        //遍历 values 集合,循环写出,避免总流量相同的情况
        for (Text value : values) {
            context.write(value,key);
        }
    }
}

        4、编写 Driver 类,注意输入的文件是序列化的数据总结果

public class FlowDriver {
    
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //1、获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //2、设置job
        job.setJarByClass(FlowDriver.class);

        //3、关联mapper 和reducer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        //4、设置mapper输出key和value类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        //5、设置最终结果输出的key和value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //6、设置数据的输入和输出路径
        //上一个MapReduce的输出
        FileInputFormat.setInputPaths(job,new Path("C:\\\\Users\\\\oyl\\\\Desktop\\\\HADOOP\\\\output3"));
        FileOutputFormat.setOutputPath(job,new Path("C:\\\\Users\\\\oyl\\\\Desktop\\\\HADOOP\\\\output6"));

        //7、提交job
        boolean resoult = job.waitForCompletion(true);
        System.exit(resoult ? 0 : 1);
    }
}

        3、运行结果

5.2WritableComparable 排序案例实操(区内排序)

(1)需求:要求每个省份手机号输出的文件中按照总流量内部排序。

(2)需求分析:基于5.1案例,增加自定义分区类,分区按照省份手机号设置

(3)代码实现:

        1、增加自定义分区类MyPartitioner2

public class MyPartitioner2extends Partitioner<FlowBean, Text> {

    @Override
    public int getPartition(FlowBean flowBean, Text text, int numPartitions) {

        int partitions = 0;
        //获取手机号的前三位
        String prePheno = text.toString().substring(0,3);

        if ("136".equals(prePheno)){
            partitions = 0;
        }else if ("137".equals(prePheno)){
            partitions = 1;
        }else if ("138".equals(prePheno)){
            partitions = 2;
        }else if ("139".equals(prePheno)){
            partitions = 3;
        }else{
            partitions = 4;
        }

        return partitions;
    }
}

        2、 Dirver代码设置设置自定义MyPartitioner2类和分区个数

//8 指定自定义分区器
job.setPartitionerClass(MyPartitioner2.class);

//9 同时指定相应数量的 ReduceTask
job.setNumReduceTasks(5);

        3、运行结果

                五个文件妥妥的

        区内数据也都是有序的

以上是关于MapReduce的WritableComparable 排序的主要内容,如果未能解决你的问题,请参考以下文章

MapReduce入门—— MapReduce概述 + WordCount案例实操

MapReduce mapreduce基础入门

MapReduce(6)特性

[hadoop]怎么把两个mapreduce工程合起来

什么是MapReduce?MapReduce整体架构搭建使用介绍

大数据之Hadoop(MapReduce): MapReduce概述