Hadoop学习--流量分区并排序

Posted 是渣渣呀

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hadoop学习--流量分区并排序相关的知识,希望对你有一定的参考价值。

准备文件

列名: 手机号码 上行流量 下行流量 总流量

实验内容

统计给定文件中,每一个手机号耗费的总上行流量、下行流量、总流量,手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中,并按照流量倒序排列

程序思路

每个类的作用

一、 FlowBean实体类

1. 实现Hadoop的Writable接口,并用来实现序列化及反序列化的bean对象
2. 用于存放我们感兴趣的数据(手机号,流量等)
3. 实现WritableComparable接口,用于shuffle中对key进行排序(这里是按照总流量进行倒序排列)

二、FlowSortPartitionerDriver驱动类

1. 获取Job实例并设置资源所在的类
2. 设置自定义的Mapper类,Reducer类以及分区类。
3. 指定Map输出的key,value的类型以及Reduce输出的key,value类型和reducer并行个数
4. 指定要读入的文件的位置和MapReduce后输出文件的位置

三、FlowSortMapper类

1. 获取读入文件并且经过Reader之后的数据(这里是电话及各种流量信息)
2. 对获取的value数据进行处理,并得到新的kv作为mapper的输出
2.1 这里是默认按每行读,所以map方法里先每次获取一行的数据并根据空格符截取数据,并将感兴趣的数据封装到bean对象中
2.2 然后将bean对象作为key,电话号作为value输出

四、FlowSortPartitioner分区类

1. 分区类在map函数执行context.write()时被调用
2. 获取map输出的键值对中我们感兴趣的信息(这里用的是value中电话号码的前三位)
3. 根据获取的信息将map输出的kv(键值对)分配给不同的分区

五、FlowSortReducer类

1. reducer类对shuffle后的键值对进行处理,并得到新的kv作为reducer的输出
1.1. 这里是将电话号码作为key,(其它流量等信息)bean对象作为value

完整代码

按照上面介绍的顺序

package com.zhuge.demo.pojo;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 *  1. 实现Hadoop的Writable接口,并用来实现序列化及反序列化的bean对象
 *  2. 用于存放我们感兴趣的数据(手机号,流量等)
 *  3. 实现WritableComparable接口,用于shuffle中对key进行排序(这里是按照总流量进行倒序排列)
 */
public class FlowBean implements Writable , WritableComparable 
    private long upFlow;
    private long downFlow;
    private long sumFlow;


    public FlowBean()
        super();
    

    public FlowBean(long upFlow, long downFlow) 
        super();
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    
	//重写序列化方法
    @Override
    public void write(DataOutput dataOutput) throws IOException 
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    
	//重写反序列化方法
    @Override
    public void readFields(DataInput dataInput) throws IOException 
        this.upFlow  = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();
    

    @Override
    public String toString() 
        return upFlow + "\\t" + downFlow + "\\t" + sumFlow;
    


    public long getUpFlow() 
        return upFlow;
    

    public void setUpFlow(long upFlow) 
        this.upFlow = upFlow;
    

    public long getDownFlow() 
        return downFlow;
    

    public void setDownFlow(long downFlow) 
        this.downFlow = downFlow;
    

    public long getSumFlow() 
        return sumFlow;
    

    public void setSumFlow(long sumFlow) 
        this.sumFlow = sumFlow;
    
	
	// //用于对key进行排序的比较方法,sumFlow是当前对象的总流量,o是要比较的对象,若当前流量的更大,则返回负数
    @Override
    public int compareTo(Object o) 
        return Long.compare(((FlowBean) o).getSumFlow(), sumFlow);
    

package com.zhuge.demo.flowsortpartition;

import com.zhuge.demo.flowsort.FlowSortMapper;
import com.zhuge.demo.flowsort.FlowSortReducer;
import com.zhuge.demo.pojo.FlowBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author ZCH
 * @date 2021/5/13 0013 - 上午 11:37
 *
 * 1. 获取Job实例设置资源所在的类
 * 2. 设置自定义的Mapper类,Reducer类以及分区类。
 * 3. 指定Map输出的key,value的类型以及Reduce输出的key,value类型和reducer并行个数
 * 4. 指定要读入的文件的位置和MapReduce后输出文件的位置
 *
 */
