SparkSQL实现原理-SparkSQL如何支持Hive?
Posted 一 铭
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkSQL实现原理-SparkSQL如何支持Hive?相关的知识,希望对你有一定的参考价值。
SparkSQL实现原理-SparkSQL对Hive的支持
SessionState介绍
SessionState用来保存SparkSession会话的明细信息。SessionState是Spark SQL会话之间的状态分离层,包括SQL配置、表、函数、UDF、SQL 解析器以及依赖于 SQLConf 的所有其他内容。
当启用Hive支持时,会创建一个HiveSessionStateBuilder对象,通过该对象来创建hive相关的元数据管理,分析其,规则解析器等对象。
启用hive的支持
在创建SparkSession时可以启用Hive的支持,只需要调用一个函数即可:
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.enableHiveSupport() // 启用Hive的支持
.getOrCreate
enableHiveSupport函数
def enableHiveSupport(): Builder = synchronized
if (hiveClassesArePresent)
config(CATALOG_IMPLEMENTATION.key, "hive")
else
throw new IllegalArgumentException(
"Unable to instantiate SparkSession with Hive support because " +
"Hive classes are not found.")
从该函数的代码实现来看,就是把选项spark.sql.catalogImplementation的值设置成了:hive字符串。而该参数默认的值是:in-memory。代码如下:
val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation")
.internal()
.stringConf
.checkValues(Set("hive", "in-memory"))
.createWithDefault("in-memory")
该参数会在SparkSession的sessionStateClassName()函数中使用。会根据该参数的值来返回SessionState的类名(SessionState会在后面介绍)。在SparkSession中,该函数的实现代码如下:
// 支持Hive时的SessionState类
private val HIVE_SESSION_STATE_BUILDER_CLASS_NAME =
"org.apache.spark.sql.hive.HiveSessionStateBuilder"
private def sessionStateClassName(conf: SparkConf): String =
conf.get(CATALOG_IMPLEMENTATION) match
// 支持hive
case "hive" => HIVE_SESSION_STATE_BUILDER_CLASS_NAME
// 非hive情况
case "in-memory" => classOf[SessionStateBuilder].getCanonicalName
从以上实现代码可以看出,支持两种类型的SessionState:
- 一种是hive,对应的实现类是:HiveSessionStateBuilder。
- 一种是in-memory,对应的实现类是:SessionStateBuilder
SessionState的创建
在SparkSession创建时会创建一个SessionState
代码如下:
@Unstable
@transient
lazy val sessionState: SessionState =
parentSessionState
.map(_.clone(this))
.getOrElse
val state = SparkSession.instantiateSessionState(
SparkSession.sessionStateClassName(sparkContext.conf),
self,
initialSessionOptions)
state
instantiateSessionState函数的实现
private def instantiateSessionState(
className: String,
sparkSession: SparkSession,
options: Map[String, String]): SessionState =
try
// invoke new [Hive]SessionStateBuilder(
// SparkSession,
// Option[SessionState],
// Map[String, String])
val clazz = Utils.classForName(className)
val ctor = clazz.getConstructors.head
ctor.newInstance(sparkSession, None, options).asInstanceOf[BaseSessionStateBuilder].build()
catch
case NonFatal(e) =>
throw new IllegalArgumentException(s"Error while instantiating '$className':", e)
可以看到,若是支持Hive的话,会调用HiveSessionStateBuilder的build()函数来实现。
在HiveSessionStateBuilder中会创建一个HiveClient,来完成和hive的交互。可以看一下下面的HiveSessionResourceLoader类的实现代码。可以看到,该代码构建了一个HiveClient的对象,用来操作Hive。
class HiveSessionResourceLoader(
session: SparkSession,
clientBuilder: () => HiveClient)
extends SessionResourceLoader(session)
private lazy val client = clientBuilder() // 构建一个HiveClient对象
override def addJar(path: String): Unit =
client.addJar(path)
super.addJar(path)
小结
本文介绍了spark支持HIve的大致实现过程。为了支持hive,spark在SparkSession的SessionState类中封装了hiveclient。
以上是关于SparkSQL实现原理-SparkSQL如何支持Hive?的主要内容,如果未能解决你的问题,请参考以下文章