Spark 执行模型与性能调优,文末留言免费获取《Spark:大数据集群计算的生产实践》

Posted 过往记忆大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark 执行模型与性能调优,文末留言免费获取《Spark:大数据集群计算的生产实践》相关的知识,希望对你有一定的参考价值。


在深入探讨Spark应用的性能改善之前,有必要先了解Spark在集群上分布式执行程序的基础知识。当运行一个Spark应用时,driver进程会随着集群worker节点上的一系列executor进程一起启动。driver负责运行用户的应用程序,当有action被触发时driver负责管理所需执行的所有工作。另一方面,executor进程以任务(task)的形式执行实际的工作以及保存结果。但是,这些任务是如何分配给executor的呢?


对于Spark应用内部触发的每个action,DAG调度器都会创建一个执行计划来完成它。执行计划就是将尽可能多的窄依赖(narrow dependency)转换(transformation)装配到各步骤(stage)中。RDD间的窄依赖是指父RDD的每一个分区最多能被一个子RDD的分区使用。当有一些宽依赖需要做shuffle操作时,stage就受限制了。当多个子RDD的分区使用同一个父RDD的分区时,RDD间就会产生宽依赖(见图3-1)。


图3-1  RDD宽依赖和窄依赖

让我们看一个例子。考虑下面的代码片段:


val numbers = sc.parallelize(nrCollection)

val multiplied = number.filter(_%2 == 0).map(_ * 3).collect()


代码筛选出偶数,然后乘以3并通过collect操作将结果返回。由于它们输入分区的数据没有分发到多个输出分区,因而都是窄转换,所以它们都将在同一个stage中执行。


另外,下面的代码对文件里的单词进行计数,筛选出现过10次的单词,然后对这些单词中的每个字符出现的次数进行计数。最后,通过collect action操作触发job的执行。这些转换中有两个是stage边界(它们有宽依赖)。代码中的两个reduceByKey转换是生成3个stage的原因:


val words = sc.textFile("textFilePath").flatMap(_.split(' '))

val wordCounts = words.Map((_, 1)).reduceByKey(_ + _)

val filteredWords = wordCounts.filter(_._2 == 10)

val characters = filteredWords.flatmap(_._1.toCharArray)

                          .map((_, 1)).reduceByKey(_ + _)

characters.collect()


在定义3个stage之后,调度器将启动一个task,计算出最终RDD对应的各个分区。因此,事实上stage是一组任务,对数据的不同子集执行相同转换(transformation)。任务调度器将基于可用资源及数据局部性把这些任务分配给executor。


例如,如果需要转换的分区已经在某个特定节点的内存里,则任务的执行将被发送到该节点。


分区


根据上一节的描述,可以推断出RDD分区方式能极大地影响执行计划的创建方式,因此也会间接影响性能。现在我们看看分区是如何影响Spark应用性能的。


分区(partition)其实就是RDD中的数据被切分后形成的片段。当DAG调度器将job转换为stage时,每个分区将被处理成一个task,每个task需要一个CPU核来执行。这意味着Spark应用的并行度取决于RDD的分区数。因此,不难理解对Spark应用性能进行调优时,RDD的分区数可能是需要考虑的最重要的事情。


控制并行度

RDD的分区数与其创建方式高度相关。从文件创建的RDD都有默认的分区数。例如,如果文件存储在HDFS上,分区数将等于文件块数目(一个文件块对应一个分区)。这意味着可以通过在HDFS上写文件时的块大小,或者通过配置InputFormat创建的分片(split)的多少,来控制分区数。


你也可以通过并行化集合来创建RDD。在本例中,默认的分区数是由spark.default.parallelism属性决定的。这个默认值由集群管理器决定:对于运行在local模式的Spark 1.5.2来说,其值为CPU核的数目;对于细粒度模式的Mesos来说,其值为8;在其他情况下,分区数取2与所有executor上的CPU核总数的最大值。


但是,你可以控制这些默认值。对这两种创建RDD的方式,可以通过一个用户输入参数来控制分区的数量:


sc.textFile(<inputPath>, <minPartitions>)

sc.parallelize(<sequence>, <numSlices>)


最常见的创建RDD的方式,是对已有的RDD进行一些转换操作(transformation)。通常,一个RDD的分区数与它所依赖的RDD的分区数相同。然而,有一些转换,例如“union”(合并)就不受此规则的制约,因为它创建的RDD其分区数等于父RDD所有分区数的总和。


我们来看另一种会引起数据shuffle的转换操作。这类转换都是宽依赖,即计算RDD的一个分区需要处理父RDD的多个分区的数据。这种情况下,如果不特别指定,默认分区数将是所依赖的RDD的最大分区数。且看Pair RDD上的一个groupByKey转换的示例:


