Hadoop中的MapReduce框架原理WritableComparable排序排序分类WritableComparable排序案例实操(全排序)(二次排序)

Posted Redamancy_06

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop中的MapReduce框架原理WritableComparable排序排序分类WritableComparable排序案例实操(全排序)(二次排序)相关的知识,希望对你有一定的参考价值。

文章目录

13.MapReduce框架原理

13.3 Shuffle机制

13.3.4 WritableComparable排序

13.3.4.1 排序概述

  排序是MapReduce框架中最重要的操作之一。
  MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
  默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
  对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
  对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

13.3.4.2 排序分类

(1)部分排序
  MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。
(2)全排序
  最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
(3)辅助排序:(GroupingComparator分组)
  在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
(4)二次排序
  在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

13.3.4.3 自定义排序 WritableComparable 原理分析

bean 对象做为 key 传输,需要实现 WritableComparable 接口重写 compareTo 方法,就可以实现排序。

@Override
public int compareTo(FlowBean bean) 

	int result;
		
	// 按照总流量大小,倒序排列
	if (this.sumFlow > bean.getSumFlow()) 
		result = -1;
	else if (this.sumFlow < bean.getSumFlow()) 
		result = 1;
	else 
		result = 0;
	

	return result;

13.3.5 WritableComparable排序案例实操(全排序)

13.3.5.1 需求

根据案例https://blog.csdn.net/Redamancy06/article/details/126479101序列化案例产生的结果再次对总流量进行倒序排序。

13.3.5.1.1 输入数据

1	13736230513	192.196.100.1	www.baidu.com	2481	24681	200
2	13846544121	192.196.100.2			264	0	200
3 	13956435636	192.196.100.3			132	1512	200
4 	13966251146	192.168.100.1			240	0	404
5 	18271575951	192.168.100.2	www.baidu.com	1527	2106	200
6 	84188413	192.168.100.3	www.baidu.com	4116	1432	200
7 	13590439668	192.168.100.4			1116	954	200
8 	15910133277	192.168.100.5	www.hao123.com	3156	2936	200
9 	13729199489	192.168.100.6			240	0	200
10 	13630577991	192.168.100.7	www.shouhu.com	6960	690	200
11 	15043685818	192.168.100.8	www.baidu.com	3659	3538	200
12 	15959002129	192.168.100.9	www.baidu.com	1938	180	500
13 	13560439638	192.168.100.10			918	4938	200
14 	13470253144	192.168.100.11			180	180	200
15 	13682846555	192.168.100.12	www.qq.com	1938	2910	200
16 	13992314666	192.168.100.13	www.gaga.com	3008	3720	200
17 	13509468723	192.168.100.14	www.qinghua.com	7335	110349	404
18 	18390173782	192.168.100.15	www.sogou.com	9531	2412	200
19 	13975057813	192.168.100.16	www.baidu.com	11058	48243	200
20 	13768778790	192.168.100.17			120	120	200
21 	13568436656	192.168.100.18	www.alibaba.com	2481	24681	200
22 	13568436656	192.168.100.19			1116	954	200

第一次处理后的数据

13470253144	180	180	360
13509468723	7335	110349	117684
13560439638	918	4938	5856
13568436656	3597	25635	29232
13590439668	1116	954	2070
13630577991	6960	690	7650
13682846555	1938	2910	4848
13729199489	240	0	240
13736230513	2481	24681	27162
13768778790	120	120	240
13846544121	264	0	264
13956435636	132	1512	1644
13966251146	240	0	240
13975057813	11058	48243	59301
13992314666	3008	3720	6728
15043685818	3659	3538	7197
15910133277	3156	2936	6092
15959002129	1938	180	2118
18271575951	1527	2106	3633
18390173782	9531	2412	11943
84188413	4116	1432	5548
13.3.5.1.2 期望输出数据

13509468723 7335 110349 117684
13736230513 2481 24681 27162
13956435636 132 1512 1644
13846544121 264 0 264
。。。 。。。

13.3.5.2 需求分析

13.3.5.3 代码实现

创建一个writableComparable的文件夹,将writable里面4个java代码同时复制到writableComparable里面

13.3.5.3.1 FlowBean对象在在需求1基础上增加了比较功能

package com.summer.mapreduce.writableComparable;

/**
 * @author Redamancy
 * @create 2022-08-23 11:26
 */

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * 1、定义类实现writable接口
 * 2、重写序列化和反序列化
 * 3、重写空参构造
 * 4、toString方法
 */
