spark shuffle

Posted 拱头

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark shuffle相关的知识,希望对你有一定的参考价值。

功能上看Spark的存储管理模型可以分为两部分:RDD缓存和Shuffle数据的持久化. RDD缓存,指的是RDD调用cache(),persist()或checkpoint,调用这个三个方法会将RDD对应的数据块结果存储到内存或者磁盘中, 可以将窄依赖的结果存储下来. Shuffle数据持久化,在一个Application中,可能会经过多次Shuffle过程,Shuffle的中间数据块是会被保存下来的直到Application结束. Shuffle数据持久化代码说明: def testShuffleAndCache (): Unit =
val conf = new SparkConf().setAppName( "run-algorithm" ).setMaster( "local" )
val sc = new SparkContext(conf)
val num = 100
val a1 = new Array[ Double ](num)
val a2 = new Array[ Double ](num)
val a3 = new Array[ Double ](num)
val r = new Random()
for (i <- a1.indices)
a1(i) = r.nextDouble()
a2(i) = r.nextDouble()
a3(i) = r.nextDouble()

val srcRDD = sc.parallelize(a1 zip a2)
/**Shuffle 持久化不能存储 map 的结果 , 因为 map 是窄依赖 , 但是如果在 map 后面加 cache/persist 就能将 map 结果存储下来 */
val sortedRDD = srcRDD.sortBy(x=>x._1).sortBy(x=>x._2).sortBy(x=>x._1).sortBy(x=>x._2).map(_._1)
/** 以下 sortedRDD 第一次被执行 , 查看前段的日志输出的时候 , 会发现输出结果之前执行了 5 stages*/
println (sortedRDD.count())
/** 以下 sortedRDD 不是第一次执行了 ,sortedRDD 已经保存了最后一次 Shuffle 的数据 , 所以只执行了一个 stage*/
println (sortedRDD.count())

在这里有必要说一下Spark的cache,cache只适合使用在不太大的RDD中,如果RDD太大不要使用cache,使用persist或者checkpoint,因为cache是将整个RDD存在内存中,RDD太大的话显然是放不下的。这里举个例子: 假设有100G的数据要处理,每个块128m,在分布式中,是每个core每次只将一个块读入内存中的,每次处理完一个再读取下一个入内存,上一个的处理结果存入磁盘,所以才能处理无穷大的数据量,但是cache是将所有数据存入内存,所以10G的机子肯定不可能cache 100G的数据。

以上是关于spark shuffle的主要内容,如果未能解决你的问题,请参考以下文章

Spark Shuffle原理解析

spark shuffle 内幕彻底解密课程

Spark性能优化--数据倾斜调优与shuffle调优

大数据之Spark:Spark调优之Shuffle调优

当 shuffle 分区大于 200 时会发生啥(数据帧中的 spark.sql.shuffle.partitions 200(默认情况下))

Spark面试题:GC导致的 Shuffle文件拉取失败,报错 Shuffle file not found