SparkSQL实现原理-SparkSQL如何支持Hive?

Posted 一 铭

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkSQL实现原理-SparkSQL如何支持Hive?相关的知识,希望对你有一定的参考价值。

SparkSQL实现原理-SparkSQL对Hive的支持

SessionState介绍

SessionState用来保存SparkSession会话的明细信息。SessionState是Spark SQL会话之间的状态分离层,包括SQL配置、表、函数、UDF、SQL 解析器以及依赖于 SQLConf 的所有其他内容。

当启用Hive支持时,会创建一个HiveSessionStateBuilder对象,通过该对象来创建hive相关的元数据管理,分析其,规则解析器等对象。

启用hive的支持

在创建SparkSession时可以启用Hive的支持,只需要调用一个函数即可:

import org.apache.spark.sql.SparkSession
val spark = SparkSession
  .builder
  .enableHiveSupport()  // 启用Hive的支持
  .getOrCreate

enableHiveSupport函数

def enableHiveSupport(): Builder = synchronized 
  if (hiveClassesArePresent) 
    config(CATALOG_IMPLEMENTATION.key, "hive")
   else 
    throw new IllegalArgumentException(
      "Unable to instantiate SparkSession with Hive support because " +
        "Hive classes are not found.")
  

从该函数的代码实现来看,就是把选项spark.sql.catalogImplementation的值设置成了:hive字符串。而该参数默认的值是:in-memory。代码如下:

val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation")
  .internal()
  .stringConf
  .checkValues(Set("hive", "in-memory"))
  .createWithDefault("in-memory")

该参数会在SparkSession的sessionStateClassName()函数中使用。会根据该参数的值来返回SessionState的类名(SessionState会在后面介绍)。在SparkSession中,该函数的实现代码如下:

// 支持Hive时的SessionState类
private val HIVE_SESSION_STATE_BUILDER_CLASS_NAME =
    "org.apache.spark.sql.hive.HiveSessionStateBuilder"

private def sessionStateClassName(conf: SparkConf): String = 
    conf.get(CATALOG_IMPLEMENTATION) match 
      // 支持hive
      case "hive" => HIVE_SESSION_STATE_BUILDER_CLASS_NAME
      // 非hive情况
      case "in-memory" => classOf[SessionStateBuilder].getCanonicalName
    
  

从以上实现代码可以看出,支持两种类型的SessionState:

  • 一种是hive,对应的实现类是:HiveSessionStateBuilder。
  • 一种是in-memory,对应的实现类是:SessionStateBuilder

SessionState的创建

在SparkSession创建时会创建一个SessionState

代码如下:

  @Unstable
  @transient
  lazy val sessionState: SessionState = 
    parentSessionState
      .map(_.clone(this))
      .getOrElse 
        val state = SparkSession.instantiateSessionState(
          SparkSession.sessionStateClassName(sparkContext.conf),
          self,
          initialSessionOptions)
        state
      
  

instantiateSessionState函数的实现

private def instantiateSessionState(
    className: String,
    sparkSession: SparkSession,
    options: Map[String, String]): SessionState = 
  try 
    // invoke new [Hive]SessionStateBuilder(
    //   SparkSession,
    //   Option[SessionState],
    //   Map[String, String])
    val clazz = Utils.classForName(className)
    val ctor = clazz.getConstructors.head
    ctor.newInstance(sparkSession, None, options).asInstanceOf[BaseSessionStateBuilder].build()
   catch 
    case NonFatal(e) =>
      throw new IllegalArgumentException(s"Error while instantiating '$className':", e)
  

可以看到,若是支持Hive的话,会调用HiveSessionStateBuilder的build()函数来实现。

在HiveSessionStateBuilder中会创建一个HiveClient,来完成和hive的交互。可以看一下下面的HiveSessionResourceLoader类的实现代码。可以看到,该代码构建了一个HiveClient的对象,用来操作Hive。

class HiveSessionResourceLoader(
    session: SparkSession,
    clientBuilder: () => HiveClient)
  extends SessionResourceLoader(session) 
  private lazy val client = clientBuilder()  // 构建一个HiveClient对象
  override def addJar(path: String): Unit = 
    client.addJar(path)
    super.addJar(path)
  

小结

本文介绍了spark支持HIve的大致实现过程。为了支持hive,spark在SparkSession的SessionState类中封装了hiveclient。

以上是关于SparkSQL实现原理-SparkSQL如何支持Hive?的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL架构工作原理及流程解析

SparkSQL 如何自定义函数

sparkSQL中udf的使用

求问怎么设置sparksql读取hive的数据库

源码详解 | SparkSQL底层解析原理

SparkSQL大数据实战:揭开Join的神秘面纱