public class FlowBean implements WritableComparable<FlowBean> 
    private long upFlow;//上行流量
    private long downFlow;//下行流量
    private long sumFlow;//总流量

    //空参构造
    public FlowBean() 
    

    public long getUpFlow() 
        return upFlow;
    

    public void setUpFlow(long upFlow) 
        this.upFlow = upFlow;
    

    public long getDownFlow() 
        return downFlow;
    

    public void setDownFlow(long downFlow) 
        this.downFlow = downFlow;
    

    public long getSumFlow() 
        return sumFlow;
    

    public void setSumFlow(long sumFlow) 
        this.sumFlow = sumFlow;
    

    public void setSumFlow() 
        this.sumFlow = this.downFlow + this.upFlow;
    

    @Override
    public void write(DataOutput dataOutput) throws IOException 

        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);

    

    @Override
    public void readFields(DataInput dataInput) throws IOException 

        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();

    

    @Override
    public String toString() 
        return  upFlow + "\\t" + downFlow + "\\t" + sumFlow;
    

    @Override
    public int compareTo(FlowBean o) 

        //按照总流量比较,倒序排序
        if(this.sumFlow > o.sumFlow)
            return -1;
        else if (this.sumFlow < o.sumFlow)
            return 1;
        else 
            return 0;
        
    


13.3.5.3.2 编写Mapper类
package com.summer.mapreduce.writableComparable;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author Redamancy
 * @create 2022-08-23 22:40
 */
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> 

    private FlowBean outK = new FlowBean();
    private Text outV = new Text();

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context) throws IOException, InterruptedException 
        //1 获取一行数据
        String line = value.toString();

        //2 按照 "\\t" ,切割数据
        String[] split = line.split("\\t");

        //3 封装outK outV
        outK.setUpFlow(Long.parseLong(split[1]));
        outK.setDownFlow(Long.parseLong(split[2]));
        outK.setSumFlow();
        outV.set(split[0]);


        //4 写出outK outV
        context.write(outK, outV);
    


13.3.5.3.3 编写Reducer类

`package com.summer.mapreduce.writableComparable;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**

  • @author Redamancy

  • @create 2022-08-23 22:58
    */
    public class FlowReducer extends Reducer<FlowBean, Text, Text, FlowBean>
    @Override
    protected void reduce(FlowBean key, Iterable values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException

     //遍历values集合,循环写出,避免总流量相同的情况
     for (Text value : values) 
         //调换KV位置,反向写出
         context.write(value, key);
    
     
    



    `

13.3.5.3.4 编写Driver类


用序列化案例产生的结果当输入路径

package com.summer.mapreduce.writableComparable;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author Redamancy
 * @create 2022-08-23 23:05
 */
public class FlowDriver 
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException 
        //1 获取job
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        //2 设置jar包路径
        job.setJarByClass(FlowDriver.class);

        //3 关联mapper和reduccer
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        //4 设置map输出的kv类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        //5 设置最终输出的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //6 设置输入路径和输出路径
        FileInputFormat.setInputPaths(job, new Path("D:\\\\Acode\\\\Hadoop\\\\output\\\\outputFlowphonedata"));
        //输出的路径为空,要是有该文件,则会报错
        FileOutputFormat.setOutputPath(job, new Path("D:\\\\Acode\\\\Hadoop\\\\output\\\\outputFlowphonedata1"));

        //7 提交job
        boolean result = job.waitForCompletion(true);

        System.exit(result ? 0 : 1);
    


13.3.5.3.5 运行结果

13.3.6 WritableComparable排序案例实操(二次排序)

将上面的13.3.5WritableComparable排序案例实操(全排序)二次排序,因为他结果的最后三行总流量是相同的,所以再按上行流量正序排序

创建一个writableComparable2的文件夹,将writableComparable里面4个java代码同时复制到writableComparable里面

在上面代码的基础上加上这几行代码即可

运行结果和我预想的是一样的

以上是关于Hadoop中的MapReduce框架原理WritableComparable排序排序分类WritableComparable排序案例实操(全排序)(二次排序)的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop中的MapReduce框架原理自定义Partitioner步骤在Job驱动中,设置自定义PartitionerPartition 分区案例

Hadoop中的MapReduce框架原理切片源码断点在哪断并且介绍相关源码FileInputFormat切片源码解析总结,那些可以证明你看过切片的源码

大数据之Hadoop(MapReduce): MapReduce框架原理

Hadoop框架:MapReduce基本原理和入门案例

大数据技术之Hadoop(MapReduce)框架原理数据压缩

大数据技术之Hadoop(MapReduce)框架原理数据压缩