MapReduce进阶

Posted 杀智勇双全杀

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MapReduce进阶相关的知识,希望对你有一定的参考价值。

day09

MapReduce提升

Shuffle设计思想

分组排序问题

分布式中,多个MapTask只会在自己的分区内进行区内分组与排序,无法直接把全部数据都按照全局的方式分组和排序。

Shuffle要解决的问题

分布式中全局排序和全局分组的问题。

Shuffle的实现

所有Map的结果会全部写入磁盘,在分布式磁盘中通过特殊的排序机制来实现全局排序,再由Reduce读取全局排序后的数据进行处理。

由于Shuffle过程必须经过磁盘,故只要经过shuffle,性能就比较差。

Shuffle功能

分区:默认按照K2进行Hash分区,对Map输出的数据进行标记。
排序:按照K2进行排序。
分组:按照K2进行分组,相同K2的所有V2放在一起。

Shuffle过程

Input将文件切片后按照Split个数确定MapTask个数,同时数据被转换为K1,V1键值对的形式,每个MapTask都会实例化Mapper类并调用Map方法(每条KV键值对调用1次Map方法),之后给自己输出的小文件添加分区信息(此时为K2,V2键值对)。Map端Shuffle从中读取数据,处理后每个MapTask都会输出一个文件,Reduce端Shuffle从中拉取自己分区的数据,处理后获得合并、排序、分组后的数据,这些数据最后还会交给Reduce做合并运算(此时为K3,V3键值对)及Output输出保存。

Map端Shuffle实现由MapTask来运行,Reduce端Shuffle实现由ReduceTask来运行。

在这里插入图片描述

Map端Shuffle

Input和Map后执行。

Spill

Spill:将每个Task处理的所有数据进行局部排序,生成多个有序的小文件

MapTask会将当前处理分区好的数据和对应的索引信息写入一个环形缓冲区【内存:100M】,直到达到存储阈值(80%),达到80M(数据+索引),触发Spill。

Spill时先将当前缓冲区中的数据进行排序(排序规则:相同分区的数据放在一起,分区内部按照K2进行排序),排序是在内存快速排序

之后将排序好的数据写入磁盘,变成一个有序的小文件,写入磁盘,释放80M内存,另外20%持续写入,又会达到阈值,继续触发spill。会有很多有序的小文件。

Merge

每个MapTask都会将自己的所有小文件合并为一个大文件合并过程中会进行排序。(先按照分区排序,分区内部按照K2排序)。排序是在硬盘插入排序(基于有序小文件的合并排序)。

Reduce端Shuffle

当MapTask生成大文件以后,会通知AppMaster,当前MapTask已经结束,生成文件,AppMaster收到Map的通知后会通知ReduceTask到每个MapTask的大文件中取属于自己的数据。

执行Shuffle后再执行Reduce和Output。

拉取数据

每个Reduce到每个Map的结果中根据Partition编号拉取属于自己的数据。

Merge

每个ReduceTask将所有MapTask中属于自己的数据进行合并排序。(分组器和程序配置决定的ReduceTask个数决定了最后生成几个文件)。排序是在硬盘插入排序

Shuffle的优化

Combiner优化

Map端的聚合,利用MapTask的分布式提前在Map端Shuffle过程中实现Reduce的聚合逻辑。发生在Map端的shuffle中,每次排序以后会做判断 ,判断是否开启了Combiner。如果开启了Combiner,就会调用Combiner的类做分组聚合。

由于MapTask的个数一般远大于ReduceTask的个数,让每个MapTask对自己处理的数据先做部分聚合,最后由reduce来做所有MapTask的最终聚合,就可以降低Reduce的负载。这么做不仅降低了Reduce负载,还可以在一定程度上解决数据倾斜的问题。

只需要设置:

job.setCombinerClass(WCReducer.class);//处理逻辑与Reduce的逻辑是一致的

Combiner不是所有程序都能用的(例如:求中位数),要预先判断所有内容是否为平权的。

