MapReduce进阶
Posted 杀智勇双全杀
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce进阶相关的知识,希望对你有一定的参考价值。
MapReduce进阶
day09
Shuffle设计思想
分组排序问题
分布式中,多个MapTask只会在自己的分区内进行区内分组与排序,无法直接把全部数据都按照全局的方式分组和排序。
Shuffle要解决的问题
分布式中全局排序和全局分组的问题。
Shuffle的实现
所有Map的结果会全部写入磁盘,在分布式磁盘中通过特殊的排序机制来实现全局排序,再由Reduce读取全局排序后的数据进行处理。
由于Shuffle过程必须经过磁盘,故只要经过shuffle,性能就比较差。
Shuffle功能
分区:默认按照K2进行Hash分区,对Map输出的数据进行标记。
排序:按照K2进行排序。
分组:按照K2进行分组,相同K2的所有V2放在一起。
Shuffle过程
Input将文件切片后按照Split个数确定MapTask个数,同时数据被转换为K1,V1键值对的形式,每个MapTask都会实例化Mapper类并调用Map方法(每条KV键值对调用1次Map方法),之后给自己输出的小文件添加分区信息(此时为K2,V2键值对)。Map端Shuffle从中读取数据,处理后每个MapTask都会输出一个文件,Reduce端Shuffle从中拉取自己分区的数据,处理后获得合并、排序、分组后的数据,这些数据最后还会交给Reduce做合并运算(此时为K3,V3键值对)及Output输出保存。
Map端Shuffle实现由MapTask来运行,Reduce端Shuffle实现由ReduceTask来运行。
Map端Shuffle
Input和Map后执行。
Spill
Spill:将每个Task处理的所有数据进行局部排序,生成多个有序的小文件。
MapTask会将当前处理分区好的数据和对应的索引信息写入一个环形缓冲区【内存:100M】,直到达到存储阈值(80%),达到80M(数据+索引),触发Spill。
Spill时先将当前缓冲区中的数据进行排序(排序规则:相同分区的数据放在一起,分区内部按照K2进行排序),排序是在内存中快速排序。
之后将排序好的数据写入磁盘,变成一个有序的小文件,写入磁盘,释放80M内存,另外20%持续写入,又会达到阈值,继续触发spill。会有很多有序的小文件。
Merge
每个MapTask都会将自己的所有小文件合并为一个大文件合并过程中会进行排序。(先按照分区排序,分区内部按照K2排序)。排序是在硬盘中插入排序(基于有序小文件的合并排序)。
Reduce端Shuffle
当MapTask生成大文件以后,会通知AppMaster,当前MapTask已经结束,生成文件,AppMaster收到Map的通知后会通知ReduceTask到每个MapTask的大文件中取属于自己的数据。
执行Shuffle后再执行Reduce和Output。
拉取数据
每个Reduce到每个Map的结果中根据Partition编号拉取属于自己的数据。
Merge
每个ReduceTask将所有MapTask中属于自己的数据进行合并排序。(分组器和程序配置决定的ReduceTask个数决定了最后生成几个文件)。排序是在硬盘中插入排序。
Shuffle的优化
Combiner优化
Map端的聚合,利用MapTask的分布式提前在Map端Shuffle过程中实现Reduce的聚合逻辑。发生在Map端的shuffle中,每次排序以后会做判断 ,判断是否开启了Combiner。如果开启了Combiner,就会调用Combiner的类做分组聚合。
由于MapTask的个数一般远大于ReduceTask的个数,让每个MapTask对自己处理的数据先做部分聚合,最后由reduce来做所有MapTask的最终聚合,就可以降低Reduce的负载。这么做不仅降低了Reduce负载,还可以在一定程度上解决数据倾斜的问题。
只需要设置:
job.setCombinerClass(WCReducer.class);//处理逻辑与Reduce的逻辑是一致的
Combiner不是所有程序都能用的(例如:求中位数),要预先判断所有内容是否为平权的。
Compress优化
这货就是压缩。。。
通过牺牲CPU的压缩和解压的性能,来提高对磁盘以及网络IO的性能的提升。可以减小文件存储所占空间,加快文件传输效率,从而提高系统的处理速度,降低IO读写的次数。但是使用数据时需要先对文件解压,加重CPU负荷,压缩算法越复杂,解压时间越长。
常用的类型:Snappy、Lzo、Lz4。
使用:
hadoop checknative
可以查看当前已安装的Hadoop支持的压缩。
压缩配置
可以在配置文件中进行配置:mapred-site.xml。这样做,所有的程序都做压缩。
可以在代码中配置:conf.set。能够灵活管理压缩配置
也可以提交程序的时候指定配置。
Input阶段:MapReduce输入通过对文件后缀名的判断,自动识别读取压缩类型,不需要做任何配置。
Map阶段需要配置:
#开启Map输出结果压缩,默认为false不开启
mapreduce.map.output.compress=true
#配置Map输出结果的压缩类型
mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
Reduce阶段需要配置:
#开启Reduce输出结果压缩,默认为false
mapreduce.output.fileoutputformat.compress=true
#配置Reduce输出结果的压缩类型
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
或者提交时配置:
yarn jar xxx.jar -Dkey=value mainclass
Shuffle分组
分布式的全局数据分组需要遵循规则:
优先调用分组比较器进行分组比较,判断K2是否为同一组。如果没有分组比较器,调用K2自带的compareTo方法实现比较,判断K2是否为同一组。
排序比较:大于、等于、小于。
分组比较:等于、不等于。
分片规则
InputFormat可以将读取到的所有输入数据划分为多个分片Split,并将每个分片的数据转换为KV。
TextInputFormat读取数据
createRecordReader:真正调用读取器读取数据的方法。
LineRecordReader:真正读取器的对象(JavaBean)。
nextKeyValue:将每一条数据转换为KV结构的方法。
TextInputFormat分片的规则
getSplits:用于将输入的所有数据划分为多个分片。
判断是否分片的条件:
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP)
文件大小 / splitSize > 1.1
计算splitSize大小:
long splitSize = computeSplitSize(blockSize, minSize, maxSize);
128M 1 Long.MAX_VALUE
minsize = 最小分片数 max(1,mapreduce.input.fileinputformat.split.minsize=1)
maxsize = 最大分片数 mapreduce.input.fileinputformat.split.maxsize
computeSplitSize计算逻辑:
Math.max(minSize, Math.min(maxSize, blockSize))
max(1,min(Long.MAX_VALUE,128M))
判断当前文件的大小是否大于128M的1.1倍。如果大于:将文件的128M作为一个分片,再次判断,直到所有分片构建。如果不大于:剩下的整体作为一个分片。例如:
130M:1个分片。145M:2个(split1:128M,split2:17M)。
如果要干预MapTask个数,可以调整minSplitSize和maxSplitSize大小。
Map Join
将小数据放入分布式缓存,每个Task从缓存中构建完整的小数据加载到Task内存中,完整的小数据与大数据的一个部分进行join。
在Map端实现,不需要经过shuffle。用于小数据join大大数据、小数据join小数据。
Reduce Join
利用Shuffle中的分组,将Join字段作为K2,所有join字段相关的数据放在同一个迭代器中。
必须经过shuffle,适用于大数据join大数据
以上是关于MapReduce进阶的主要内容,如果未能解决你的问题,请参考以下文章