spark shuffle过程详解,相关优化
Posted Leo Han
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark shuffle过程详解,相关优化相关的知识,希望对你有一定的参考价值。
Spark中的shuffle过程与Mapreduce的Shuffle过程很多概念都很类似。在spakr中,如果发生了宽依赖,前一个stage 的 ShuffleMapTask 进行 shuffle write, 把数据存储在 blockManager 上面, 并且把数据位置元信息上报到 driver 的 mapOutTrack 组件中, 下一个 stage 根据数据位置元信息, 进行 shuffle read, 拉取上个stage 的输出数据。
在spark目前的版本中,主要有两种shuffle:
- Hash Based Shuffle
- Sort Based Shuffle
Hash Based Shuffle
这中shuffle模式下,每个MapTask会为下游的每个reduceTask都创建一个文件,如果下游有R 个reduce任务,每个MapTask会创建R个文件,如果一共有M个MapTask,则一共会创建 M * R个文件
MapTask的ShuffleWrite会根据下游的reduceTask和分区将数据写入对应的文件中,
下游reduceTask的ShuffleRead根据自身对应的分区去上游各个MapTask拉取到对应的分区的文件的数据,然后根据各种不同算子进行相应的聚合等操作,ShuffleRead是一边拉取,一边进行聚合的,每个ShuffleReadTask都会有一个buffler缓冲区,每次只能拉取与buffer缓冲区大小相同的数据,然后通过内存中的一个Map进行聚合等操作,聚合完一批数据在拉取下一批数据,并放到缓冲中进行聚合操作,一次类推,直到最后将所有的数据都拉取完,并得到最终的结果。
Hash Based Shuffle时,并不会像Mapreduce那样进行排序,reduce端去各个map里面fetch数据的时候,将fetch回来的数据先放在内存RDD中,然后在写入到磁盘文件
- 容易导致OOM,reduce端fetch数据先都存放在了内存中
- Map端过多的小文件,会导致大量IO以及GC
Hash Based Shuffle优化 consolidateFiles
为了解决上面的这个问题,spark引入了consolidateFiles机制,实现如下:
map端的shufflewrite不在和之前一样每个reduceTask一个文件,而是对应到一个executor-core一个文件,每个executor-core在执行第一个MapTask的时候根据ReduceTask(数量为R ),生成R个任务,后续再该节点上执行的MapTask不在生成新的文件,而是在之前对应的分区文件上追加内容,这样一来,生成的文件的总数量为: C * R (其中C为executor-core的个数,R 为reduceTask的数量)
但是上面这种机制在reduceTask任务很大的时候还是无法避免和众多的小文件
开启该优化指定参数spark.shuffle.consolidateFiles=true
Sort Based Shuffle
Sort Shuffle有两种模式,普通模式和byPass模式,我们先说普通模式。
- 普通模式下,数据会首先写入内存缓冲中,一边聚合一边写入缓冲区中,如果到达了一个阈值,则会写文件,写入文件前先会对内存中的数据根据key进行排序,然后分批写入磁盘,默认是10000条,
每次溢写磁盘都会生成一个磁盘文件,最后会将这些磁盘文件进行合并,生成一个文件,同时会生成一个索引文件,记录各个不同的partition分区对应的数据在文件的哪个位置,最终会生成2 * M个文件,M为map任务数量 - byPass模式,触发的条件是map的数量小于
spark.shuffle.sort.bypassMergeThreshold(默认是200)
。该机制下,每个MapTask会为下游每个ReduceTask生成一个文件,只不过在最后会合并成一个文件并创建一个对应的索引文件。该机制与hash based shuffle基本一致,只是在最后阶段合并成了个文件。而与普通机制不同在于:1. 磁盘写机制不同(一个不区分reduce,只是在缓冲区满了写),2. 不会进行排序
Shuffle阶段需要将数据写入磁盘,这其中涉及大量的读写文件操作和文件传输操作,因此对节点的系统IO有比较大的影响,因此可通过调整参数,减少shuffle阶段的文件数和IO读写次数来提高性能,具体参数主要有以下几个:
1)spark.shuffle.manager
设置Spark任务的shuffleManage模式,1.2以上版本的默认方式是sort,即shuffle write阶段会进行排序,每个executor上生成的文件会合并成两个文件(包含一个索引文件)。
2)spark.shuffle.sort.bypassMergeThreshold
设置启用bypass机制的阈值(默认为200),若Shuffle Read阶段的task数小于等于该值,则Shuffle Write阶段启用bypass机制。如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。
3)spark.shuffle.file.buffer (默认32K)
该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
4)spark.shuffle.io.maxRetries (默认3次)
设置Shuffle Read阶段fetches数据时的重试次数,若shuffle阶段的数据量很大,可以适当调大一些。
5)spark.reducer.maxSizeInFlight(默认48m)
该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。可以适当调大,如96m
6)spark.shuffle.memoryFraction(默认0.2)
该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认0.2。当MapTask结束后,reduceTask会拉取各个MapTask中对应的数据并进行聚合操作,拉取数据首先是放在内存中进行相关
7)spark.shuffle.consolidateFiles(默认值false)
如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。
8)spark.storage.memoryFraction(默认0.6)
该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。
以上是关于spark shuffle过程详解,相关优化的主要内容,如果未能解决你的问题,请参考以下文章