MapReduce的WritableComparable 排序
Posted NC_NE
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce的WritableComparable 排序相关的知识,希望对你有一定的参考价值。
一、排序概述
排序是MapReduce框架中最重要的操作之一。
MapTask和ReduceTask阶段均会对数据按 照key进行排序,该操作属于Hadoop的默认行为。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
二、排序分类(mapreduce流程中的排序)
(1)部分排序:MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序(分区内排序,环形缓冲区spill前)。
(2)全排序:最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构。
(3)辅助排序:(GroupingComparator分组) 在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
(4)二次排序:在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
三、MapTask阶段排序
对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序(按照key的索引以字典顺序进行快速排序),并将这些有序数 据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序(将所有溢写文件归并排序成一个大文件)。
四、ReduceTask阶段排序
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
五、自定义排序
原理:实现 WritableComparable 接口重写 compareTo 方法,就可以实现排序。
5.1、WritableComparable 排序案例实操(全排序)
(1)需求:对序列化案例产生的结果再次对总流量进行倒序排序
(2)需求分析:
(3)代码实现:
1、FlowBean类实现WritableComparable接口,重写compareTo 方法
public class FlowBean implements WritableComparable<FlowBean> {
private Long upFlow;
private Long downFlow;
private Long sumFlow;
public Long getUpFlow() {
return upFlow;
}
public void setUpFlow(Long upFlow) {
this.upFlow = upFlow;
}
public Long getDownFlow() {
return downFlow;
}
public void setDownFlow(Long downFlow) {
this.downFlow = downFlow;
}
public Long getSumFlow() {
return sumFlow;
}
public void setSumFlow(Long sumFlow) {
this.sumFlow = sumFlow;
}
public void setSumFlow() {
this.sumFlow = this.upFlow + this.downFlow;
}
//空参构造
public FlowBean() {
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(upFlow);
dataOutput.writeLong(downFlow);
dataOutput.writeLong(sumFlow);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
upFlow = dataInput.readLong();
downFlow = dataInput.readLong();
sumFlow = dataInput.readLong();
}
@Override
public String toString() {
return upFlow + "\\t" + downFlow + "\\t" + sumFlow ;
}
// 按照总流量大小,倒序排列
@Override
public int compareTo(FlowBean o) {
if (this.sumFlow > o.sumFlow){
return -1;
}else if (this.sumFlow < o.sumFlow){
return 1;
}else{
//二次排序
if (this.upFlow > o.upFlow){
return -1;
}else if (this.upFlow < o.upFlow){
return 1;
}else{
return 0;
}
}
}
}
2、编写 Mapper 类
/**
* @author oyl
* @create 2021-06-05 16:46
* @Description
* 需要对总流量进行排序,所以总量了必须是key
*/
public class FlowMapper extends Mapper<LongWritable, Text,FlowBean,Text> {
private Text outv = new Text();
private FlowBean outk = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取一行数据
String line = value.toString();
//切割
String[] splits = line.split("\\t");
String phone = splits[0];
outv.set(phone);
outk.setUpFlow(Long.parseLong(splits[1]));
outk.setDownFlow(Long.parseLong(splits[2]));
outk.setSumFlow();
context.write(outk,outv);
}
}
3、编写 Reducer 类
/**
* @author oyl
* @create 2021-06-05 17:06
* @Description
*/
public class FlowReducer extends Reducer<FlowBean,Text,Text, FlowBean> {
@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//遍历 values 集合,循环写出,避免总流量相同的情况
for (Text value : values) {
context.write(value,key);
}
}
}
4、编写 Driver 类,注意输入的文件是序列化的数据总结果
public class FlowDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1、获取job
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
//2、设置job
job.setJarByClass(FlowDriver.class);
//3、关联mapper 和reducer
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
//4、设置mapper输出key和value类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);
//5、设置最终结果输出的key和value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
//6、设置数据的输入和输出路径
//上一个MapReduce的输出
FileInputFormat.setInputPaths(job,new Path("C:\\\\Users\\\\oyl\\\\Desktop\\\\HADOOP\\\\output3"));
FileOutputFormat.setOutputPath(job,new Path("C:\\\\Users\\\\oyl\\\\Desktop\\\\HADOOP\\\\output6"));
//7、提交job
boolean resoult = job.waitForCompletion(true);
System.exit(resoult ? 0 : 1);
}
}
3、运行结果
5.2WritableComparable 排序案例实操(区内排序)
(1)需求:要求每个省份手机号输出的文件中按照总流量内部排序。
(2)需求分析:基于5.1案例,增加自定义分区类,分区按照省份手机号设置
(3)代码实现:
1、增加自定义分区类MyPartitioner2
public class MyPartitioner2extends Partitioner<FlowBean, Text> {
@Override
public int getPartition(FlowBean flowBean, Text text, int numPartitions) {
int partitions = 0;
//获取手机号的前三位
String prePheno = text.toString().substring(0,3);
if ("136".equals(prePheno)){
partitions = 0;
}else if ("137".equals(prePheno)){
partitions = 1;
}else if ("138".equals(prePheno)){
partitions = 2;
}else if ("139".equals(prePheno)){
partitions = 3;
}else{
partitions = 4;
}
return partitions;
}
}
2、 Dirver代码设置设置自定义MyPartitioner2类和分区个数
//8 指定自定义分区器
job.setPartitionerClass(MyPartitioner2.class);
//9 同时指定相应数量的 ReduceTask
job.setNumReduceTasks(5);
3、运行结果
五个文件妥妥的
区内数据也都是有序的
以上是关于MapReduce的WritableComparable 排序的主要内容,如果未能解决你的问题,请参考以下文章
MapReduce入门—— MapReduce概述 + WordCount案例实操