问题排查 | Spark OrcFileFormat inferSchema执行巨慢问题分析
Posted 柚子聊大数据
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了问题排查 | Spark OrcFileFormat inferSchema执行巨慢问题分析相关的知识,希望对你有一定的参考价值。
1、现象说明
最近业务开发的同事在使用大数据平台提供的二次开发功能时,碰到一个很奇怪的问题——二次开发的jar包手动利用spark-submit脚本提交到yarn,代码执行速度远远快于大数据平台。
2、定位过程
2.1 从二次开发代码着手
首先检查二次开发功能实现的逻辑,主要就是以下两个步骤:
1、获取当前线程的ClassLoader,并将自定义功能的jar加载进classpath。
2、利用反射技术,执行自定义接口实现类的方法。
从代码上来看,真的是十分的简洁,没有优化的地方。
2.2 检查自定义功能的实现逻辑
自定义功能的逻辑说起来也十分的简单:
根据业务需求列出hdfs指定目录下的某些orc文件,并调用
sparkcontext.read.orc(path)
从代码上来看,与大数据平台中唯一不同的就是构造FileSystem对象——在自定义功能中每次获取FileSystem都调用了FileSystem.get(Configuration),而在大数据平台中利用了双重校验锁实现的FileSystem单例对象。
在我先前的认知中,每个FileSystem的底层都会持有一个dfsClient对象,都是一个长链接,都会消耗系统的句柄。因此一个进程创建一个FileSystem对象即可,多了就是资源上的浪费。但是细看源码,便突破了我的认知。
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
if (conf.getBoolean(disableCacheName, false)) {
LOGGER.debug("Bypassing cache to create filesystem {}", uri);
return createFileSystem(uri, conf);
}
return CACHE.get(uri, conf);
源码中scheme,即为hdfs,默认情况下fs.hdfs.impl.disable.cache没有配置值,因此默认情况下if条件不会成立,同一个进程中每次获取的都是同一个FileSystem对象。(这一点也通过远程debug的方式得到了验证)
2.3 从日志判断慢的地方
利用jstack命令,查看二次开发功能执行到哪里了?
# 查看进程中,最繁忙的线程
# 注意,这边显示的是10进制,而jstack输出的线程编号是16进制,需要进程转换.
top -Hp 进程号
详细堆栈信息:
#488 prio=5 os_prio=0 tid=0x00007f5d5c07e000 nid=0x14039 runnable [0x00007f5bed3f4000]
RUNNABLE :
at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
locked <0x000000068342dfb8> (a sun.nio.ch.Util$3)
locked <0x000000068342dfa8> (a java.util.Collections$UnmodifiableSet)
locked <0x000000068342dd80> (a sun.nio.ch.EPollSelectorImpl)
at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
at org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335)
at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:258)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:209)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:171)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102)
at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:207)
at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:156)
locked <0x000000063fc2e730> (a org.apache.hadoop.hdfs.RemoteBlockReader2)
at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:744)
at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:800)
locked <0x000000063fc205a8> (a org.apache.hadoop.hdfs.DFSInputStream)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:860)
locked <0x000000063fc205a8> (a org.apache.hadoop.hdfs.DFSInputStream)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:903)
locked <0x000000063fc205a8> (a org.apache.hadoop.hdfs.DFSInputStream)
at java.io.DataInputStream.readFully(DataInputStream.java:195)
at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.extractMetaInfoFromFooter(ReaderImpl.java:369)
at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.<init>(ReaderImpl.java:316)
at org.apache.hadoop.hive.ql.io.orc.OrcFile.createReader(OrcFile.java:187)
at org.apache.spark.sql.hive.orc.OrcFileOperator$$anonfun$getFileReader$2.apply(OrcFileOperator.scala:68)
at org.apache.spark.sql.hive.orc.OrcFileOperator$$anonfun$getFileReader$2.apply(OrcFileOperator.scala:67)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at scala.collection.TraversableOnce$class.collectFirst(TraversableOnce.scala:143)
at scala.collection.AbstractIterator.collectFirst(Iterator.scala:1194)
at org.apache.spark.sql.hive.orc.OrcFileOperator$.getFileReader(OrcFileOperator.scala:69)
at org.apache.spark.sql.hive.orc.OrcFileOperator$$anonfun$readSchema$1.apply(OrcFileOperator.scala:77)
at org.apache.spark.sql.hive.orc.OrcFileOperator$$anonfun$readSchema$1.apply(OrcFileOperator.scala:77)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.sql.hive.orc.OrcFileOperator$.readSchema(OrcFileOperator.scala:77)
at org.apache.spark.sql.hive.orc.OrcFileFormat.inferSchema(OrcFileFormat.scala:60)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:177)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:177)
at scala.Option.orElse(Option.scala:289)
at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:176)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.orc(DataFrameReader.scala:582)
at com.baidu.spark.util.hdfs.HdfsReader$.stageData(HdfsReader.scala:73)
at com.baidu.spark.job.apollo.machine.ActivityAnalysisStream$$anonfun$reduce$1.apply(ActivityAnalysisStream.scala:63)
at com.baidu.spark.job.apollo.machine.ActivityAnalysisStream$$anonfun$reduce$1.apply(ActivityAnalysisStream.scala:62)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45)
at com.baidu.spark.job.apollo.machine.ActivityAnalysisStream.reduce(ActivityAnalysisStream.scala:62)
at com.baidu.spark.job.apollo.machine.ActivityAnalysisStream.process(ActivityAnalysisStream.scala:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.baidu.shark.common.DataProcessImpl.process(DataProcessImpl.java:80)
at com.baidu.shark.operator.common.JarWorkOperator.run(JarWorkOperator.scala:82)
at com.baidu.shark.service.impl.common.JarWorkServiceImpl.execute(JarWorkServiceImpl.java:42)
at com.baidu.shark.service.impl.common.CommonOperatorService.run(CommonOperatorService.java:257)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
<0x000000067d135998> (a java.util.concurrent.ThreadPoolExecutor$Worker)
从堆栈日志来看,程序正在获取orc文件的schema。(这边就是导致任务一直没有提交到yarn上去的地方)
2.4 观察GC时间
线程状态未出现死锁,难道是资源不够?用如下命令观察进程GC时间
# 更多的jstat功能可以用jstat -options来学习.
jstat -gcmetacapacity 156891 5000
结果发现metaspace区域使用率很高,基本维持在98%以上,且业务流程一运行GC次数、时间都会飙升。
关于metaspace,总的来说就是如下几点:
-
MaxMetaspaceSize是最大值,MetaspaceSize是初始值 -
MetaspaceSize在没有设置时,初始值大概就是约20.8MB -
当metaspace使用到达MetaspaceSize时,就会触发full gc,且扩容后再次达到阈值时,依然会出现full gc。
(关于metaspace更详细的介绍,可以参考链接1)
full gc肯定会影响性能,于是在spark-default.properties文件中设置如下的参数
spark.driver.extraJavaOptions -XX:MaxMetaspaceSize=512m -XX:MetaspaceSize=512m
结果:
-
GC时间在1s以内,但是与直接提交任务到yarn相比,速度仍然相差较大。
2.5 调整代码
在2.3节的分析过程中,我们发现代码是卡在获取orc文件的schema步骤中,那我们就尝试自己写代码去解析一个orc文件,得到schema,再调用spark接口,如下所示:
# 之前
sqlContext.read.orc(orcPath)
# 之后
sqlContext.read.schema(schema).orc(orcPath)
经现场测试,调整代码后,提交速度得到大大提高。
3、总结
1、同一个进程,同一个hdfs集群,使用FileSystem.get(Configuration)方法,获取的永远是同一个FileSystem对象,不需要从性能角度来考虑使用单例模式。
2、通过URLClassLoader及其实现类,可以将二次开发的jar动态的添加到classpath中。
3、top -Hp 进程号,可以查看进程内最忙线程,jstack可以打印出进程当前时刻的堆栈情况,从而判断出代码执行情况。
4、当metaspace每次扩容时,都会进行Full GC。
5、DataFrameReader.orc方法读取orc文件时,代码中还是尽量去指定schema。
4、待思考
同样的代码,该问题仅仅在yarn-client模式下出现的,且我司的大数据平台是一个长应用(所有的离线任务都共用一个driver),到底是什么导致获取schema这么慢呢???
5、参考链接
1、https://blog.csdn.net/u011381576/article/details/79635867
2、https://blog.csdn.net/aijiudu/article/details/78616064
以上是关于问题排查 | Spark OrcFileFormat inferSchema执行巨慢问题分析的主要内容,如果未能解决你的问题,请参考以下文章
Spark MetadataFetchFailedException 问题排查
Apache Spark:Task not serializable异常的排查和解决
问题排查 | Spark OrcFileFormat inferSchema执行巨慢问题分析
大数据问题排查系列 - SPARK STANDALONE HA 模式的一个缺陷点与应对方案