0008 - MapReduce中Shuffle和排序机制解析

Posted 小左先森

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了0008 - MapReduce中Shuffle和排序机制解析相关的知识,希望对你有一定的参考价值。

大数据梦工厂(0008 - MapReduce中Shuffle和排序机制解析)


1 - Shuffle

MapReduce 确保每个 Reducer 的输入都是按 key 排序。系统执行排序,将 Map 的输出作为输入传给 Reducer 的过程称为 Shuffle。

2 - Map 端

Map 函数输出时,并不是直接写到磁盘,而是利用缓冲的方式写到内存并进行预排序。

其过程如下:
1、 每个 Map 任务都有一个环形内存缓冲区用于存储任务输出。在默认情况下,缓冲的大小为 100MB(mapreduce.task.io.sort.mb),一旦缓冲区的内容达到 80%(mapreduce.map.sort.spill.percent),便有一个后台线程将写入缓冲区的内容按照轮询的方式溢出(spill)写到磁盘指定目录(mapreduce.cluster.local.dir)。溢出的过程中 Map 任务一边溢出一边继续写入缓冲区,如果缓冲区被写满,Map 任务就会阻塞直到后台线程写磁盘过程结束。

2、 在写磁盘之前,线程根据数据最终要传的 Reducer 把数据划分到相应的分区(Partition)。每个分区按 key 在内存中排序。

3、 如果有 Combiner 函数,则在分区排序后的输出上运行局部聚合操作,以减轻 Shuffle 过程中网络负载的压力。

4、 每次内存缓冲区达到溢出阈值,就会新建一个溢出文件(spill file),因此在 Map 任务写完其最后一个输出后,会有多个溢出文件。最终会被合并成一个已分区且已排序的输出文件。如果 Map 输出的数据比较多,产生的小文件会很多,影响系统性能,因此需要进行合并,通过(mapreduce.task.io.sort.factor,默认为 10)设置一次最多可以合并的文件个数。

5、 Map 输出到磁盘的过程中,可以设置压缩,默认关闭 Map 输出压缩。为加快写磁盘速度,节约磁盘空间,并且减少传给 Reducer 的数据量,通过设置(mapreduce.map.output.compress)为 true 开启压缩,使用的压缩库由(org.apache.hadoop.io.compress.DefaultCodec)指定(Hadoop 的压缩 codec 格式见下文)。

以上便是 Map 任务输出过程的主要步骤,输出到磁盘后,Reducer 通过 HTTP 服务获取输出文件的分区数据。用于文件分区的工作线程的数据由(mapreduce.shuffle.max.threads)控制,该设置针对每一个 NodeManager,而不是每个 Map 任务。默认值为 0 表示最大线程设置为机器中可用处理器数的 2 倍。

3 - Reduce 端

Reduce 端在 Shuffle 阶段主要分为:Copy、Merge、Sort 以及 Reduce 阶段。

3.1 - Copy 阶段

ReduceTask 从各个 MapTask 上远程复制数据,因此只要有 MapTask 完成,ReduceTask 就开始复制其输出。复制的过程可以使多线程并发进行,并发数由(mapreduce.reduce.shuffle.parallelcopies,默认为 5)设置。

Map 任务成功完成后,通过心跳机制通知 ApplicationMaster,Reducer 中的一个线程定期查询 ApplicationMaster,以获取完成的 MapTask 输出的主机位置,从而去对应的主机复制数据,直到获得所有的输出位置。

由于第一个 Reducer 可能会失败,因此主机并没有在第一个 Reducer 检索到 Map 输出时就立即从磁盘上删除它们。相反,主机会等待作业完成,直到 ApplicationMaster 告知它删除 Map 输出。

如果 Map 输出相对较小,会被复制到 ReduceTask 的 JVM 内存(缓冲区大小比例由 mapreduce.reduce.shuffle.input.buffer.percent 控制,默认为 70%),否则,Map 输出被复制到磁盘。一旦内存缓冲区达到阈值(mapreduce.reduce.shuffle.merge.percent,默认为 66%)或达到 Map 输出文件数阈值(mapreduce.reduce.merge.inmem.threshold,默认 1000),则将内存的数据合并后溢出写到磁盘。如果设置了 Combiner 函数,则在写入磁盘前调用 Combiner 函数以减少写入磁盘的数据量。

3.2 - Merge 阶段

在远程复制数据的同时,随着磁盘上溢写文件的增多,ReduceTask 启动两个后台线程对内存和磁盘上的文件进行合并(可能需要多次合并),每次合并的文件数由(mapreduce.task.io.sort.factor,默认为 10)控制。为了合并,压缩的 Map 输出都必须在内存中被解压缩。

3.3 - Sort 阶段

用户编写 reduce() 函数输入数据是按 key 聚集的一组数据,进行快速排序(字典顺序排序)。由于各个 MapTask 已经实现对其输出处理结果进行了局部排序,因此,ReduceTask 只需要对所有数据进行一次归并排序即可。

3.4 - Reduce 阶段

