问题排查 | Spark OrcFileFormat inferSchema执行巨慢问题分析

Posted 柚子聊大数据

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了问题排查 | Spark OrcFileFormat inferSchema执行巨慢问题分析相关的知识,希望对你有一定的参考价值。

1、现象说明

最近业务开发的同事在使用大数据平台提供的二次开发功能时,碰到一个很奇怪的问题——二次开发的jar包手动利用spark-submit脚本提交到yarn,代码执行速度远远快于大数据平台。

2、定位过程

2.1 从二次开发代码着手

首先检查二次开发功能实现的逻辑,主要就是以下两个步骤:

1、获取当前线程的ClassLoader,并将自定义功能的jar加载进classpath。

2、利用反射技术,执行自定义接口实现类的方法。

从代码上来看,真的是十分的简洁,没有优化的地方。

2.2 检查自定义功能的实现逻辑

自定义功能的逻辑说起来也十分的简单:

根据业务需求列出hdfs指定目录下的某些orc文件,并调用sparkcontext.read.orc(path)

从代码上来看,与大数据平台中唯一不同的就是构造FileSystem对象——在自定义功能中每次获取FileSystem都调用了FileSystem.get(Configuration),而在大数据平台中利用了双重校验锁实现的FileSystem单例对象。

在我先前的认知中,每个FileSystem的底层都会持有一个dfsClient对象,都是一个长链接,都会消耗系统的句柄。因此一个进程创建一个FileSystem对象即可,多了就是资源上的浪费。但是细看源码,便突破了我的认知。

String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme); if (conf.getBoolean(disableCacheName, false)) { LOGGER.debug("Bypassing cache to create filesystem {}", uri); return createFileSystem(uri, conf);    } return CACHE.get(uri, conf);

源码中scheme,即为hdfs,默认情况下fs.hdfs.impl.disable.cache没有配置值,因此默认情况下if条件不会成立,同一个进程中每次获取的都是同一个FileSystem对象。(这一点也通过远程debug的方式得到了验证)

2.3 从日志判断慢的地方

利用jstack命令,查看二次开发功能执行到哪里了?

# 查看进程中,最繁忙的线程# 注意,这边显示的是10进制,而jstack输出的线程编号是16进制,需要进程转换.top -Hp 进程号

详细堆栈信息:

"threadPoolTaskExecutor-2" #488 prio=5 os_prio=0 tid=0x00007f5d5c07e000 nid=0x14039 runnable [0x00007f5bed3f4000] java.lang.Thread.State: RUNNABLE at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method) at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269) at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93) at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86) - locked <0x000000068342dfb8> (a sun.nio.ch.Util$3) - locked <0x000000068342dfa8> (a java.util.Collections$UnmodifiableSet) - locked <0x000000068342dd80> (a sun.nio.ch.EPollSelectorImpl) at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97) at org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335) at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157) at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.readChannelFully(PacketReceiver.java:258) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:209) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:171) at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:102) at org.apache.hadoop.hdfs.RemoteBlockReader2.readNextPacket(RemoteBlockReader2.java:207) at org.apache.hadoop.hdfs.RemoteBlockReader2.read(RemoteBlockReader2.java:156) - locked <0x000000063fc2e730> (a org.apache.hadoop.hdfs.RemoteBlockReader2) at org.apache.hadoop.hdfs.DFSInputStream$ByteArrayStrategy.doRead(DFSInputStream.java:744) at org.apache.hadoop.hdfs.DFSInputStream.readBuffer(DFSInputStream.java:800) - locked <0x000000063fc205a8> (a org.apache.hadoop.hdfs.DFSInputStream) at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:860) - locked <0x000000063fc205a8> (a org.apache.hadoop.hdfs.DFSInputStream) at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:903) - locked <0x000000063fc205a8> (a org.apache.hadoop.hdfs.DFSInputStream) at java.io.DataInputStream.readFully(DataInputStream.java:195) at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.extractMetaInfoFromFooter(ReaderImpl.java:369) at org.apache.hadoop.hive.ql.io.orc.ReaderImpl.<init>(ReaderImpl.java:316) at org.apache.hadoop.hive.ql.io.orc.OrcFile.createReader(OrcFile.java:187) at org.apache.spark.sql.hive.orc.OrcFileOperator$$anonfun$getFileReader$2.apply(OrcFileOperator.scala:68) at org.apache.spark.sql.hive.orc.OrcFileOperator$$anonfun$getFileReader$2.apply(OrcFileOperator.scala:67) at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) at scala.collection.TraversableOnce$class.collectFirst(TraversableOnce.scala:143) at scala.collection.AbstractIterator.collectFirst(Iterator.scala:1194) at org.apache.spark.sql.hive.orc.OrcFileOperator$.getFileReader(OrcFileOperator.scala:69) at org.apache.spark.sql.hive.orc.OrcFileOperator$$anonfun$readSchema$1.apply(OrcFileOperator.scala:77) at org.apache.spark.sql.hive.orc.OrcFileOperator$$anonfun$readSchema$1.apply(OrcFileOperator.scala:77) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at org.apache.spark.sql.hive.orc.OrcFileOperator$.readSchema(OrcFileOperator.scala:77) at org.apache.spark.sql.hive.orc.OrcFileFormat.inferSchema(OrcFileFormat.scala:60) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:177) at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$7.apply(DataSource.scala:177) at scala.Option.orElse(Option.scala:289) at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:176) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178) at org.apache.spark.sql.DataFrameReader.orc(DataFrameReader.scala:582) at com.baidu.spark.util.hdfs.HdfsReader$.stageData(HdfsReader.scala:73) at com.baidu.spark.job.apollo.machine.ActivityAnalysisStream$$anonfun$reduce$1.apply(ActivityAnalysisStream.scala:63) at com.baidu.spark.job.apollo.machine.ActivityAnalysisStream$$anonfun$reduce$1.apply(ActivityAnalysisStream.scala:62) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) at scala.collection.mutable.ListBuffer.foreach(ListBuffer.scala:45) at com.baidu.spark.job.apollo.machine.ActivityAnalysisStream.reduce(ActivityAnalysisStream.scala:62) at com.baidu.spark.job.apollo.machine.ActivityAnalysisStream.process(ActivityAnalysisStream.scala:38) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.baidu.shark.common.DataProcessImpl.process(DataProcessImpl.java:80) at com.baidu.shark.operator.common.JarWorkOperator.run(JarWorkOperator.scala:82) at com.baidu.shark.service.impl.common.JarWorkServiceImpl.execute(JarWorkServiceImpl.java:42) at com.baidu.shark.service.impl.common.CommonOperatorService.run(CommonOperatorService.java:257) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers: - <0x000000067d135998> (a java.util.concurrent.ThreadPoolExecutor$Worker)

