mapreduce过程shuffle详解,相关优化
Posted Leo Han
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了mapreduce过程shuffle详解,相关优化相关的知识,希望对你有一定的参考价值。
我们知道,在大数据计算中,MapReduce主要有如下三个流程:
- Map
- Shuffle
- Reduce
整个过程中的Shuffle包含 Map Shuffle和Reduce Shuffle两个阶段。
我们知道在大数据时代,大量数据以前的单台服务器是无法解决这些问题的,因此采用了集群、分布式解决方案,说白了,就是以前的数据量太大单台处理不完,现在通过集群分布式,拆分成很多块,每个节点处理一部分,并行处理,这样能够解决大数据量和处理速度问题,而这其中mapreduce就是基于这个理念来实现的。
整个大流程结构图如下:
Map端
Mapreduce首先将输入数据切片划分,然后record reader通过确定的分片来读取对应的分片数据,然后进行MapTask处理,数据被切分成多少个块,对应的就有多少个MapTask
一个MapTask运行的时候,其输出Key/Value的时候首先会被序列化字节流,然后并不是直接写入到文件中,而是写入到一个环形缓冲区中该缓冲区大小默认为100MB(mapreduce.task.io.sort.mb
控制),当缓冲区内的数据容量达到总容量的一个阈值时(默认是0.8,mapreduce.map.sort.spill.percent
控制),会有一个专门的溢写线程将缓冲区内的数据写入到文件中,这时候MapTask还是可以继续向环形缓冲区内写入内容的,如果某一个时刻缓冲区满了,这时候MapTask无法写入,需要等待溢写线程将数据都写入到磁盘中。
在将缓冲区中的内容写入到磁盘之前,首先会根据设置的分区规则对写入的数据进行分区处理(默认的分区是hash分区,规则为hash(Key) % R , R 为reduce端reduceTask数量),然后进行排序,根据分区排序,分区内根据key排序(使用QuickSort快排来进行排序),如果设置了Combiner,则会在排序结果的基础上在进行一次合并(可以理解为Combiner就是对排序结果进行了一次reduce)
每一次缓冲区写磁盘的时候都会溢写一次文件,产生一个对应的磁盘文件,文件中实际上是按照了分区进行了排序,为了能够快速的找到文件中对应的分区数据,还会记录对应的索引,默认是存放在内存中,大小1KB(mapreduce.task.index.cache.limit.bytes
控制),如果内存不够则写入到磁盘中。
当MapTask结束的时候,我们会发现由于多次溢写磁盘,此时存在很多的小文件,Map端最后会将这些文件进行多次Merge,成一个文件
(每次merge文件的个数由mapreduce.task.io.sort.factor
控制,默认为10个)。merge会按照一个分区一个分区的来merge,通过索引文件,能够在各个文件中很方便的找到各个分区在文件中对应数据的起始地址和结束地址。这样最终会形成一个数据文件和一个索引文件。
merge过程中还是会重复溢写的的流程,sort和combine(如果文件个数小于mapreduce.map.combine.minispills
则不会进行combine)
到这里Map端的shuffle就完成了
Reduce
reduce端的shuffle主要有两个阶段。
首先是Copy数据
reduceTash能够通过上面说的hash(key) % R ,来确定自己所在的数据分区,然后去Map端fetch数据,Map端通过暴露HTTP Server的方式,来让reduce端来读取数据,map端能够同时并发响应请求数量由mapreduce.shuffle.max.threads
来控制(这个是针对NM配置,不是针对每个task的配置)
reduce维护几个copier线程,并行地从map任务机器提取数据。默认情况下有5个copy线程,可以通过mapreduce.reduce.shuffle.parallelcopies
配置。
如果map输出的数据足够小,则会被拷贝到reduce任务的JVM内存中。mapreduce.reduce.shuffle.input.buffer.percent
配置JVM堆内存的多少比例可以用于存放map任务的输出结果。如果数据太大容不下,则被拷贝到reduce的机器磁盘上,同时会进行数据的排序,一边Copy一边Sort。
当内存中的数据到达一定容量的时候则会开启Merge Sort处理(mapreduce.reduce.shuffle.merge.percent
控制,默认0.66),即 内存到磁盘Merge
,
当属于该reduce的mapTask的数据都Copy完成之后,会形成多个文件,这时候会再次进行Merge,即磁盘到磁盘Merge
最终每个reduce处理一个merge的最终文件
相关调优参数说明
名称 | 默认值 | |
---|---|---|
mapreduce.task.io.sort.mb | 100MB | 每个MapTask环形缓冲区大小 |
mapreduce.map.sort.spill.percent | 0.8 | 当数据达到缓冲区占比超过该值时,溢写磁盘 |
mapreduce.task.io.sort.factor | 10 | MapTask生成众多小文件后进行合并时,每次同时合并的小文件的个数 |
mapreduce.reduce.shuffle.parallelcopies | 5 | reduce端Copy数据的并发线程数 |
mapreduce.reduce.shuffle.input.buffer.percent | 0.7 | reduce端copymap端数据的时候使用内存占java heap的占比 |
mapreduce.reduce.shuffle.merge.percent | 0.66 | Reduce端buffer中数据达到多少溢写磁盘 |
以上是关于mapreduce过程shuffle详解,相关优化的主要内容,如果未能解决你的问题,请参考以下文章