Compress优化

这货就是压缩。。。

通过牺牲CPU的压缩和解压的性能,来提高对磁盘以及网络IO的性能的提升。可以减小文件存储所占空间,加快文件传输效率,从而提高系统的处理速度,降低IO读写的次数。但是使用数据时需要先对文件解压,加重CPU负荷,压缩算法越复杂,解压时间越长。

常用的类型:Snappy、Lzo、Lz4。

使用:

hadoop checknative

可以查看当前已安装的Hadoop支持的压缩。

压缩配置

可以在配置文件中进行配置:mapred-site.xml。这样做,所有的程序都做压缩。

可以在代码中配置:conf.set。能够灵活管理压缩配置

也可以提交程序的时候指定配置。

Input阶段:MapReduce输入通过对文件后缀名的判断,自动识别读取压缩类型,不需要做任何配置。

Map阶段需要配置:

#开启Map输出结果压缩,默认为false不开启
mapreduce.map.output.compress=true
#配置Map输出结果的压缩类型
mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.DefaultCodec

Reduce阶段需要配置:

#开启Reduce输出结果压缩,默认为false
mapreduce.output.fileoutputformat.compress=true
#配置Reduce输出结果的压缩类型
mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec

或者提交时配置:

yarn jar xxx.jar  -Dkey=value mainclass  

Shuffle分组

分布式的全局数据分组需要遵循规则:
优先调用分组比较器进行分组比较,判断K2是否为同一组。如果没有分组比较器,调用K2自带的compareTo方法实现比较,判断K2是否为同一组。

排序比较:大于、等于、小于。
分组比较:等于、不等于。

分片规则

InputFormat可以将读取到的所有输入数据划分为多个分片Split,并将每个分片的数据转换为KV。

TextInputFormat读取数据

createRecordReader:真正调用读取器读取数据的方法。
LineRecordReader:真正读取器的对象(JavaBean)。
nextKeyValue:将每一条数据转换为KV结构的方法。

TextInputFormat分片的规则

getSplits:用于将输入的所有数据划分为多个分片。

判断是否分片的条件:

while (((double) bytesRemaining)/splitSize > SPLIT_SLOP)
    	文件大小 /  splitSize >  1.1

计算splitSize大小:

long splitSize = computeSplitSize(blockSize, minSize, maxSize);
									128M		1		Long.MAX_VALUE
                                        
minsize = 最小分片数 max(1,mapreduce.input.fileinputformat.split.minsize=1)
maxsize = 最大分片数 mapreduce.input.fileinputformat.split.maxsize                      	

computeSplitSize计算逻辑:

Math.max(minSize, Math.min(maxSize, blockSize))
max(1,min(Long.MAX_VALUE,128M))

判断当前文件的大小是否大于128M的1.1倍。如果大于:将文件的128M作为一个分片,再次判断,直到所有分片构建。如果不大于:剩下的整体作为一个分片。例如:
130M:1个分片。145M:2个(split1:128M,split2:17M)。

如果要干预MapTask个数,可以调整minSplitSize和maxSplitSize大小。

Map Join

将小数据放入分布式缓存,每个Task从缓存中构建完整的小数据加载到Task内存中,完整的小数据与大数据的一个部分进行join。

在Map端实现,不需要经过shuffle。用于小数据join大大数据、小数据join小数据。

Reduce Join

利用Shuffle中的分组,将Join字段作为K2,所有join字段相关的数据放在同一个迭代器中。

必须经过shuffle,适用于大数据join大数据

以上是关于MapReduce进阶的主要内容,如果未能解决你的问题,请参考以下文章

MapReduce进阶:多MapReduce的链式模式

学习笔记Hadoop—— MapReduce编程进阶

MapReduce进阶:多路径输入输出

学习笔记Hadoop(十五)—— MapReduce编程进阶

Atom编辑器入门到精通 Atom使用进阶

MapReduce 进阶:Partitioner 组件