从堆栈日志来看,程序正在获取orc文件的schema。(这边就是导致任务一直没有提交到yarn上去的地方)

2.4 观察GC时间

线程状态未出现死锁,难道是资源不够?用如下命令观察进程GC时间

# 更多的jstat功能可以用jstat -options来学习.jstat -gcmetacapacity 156891 5000

结果发现metaspace区域使用率很高,基本维持在98%以上,且业务流程一运行GC次数、时间都会飙升。

关于metaspace,总的来说就是如下几点:

  • MaxMetaspaceSize是最大值,MetaspaceSize是初始值
  • MetaspaceSize在没有设置时,初始值大概就是约20.8MB
  • 当metaspace使用到达MetaspaceSize时,就会触发full gc,且扩容后再次达到阈值时,依然会出现full gc。

(关于metaspace更详细的介绍,可以参考链接1)

full gc肯定会影响性能,于是在spark-default.properties文件中设置如下的参数

spark.driver.extraJavaOptions -XX:MaxMetaspaceSize=512m -XX:MetaspaceSize=512m

结果:

  • GC时间在1s以内,但是与直接提交任务到yarn相比,速度仍然相差较大。

2.5 调整代码

在2.3节的分析过程中,我们发现代码是卡在获取orc文件的schema步骤中,那我们就尝试自己写代码去解析一个orc文件,得到schema,再调用spark接口,如下所示:

# 之前sqlContext.read.orc(orcPath)# 之后sqlContext.read.schema(schema).orc(orcPath)

经现场测试,调整代码后,提交速度得到大大提高。

3、总结

1、同一个进程,同一个hdfs集群,使用FileSystem.get(Configuration)方法,获取的永远是同一个FileSystem对象,不需要从性能角度来考虑使用单例模式。

2、通过URLClassLoader及其实现类,可以将二次开发的jar动态的添加到classpath中。

3、top -Hp 进程号,可以查看进程内最忙线程,jstack可以打印出进程当前时刻的堆栈情况,从而判断出代码执行情况。

4、当metaspace每次扩容时,都会进行Full GC。

5、DataFrameReader.orc方法读取orc文件时,代码中还是尽量去指定schema。

4、待思考

同样的代码,该问题仅仅在yarn-client模式下出现的,且我司的大数据平台是一个长应用(所有的离线任务都共用一个driver),到底是什么导致获取schema这么慢呢???

5、参考链接

1https://blog.csdn.net/u011381576/article/details/796358672https://blog.csdn.net/aijiudu/article/details/78616064

以上是关于问题排查 | Spark OrcFileFormat inferSchema执行巨慢问题分析的主要内容,如果未能解决你的问题,请参考以下文章

Spark MetadataFetchFailedException 问题排查

Apache Spark:Task not serializable异常的排查和解决

问题排查 | Spark OrcFileFormat inferSchema执行巨慢问题分析

大数据问题排查系列 - SPARK STANDALONE HA 模式的一个缺陷点与应对方案

线上 hive on spark 作业执行超时问题排查案例分享

线上 hive on spark 作业执行超时问题排查案例分享