打怪升级之小白的大数据之旅(五十)<MapReduce框架原理二:shuffle>
Posted GaryLea
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了打怪升级之小白的大数据之旅(五十)<MapReduce框架原理二:shuffle>相关的知识,希望对你有一定的参考价值。
打怪升级之小白的大数据之旅(五十)
MapReduce框架原理二:shuffle
上次回顾
上一章。我们学习了Hadoop中MapReduce框架原理中的工作流程以及InputFormat模块,本章我为大家带来shuffle
shuffle机制
shuffle工作机制
- 在上一章开篇MapReduce框架整体认知中,我们了解了MapReduce的模块构成,当时我只是提了一下shuffle,它的工作内容是map阶段的后半段以及reduce的前半段
- Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle
shuffle的工作流程如下:
- 上面这张图,就是shuffle的工作内容,当我们的数据通过map方法之后,数据会进入到环形缓冲区中
- 数据中包含了key,value和分区号,分区号=hashcode(key)%reduceTask的数量
- 环形缓冲区底层就是数组,它里面会存储数据和索引,并预留20%的位置用于写入数据,其他80%位置它会将数据进行分区,然后排序
- 分区就是在map进入环形缓冲区之前进行,会对数据分配一个分区号,环形缓冲区通过这个分区号进行分区,排序就是对该分区中的数据进行排序,排序是按照快速排序进行的
- 当数据写满了80%就会对数据进行排序,排序后会将数据以文件形式写入到磁盘上
- 接着会对磁盘上的文件进行合并,然后进行归并排序,因为前面排过序,所以速度会很快
- 归并排序后的数据会交给reduce
- resuce会将数据进行拷贝到内存中,如果内存不够就溢出写入到磁盘上,然后对每个map来的数据进行归并排序,按照相同的key进行分组
- 分组之后就会调用reduce方法做下一步处理
partition分区
- 在前面我们知道,数据通过map后,进入环形缓冲区时,会为数据分配分区号,进入环形缓冲区后,对数据进行分区
- 默认分区是根据key的hashcode值然后对reduceTask数量进行取模得到的,用户没有办法控制key存储到哪个分区
- 所以我们需要进行自定义分区
自定义分区
语法步骤:
-
自定义类继承自Partitioner,重写Partition方法
public class CustomPartitioner extends Partitioner<key,value>{ @Override public int getPartition(Text text, FlowBean flowBean, int i) { String phone = text.toString(); }
-
在Job驱动中,设置自定义的Partitioner
job.setPartitionerClass(MyFlowPartition.class);
-
自定义Partition后,还需要根据自定的分区逻辑设置相应数量的ReduceTask
job.setNumReduceTasks(5);
Partition分区案例实操
-
需求
- 还是前面序列化的需求,统计下面数据中的上行、下行总流量
- 然后将下面测试数据中的手机号按照136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中
-
测试数据
1 13736230513 192.196.100.1 www.company.com 2481 24681 200 2 13846544121 192.196.100.2 264 0 200 3 13956435636 192.196.100.3 132 1512 200 4 13966251146 192.168.100.1 240 0 404 5 18271575951 192.168.100.2 www.company.com 1527 2106 200 6 84188413 192.168.100.3 www.atguigu.com 4116 1432 200 7 13590439668 192.168.100.4 1116 954 200 8 15910133277 192.168.100.5 www.hao123.com 3156 2936 200 9 13729199489 192.168.100.6 240 0 200 10 13630577991 192.168.100.7 www.shouhu.com 6960 690 200 11 15043685818 192.168.100.8 www.baidu.com 3659 3538 200 12 15959002129 192.168.100.9 www.company.com 1938 180 500 13 13560439638 192.168.100.10 918 4938 200 14 13470253144 192.168.100.11 180 180 200 15 13682846555 192.168.100.12 www.qq.com 1938 2910 200 16 13992314666 192.168.100.13 www.gaga.com 3008 3720 200 17 13509468723 192.168.100.14 www.qinghua.com 7335 110349 404 18 18390173782 192.168.100.15 www.sogou.com 9531 2412 200 19 13975057813 192.168.100.16 www.baidu.com 11058 48243 200 20 13768778790 192.168.100.17 120 120 200 21 13568436656 192.168.100.18 www.alibaba.com 2481 24681 200 22 13568436656 192.168.100.19 1116 954 200
-
编写思路
- 还是上一章中序列化的案例实操,统计流量,这个流程就不再赘述了,这个案例多了一个按手机号的分区,我们直接定义一个Paritioner类,然后在Driver中注册一下,并定义好ReduceTask数量就好了
编写代码如下:
-
FlowBean.java
package com.company.partition1; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /* 自定义一个类并实现Hadoop的序列化框架 步骤: 1.自定义类并实现Writable接口 2.重写wrtie和readFileds方法 3.注意:读取数据时必须和写的顺序保持一致 */ public class FlowBean implements Writable { private long upFlow; private long downFlow; private long sumFlow; public FlowBean() { } public FlowBean(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } /* 序列化时调用的方法 */ public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } /* 反序列化时调用的方法 */ public void readFields(DataInput in) throws IOException { //注意:读取数据时必须和写的顺序保持一致 upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } @Override public String toString() { return upFlow + " " + downFlow + " " + sumFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } }
-
FlowDriver.java
package com.company.partition; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FlowDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration()); /* 默认情况下 : 分区数量 = ReduceTask的数量 (正常情况下) 分区数量 < ReduceTask的数量 : 会浪费资源--多出来的ReduceTask没有数据可以处理 分区数量 > ReduceTask的数量 :会报错 */ //设置ReduceTask的数量 job.setNumReduceTasks(5); //设置使用自定义的分区类 job.setPartitionerClass(MyPartitioner.class); job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job,new Path("D:\\\\io\\\\input2")); FileOutputFormat.setOutputPath(job,new Path("D:\\\\io\\\\output2")); job.waitForCompletion(true); } }
-
FlowMapper.java
package com.company.partition; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /* 作用 :用来实现需要在MapTask中实现的功能 Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>: 两组: 第一组: KEYIN :读取数据的偏移量的类型 VALUEIN : 读取的一行一行的内容的类型 第二组: KEYOUT : 写出的Key的类型(在这指的是手机号) VALUEOUT :写出的Value的类型(在这指的是存储流量数据的对象) */ public class FlowMapper extends Mapper<LongWritable,Text, Text, FlowBean> { /** * 在map方法中去实现需要在MapTask中实现的功能 * 注意:map方法在被循环调用(MR框架-MapTask程序) * 每调用一次就会传入一行内容 * @param key :读取数据的偏移量 * @param value :读取的一行一行的内容 * @param context :上下文(在这用来通过上下文将K,V写出) * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //1.将内容进行切割 String[] line = value.toString().split("\\t"); //2.封装K,V Text outKey = new Text(line[1]); FlowBean outValue = new FlowBean(Long.parseLong(line[line.length - 3]), Long.parseLong(line[line.length - 2])); //3.写出K,V context.write(outKey,outValue); } }
-
FlowReducer.java
package com.company.partition; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /* 作用 :用来实现需要在RedcueTask中实现的功能 Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 两组: 第一组: KEYIN :读取的Key的类型(Mapper输出的Key的类型) VALUEIN :读取的Value的类型(Mapper输出的Value的类型) 第二组: KEYOUT :输出的Key的类型(在这指的是单词) VALUEOUT :输出的Value的类型(在这指的是单词的数量) */ public class FlowReducer extends Reducer<Text, FlowBean,Text, FlowBean> { /** * 在reduce方法中去实现需要在ReduceTask中实现的功能 * 注意::reduce方法在被循环调用(MR框架-ReduceTask程序) * 每调用一次就会传入一组数据(key值相同为一组) * @param key :读取的key值 * @param values :一组数据中所有的value * @param context : 上下文(在这用来将K,V写出去) * @throws IOException * @throws InterruptedException * * key value * 153211111111 100 200 300 * 153211111111 80 20 100 * 153211111111 80 20 100 */ @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { long sumUpFlow = 0;//总上行流量 long sumDowFlow = 0;//总下行流量 //遍历所有的value for (FlowBean value : values) { sumUpFlow += value.getUpFlow(); sumDowFlow += value.getDownFlow(); } //2.封装K,V FlowBean outValue = new FlowBean(sumUpFlow, sumDowFlow); //3.写出K,V context.write(key,outValue); } }
-
MyPartitioner.java
package com.company.partition; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; /* Partitioner<KEY, VALUE> KEY : map输出的key的类型 VALUE : map输出的value的类型 */ public class MyPartitioner extends Partitioner<Text,FlowBean> { /** * 返回分区号 * @param text map输出的key * @param flowBean map输出的value * @param numPartitions ReduceTask的数量 * @return */ public int getPartition(Text text, FlowBean flowBean, int numPartitions) { String phoneNumber = text.toString(); //判断手机号 if (phoneNumber.startsWith("136")){ return 0; }else if (phoneNumber.startsWith("137")){ return 1; }else if (phoneNumber.startsWith("138")){ return 2; }else if (phoneNumber.startsWith("139")){ return 3; }else{ return 4; } } }
WritableComparable排序
- 排序是MapReduce中最重要的操作之一
- 具体的排序流程我在本章的开篇讲过了,MapTask和ReduceTask都会对数据按照key进行排序,默认的排序是按照字典的顺序进行排序,且实现排序的方法是快排
- 排序的思想就是在Java学习map集合时,我们学习的comparable和comparator,忘了的小伙伴可以回看前面集合的内容哈
- 在我们的实际需求中,同样需要自定义排序,自定义排序需要实现WritableComparable接口,下面我们再次根据手机流量的案例进行排序的使用
WritableComparable案例实操
在前面Partition分区的基础上,对数据按照手机的总流量进行排序,我们只需要修改一下我们的FlowBean即可
- FlowBean.java
package com.company.comparable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /* 自定义类的对象可以作为key进行排序: 要求: 1.自定义的类实现WritableComparable 2.WritableComparable<T> 继承了 Writable接口和Comparable接口 3.重写compareTo方法,readFileds方法,write方法 */ public class FlowBean implements WritableComparable<FlowBean> { private long upFlow; private long downFlow; private long sumFlow; public FlowBean() { } public FlowBean(long upFlow, long downFlow,long sumFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = sumFlow; } /* 指定按照哪个字段进行排序 */ public int compareTo(FlowBean o) { //按照sumFlow排序 return Long.compare(this.sumFlow,o.sumFlow); } /* 序列化时调用的方法 */ public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } /* 反序列化时调用的方法 */ public void readFields(DataInput in) throws IOException { //注意:读取数据时必须和写的顺序保持一致 upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } @Override public String toString() { return upFlow + " " + downFlow + " " + sumFlow; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } }
Combiner合并
-
Combiner是MR程序中Mapper和Reducer之外的一个组件
-
Combiner的父类就是Reduce
-
它的作用就是在Reduce之前进行数据的合并,它会对每一个MapTask的输出进行局部汇总,这样就可以减小网络传输的量
-
举个栗子
-
还是以我们前面的分芝麻案例举例,假设我们的芝麻有上千个分区,即
黑芝麻: 1 2 100 200 1000 50 60 ..... 一千个
-
此时我们的Reduce工作量就会比较大,它会将每一个分区进行挨个的合并,如果我们使用Combiner在MapTask输出时就对数据进行合并,那么Reduce的工作量就会少很多
-
当然了,我们要注意,不是所有的地方都可以使用Combiner,它的应用场景就是用在不会对最终结果产生变化的场景下使用,这个很好理解,因为Combiner是在Reduce之前调用的,如果Combiner对数据进行了局部汇总,那么Reduce再次进行数据汇总时就有可能出现数据的偏差,举个栗子:
Combiner合并实例
以前面我们的写的wordcount举例
Combiner的实现有两种方式:
-
第一种
- 增加一个自定义的Combiner类,继承自Reducer
- 在自定义Combiner类中进行数据的汇总,然后输出统计结果
// 指定需要使用combiner,以及用哪个类作为combiner的逻辑 job.setCombinerClass(WordcountCombiner.class);
-
第二种将Reducer类作为Combiner在Driver驱动类中指定
//设置Combiner的类 job.setCombinerClass(WordcountReducer.class);
我们继续通过一个实例来说明Combiner的用法,第二种直接就是将Reducer传入到job驱动中,我就不演示了,我就演示第一种,自定义的方法
编码步骤:
前面wordcount的代码基础上,修改一下Driver类,添加一个WcCombiner自定义合并类
-
WCCombiner.java
package com.company.combiner; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class WCCombiner extends Reducer<Text, LongWritable以上是关于打怪升级之小白的大数据之旅(五十)<MapReduce框架原理二:shuffle>的主要内容,如果未能解决你的问题,请参考以下文章
打怪升级之小白的大数据之旅(五十六)<Zookeeper内部原理>
打怪升级之小白的大数据之旅(五十)<MapReduce框架原理二:shuffle>
打怪升级之小白的大数据之旅(五十九)<Hadoop优化方案>
打怪升级之小白的大数据之旅(五十四)<Zookeeper概述与部署>