public class FlowSortPartitionerDriver 
    public static void main(String[] args) throws Exception
        Job job = Job.getInstance(new Configuration());
        //设置job中的资源所在的jar包
        job.setJarByClass(FlowSortPartitionerDriver.class);

        //设置自定义的Mapper类,Reducer类
        job.setMapperClass(FlowSortMapper.class);
        job.setReducerClass(FlowSortReducer.class);

        //指定Map输出的key,value的类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        //指定Reduce输出的key,value类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        //加载自定义分区类
        job.setPartitionerClass(FlowSortPartitioner.class);

        //设置Reducetask个数
        job.setNumReduceTasks(5);

        //指定要读入的文件的位置
        FileInputFormat.setInputPaths(job,new Path("E:\\\\IDEA-Projects\\\\Writable\\\\src\\\\work\\\\part-r-00000"));
        //指定输出文件的存放位置
        FileOutputFormat.setOutputPath(job,new Path("E:\\\\IDEA-Projects\\\\Writable\\\\src\\\\output2"));

        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    


无论我们以怎样的方式从分片中读取一条记录,每读取一条记录都会调用RecordReader类;
系统默认的RecordReader是LineRecordReader,TextInputFormat;
LineRecordReader是用每行的偏移量作为map的key,每行的内容作为map的value;(就相当于我们文件中的每行电话及流量的记录)

注:为何要将bean作为mapper输出的key?
因为:shuffle中是对mapper输出中的key进行排序,也就是这里bean对象的属性sumFlow,所以如果bean不是key的话就没法按照总流量排序了!!

package com.zhuge.demo.flowsort;

import com.zhuge.demo.pojo.FlowBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author ZCH
 * @date 2021/5/13 0013 - 上午 11:36
 *
 *   1. 获取读入文件并且经过Reader之后的数据(这里是电话及各种流量信息)
 *   2. 对获取的value数据进行处理,并得到新的kv作为mapper的输出
 *   3. 这里是默认按每行读,所以map方法里先每次获取一行的数据并根据空格符截取数据,并将感兴趣的数据封装到bean对象中
 *   4. 然后将bean对象作为key,电话号作为value输出
 */
public class FlowSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> 

    FlowBean bean = new FlowBean();
    Text v = new Text();

    /**
     *
     * 注: 这个mapper是对FlowDriver生成的文件进行Map的!!
     */
    @Override
    protected void map(LongWritable key, Text value, Context context)	throws IOException, InterruptedException 

        // 1 获取一行
        String line = value.toString();

        // 2 截取
        String[] fields = line.split("\\t");

        // 3 封装对象
        String phoneNbr = fields[0];
        long upFlow = Long.parseLong(fields[1]);
        long downFlow = Long.parseLong(fields[2]);

        bean.setUpFlow(upFlow);
        bean.setDownFlow(downFlow);
        bean.setSumFlow(upFlow+downFlow);
        v.set(phoneNbr);

        // 4 输出
        context.write(bean, v);
    

package com.zhuge.demo.flowsortpartition;

import com.zhuge.demo.pojo.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * @author ZCH
 * @date 2021/5/13 0013 - 下午 2:05
 *
 * 1. 分区类在map函数执行context.write()时被调用
 * 2. 获取map输出的键值对中的value的信息(这里是电话号码的前三位)
 * 3. 根据获取的信息将map输出的kv(键值对)分配给不同的分区
 */
public class FlowSortPartitioner extends Partitioner<FlowBean, Text> 
    @Override
    public int getPartition(FlowBean key, Text text, int i) 
        String preNum = text.toString().substring(0, 3);
        int partition = 4;
        // 2 判断是哪个省
        if ("136".equals(preNum)) 
            partition = 0;
        else if ("137".equals(preNum)) 
            partition = 1;
        else if ("138".equals(preNum)) 
            partition = 2;
        else if ("139".equals(preNum)) 
            partition = 3;
        
        return partition;
    


package com.zhuge.demo.flowsort;

import com.zhuge.demo.pojo.FlowBean;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author ZCH
 * @date 2021/5/13 0013 - 上午 11:37
 *
 * 1. reducer类对shuffle后的键值对进行处理,并得到新的kv作为reducer的输出
 * 2. 这里是将电话号码作为key,(其它流量等信息)bean对象作为value
 */
public class FlowSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> 

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context)	throws IOException, InterruptedException 
        // 循环输出,避免总流量相同情况
        for (Text text : values) 
            context.write(text, key);
        
    


运行结果


以上是关于Hadoop学习--流量分区并排序的主要内容,如果未能解决你的问题,请参考以下文章

Hadoop学习--流量分区并排序

Hadoop学习之路(十九)MapReduce框架排序

MapReduce分区和排序

大数据之Hadoop(MapReduce):WritableComparable排序案例实操(区内排序)

大数据学习之提交job流程,排序11

Hadoop之Hive的分区表