spark 参数调优
Posted 宝哥大数据[离职找工作中,大佬帮内推下]
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark 参数调优相关的知识,希望对你有一定的参考价值。
文章目录
spark = SparkSession.builder.appName(splitext(basename(__file__))[0]) \\
.config("spark.sql.crossJoin.enabled", "true") \\
.config("spark.sql.autoBroadcastJoinThreshold", "104857600") \\
.config("spark.dynamicAllocation.enabled", "true") \\
.config("spark.dynamicAllocation.executorIdleTimeout", "30s") \\
.config("spark.dynamicAllocation.schedulerBacklogTimeout", "10s") \\
.config("spark.dynamicAllocation.initialExecutors", "4") \\
.config("spark.dynamicAllocation.maxExecutors", "6") \\
.config("spark.dynamicAllocation.minExecutors", "1") \\
.config("spark.shuffle.service.enabled", "true") \\
.config("spark.executor.memory", "5G") \\
.config("spark.executor.cores", "6") \\
.config("spark.default.parallelism", "144") \\
.config("spark.sql.shuffle.partitions", "144") \\
.config("spark.sql.windowExec.buffer.spill.threshold", "40960") \\
// 3.0 cbo
.config("spark.sql.cbo.enabled", "true") \\
.config("spark.sql.cbo.joinReorder.enabled", "true") \\
.config("spark.sql.cbo.joinReorder.dp.star.filter", "true") \\
.config("spark.sql.cbo.starSchemaDetection", "true") \\
.config("spark.sql.statistics.histogram.enabled", "true") \\
.config("spark.executor.memoryOverhead", "2048") \\
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \\
.enableHiveSupport().getOrCreate()
一、shuffle 调优
在划分stage时,最后一个stage称为finalStage,它本质上是一个ResultStage对象,前面的所有stage被称为ShuffleMapStage。ShuffleMapStage的结束伴随着shuffle文件的写磁盘。
1.1、map和reduce端缓冲区大小
1.1.1、map端
在Spark任务运行过程中,如果shuffle的map端处理的数据量比较大,但是map端缓冲的大小是固定的,可能会出现map端缓冲数据频繁spill溢写到磁盘文件中的情况,使得性能非常低下,通过调节map端缓冲的大小,可以避免频繁的磁盘IO操作,进而提升Spark任务的整体性能。
map端缓冲的默认配置是32KB,如果每个task处理640KB的数据,那么会发生640/32 = 20次溢写,如果每个task处理64000KB的数据,机会发生64000/32=2000此溢写,这对于性能的影响是非常严重的。
val conf = new SparkConf().set("spark.shuffle.file.buffer", "64")
1.1.2、reduce端
Spark Shuffle过程中,shuffle reduce task的buffer缓冲区大小决定了reduce task每次能够缓冲的数据量,也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。
reduce端数据拉取缓冲区的大小可以通过spark.reducer.maxSizeInFlight
参数进行设置,默认为48MB
val conf = new SparkConf()
.set("spark.shuffle.file.buffer", "64")
1.2、reduce端重试次数和等待时间间隔
1.2.1、重试次数
Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试。对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(默认3次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。
//reduce端拉取数据重试次数配置
val conf = new SparkConf()
.set("spark.shuffle.io.maxRetries", "6")
1.2.2、等待时间间隔
Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,可以通过加大间隔时长(默认5s),以增加shuffle操作的稳定性。
val conf = new SparkConf()
.set("spark.shuffle.io.retryWait", "60s")
1.3、bypass机制开启阈值
当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量,那么此时map-side就不会进行排序了,减少了排序的性能开销。
val conf = new SparkConf()
.set("spark.shuffle.sort.bypassMergeThreshold", "400")
1.4、其他
1、spark.shuffle.file.buffer
:主要是设置的Shuffle过程中写文件的缓冲,默认32k,如果内存足够,可以适当调大,来减少写入磁盘的数量。
2、spark.reducer.maxSizeInFight
:主要是设置Shuffle过程中读文件的缓冲区,一次能够读取多少数据,如果内存足够,可以适当扩大,减少整个网络传输次数。
3、spark.shuffle.io.maxRetries
:主要是设置网络连接失败时,重试次数,适当调大能够增加稳定性。
4、spark.shuffle.io.retryWait
:主要设置每次重试之间的间隔时间,可以适当调大,增加程序稳定性。
5、spark.shuffle.memoryFraction
:Shuffle过程中的内存占用,如果程序中较多使用了Shuffle操作,那么可以适当调大该区域。
6、spark.shuffle.manager
:Hash和Sort方式,Sort是默认,Hash在reduce数量比较少的时候,效率会很高。
7、spark.shuffle.sort. bypassMergeThreshold
:设置的是Sort方式中,启用Hash输出方式的临界值,如果你的程序数据不需要排序,而且reduce数量比较少,那推荐可以适当增大临界值。
8、spark. shuffle.cosolidateFiles
:如果你使用Hash shuffle方式,推荐打开该配置,实现更少的文件输出。
以上是关于spark 参数调优的主要内容,如果未能解决你的问题,请参考以下文章