1w 字详解 Spark Shuffle (建议收藏反复看)
Posted 大数据羊说
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了1w 字详解 Spark Shuffle (建议收藏反复看)相关的知识,希望对你有一定的参考价值。
那为什么需要 shuffle 呢?
这往往和大数据的很多计算场景相关,比如我们一直在说的 WordCount ,把相同的单词进行计数。因此,需要对数据进行聚合操作,聚合自然要对相同的 key 进行聚合,所以需要通过 shuffle 把各个节点相同的 key 拉到一起,可以结合下图理解一下。
当然,WordCount 是我举的一个小栗子。其实,在 Spark 中,聚合、排序是一个通用的计算场景(都有相应的算子),而且很多时候数据量会很大,因此,必须得设计 shuffle 来支持这些不同类型的计算。
那 shuffle 到底解决了什么问题?
我们已经知道 Shuffle 指的是集群范围内跨节点、跨进程数据分发,其实分发并不是上游 Stage 直接把数据分发给下游,而是先输出一个中间文件,然后下游 Stage 去读取中间文件。
所以,shuffle 要解决的问题就是如何将这些中间文件数据重新组织,使其能够在上下游 Task 之间进行数据传递和计算。
分为Shuffle Write
和 Shuffle Read
两个阶段,前者主要解决上游 stage 输出数据的分区问题,后者主要解决下游 stage 从上游 stage 获取数据、重新组织、并为后续操作提供数据源的问题。干说有点晦涩,为了你方便理解,我还是用 WordCount
这个例子来说明。在 WordCount
中,要实现相同的单词进行分组计数,需要引入 shuffle,其流程图如下:
WordCount
中主要是 reduceByKey
产生了 Shuffle ,从上图可知,以 Shuffle 为边界把计算切分成 2 个 Stage, Shuffle 的上游叫做 Map 阶段,Shuffle 的下游叫做 Reduce 阶段。
Map 阶段,每个分区内先把自己的数据做一个初步的 combine,又称为 Map 端聚合;然后不同的单词会根据分区函数计算出来的分区号,被分发到下游对应的分区
最后 Reduce
阶段以单词为 key 执行聚合函数做第二次聚合,这里称为全局聚合,从而实现了单词计数的功能。
上述流程值得注意的是,在 Shuffle 环节,数据不是直接分发到下游,而是先写到中间文件中,然后 Reduce
阶段会去读取这个中间文件的数据。
总的来说,Map 和 Reduce 类似于一个生产者-消费者模型,Map 阶段生产 shuffle 中间文件,reduce 消费 shuffle 中间文件,从而衔接了 shuffle 的上下游,实现了数据的交换。
那么,疑问又来了,什么是 shuffle 中间文件?shuffle 中间文件又是怎样生成的?
在 第五篇:Spark 调度系统 中,小林已经说过,DAGScheduler
会从数据依赖图后往前回溯,以 shuffle 为边界切割 job ,生成 Stage,并会为每个 Stage 创建任务集合 TaskSet
,每一个 TaskSet
又都包含多个分布式任务 Task。
在 Map 阶段,每个 Task 都会生成 2 个文件:一个 data 文件,一个 index 文件。
这两个文统称为 Shuffle 中间文件,并且以 Map 端的 Task 粒度生成,有多少个并行的 Task ,
就会生成多少份 Shuffle 中间文件(最后小文件个数:2 x Map Task
个数)。
shuffle 中间文件到底长啥样?我们已经说了,它是 data 文件和 index 文件的一个统称。我们以 WordCount
为例来说明:
从上图可知,data 文件记录的就是原始的 <key,value> 数据集,index 文件则记录了哪些数据属于下游哪个 Reduce
分区,
标识了目标分区所属数据的起始索引。在图中,为了简洁描述,我只用了 2 条数据示意,实际上更复杂。
我们知道了 shuffle 文件的样子,下面看看它是如何产生的?
主要承担的功能有:数据分区、聚合和排序 3 个功能。关于数据分区,主要考虑 2 个问题。
第一个问题,如何确定分区个数?
一般来说,分区个数与下游 Stage 的 Reduc Task 个数一致。在 shuffle 时,用户通过指定分区个数:numPartitions
相应的会有同样个数的 reduce task 来处理相应的数据;如果用户没有自定义,则分区个数默认为 parent RDD 中分区个数的最大值。
第二个问题,对于 map task 输出的 record 如何分区?
公式:partitionId = Hash(key) % numpartitions
。即每计算出一条 record,计算其 key 的哈希值,并与分区个数取模,则得到该条数据的分区 ID
关于数据聚合,本来聚合应该是 shuffle read
完数据之后要做的,之所以在 shuffle write
时做 combine ,是考虑到数据量大的时候,提前聚合以减少数据量,从而减少网络 IO。
这里要注意的是,shuffle read
聚合是属于全局聚合,shuffle write
聚合只针对当前分区做聚合。
关于排序,Spark 采取的是先聚合,再排序。实现主要是设计了一个特殊的数据结构,类似 HashMap+Array
,完成先聚合,再排序的功能。
针对上述三个功能,Spark 设计了一个通用的 Shuffle Write 框架,该框架的执行顺序为:map 输出 record ---> 数据聚合 ---> 排序 ---> 分区。其中数据聚合、排序是可选项。
这里还是使用 WordCount 例子给大家做说明:
如图所示,Spark 采用的实现方法是建立一个类似 HashMap + Array
的内存数据结构,对 map()
输出的 record 进行聚合。
类 HashMap
结构中的 Key 由 “partitionId+Key”
组成, Value 是经过相同 Key combine 的结果。
在图中,map 端聚合是 sum()
函数,那么 Value 中存放的便是多个 record 对应的 Value 相加的结果。聚合完成后,Spark 对类似 HashMap
中的 record 进行排序。
如果需要按 Key 进行排序,那么按 partitionId+Key
进行排序。最后,将排序后的 record 写入一个分区文件中。
如果该数据结构存放不下,则会先扩容为 2 倍大小,如果还存放不下,就将类 HashMap
中的 record 排序后 spill 到磁盘上。
此时,HashMap
被清空,可以继续对 map()
输出的 record 进行聚合,如果内存再次不够用,那么继续spill到磁盘上,此过程可以重复多次。
当 map()
输出完成以后,将此时 HashMap
中的 reocrd 与磁盘上已排序的 record 进行再次聚合(merge),得到最终的 record
,并输出到相应的分区文件中,该分区文件,便是上文中提到的 Shuffle 中间文件。
好了,我们知道 shuffle 中间文件是如何产生的,接下来我们继续看 reduce task 如何读取 shuffle 中间文件?
输出了 shuffle 中间文件后,就到了Shuffle Read
阶段。Shuffle Read
主要需要实现 3 个功能:跨节点拉取数据,聚合和排序。同样的,Spark 为了支持这三个功能,设计了一个通用的 Shuffle Read 框架,它的计算顺序:跨节点数据拉取 ---> 数据聚合 ---> 排序 ---> 输出。
这里值得注意的点是,对于每一个 Map Task
生成的中间文件,其中生成的分区数量由下游的 Reduce 阶段的 Task 个数决定,并且 index 文件标识了目标分区所属数据的起始索引。
为了讲清楚 Shuffle Read
的细节,小林还是使用 WordCount
例子,给大家揭开 Shuffle Read
的神秘面纱。
如图所示,Reduce Task 不断的从各个 shuffle
中间文件拉取数据,并将数据输出到一个 buffer 中。
获取 record 后,Spark 建立一个类似 HashMap
的内存数据结构(ExternalAppendOnlyMap
),对 buffer 中的 record 进行聚合,类 HashMap
中的 Key 是 record 中的 Key,类 HashMap
中的 Value 是经过相同聚合函数(func()
)计算后的结果。
在图中,聚合函数是 sum()
函数,那么 Value 中存放的是多个 record 对应 Value 相加后的结果。之后,按照 Key进行排序,如图所示,则建立一个Array 结构,读取类 HashMap
中的 record,并对 record 按 Key (图中是按 key 的首字母)进行排序,排序完成后,将结果输出或者传递给下一步操作。
如果类 HashMap
存放不下,则会先扩容为两倍大小,如果还存放不下,就将 HashMap
中的 record 排序后 spill 到磁盘上。此时,HashMap
被清空,可以继续对 buffer
中的 record 进行聚合。如果内存再次不够用,那么继续 spill 到磁盘上,此过程可以重复多次。
当聚合完成以后,将此时 HashMap
中的 reocrd 与磁盘上已排序的 record 进行再次聚合,得到最终的 record,输出到分区文件中,供其它数据操作。
好了,至此小林已经把 Shuffle 的整个全貌都讲清楚了。
的含义和shuffle
的工作原理,并解释了为什么一定需要 shuffle
;最后从 Shuffle Write
和 Shuffle Read
两个角度,深度剖析了 Shuffle
设计原理和 shuffle
流程细节。好了,今天的文章就到这里!如果可以帮我转发,是对我最大的鼓励。
最后给大家几个思考题:
1.Spark 的 shuffle 、MR 的 shuffle 、Flink 的 shuffle 有什么区别?
2.文中说了聚合、排序在 shuffle 是可选项,是如何实现的?
欢迎留言区与我讨论交流呀~
欢迎关注 「小林玩大数据」
拉你进小林的高质量大数据交流群
推荐阅读:
第一篇:RDD 编程模型:延迟计算是怎么回事?
第二篇:Spark 数据依赖图是啥?
第三篇:Spark 进程模型与分布式部署:什么是分布式计算?
第四篇:你管这破玩意儿叫 RDD 的持久化?
第五篇:Spark 调度系统,内部原来是这样搬砖的!
第六篇:Executor 和 Driver 资源调度总结
万字详解 Spark开发调优(建议收藏)
1一、前言
在大数据计算领域,Spark 已经成为了越来越流行、越来越受欢迎的计算平台之一。Spark 的功能涵盖了大数据领域的离线批处理、SQL类处理、流式/实时计算、机器学习、图计算等各种不同类型的计算操作,应用范围与前景非常广泛。大多数同学(包括笔者在内),最初开始尝试使用Spark的原因很简单,主要就是为了让大数据计算作业的执行速度更快、性能更高。
然而,通过Spark开发出高性能的大数据计算作业,并不是那么简单的。如果没有对Spark作业进行合理的调优,Spark作业的执行速度可能会很慢,这样就完全体现不出Spark作为一种快速大数据计算引擎的优势来。因此,想要用好Spark,就必须对其进行合理的性能优化。
Spark的性能调优实际上是由很多部分组成的,不是调节几个参数就可以立竿见影提升作业性能的。我们需要根据不同的业务场景以及数据情况,对Spark作业进行综合性的分析,然后进行多个方面的调节和优化,才能获得最佳性能。
本文作为Spark性能优化指南的基础,主要讲解开发调优以及资源调优。
2二、开发调优
3三、调优概述
Spark性能优化的第一步,就是要在开发Spark作业的过程中注意和应用一些性能优化的基本原则。开发调优,就是要让大家了解以下一些Spark基本开发原则,包括:RDD lineage设计、算子的合理使用、特殊操作的优化等。在开发过程中,时时刻刻都应该注意以上原则,并将这些原则根据具体的业务以及实际的应用场景,灵活地运用到自己的Spark作业中。
4原则一:避免创建重复的RDD
通常来说,我们在开发一个Spark作业时,首先是基于某个数据源(比如Hive表或HDFS文件)创建一个初始的RDD;接着对这个RDD执行某个算子操作,然后得到下一个RDD;以此类推,循环往复,直到计算出最终我们需要的结果。在这个过程中,多个RDD会通过不同的算子操作(比如map、reduce等)串起来,这个“RDD串”,就是RDD lineage,也就是“RDD的血缘关系链”。
我们在开发过程中要注意:对于同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据。
一些Spark初学者在刚开始开发Spark作业时,或者是有经验的工程师在开发RDD lineage极其冗长的Spark作业时,可能会忘了自己之前对于某一份数据已经创建过一个RDD了,从而导致对于同一份数据,创建了多个RDD。这就意味着,我们的Spark作业会进行多次重复计算来创建多个代表相同数据的RDD,进而增加了作业的性能开销。
一个简单的例子
// 需要对名为“hello.txt”的HDFS文件进行一次map操作,再进行一次reduce操作。也就是说,需要对一份数据执行两次算子操作。
// 错误的做法:对于同一份数据执行多次算子操作时,创建多个RDD。
// 这里执行了两次textFile方法,针对同一个HDFS文件,创建了两个RDD出来,然后分别对每个RDD都执行了一个算子操作。
// 这种情况下,Spark需要从HDFS上两次加载hello.txt文件的内容,并创建两个单独的RDD;第二次加载HDFS文件以及创建RDD的性能开销,很明显是白白浪费掉的。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd1.map(...)
val rdd2 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd2.reduce(...)
// 正确的用法:对于一份数据执行多次算子操作时,只使用一个RDD。
// 这种写法很明显比上一种写法要好多了,因为我们对于同一份数据只创建了一个RDD,然后对这一个RDD执行了多次算子操作。
// 但是要注意到这里为止优化还没有结束,由于rdd1被执行了两次算子操作,第二次执行reduce操作的时候,还会再次从源头处重新计算一次rdd1的数据,因此还是会有重复计算的性能开销。
// 要彻底解决这个问题,必须结合“原则三:对多次使用的RDD进行持久化”,才能保证一个RDD被多次使用时只被计算一次。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt")
rdd1.map(...)
rdd1.reduce(...)
5原则二:尽可能复用同一个RDD
除了要避免在开发过程中对一份完全相同的数据创建多个RDD之外,在对不同的数据执行算子操作时还要尽可能地复用一个RDD。比如说,有一个RDD的数据格式是key-value类型的,另一个是单value类型的,这两个RDD的value数据是完全一样的。那么此时我们可以只使用key-value类型的那个RDD,因为其中已经包含了另一个的数据。对于类似这种多个RDD的数据有重叠或者包含的情况,我们应该尽量复用一个RDD,这样可以尽可能地减少RDD的数量,从而尽可能减少算子执行的次数。
一个简单的例子
// 错误的做法。
// 有一个<Long, String>格式的RDD,即rdd1。
// 接着由于业务需要,对rdd1执行了一个map操作,创建了一个rdd2,而rdd2中的数据仅仅是rdd1中的value值而已,也就是说,rdd2是rdd1的子集。
JavaPairRDD<Long, String> rdd1 = ...
JavaRDD<String> rdd2 = rdd1.map(...)
// 分别对rdd1和rdd2执行了不同的算子操作。
rdd1.reduceByKey(...)
rdd2.map(...)
// 正确的做法。
// 上面这个case中,其实rdd1和rdd2的区别无非就是数据格式不同而已,rdd2的数据完全就是rdd1的子集而已,却创建了两个rdd,并对两个rdd都执行了一次算子操作。
// 此时会因为对rdd1执行map算子来创建rdd2,而多执行一次算子操作,进而增加性能开销。
// 其实在这种情况下完全可以复用同一个RDD。
// 我们可以使用rdd1,既做reduceByKey操作,也做map操作。
// 在进行第二个map操作时,只使用每个数据的tuple._2,也就是rdd1中的value值,即可。
JavaPairRDD<Long, String> rdd1 = ...
rdd1.reduceByKey(...)
rdd1.map(tuple._2...)
// 第二种方式相较于第一种方式而言,很明显减少了一次rdd2的计算开销。
// 但是到这里为止,优化还没有结束,对rdd1我们还是执行了两次算子操作,rdd1实际上还是会被计算两次。
// 因此还需要配合“原则三:对多次使用的RDD进行持久化”进行使用,才能保证一个RDD被多次使用时只被计算一次。
6原则三:对多次使用的RDD进行持久化
当你在Spark代码中多次对一个RDD做了算子操作后,恭喜,你已经实现Spark作业第一步的优化了,也就是尽可能复用RDD。此时就该在这个基础之上,进行第二步优化了,也就是要保证对一个RDD执行多次算子操作时,这个RDD本身仅仅被计算一次。
Spark中对于一个RDD执行多次算子的默认原理是这样的:每次你对一个RDD执行一个算子操作时,都会重新从源头处计算一遍,计算出那个RDD来,然后再对这个RDD执行你的算子操作。这种方式的性能是很差的。
因此对于这种情况,我们的建议是:对多次使用的RDD进行持久化。此时Spark就会根据你的持久化策略,将RDD中的数据保存到内存或者磁盘中。以后每次对这个RDD进行算子操作时,都会直接从内存或磁盘中提取持久化的RDD数据,然后执行算子,而不会从源头处重新计算一遍这个RDD,再执行算子操作。
对多次使用的RDD进行持久化的代码示例
// 如果要对一个RDD进行持久化,只要对这个RDD调用cache()和persist()即可。
// 正确的做法。
// cache()方法表示:使用非序列化的方式将RDD中的数据全部尝试持久化到内存中。
// 此时再对rdd1执行两次算子操作时,只有在第一次执行map算子时,才会将这个rdd1从源头处计算一次。
// 第二次执行reduce算子时,就会直接从内存中提取数据进行计算,不会重复计算一个rdd。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
rdd1.map(...)
rdd1.reduce(...)
// persist()方法表示:手动选择持久化级别,并使用指定的方式进行持久化。
// 比如说,StorageLevel.MEMORY_AND_DISK_SER表示,内存充足时优先持久化到内存中,内存不充足时持久化到磁盘文件中。
// 而且其中的_SER后缀表示,使用序列化的方式来保存RDD数据,此时RDD中的每个partition都会序列化成一个大的字节数组,然后再持久化到内存或磁盘中。
// 序列化的方式可以减少持久化的数据对内存/磁盘的占用量,进而避免内存被持久化数据占用过多,从而发生频繁GC。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)
对于persist()方法而言,我们可以根据不同的业务场景选择不同的持久化级别。
Spark的持久化级别
持久化级别 | 含义解释 |
---|---|
MEMORY_ONLY | 使用未序列化的Java对象格式,将数据保存在内存中。如果内存不够存放所有的数据,则数据可能就不会进行持久化。那么下次对这个RDD执行算子操作时,那些没有被持久化的数据,需要从源头处重新计算一遍。这是默认的持久化策略,使用cache()方法时,实际就是使用的这种持久化策略。 |
MEMORY_AND_DISK | 使用未序列化的Java对象格式,优先尝试将数据保存在内存中。如果内存不够存放所有的数据,会将数据写入磁盘文件中,下次对这个RDD执行算子时,持久化在磁盘文件中的数据会被读取出来使用。 |
MEMORY_ONLY_SER | 基本含义同MEMORY_ONLY。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。 |
MEMORY_AND_DISK_SER | 基本含义同MEMORY_AND_DISK。唯一的区别是,会将RDD中的数据进行序列化,RDD的每个partition会被序列化成一个字节数组。这种方式更加节省内存,从而可以避免持久化的数据占用过多内存导致频繁GC。 |
DISK_ONLY | 使用未序列化的Java对象格式,将数据全部写入磁盘文件中。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等. | 对于上述任意一种持久化策略,如果加上后缀_2,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。假如某个节点挂掉,节点的内存或磁盘中的持久化数据丢失了,那么后续对RDD计算时还可以使用该数据在其他节点上的副本。如果没有副本的话,就只能将这些数据从源头处重新计算一遍了。 |
如何选择一种最合适的持久化策略
默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据。因为不进行序列化与反序列化操作,就避免了这部分的性能开销;对这个RDD的后续算子操作,都是基于纯内存中的数据的操作,不需要从磁盘文件中读取数据,性能也很高;而且不需要复制一份数据副本,并远程传送到其他节点上。但是这里必须要注意的是,在实际的生产环境中,恐怕能够直接用这种策略的场景还是有限的,如果RDD中数据比较多时(比如几十亿),直接用这种持久化级别,会导致JVM的OOM内存溢出异常。
如果使用MEMORY_ONLY级别时发生了内存溢出,那么建议尝试使用MEMORY_ONLY_SER级别。该级别会将RDD数据序列化后再保存在内存中,此时每个partition仅仅是一个字节数组而已,大大减少了对象数量,并降低了内存占用。这种级别比MEMORY_ONLY多出来的性能开销,主要就是序列化与反序列化的开销。但是后续算子可以基于纯内存进行操作,因此性能总体还是比较高的。此外,可能发生的问题同上,如果RDD中的数据量过多的话,还是可能会导致OOM内存溢出的异常。
如果纯内存的级别都无法使用,那么建议使用MEMORY_AND_DISK_SER策略,而不是MEMORY_AND_DISK策略。因为既然到了这一步,就说明RDD的数据量很大,内存无法完全放下。序列化后的数据比较少,可以节省内存和磁盘的空间开销。同时该策略会优先尽量尝试将数据缓存在内存中,内存缓存不下才会写入磁盘。
通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,有时还不如重新计算一次所有RDD。后缀为_2的级别,必须将所有数据都复制一份副本,并发送到其他节点上,数据复制以及网络传输会导致较大的性能开销,除非是要求作业的高可用性,否则不建议使用。
7原则四:尽量避免使用shuffle类算子
如果有可能的话,要尽量避免使用shuffle类算子。因为Spark作业运行过程中,最消耗性能的地方就是shuffle过程。shuffle过程,简单来说,就是将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操作。比如reduceByKey、join等算子,都会触发shuffle操作。
shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。
因此在我们的开发过程中,能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。
Broadcast与map进行join代码示例
// 传统的join操作会导致shuffle操作。
// 因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。
val rdd3 = rdd1.join(rdd2)
// Broadcast+map的join操作,不会导致shuffle操作。
// 使用Broadcast将一个数据量较小的RDD作为广播变量。
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)
// 在rdd1.map算子中,可以从rdd2DataBroadcast中,获取rdd2的所有数据。
// 然后进行遍历,如果发现rdd2中某条数据的key与rdd1的当前数据的key是相同的,那么就判定可以进行join。
// 此时就可以根据自己需要的方式,将rdd1当前数据与rdd2中可以连接的数据,拼接在一起(String或Tuple)。
val rdd3 = rdd1.map(rdd2DataBroadcast...)
// 注意,以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。
// 因为每个Executor的内存中,都会驻留一份rdd2的全量数据。
8原则五:使用map-side预聚合的shuffle操作
如果因为业务需要,一定要使用shuffle操作,无法用map类的算子来替代,那么尽量使用可以map-side预聚合的算子。
所谓的map-side预聚合,说的是在每个节点本地对相同的key进行一次聚合操作,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。通常来说,在可能的情况下,建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。因为reduceByKey和aggregateByKey算子都会使用用户自定义的函数对每个节点本地的相同key进行预聚合。而groupByKey算子是不会进行预聚合的,全量的数据会在集群的各个节点之间分发和传输,性能相对来说比较差。
比如如下两幅图,就是典型的例子,分别基于reduceByKey和groupByKey进行单词计数。其中第一张图是groupByKey的原理图,可以看到,没有进行任何本地聚合时,所有数据都会在集群节点之间传输;第二张图是reduceByKey的原理图,可以看到,每个节点本地的相同key数据,都进行了预聚合,然后才传输到其他节点上进行全局聚合。
9原则六:使用高性能的算子
除了shuffle相关的算子有优化原则之外,其他的算子也都有着相应的优化原则。
使用reduceByKey/aggregateByKey替代groupByKey
详情见“原则五:使用map-side预聚合的shuffle操作”。
使用mapPartitions替代普通map
mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重!
使用foreachPartitions替代foreach
原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写MySQL,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。实践中发现,对于1万条左右的数据量写MySQL,性能可以提升30%以上。
使用filter之后进行coalesce操作
通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。
使用repartitionAndSortWithinPartitions替代repartition与sort类操作
repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。
10原则七:广播大变量
有时在开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能。
在算子函数中使用到外部变量时,默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能。
因此对于上述情况,如果使用的外部变量比较大,建议使用Spark的广播功能,对该变量进行广播。广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,从而减少网络传输的性能开销,并减少对Executor内存的占用开销,降低GC的频率。
广播大变量的代码示例
// 以下代码在算子函数中,使用了外部的变量。
// 此时没有做任何特殊操作,每个task都会有一份list1的副本。
val list1 = ...
rdd1.map(list1...)
// 以下代码将list1封装成了Broadcast类型的广播变量。
// 在算子函数中,使用广播变量时,首先会判断当前task所在Executor内存中,是否有变量副本。
// 如果有则直接使用;如果没有则从Driver或者其他Executor节点上远程拉取一份放到本地Executor内存中。
// 每个Executor内存中,就只会驻留一份广播变量副本。
val list1 = ...
val list1Broadcast = sc.broadcast(list1)
rdd1.map(list1Broadcast...)
11原则八:使用Kryo优化序列化性能
在Spark中,主要有三个地方涉及到了序列化:
在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。
将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。
对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。
以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):
// 创建SparkConf对象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 设置序列化器为KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
12原则九:优化数据结构
Java中,有三种类型比较耗费内存:
对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。
字符串,每个字符串内部都有一个字符数组以及长度等额外信息。
集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry。
因此Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而降低GC频率,提升性能。
但是在笔者的编码实践中发现,要做到该原则其实并不容易。因为我们同时要考虑到代码的可维护性,如果一个代码中,完全没有任何对象抽象,全部是字符串拼接的方式,那么对于后续的代码维护和修改,无疑是一场巨大的灾难。同理,如果所有操作都基于数组实现,而不使用HashMap、LinkedList等集合类型,那么对于我们的编码难度以及代码可维护性,也是一个极大的挑战。因此笔者建议,在可能以及合适的情况下,使用占用内存较少的数据结构,但是前提是要保证代码的可维护性。
13原则十:Data Locality本地化级别
PROCESS_LOCAL:进程本地化,代码和数据在同一个进程中,也就是在同一个executor中;计算数据的task由executor执行,数据在executor的BlockManager中;性能最好
NODE_LOCAL:节点本地化,代码和数据在同一个节点中;比如说,数据作为一个HDFS block块,就在节点上,而task在节点上某个executor中运行;或者是,数据和task在一个节点上的不同executor中;数据需要在进程间进行传输NO_PREF:对于task来说,数据从哪里获取都一样,没有好坏之分RACK_LOCAL:机架本地化,数据和task在一个机架的两个节点上;数据需要通过网络在节点之间进行传输ANY:数据和task可能在集群中的任何地方,而且不在一个机架中,性能最差
spark.locality.wait,默认是3s
Spark在Driver上,对Application的每一个stage的task,进行分配之前,都会计算出每个task要计算的是哪个分片数据,RDD的某个partition;Spark的task分配算法,优先,会希望每个task正好分配到它要计算的数据所在的节点,这样的话,就不用在网络间传输数据;
但是可能task没有机会分配到它的数据所在的节点,因为可能那个节点的计算资源和计算能力都满了;所以呢,这种时候,通常来说,Spark会等待一段时间,默认情况下是3s钟(不是绝对的,还有很多种情况,对不同的本地化级别,都会去等待),到最后,实在是等待不了了,就会选择一个比较差的本地化级别,比如说,将task分配到靠它要计算的数据所在节点,比较近的一个节点,然后进行计算。
但是对于第二种情况,通常来说,肯定是要发生数据传输,task会通过其所在节点的BlockManager来获取数据,BlockManager发现自己本地没有数据,会通过一个getRemote()方法,通过TransferService(网络数据传输组件)从数据所在节点的BlockManager中,获取数据,通过网络传输回task所在节点。
对于我们来说,当然不希望是类似于第二种情况的了。最好的,当然是task和数据在一个节点上,直接从本地executor的BlockManager中获取数据,纯内存,或者带一点磁盘IO;如果要通过网络传输数据的话,那么实在是,性能肯定会下降的,大量网络传输,以及磁盘IO,都是性能的杀手。
什么时候要调节这个参数?
观察日志,spark作业的运行日志,推荐大家在测试的时候,先用client模式,在本地就直接可以看到比较全的日志。日志里面会显示,starting task,PROCESS LOCAL、NODE LOCAL,观察大部分task的数据本地化级别。
如果大多都是PROCESS_LOCAL,那就不用调节了,如果是发现,好多的级别都是NODE_LOCAL、ANY,那么最好就去调节一下数据本地化的等待时长调节完,应该是要反复调节,每次调节完以后,再来运行,观察日志 看看大部分的task的本地化级别有没有提升;看看,整个spark作业的运行时间有没有缩短
但是注意别本末倒置,本地化级别倒是提升了,但是因为大量的等待时长,spark作业的运行时间反而增加了,那就还是不要调节了。
spark.locality.wait,默认是3s;可以改成6s,10s
默认情况下,下面3个的等待时长,都是跟上面那个是一样的,都是3s。
spark.locality.wait.process//建议60s
spark.locality.wait.node//建议30s
spark.locality.wait.rack//建议20s
end
Flink 从入门到精通 系列文章
基于 Apache Flink 的实时监控告警系统关于数据中台的深度思考与总结(干干货)日志收集Agent,阴暗潮湿的地底世界
公众号(zhisheng)里回复 面经、ClickHouse、ES、Flink、 Spring、Java、Kafka、监控 等关键字可以查看更多关键字对应的文章。
点个赞+在看,少个 bug ????
以上是关于1w 字详解 Spark Shuffle (建议收藏反复看)的主要内容,如果未能解决你的问题,请参考以下文章