Spark MetadataFetchFailedException 问题排查
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark MetadataFetchFailedException 问题排查相关的知识,希望对你有一定的参考价值。
参考技术Ahttps://blog.csdn.net/u013332124/article/details/93629816
业务反馈某个任务运行失败,看Spark History Server发现多个Stage报如下错误:
从UI以及异常的信息来看,是下游Stage做Shuffle Read时发现数据丢失,之后上游Stage和本Stage都重新调度计算,但是重试又发生类似的情况,直到重试3次后整个application失败。
后面application又自动进行重试,但是依旧失败了,因此判断这不是偶然的情况。
所以问题只有一个: 为什么下游Stage要获取数据时,找不到上游Stage输出的数据了 ?
shuffle的过程主要是上游Stage将数据写到磁盘,然后下游Stage通过Executor的BlockManager来拉取数据。如果下游Stage要拉取数据时Executor已经异常下线了,就会导致下游Stage拉取不到数据。这时候就会报MetadataFetchFailedException。
看了下Driver的日志,确实发现很多 Lost executor 日志( 如果Executor不是Driver主动通知退出的,Driver发现Executor长时间没有响应就会输出这条日志)。
随便找了台Lost Executor的日志,确实有些问题:
Executor异常退出的最大可能就是OOM了。这个任务配置的Executor内存是10g,core数量是8个,一个task需要的cpu是1个,因此task并行度是8。如果task需要的内存大于 10/8=1.25g的话,那很可能会导致OOM。
但是观察了下spark History server上数据量的大小,发现各个stage处理的数据量都不大,最多几十G,stage的task数量一般都是1000个,分摊下来一个task也就处理几百M那里,不至于导致OOM。 当然,不排除数据倾斜的问题。
最重要的是,如果Executor发生OOM,是会有出错日志的,但是看了几个Executor的日志,都没找到相关的出错日志。
还有一种情况是linux在系统内存使用紧张时会根据一些算法kill某些进程,Executor可能就会被kill掉。
后面看了下那个时间点机器的使用内存,发现还有大量的剩余内存。因此和OOMKiller无关
如果某个NodeManager在运行过程中工作磁盘使用率达到了一定的阀值,该NodeManager会被标记为UnHealth,同时在上面运行的Container都会被停止。
用df -h命令检查了下机器的磁盘使用情况,也没什么问题。因此排除这种可能
和判断NodeManager是否健康的配置有:
yarn.nodemanager.disk-health-checker.enable : 是否开启磁盘健康检查,默认是true,表示开启
yarn.nodemanager.disk-health-checker.min-healthy-disks :NodeManager上最少保证健康磁盘比例,当健康磁盘比例低于该值时,NodeManager不会再接收和启动新的Container,默认值是0.25,表示25%;
yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage :一块磁盘的最高使用率,当一块磁盘的使用率超过该值时,则认为该盘为坏盘,不再使用该盘,默认是90,表示90%,可以适当调低;
yarn.nodemanager.disk-health-checker.min-free-space-per-disk-mb :一块磁盘最少保证剩余空间大小,当某块磁盘剩余空间低于该值时,将不再使用该盘,默认是0,表示0MB。
后面无意看到一个成功的Stage的某个失败Task日志,才知道yarn会因为内存问题会kill Executor:
异常信息也很明了,Executor使用的内存超过了限制,因此被Yarn Kill掉。
spark.yarn.executor.memoryOverHead默认配置是ExecutorMemory的10%,也就是1G。所以Executor在yarn这边申请的内存是11G。但是spark.yarn.executor.memoryOverHeap是堆外内存,主要用于JVM自身,字符串, NIO Buffer等开销,因此没做限制。
如果运行的task数量过多,OverHead的使用就会超过1G,最终Executor总的使用内存超过11G,yarn有个container内存使用检测机制,如果发现有container内存使用超标,就会主动kill这个Container。
这个问题的主要原因就是任务的task并行度是8,但是OverHead大小只有1G,在任务运行过程中不时的OverHead区域超过了1G,最终导致Executor被yarn给kill了。
举个例子,比如某个上游Stage的task运行成功将数据写入Executor A后,下游Stage的task又分发到这个Executor A上,这时运行过程中Executor A的OverHead区域使用超过了1G,整个Executor 被kill,这样Executor A上的shuffle数据就丢失了。后面的重试又不断的重复这样的场景,直到整个任务失败。
通过调整任务的参数可以解决这个问题。这个问题的主要原因在于OverHead的大小。因此我们有两个方向调整任务的参数:
Executor运行时会告诉yarn要申请的资源数量,如果要申请的内存超过yarn配置的 yarn.scheduler.maximux-allocation-mb值,yarn就会拒绝Executor的启动。
Executor启动后,yarn也有 Container 的内存监控机制。如果运行过程中Executor实际使用的内存超过了申请的内存,yarn发现了就会主动kill 这个Executor。比如Executor申请资源时指定要10G,但是运行过程过实际使用超过了10G,那么yarn就会主动kill这个Executor。
有两种情况可能导致Executor实际使用的内存超过预期值:
overhead的大小由spark.yarn.executor.memoryOverHead来配置,如果没配置,默认是ExecutorMemory的10%。
Executor向yarn申请内存时会自动算上overhead,但是还是会有overhead大小不够用的情况。
这种一般都发生在Executor同时运行的task数量比较多的情况,overhead区域主要用于JVM自身,字符串, NIO Buffer(Driect Buffer)等开销,如果task并发度太高,就会导致overhead区域不够用的情况。
因为overhead是堆外内存,因此它不会受JVM内存限制。但是overhead使用超过了预期的值后,yarn就开始kill这个 Executor了。
解决办法:
yarn对container内存的判断不单单只判断Executor的内存使用量,如果Executor又开启了一个子进程,这个子进程使用的内存也会算在Container的内存使用里面。
也就是说,yarn对container内存判断算的是整个Executor进程树的总内存大小。
典型的场景就是Executor中又进行了shell调用,shell运行的子程序又占用了一定大小,最终导致超过了预期值,然后Executor被yarn kill。
解决方法:
科普Spark,Spark是什么,如何使用Spark
科普Spark,Spark是什么,如何使用Spark
1.Spark基于什么算法的分布式计算(很简单)
2.Spark与MapReduce不同在什么地方
3.Spark为什么比Hadoop灵活
4.Spark局限是什么
5.什么情况下适合使用Spark
什么是Spark
Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。其架构如下图所示:
Spark与Hadoop的对比
Spark的中间数据放到内存中,对于迭代运算效率更高。
Spark更适合于迭代运算比较多的ML和DM运算。因为在Spark里面,有RDD的抽象概念。
Spark比Hadoop更通用
Spark提供的数据集操作类型有很多种,不像Hadoop只提供了Map和Reduce两种操作。比如map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等多种操作类型,Spark把这些操作称为Transformations。同时还提供Count, collect, reduce, lookup, save等多种actions操作。
这些多种多样的数据集操作类型,给给开发上层应用的用户提供了方便。各个处理节点之间的通信模型不再像Hadoop那样就是唯一的Data Shuffle一种模式。用户可以命名,物化,控制中间结果的存储、分区等。可以说编程模型比Hadoop更灵活。
不过由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。
容错性
在分布式数据集计算时通过checkpoint来实现容错,而checkpoint有两种方式,一个是checkpoint data,一个是logging the updates。用户可以控制采用哪种方式来实现容错。
可用性
Spark通过提供丰富的Scala, Java,Python API及交互式Shell来提高可用性。
Spark与Hadoop的结合
Spark可以直接对HDFS进行数据的读写,同样支持Spark on YARN。Spark可以与MapReduce运行于同集群中,共享存储资源与计算,数据仓库Shark实现上借用Hive,几乎与Hive完全兼容。
Spark的适用场景
Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小(大数据库架构中这是是否考虑使用Spark的重要因素)
由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合。总的来说Spark的适用面比较广泛且比较通用。
运行模式
本地模式
Standalone模式
Mesoes模式
yarn模式
Spark生态系统
Shark ( Hive on Spark): Shark基本上就是在Spark的框架基础上提供和Hive一样的H iveQL命令接口,为了最大程度的保持和Hive的兼容性,Shark使用了Hive的API来实现query Parsing和 Logic Plan generation,最后的PhysicalPlan execution阶段用Spark代替Hadoop MapReduce。通过配置Shark参数,Shark可以自动在内存中缓存特定的RDD,实现数据重用,进而加快特定数据集的检索。同时,Shark通过UDF用户自定义函数实现特定的数据分析学习算法,使得SQL数据查询和运算分析能结合在一起,最大化RDD的重复使用。
Spark streaming: 构建在Spark上处理Stream数据的框架,基本的原理是将Stream数据分成小的时间片断(几秒),以类似batch批量处理的方式来处理这小部分数据。Spark Streaming构建在Spark上,一方面是因为Spark的低延迟执行引擎(100ms+)可以用于实时计算,另一方面相比基于Record的其它处理框架(如Storm),RDD数据集更容易做高效的容错处理。此外小批量处理的方式使得它可以同时兼容批量和实时数据处理的逻辑和算法。方便了一些需要历史数据和实时数据联合分析的特定应用场合。
Bagel: Pregel on Spark,可以用Spark进行图计算,这是个非常有用的小项目。Bagel自带了一个例子,实现了Google的PageRank算法。
以上是关于Spark MetadataFetchFailedException 问题排查的主要内容,如果未能解决你的问题,请参考以下文章