1. RDD1 和 RDD2 进行 Join 操作, 其中采用采样的方式发现 RDD1 中有严重的数据倾斜的 Key
第一步: 采用 Spark RDD 中提供的采样接口,基于采样的数据可以计算出哪个(哪些)Key 的 Values 个数最多
第二步:把全休数据分成两部分,即把原来一个RDD1 变成 RDD11 和 RDD12, 其中 RDD11 代表导致数据倾斜的 Key, RDD12 中包含的是不会产生数据倾斜的 Key
第三步:RDD11 join RDD2 + RDD12 join RDD2 得出结果就是 RDD1 join RDD2 的结果
上述流程中:
第一种情况: 如果 RDD11 中的数据量不是很多,可以采用 Map 端的 Join 操作,避免了 Shuffle 和数据倾斜。
第二种情况: 如果 RDD11 中的数据量特别多, 此时之所以能够缓解数据倾斜,是因为采用了 SparkCore天然的并行机制对RDD11 中的同样一个 Key 的数据进行了拆分 。从而达到让原本倾斜的 Key 分散到不同的 Task的目的,就缓解了数据倾斜。
2. 数据倾斜通常发生在有 Shuffle 操作时,某个 Key 具有较大的数据量。一般解决方案是降低此 Key 的数据量。可以通过以下几个方面考虑:
(1)根据业务,把 Key 分成更细粒度的 Key。例如:原先只是用某个 ID 作为 Key,可以考虑把 Key 变为 ID+省市。
(2)在 Key 之前或之后加上某一范围的随机数(如 1 ~ 100)。这样这个 Key 就被分解成了 100 个 Key。到最后再去掉前轻或后辍
最终,可以通过加内存和Core
3. Reduce 端发生OOM
Mapper 端和 Reduce 端之间有一个 Cache (默认大小 48M)层,发生OOM时,通过设置 spark.reducer.maxSizeInFlight 以减少缓存层的大小。另外,如果内存够用,可以增大这个值,以减少网络传输的次数,从而提高性能。
4. Shuffle file not found
当 Reducer 端根据 Driver 端提供的信息到 Mapper 中指定的位置去获取属于自己的数据的时候,首先会定位数据所在的文件,此时可能发生 Shuffle file not found 的错误。重试的时候,又没有问题了。一般原因是 Mapper 在进行GC,我们去请求数据的时候没有响应。有个设置项 spark.shuffle.io.maxRetries = 3
,spark.shuffle.io.retryWait
= 5s 表示 3*5 秒后,仍然找不到文件。这也说明 GC 的速度太慢了。
5. Spark on Yarn cluster 模式下, Driver 端产生 OOM,而在 Client 模式下则没有。
在 Client 模式下,加载本地配置的时候,Driver 所在的 JVM 默认的永久代大小是 128M; 而在 Cluster 模式下,默认的是82M。