SPARK 3.1.2 Driver端下载UDF jar包导致磁盘爆满
Posted 鸿乃江边鸟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SPARK 3.1.2 Driver端下载UDF jar包导致磁盘爆满相关的知识,希望对你有一定的参考价值。
背景
本文基于spark 3.1.2且配置spark.sql.catalogImplementation=hive
在以spark-sql形式运行sql任务时,发现运行driver端的机器的磁盘总是会达到95%以上的利用率.
分析
经过分析,我们发现是/tmp/$session_id_resources下的UDF jar包导致的磁盘问题。这就使我们不得怀疑是调用hive的UDF函数造成的,接下来直接说重点,直接到ResolveFunctions Rule,改rule是用来解析函数的规则:
case u @ UnresolvedFunction(funcId, arguments, isDistinct, filter) =>
withPosition(u)
v1SessionCatalog.lookupFunction(funcId, arguments) match
// AggregateWindowFunctions are AggregateFunctions that can only be evaluated within
// the context of a Window clause. They do not need to be wrapped in an
// AggregateExpression.
case wf: AggregateWindowFunction =>
if (isDistinct || filter.isDefined)
failAnalysis("DISTINCT or FILTER specified, " +
s"but $wf.prettyName is not an aggregate function")
这个函数最终会调用SessionCatalog的lookupFunction方法,继而调用loadFunctionResources方法,继而调用HiveSessionResourceLoader的loadResource方法:
class HiveSessionResourceLoader(
session: SparkSession,
clientBuilder: () => HiveClient)
extends SessionResourceLoader(session)
private lazy val client = clientBuilder()
override def addJar(path: String): Unit =
val uri = Utils.resolveURI(path)
resolveJars(uri).foreach p =>
client.addJar(p)
super.addJar(p)
之后调用HiveClientImpl.addJar:
def addJar(path: String): Unit =
val uri = new Path(path).toUri
val jarURL = if (uri.getScheme == null)
// `path` is a local file path without a URL scheme
new File(path).toURI.toURL
else
// `path` is a URL with a scheme
uri.toURL
clientLoader.addJar(jarURL)
runSqlHive(s"ADD JAR $path")
注意*runSqlHive(s"ADD JAR $path")*这块代码,这块代码的作用是向hive客户端发动ADD JAR命令,而这个命令的作用就会把对应的UDF JAR包下载到driver端,具体的可参考Hive UDF源码解析【1】Create Function,或者可以跟着代码自己捋清楚(会调用AddResourceProcessor.run方法)。
解决
其实这个问题在spark master分支版本是不存在的,因为有个pr已经间接的解决了这个问题,SPARK-34955.
所以我们的做法很简单,就是直接和并过来对应的commit,事情证明这也很好的解决了这个问题。
说明
其实对于spark来说,下载UDF jar到driver端没有意义的,只有在Task的执行的时候,才会需要对应的UDFjar包,而task所需要的UDFjar是从SessionState的addJar来的
def addJar(path: String): Unit =
session.sparkContext.addJar(path)
val uri = new Path(path).toUri
val jarURL = if (uri.getScheme == null)
// `path` is a local file path without a URL scheme
new File(path).toURI.toURL
else
// `path` is a URL with a scheme
uri.toURL
session.sharedState.jarClassLoader.addURL(jarURL)
Thread.currentThread().setContextClassLoader(session.sharedState.jarClassLoader)
session.sparkContext.addJar(path) 方法会把jar包放到driver端,在Task运行的时候,会调用TaskRuner的run()方法:
override def run(): Unit =
...
updateDependencies(
taskDescription.addedFiles, taskDescription.addedJars, taskDescription.addedArchives)
updateDependencies 方法就会下载task所需要的jar包。
对应的还有SPARK-35286也存在类似问题
以上是关于SPARK 3.1.2 Driver端下载UDF jar包导致磁盘爆满的主要内容,如果未能解决你的问题,请参考以下文章