spark thrift server 与 网易 kyuubi thrift server

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark thrift server 与 网易 kyuubi thrift server相关的知识,希望对你有一定的参考价值。

参考技术A thrift server可以实现通过jdbc, beeline等工具,实现连接到spark集群,并提交sql查询的机制。

默认情况下,cdh安装的spark没有包含thrift server模块,因此我们需要重新编译spark。

另外,为了不影响cdh自带的spark,而且spark目前都是基于yarn运行的,本身也没有什么独立的服务部署(除了history sever)。

所以,在一个集群中,可以部署安装多个版本的spark。

我们使用源码编译的spark 2.4.0(其中hive的版本是1.2.1)

cdh集成的spark版本和Hive版本如下:

使用jdk1.8
修改spark提供的mvn,使用自行安装的maven 3.8.1

使用make-distribution.sh可以帮助与我们编译之后打包成tgz文件

修改pom.xml文件的配置如下。

最后,执行编译命令如下:

这样打出的包,就含有thrift server的jar包了。

最终打包文件,根目录下。

之后就是解压到其他目录下后即可。

将hive-site.xml的文件连接过来,这样spark就可以读取hive的表了。

为了确保spark提交到yarn上运行,需要配置

cp spark-defaults.conf.template spar-defaults.conf

另外,可以在spark-env.sh中设置环境变量。

HADOOP_CONF_DIR

环境变量,也可以在/etc/profile中设置

启动日志可以查看,注意下端口占用问题,如下。

启动时候,使用beeline工具连接上,主要这里不用使用cdh默认安装hive提供的beeline工具,应为版本太高。

使用编译后spark生成beeline工具

参考beeline使用教程。

https://github.com/apache/incubator-kyuubi

kyuubi是基于thrift sever二次开发,在系能和安全上优于thrift server。

鉴于目前hive的版本是2.1,而最新的kyuubi的hive是2.3,所以采用前天版本的kyuubi,采用0.7版本,保证hive的版本小于当前集群中的hive版本。

使用build目录下的dist脚本进行编译和打包。

编译成功后,会在更目录下出现tar.gz的压缩文件,如上图。

之后解压到目录下。

配置bin/kyuubi-env.sh脚本,设置spark路径

执行bin/start-kyuubi.sh命令即可。

访问的方式同样采用beelin,注意使用上面章节的beeline工具。

访问后,可以通过beeline访问到hive的表(在spark中已经配置了hive-site.xml)

!connect jdbc: hive2://xxxx:10009 即可。

原创问题定位分享(18)beeline连接spark thrift有时会卡住

spark 2.1.1

 

beeline连接spark thrift之后,执行use database有时会卡住,而use database 在server端对应的是 setCurrentDatabase,

经过排查发现当时spark thrift正在执行insert操作,

 

org.apache.spark.sql.hive.execution.InsertIntoHiveTable

  protected override def doExecute(): RDD[InternalRow] = {
    sqlContext.sparkContext.parallelize(sideEffectResult.asInstanceOf[Seq[InternalRow]], 1)
  }
...
  @transient private val externalCatalog = sqlContext.sharedState.externalCatalog

  protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
  ...
        externalCatalog.loadDynamicPartitions(
          externalCatalog.getPartitionOption(
          externalCatalog.loadPartition(
      externalCatalog.loadTable(

可见insert操作中可能会调用loadDynamicPartitions、getPartitionOption、loadPartition、loadTable等方法,

 

org.apache.spark.sql.hive.client.HiveClientImpl

  def loadTable(
      loadPath: String, // TODO URI
      tableName: String,
      replace: Boolean,
      holdDDLTime: Boolean): Unit = withHiveState {
...
  def loadPartition(
      loadPath: String,
      dbName: String,
      tableName: String,
      partSpec: java.util.LinkedHashMap[String, String],
      replace: Boolean,
      holdDDLTime: Boolean,
      inheritTableSpecs: Boolean): Unit = withHiveState {
...
  override def setCurrentDatabase(databaseName: String): Unit = withHiveState {

而HiveClientImpl中对应的方法都会执行withHiveState,而withHiveState有synchronized,所以insert操作中的部分代码(比如loadPartition)和use database操作会被同步执行,当insert执行很慢时就会卡住所有的其他操作;

 

spark thrift中实现原理详见 https://www.cnblogs.com/barneywill/p/10137672.html

 

以上是关于spark thrift server 与 网易 kyuubi thrift server的主要内容,如果未能解决你的问题,请参考以下文章

Spark的thrift端口

spark sql thrift server

身份验证和授权Spark Thrift Server

Spark Thrift Server改造:返回执行进度日志

对 Spark Thrift Server 的直线查询未在 Spark History UI 中显示任何内容

使用HAProxy代理控制Spark SQL Thrift Server服务的最大连接数