rdd.groupByKey(<numTasks>)


要想写一个高效的Spark应用程序,就必须设置最优的分区数。假设job生成的任务数少于可用的CPU数。这种情况下,可能面临两个性能问题:第一,不能充分发挥整体计算能力;第二,如果分区数量少,单个分区内的数据量将会比分区数量更多时大很多。对于更大的数据集,执行任务时还会有内存压力和垃圾回收的压力,导致运算速度减慢。


同样,如果一个分区内的数据太大无法加载到内存,数据将不得不溢写到磁盘以避免出现out-of-memory异常。但是溢写到磁盘需要排序及磁盘I/O等操作,会带来巨大的开销。


为充分利用集群的计算能力,分区数至少应当等于集群分配给应用程序的CPU数,但是分区过大的问题依然没有得到解决。如果你的数据集非常大,而集群又相当小,那么你的分区还是会过大。这种情形下,RDD的分区数必须远高于可用的CPU核数。


另一方面,你还得考虑周全以防落入另一种极端:分区数过多。分区数过多将会生成许多需要发送到worker节点执行的小任务,这将增加任务调度的开销。不过,启动任务所带来的性能损失比数据溢写到硬盘的损失要小。如果有许多任务几乎瞬时完成,或者这些任务根本没有执行任何读/写操作,这就表明你的并行度太高了。


对于RDD来说,很难计算出一个最佳的分区数,因为这很大程度上取决于数据集的大小、分区器(paritioner)本身,以及每个任务可用的总内存。为估算出一个比较精确的分区数,你需要了解你的数据及其分布情况。不过,建议把每个RDD的分区数量设置为CPU数的2到4倍。


分区器

我们讨论了如何控制RDD的分区数量,但是数据在这些分区上是怎样分布的呢?为了让分区中的数据分散到集群上,Spark使用分区器(paritioner),目前有两种内置的分区器:HashParitioner和RangePartitioner。

选择分区器的默认方式是,对这两个参数中的一个进行设置来决定使用何种分区器:

  1. 当任何输入的RDD用到了某一分区器,输出的RDD也会用此分区器来分区。

  2. 否则,在Pair RDD(键值对)情形下,默认使用HashPartitioner。


HashPartitioner基于键(key)的哈希码(hash code)把值分布到各个分区上。通过计算键的哈希码与分区数的模,得到分区索引值,在计算中也需要考虑到哈希码的正负情况。


RangePartitioner根据范围对可排序项进行分区。对RDD内容进行取样能决定大致的范围区间。最终的分区数可能小于配置的数。


不过,也不是非得使用这些分区器,你可以自己开发一个。如果你对使用场景的领域知识非常了解,会非常有帮助。假设你有一个Pair RDD,其中键是文件系统上文件的路径。如果使用HashPartitioner,foler1/firstFileName.txt与folder1/secondFileName.png 可能结束于不同节点的不同分区。如果你想把同一文件夹的所有文件放在同一分区内,可以编写自己的分区器,基于父文件夹来分发文件。


编写自定义分区器其实非常简单,只需要对分区器的org.apache.spark. Partitioner进行扩展,并实现下面的方法即可。


  1. getPartition(key: Any): Int——为特定的键提供分区ID。

  2. numPartitions: Int——指定分区器创建的分区数。

  3. equals and hashcode——用来将你的分区器同其他分区器进行比较的方法。


一旦实现了自定义分区器,使用起来会非常简单。既可以把它传递给partitionBy函数,也可以传递给基于shuffle的函数。


pairRdd.partitionBy(new MyPartitioner(3))

firstPairRdd.join(secondPairRdd, new MyParitioner(3))


有一些操作,譬如map函数,也会影响分区。在map操作中,可以改变一个Pair RDD中的键,这样分区就会发生变化。这种情形下,生成的RDD将没有分区器集。不过,你可以从两种方式中(mapValues及flatMapValues)选择一种,对Pair RDD中的值进行map处理以保留分区器。


活动规则

【4】本活动解释权Hadoop技术博文归所有。

以上是关于Spark 执行模型与性能调优,文末留言免费获取《Spark:大数据集群计算的生产实践》的主要内容,如果未能解决你的问题,请参考以下文章

Spark性能调优与故障处理

Spark调优内存模型与参数调优

MySQL性能测试 · 数据库调优攻略

Spark 数据分析调优

2022-02-26-Spark-46(性能调优SparkUI)

[3.3.0]数据倾斜与shuffle类性能调优