最后一次合并排序(可以来自内存和磁盘片段)的时候,直接把数据写入到 reduce() 函数,从而省略了一次磁盘的往返读写过程。最后输出直接写到文件系统(如:HDFS)上。

注意事项:

  • ReduceTask=0,表示没有 Reduce 阶段,输出文件个数 Map 个数一致。
  • ReduceTask=1,默认值,所以输出文件个数为 1 个。
  • 具体有多少个 ReduceTask,需要根据集群性能而定。
  • 如果数据分布不均匀,可能会在 Reduce 阶段产生数据倾斜。

4 - 参数调优

在上面介绍 Shuffle 过程时,已经提过相关参数来提高 MapReduce 的性能。在 mapred-default.xml 配置文件修改,如下进行统一整理说明:

4.1 - Map 端参数调优

属性名称 默认值 描述
mapreduce.task.io.sort.mb 100 Map 输出时所使用的内存缓冲区大小,单位:MB
mapreduce.map.sort.spill.percent 0.80 Map 输出溢写到磁盘的内存阈值百分比(0.8 或 80%)
mapreduce.cluster.local.dir ${hadoop.tmp.dir}/mapred/local 溢出文件指定本地磁盘的目录
mapreduce.task.io.sort.factor 10 排序文件一次最多合并的流数量
mapreduce.map.output.compress false 是否压缩 Map 输出
mapreduce.map.output.compress.codec org.apache.hadoop.io.compress.DefaultCodec Map 输出的压缩编解码器
mapreduce.shuffle.max.threads 0 Map 输出到 Reducer 的每个 NM 的工作线程。0 表示使用机器中可用处理器数的 2 倍
Hadoop 的压缩 codec 格式如下:
压缩格式 HadoopCompressionCodec
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
LZ4 org.apache.hadoop.io.compress.Lz4Codec
Snappy org.apache.hadoop.io.compress.SnappyCodec

1、减少溢写(Spill)次数
通过评估 Map 输出大小,增加 mapreduce.task.io.sort.mb 的值来减少溢写磁盘的次数,从而减少磁盘 I/O。可参考 MapReduce 计数器(SPILLED_RECORDS)计算在作业运行整个阶段中溢写磁盘的记录数。

2、减少合并(Merge)次数
通过调整 mapreduce.task.io.sort.factor 的值,增大 Merge 的文件数目,减少 Merge 的次数,从而缩短 MapReduce 的处理时间。

3、合理修改运行任务内存
运行 Map 任务和 Reduce 任务的 JVM 内存大小由 mapreduce.map.java.optsmapreduce.reduce.java.opts 设置。

4.2 - Reduce 端参数调优

属性名称 默认值 描述
mapreduce.reduce.shuffle.parallelcopies 5 把 Map 输出复制到 Reduce 的线程数
mapreduce.task.io.sort.factor 10 排序文件一次最多合并的流数量
mapreduce.reduce.shuffle.input.buffer.percent 0.70 Shuffle 的复制阶段,分配给 Map 输出的缓冲区占堆空间的百分比
mapreduce.reduce.shuffle.merge.percent 0.66 Map 输出缓冲区的阈值使用百分比,超过将进行合并输出和溢写磁盘
mapreduce.reduce.merge.inmem.threshold 1000 当 Map 输出文件数超过该阈值,进行合并输出和溢写磁盘,0 或更小的数表示没有阈值限制
mapreduce.reduce.input.buffer.percent 0.0 在 reduce 过程中,内存保存 Map 输出的空间占整个堆空间的比例。默认情况下,Reduce 任务开始前,所有的 Map 输出合并到磁盘,以便为 reduce 提供尽可能多的内存。

1、合理设置 Map 和 Reduce 个数
如果都设置较小,会导致 Task 等待,延长处理时间;如果设置太多,会导致 Map 和 Reduce 任务间竞争资源,造成处理超时等错误。

2、合理设置 Map 和 Reduce 共存
通过调整 mapreduce.job.reduce.slowstart.completedmaps (默认0.05),表示至少 Map 任务完成 5% 时,Reduce 任务才会开始运行,以减少 Reduce 任务的等待时间。

3、合理设置 Reduce 端的缓冲区
当数据达到一个阈值时,缓冲区中的数据就会写入磁盘,Reduce 从磁盘中读取所有数据,增加网络宽带负载。通过调整 mapreduce.reduce.input.buffer.percent 参数为 1.0(或一个更低的值,但要大于 0.0),Reduce 会直接读取保留指定比例的缓冲区中的数据,以提升性能。


::: hljs-center
扫一扫,我们的故事就开始了。
:::

以上是关于0008 - MapReduce中Shuffle和排序机制解析的主要内容,如果未能解决你的问题,请参考以下文章

MapReduce的Shuffle机制

MapReduce shuffle阶段详解

mapreduce的shuffle机制

大数据技术专题篇MapReduce shuffle过程详解

MapReduce中shuffle过程

spark 4种 shuffle机制与mapreduce shuffle机制对比