Spark Shuffle FetchFailedException

Posted 常生果

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark Shuffle FetchFailedException相关的知识,希望对你有一定的参考价值。

spark大规模数据处理中,有个比较常见的错误:

org.apache.spark.shuffle.MetadataFetchFailedException: 
Missing an output location for shuffle 0

ERROR shuffle.RetryingBlockFetcher: Failed to fetch block shuffle_0_1300_106, and will not retry (0 retries)

java.lang.RuntimeException: java.lang.RuntimeException: Failed to open file: /data03/yarn/nm/usercache/vendorszry/appcache/application_1533223718021_249359/blockmgr-980fe20e-f8ef-4646-a01c-2e55bb07b333/01/shuffle_0_343_0.index at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:243)

.............
..............
................

Caused by: java.io.FileNotFoundException: /data03/yarn/nm/usercache/vendorszry/appcache/application_1533223718021_249359/blockmgr-980fe20e-f8ef-4646-a01c-2e55bb07b333/01/shuffle_0_343_0.index (No such file or directory)
	at java.io.FileInputStream.open(Native Method)
	at java.io.FileInputStream.<init>(FileInputStream.java:146)
	at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.getSortBasedShuffleBlockData(ExternalShuffleBlockResolver.java:232)

 

原因分析如下:

     spark  Map 操作将 RDD 里的各个元素进行映射, RDD 的各个数据元素之间不存在依赖,可以在集群的各个内存中独立计算,也就是并行化,但是Map后进行reduce相关操作,spark为了计算相同 key 下的元素个数,需要把相同 key 的元素聚集到同一个 partition 下,所以造成了数据在内存中的重新分布,即 shuffle 操作.

shuffle 操作是 spark 中最耗时的操作,应尽量避免不必要的 shuffle. 
  宽依赖主要有两个过程: shuffle write 和 shuffle fetch. 类似 Hadoop 的 Map 和 Reduce 阶段.shuffle write 将 ShuffleMapTask 任务产生的中间结果缓存到内存中, shuffle fetch 获得 ShuffleMapTask 缓存的中间结果进行 ShuffleReduceTask 计算,这个过程容易造成OutOfMemory. 

shuffle分为shuffle write和shuffle read两部分。
shuffle write的分区数由上一阶段的RDD分区数控制,shuffle read的分区数则是由Spark提供的一些参数控制。

shuffle write可以简单理解为类似于saveAsLocalDiskFile的操作,将计算的中间结果按某种规则临时放到各个executor所在的本地磁盘上。

shuffle read 以简单理解为读取中间数据结果,进行Reduce操作,这时候数据的分区数则是由spark提供的一些参数控制。

可以想到的是,如果这个参数值设置的很小,同时shuffle read的量很大,那么将会导致一个task需要处理的数据非常大。结果导致JVM crash,从而导致取shuffle数据失败,同时executor也丢失了,看到Failed to connect to host的错误,也就是executor lost的意思。有时候即使不会导致JVM crash也会造成长时间的gc。

Shuffle内存分配过程:
  shuffle 过程内存分配使用 ShuffleMemoryManager 类管理,会针对每个 Task 分配内存,Task 任务完成后通过 Executor 释放空间.这里可以把 Task 理解成不同 key 的数据对应一个 Task. 早期的内存分配机制使用公平分配,即不同 Task 分配的内存是一样的,但是这样容易造成内存需求过多的 Task 的 OutOfMemory, 从而造成多余的 磁盘 IO 过程,影响整体的效率.(例:某一个 key 下的数据明显偏多,但因为大家内存都一样,这一个 key 的数据就容易 OutOfMemory).1.5版以后 Task 共用一个内存池,内存池的大小默认为 JVM 最大运行时内存容量的16%,分配机制如下:假如有 N 个 Task,ShuffleMemoryManager 保证每个 Task 溢出之前至少可以申请到1/2N 内存,且至多申请到1/N,N 为当前活动的 shuffle Task 数,因为N 是一直变化的,所以 manager 会一直追踪 Task 数的变化,重新计算队列中的1/N 和1/2N.但是这样仍然容易造成内存需要多的 Task 任务溢出,所以最近有很多相关的研究是针对 shuffle 过程内存优化的. 

以上是关于Spark Shuffle FetchFailedException的主要内容,如果未能解决你的问题,请参考以下文章

spark 4种 shuffle机制与mapreduce shuffle机制对比

Spark的Shuffle是怎么回事

Spark中的Spark Shuffle详解

Spark shuffle调优

只了解MapReduce的Shuffle?Spark Shuffle了解一下

Spark Shuffle