Lindrom 实践 | Spark 对接 Lindorm Phoenix 5.x 轻客户端

Posted HBase技术社区

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Lindrom 实践 | Spark 对接 Lindorm Phoenix 5.x 轻客户端相关的知识,希望对你有一定的参考价值。

1. 背景

Lindorm兼容Phoenix提供的是Phoenix 5.x轻客户端,在Spark官网上对接Phoenix的例子大多是Phoenix 4.x重客户端,因此本文给出Spark对接Phoenix 5.x轻客户端的例子,方便大家参考。


2. Spark对接Phoenix 5.x轻客户端

2.1 从Spark官网下载Spark安装包

从Spark官网下载Spark安装包,版本自行选择,本文以Spark-2.4.3版本为例。下载后解压。


2.2 从阿里云仓库下载Phoenix5.x轻客户端

从阿里云仓库下载Phoenix5.x轻客户端ali-phoenix-shaded-thin-client-5.2.5-HBase-2.x.jar, 放置于一个目录,比如/data/lib。


2.3 启动spark-shell

进入spark目录,运行spark-shell

./bin/spark-shell --jars /data/lib/ali-phoenix-shaded-thin-client-5.2.5-HBase-2.x.jar --driver-class-path /data/lib/ali-phoenix-shaded-thin-client-5.2.5-HBase-2.x.jar


2.4 粘贴运行代码

2.4.1 Phoenix Statement方式访问

在spark-shell上输入:paste可以输入多行文本

:paste

修改下面代码中的url, user, password为自己的实例集群信息,然后全部粘贴于spark-shell中。

import java.sql.{DriverManager, SQLException}
import java.util.Properties
val driver = "org.apache.phoenix.queryserver.client.Driver"
val url= "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF"
val info = new Properties()
info.put("user", "xxxx") //表示用户名是root
info.put("password", "xxxx") //表示密码是hadoop
try {
Class.forName(driver)
} catch {
case e: ClassNotFoundException => e.printStackTrace
}
val conn = DriverManager.getConnection(url, info)
val stmt = conn.createStatement
try {
stmt.execute("drop table if exists test")
stmt.execute("create table test(c1 integer primary key, c2 integer)")
stmt.execute("upsert into test(c1,c2) values(1,1)")
stmt.execute("upsert into test(c1,c2) values(2,2)")
val rs = stmt.executeQuery("select * from test limit 1000")
while (rs.next()) {
println(rs.getString(1) + " | " +
rs.getString(2) )
}
stmt.execute("drop table if exists test")
} catch {
case e: SQLException => e.printStackTrace()
} finally {
if (null != stmt) {
stmt.close()
}
if (null != conn) {
conn.close()
}
}

输入Ctrl+D 结束文本输入,即可看到运行结果, 会显示类似如下信息:

// Exiting paste mode, now interpreting.

1 | 1
2 | 2

2.4.2 DataFrame方式访问

DataFrame方式只能进行读写,建表操作和删表操作需要使用Phoenix Statement方式。


2.4.2.1 DataFrame方式读

输入:paste粘贴以下文本,然后输入Ctrl+D后开始运行。记得修改url,user,password信息。

import org.apache.spark.sql.SQLContext
val sqlContext = new SQLContext(sc)

val jdbcDF = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:phoenix:thin:url=http://ld-bpxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "driver" -> "org.apache.phoenix.queryserver.client.Driver", "dbtable" -> "TEST","fetchsize" -> "10000", "user" -> "xxxx", "password" -> "xxxx")).load()
jdbcDF.show()


2.4.2.1 DataFrame方式写

输入:paste粘贴以下文本,然后输入Ctrl+D后开始运行。记得修改url,user,password信息。

import java.util.Properties
import org.apache.spark.sql.{SQLContext, Row}
import org.apache.spark.sql.types.{IntegerType,StructField, StructType}
val sqlContext = new SQLContext(sc)
val testRDD = sc.parallelize(Array("3 3","4 4")).map(_.split(" "))
//创建schema
val schema = StructType(List(StructField("c1", IntegerType, true),StructField("c2", IntegerType, true)))
//创建Row对象,每个Row对象都是rowRDD中的一行
val rowRDD = testRDD.map(p => Row(p(0).toInt,p(1).toInt))
//建立起Row对象和模式之间的对应关系,也就是把数据和模式对应起来
val testDataFrame = sqlContext.createDataFrame(rowRDD, schema)
//下面创建一个prop变量用来保存JDBC连接参数
val prop = new Properties()
prop.put("user", "xxxx") //表示用户名是root
prop.put("password", "xxxx") //表示密码是hadoop
prop.put("driver","org.apache.phoenix.queryserver.client.Driver")
//下面就可以连接数据库,采用append模式,表示追加记录到数据库spark的student表中
testDataFrame.write.mode("append").jdbc("jdbc:phoenix:thin:url=http://ld-xxxxxxxxxxxxx-proxy-phoenix-pub.lindorm.rds.aliyuncs.com:8765;serialization=PROTOBUF", "test", prop)


以上是关于Lindrom 实践 | Spark 对接 Lindorm Phoenix 5.x 轻客户端的主要内容,如果未能解决你的问题,请参考以下文章

spark-streaming对接kafka的两种方式

手把手教你如何配置DBeaver对接FusionInsigth MRS Spark2x

Spark On HBase

信息系统实践手记7-对接卡口平台细节

信息系统实践手记4-平台对接的一些思考

系统对接阿里云短信接口(Java开发实践)