Spark的Shuffle是怎么回事

Posted valjeanshaw

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark的Shuffle是怎么回事相关的知识,希望对你有一定的参考价值。

Spark的Shuffle是怎么回事

? Shuffle的中文含义是混洗,官方定义是:一种让数据重新分布以使得某些数据被放在同一分区里的一种机制。Shuffle的过程中,存在着大量的网络消耗传输数据,会在磁盘上产生大量的中间文件,在平时的工作中了解shuffle的运行机制能帮助我们写出更优秀的代码。此篇文章从shuffle的含义开始讲起,按照spark中shuffle的几中不同运行机制进行了解析,并最终附上了一些shuffle调优的建议。

1. Shuffle的概述

? spark中对于stage,划分了两个种类,除了最后一个stage称为resultStage,其余stage均称为ShuffleMapStage。

? ResultStage 基本上对应代码中的 action 算子,即将一个函数应用在 RDD 的各个 partition 的数据集上,意味着一个 job 的运行结束。而ShuffleMapStage 的结束伴随着 shuffle 文件的写磁盘。

技术图片

? 在spark中,shuffle分为map阶段和reduce阶段,也可称之为shuffle read和shuffle write阶段,这里和hadoop的mapreduce不是一个东西。map端task和reduce端task不在相同的stage中,map task位于ShuffleMapStage,reduce task位于ResultStage,map task会先执行,将上一个stage得到的最后结果写出,后执行的reduce task拉取上一个stage进行合并

? 对于一次shuffle,map过程和reduce过程都有若干个task来执行。对于task的个数,map端的task个数和RDD的partition个数相同,reduce 端的 stage 默认取spark.default.parallelism 这个配置项的值作为分区数,如果没有配置,则以 map 端的最后一个 RDD 的分区数作为其分区数,分区数就将决定 reduce 端的 task 的个数。

2. Shuffle机制详解

? 在Spark的源码中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager。在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。

2.1 spark早期的HashShuffleManager

在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。它的计算模式比较的简单粗暴,详细如下:

  • shuffle write阶段

这个阶段将stage中每个task处理的数据根据算子进行“划分”。比如reduceByKey,就是对相同的key执行hash算法,从而将相同都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

  • shuffle read阶段

stage的每一个task就需要将上一个stage的计算结果中的所有相同key,从各个节点上通过网络都拉取到自己所在的节点上,然后进行key的聚合或连接等操作。由于shuffle write的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。

技术图片

? 那么针对这种简单粗暴的HashShuffleManager,有着一个非常严重的弊端:会产生大量的中间磁盘文件,这样大量的磁盘IO操作会很影响性能。磁盘文件的数量由下一个stage的task数量决定,即下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。比如下一个 stage 总共有 100 个 task,那么当前 stage 的每个 task 都要创建 100 份磁盘文件,如果当前stage有50个 task,那么总共会建立5000个磁盘文件。

2.2 优化后的 HashShuffleManager

? 由于原版的HashShuffleManager,HashShuffleManager后期进行了优化,这里说的优化是指可以设置一个参数,spark.shuffle.consolidateFiles=true。该参数默认值为false,通常来说如果我们使用HashShuffleManager,那么都建议开启这个选项。

? 开启consolidate机制之后,在shuffle write过程中,task不会为下游stage的每个task创建一个磁盘文件,此时会出现shuffleFileGroup的概念,每个 shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。而此时就会根据Executor数,并行执行task。第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

? 当Executor执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。即运行在同一个Executor的task会复用之前的磁盘文件。 这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

技术图片

2.3 当前默认的SortShuffleManager

在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager的运行机制主要分成两种,一种是普通运行机制,另一种是bypass运行机制。当shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数的值时(默认为200),就会启用bypass机制。

  • 普通运行模式

? 在普通模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可以选用不同的数据结构。如果是由聚合操作的shuffle算子,就是用map的数据结构(边聚合边写入内存),如果是join的算子,就使用array的数据结构(直接写入内存)。等到内存容量到了临界值就准备溢写到磁盘。
??在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序,排序之后,会分批将数据写入磁盘文件,每批次默认1万条数据。
??此时task往磁盘溢写,会产生多个临时文件,最后会将所有的临时文件都进行合并,合并成一个大文件。最终只剩下两个文件,一个是合并之后的数据文件,一个是索引文件,标识了下游各个task的数据在文件中的start offset与end offset。下游的task根据索引文件读取相应的数据文件。需要注意的是,此处所说的两个文件,是指上游一个task生成两个文件,而非所有的task最终只有两个文件。

技术图片

  • bypass运行模式

触发bypass机制的条件:

  1. shuffle map task的数量小于spark.shuffle.sort.bypassMergeThreshold参数的值(默认200)

  2. 不是聚合类的shuffle算子(比如groupByKey)

? 我们都知道,排序的时间复杂度最高不能优于O(nlogn),那么如果将排序的时间复杂度省下,那么shuffle性能将会提升很多。bypass机制与普通SortShuffleManager运行机制的不同在于,bypass机制就是利用了hash的O(1)时间复杂度取代了排序的操作开销,提升了这部分的性能。

? task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。如上,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

技术图片

附: 一些shuffle调优参数

spark.shuffle.file.buffer
参数说明该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小(默认是32K)。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
?
spark.reducer.maxSizeInFlight
参数说明该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
?
spark.shuffle.io.maxRetries & spark.shuffle.io.retryWait
spark.shuffle.io.retryWait:huffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。(默认是3次)
spark.shuffle.io.retryWait该参数代表了每次重试拉取数据的等待间隔。(默认为5s)
调优建议:一般的调优都是将重试次数调高,不调整时间间隔。
?
spark.shuffle.memoryFraction
参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例。
?
spark.shuffle.manager
参数说明该参数用于设置shufflemanager的类型(默认为sort)。Spark1.5x以后有三个可选项:

Hash:spark1.x版本的默认值,HashShuffleManager
Sort:spark2.x版本的默认值,普通机制,当shuffle read task 的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数,自动开启bypass 机制
tungsten-sort:

spark.shuffle.sort.bypassMergeThreshold
参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作。
调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些
?
spark.shuffle.consolidateFiles
参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,也就是开启优化后的HashShuffleManager。
调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%

以上是关于Spark的Shuffle是怎么回事的主要内容,如果未能解决你的问题,请参考以下文章

SPARK push-based shuffle mapTask是怎么获取ESS列表信息

Spark Shuffle 中 JVM 内存使用及配置内幕详情

spark性能调优 spark shuffle中JVM内存使用及配置内幕详情

[Spark性能调优] 第四章 : Spark Shuffle 中 JVM 内存使用及配置内幕详情

Spark参数调优

[spark] shuffle