Spark(二十二)Shuffle调优之调节Map端内存缓冲与Reduce端内存占比

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark(二十二)Shuffle调优之调节Map端内存缓冲与Reduce端内存占比相关的知识,希望对你有一定的参考价值。

参考技术A Map端内存缓冲,Reduce端内存占比;很多资料、网上视频,都会说,这两个参数,是调节Shuffle性能的不二选择,很有效果的样子,实际上,不是这样的。

以实际的生产经验来说,这两个参数没有那么重要,往往来说,shuffle的性能不是因为这方面的原因导致的

默认情况下,shuffle的map task,输出到磁盘文件的时候,统一都会先写入每个task自己关联的一个内存缓冲区。

这个缓冲区大小,默认是32kb。

每一次,当内存缓冲区满溢之后,才会进行spill操作,溢写操作,溢写到磁盘文件中去。

在实际生产环境中,我们在什么时候来调节两个参数?

2、看Spark UI,如果你的公司是决定采用standalone模式,那么狠简单,你的spark跑起来,会显示一个Spark UI的地址,4040的端口,进去看,依次点击进去,可以看到,你的每个stage的详情,有哪些executor,有哪些task,每个task的shuffle write和shuffle read的量,shuffle的磁盘和内存,读写的数据量;如果是用的yarn模式来提交,课程最前面,从yarn的界面进去,点击对应的application,进入Spark UI,查看详情。

3、如果发现shuffle 磁盘的write和read,很大。这个时候,就意味着最好调节一些shuffle的参数。进行调优。首先当然是考虑开启map端输出文件合并机制。

4、调节上面说的那两个参数。调节的时候的原则。spark.shuffle.file.buffer,每次扩大一倍,然后看看效果,64,128;spark.shuffle.memoryFraction,每次提高0.1,看看效果。

5、不能调节的太大,太大了以后过犹不及,因为内存资源是有限的,你这里调节的太大了,其他环节的内存使用就会有问题了。

6、调节了以后,效果?map task内存缓冲变大了,减少spill到磁盘文件的次数;reduce端聚合内存变大了,减少spill到磁盘的次数,而且减少了后面聚合读取磁盘文件的数量。

大数据之Spark:Spark调优之Shuffle调优

目录

1. map和reduce端缓冲区大小

在Spark任务运行过程中,如果shuffle的map端处理的数据量比较大,但是map端缓冲的大小是固定的,可能会出现map端缓冲数据频繁spill溢写到磁盘文件中的情况,使得性能非常低下,通过调节map端缓冲的大小,可以避免频繁的磁盘IO操作,进而提升Spark任务的整体性能。

map端缓冲的默认配置是32KB,如果每个task处理640KB的数据,那么会发生640/32 = 20次溢写,如果每个task处理64000KB的数据,即会发生64000/32=2000次溢写,这对于性能的影响是非常严重的。

map端缓冲的配置方法:

val conf = new SparkConf()
  .set("spark.shuffle.file.buffer", "64")

Spark Shuffle过程中,shuffle reduce task的buffer缓冲区大小决定了reduce task每次能够缓冲的数据量,也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。

reduce端数据拉取缓冲区的大小可以通过spark.reducer.maxSizeInFlight参数进行设置,默认为48MB。该参数的设置方法如下:

reduce端数据拉取缓冲区配置:

val conf = new SparkConf()
  .set("spark.reducer.maxSizeInFlight", "96")

2. reduce端重试次数和等待时间间隔

Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试。对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

reduce端拉取数据重试次数可以通过spark.shuffle.io.maxRetries参数进行设置,该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败,默认为3,该参数的设置方法如下:

reduce端拉取数据重试次数配置:

val conf = new SparkConf()
  .set("spark.shuffle.io.maxRetries", "6")

Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,可以通过加大间隔时长(比如60s),以增加shuffle操作的稳定性。

reduce端拉取数据等待间隔可以通过spark.shuffle.io.retryWait参数进行设置,默认值为5s,该参数的设置方法如下:

reduce端拉取数据等待间隔配置:

val conf = new SparkConf()
  .set("spark.shuffle.io.retryWait", "60s")

3. bypass机制开启阈值

对于SortShuffleManager,如果shuffle reduce task的数量小于某一阈值则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。

当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量,那么此时map-side就不会进行排序了,减少了排序的性能开销,但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

SortShuffleManager排序操作阈值的设置可以通过spark.shuffle.sort.bypassMergeThreshold这一参数进行设置,默认值为200,该参数的设置方法如下:

reduce端拉取数据等待间隔配置:

val conf = new SparkConf()
  .set("spark.shuffle.sort.bypassMergeThreshold", "400")

以上是关于Spark(二十二)Shuffle调优之调节Map端内存缓冲与Reduce端内存占比的主要内容,如果未能解决你的问题,请参考以下文章

Spark学习之路 SparkCore的调优之开发调优

Spark学习之路 SparkCore的调优之Shuffle调优

Spark学习之路 SparkCore的调优之Shuffle调优

大数据之Spark:Spark调优之RDD算子调优

Spark性能调优之解决数据倾斜

Spark学习之路 SparkCore的调优之Spark内存模型[转]