spark hive类总是优先记载应用里面的jar包,跟spark.{driver/executor}.userClassPathFirst无关
Posted 鸿乃江边鸟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark hive类总是优先记载应用里面的jar包,跟spark.{driver/executor}.userClassPathFirst无关相关的知识,希望对你有一定的参考价值。
背景
最近在弄spark on k8s的时候,要集成同事的一些功能,其实这并没有什么,但是里面涉及到了hive的类问题(具体指这个org.apache.hadoop.hive.包下的类)。之后发现hive类总是优先加载应用jar包里的类,而忽略掉spark自带的系统jars包,这给我带了了很大的困扰,大约花了一两周的时间,终于把这个问题排查清楚了。
问题分析
直接分析:
我们知道在spark提交的时候,会获取URLClassLoader去加载类,如下:
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
// Let the main class re-initialize the logging system once it starts.
if (uninitLog) {
Logging.uninitialize()
}
if (args.verbose) {
logInfo(s"Main class:\\n$childMainClass")
logInfo(s"Arguments:\\n${childArgs.mkString("\\n")}")
// sysProps may contain sensitive information, so redact before printing
logInfo(s"Spark config:\\n${Utils.redact(sparkConf.getAll.toMap).mkString("\\n")}")
logInfo(s"Classpath elements:\\n${childClasspath.mkString("\\n")}")
logInfo("\\n")
}
val loader = getSubmitClassLoader(sparkConf)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
var mainClass: Class[_] = null
try {
mainClass = Utils.classForName(childMainClass)
} catch {
....
这里的getSubmitClassLoader方法就是获得URLClassloader:
private def getSubmitClassLoader(sparkConf: SparkConf): MutableURLClassLoader = {
val loader =
if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
new ChildFirstURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
} else {
new MutableURLClassLoader(new Array[URL](0),
Thread.currentThread.getContextClassLoader)
}
Thread.currentThread.setContextClassLoader(loader)
loader
}
区别就是ChildFirstURLClassLoader自定义了类加载的顺序,也就是会优先加载应用jar包里的顺序,可是我们的应用的并没有设置spark.{driver/executor}.userClassPathFirst,所以该hive类是跟这个加载器无关的。
就在百思不得其解的时候,突然想到了spark 对于hive metastore的兼容性随笔–通过classloader实现,这里的实现,这里就不得不分析一下IsolatedClientLoader这个类的细节:
private[hive] val classLoader: MutableURLClassLoader = {
val isolatedClassLoader =
if (isolationOn) {
if (allJars.isEmpty) {
// See HiveUtils; this is the Java 9+ + builtin mode scenario
baseClassLoader
} else {
val rootClassLoader: ClassLoader =
if (SystemUtils.JAVA_VERSION.split("\\\\.")(1).toInt>=9) {
// In Java 9, the boot classloader can see few JDK classes. The intended parent
// classloader for delegation is now the platform classloader.
// See http://java9.wtf/class-loading/
val platformCL =
classOf[ClassLoader].getMethod("getPlatformClassLoader").
invoke(null).asInstanceOf[ClassLoader]
// Check to make sure that the root classloader does not know about Hive.
assert(Try(platformCL.loadClass("org.apache.hadoop.hive.conf.HiveConf")).isFailure)
platformCL
} else {
// The boot classloader is represented by null (the instance itself isn't accessible)
// and before Java 9 can see all JDK classes
null
}
new URLClassLoader(allJars, rootClassLoader) {
override def loadClass(name: String, resolve: Boolean): Class[_] = {
val loaded = findLoadedClass(name)
if (loaded == null) doLoadClass(name, resolve) else loaded
}
def doLoadClass(name: String, resolve: Boolean): Class[_] = {
val classFileName = name.replaceAll("\\\\.", "/") + ".class"
if (isBarrierClass(name)) {
// For barrier classes, we construct a new copy of the class.
val bytes = IOUtils.toByteArray(baseClassLoader.getResourceAsStream(classFileName))
logDebug(s"custom defining: $name - ${util.Arrays.hashCode(bytes)}")
defineClass(name, bytes, 0, bytes.length)
} else if (!isSharedClass(name)) {
logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
super.loadClass(name, resolve)
} else {
// For shared classes, we delegate to baseClassLoader, but fall back in case the
// class is not found.
logDebug(s"shared class: $name")
try {
baseClassLoader.loadClass(name)
} catch {
case _: ClassNotFoundException =>
super.loadClass(name, resolve)
}
}
}
}
}
} else {
baseClassLoader
}
如果是在JDK8的情况下rootClassLoader是为null的,这就导致了在加载hive相关的类的时候,super.loadClass方法就会直接执行URLClassLoader的findClass方法,进而从URL(也就是通过addURL方法加载进来的jar)查着相关的类
而在spark中,最终的任务提交是通过SparkSubmit的runMain方法来提交的,代码如第一块代码:
val loader = getSubmitClassLoader(sparkConf)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
注意childClasspath这个是怎么来着,我们在提交任务的时候,是可以看到Classpath elements输出的,也就是会包括–jars指定的jar包和因公的jar包,所以在加载hive相关的类的时候,就会优先从childClassPath中加载对应的class,这是通过IolatedClassLoader实现。
解决方法
但是如果应用中用到的hive相关的类和系统的类不一致的话,该怎么解决,可以用maven shade插件进行重命名,使应用的jar包使用重名以后的类,而不会影响其他的类使用系统自带的hive相关的类
结论
hive相关的class(在org.apache.hadoop.hive包下),跟spark.{driver/executor}.userClassPathFirst无关,跟IolatedClassLoader实现有关。具体想看哪些hive类是从哪个jar包加载进来的,可以开启debug日志
以上是关于spark hive类总是优先记载应用里面的jar包,跟spark.{driver/executor}.userClassPathFirst无关的主要内容,如果未能解决你的问题,请参考以下文章