Spark连接HBase
Posted 大数据技术汇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark连接HBase相关的知识,希望对你有一定的参考价值。
(一)、Spark读取HBase中的数据
hbase中的数据
1 import org.apache.hadoop.hbase.{HBaseConfiguration,HTableDescriptor, TableName}
2 import org.apache.hadoop.hbase.client.HBaseAdmin
3 import org.apache.hadoop.hbase.mapreduce.TableInputFormat
4 import org.apache.spark._
5 import org.apache.hadoop.hbase.util.Bytes
6
7 /**
8 * Created by *** on 2018/2/12.
9 *
10 * 从hbase读取数据转化成RDD
11 */
12 object SparkReadHBase {
13
14 def main(args: Array[String]): Unit = {
15 val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
16 val sc = new SparkContext(sparkConf)
17
18 val tablename = "account"
19 val conf = HBaseConfiguration.create()
20 //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
21 conf.set("hbase.zookeeper.quorum","node02,node03,node04")
22 //设置zookeeper连接端口,默认2181
23 conf.set("hbase.zookeeper.property.clientPort", "2181")
24 conf.set(TableInputFormat.INPUT_TABLE, tablename)
25
26 // 如果表不存在则创建表
27 val admin = new HBaseAdmin(conf)
28 if (!admin.isTableAvailable(tablename)) {
29 val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))
30 admin.createTable(tableDesc)
31 }
32
33 //读取数据并转化成rdd
34 val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
35 classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
36 classOf[org.apache.hadoop.hbase.client.Result])
37
38 val count = hBaseRDD.count()
39 println(count)
40 hBaseRDD.foreach{case (_,result) =>{
41 //获取行键
42 val key = Bytes.toString(result.getRow)
43 //通过列族和列名获取列
44 val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))
45 val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))
46 println("Row key:"+key+" Name:"+name+" Age:"+age)
47 }}
48
49 sc.stop()
50 admin.close()
51 }
52
53 }
(二)、Spark写HBase
1.第一种方式:
1 import org.apache.hadoop.hbase.HBaseConfiguration
2 import org.apache.hadoop.hbase.client.Put
3 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
4 import org.apache.hadoop.hbase.mapred.TableOutputFormat
5 import org.apache.hadoop.hbase.util.Bytes
6 import org.apache.hadoop.mapred.JobConf
7 import org.apache.spark.{SparkConf, SparkContext}
8 import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
9 /**
10 * Created by *** on 2018/2/12.
11 *
12 * 使用saveAsHadoopDataset写入数据
13 */
14 object SparkWriteHBaseOne {
15 def main(args: Array[String]): Unit = {
16 val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
17 val sc = new SparkContext(sparkConf)
18
19 val conf = HBaseConfiguration.create()
20 //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
21 conf.set("hbase.zookeeper.quorum","node02,node03,node04")
22 //设置zookeeper连接端口,默认2181
23 conf.set("hbase.zookeeper.property.clientPort", "2181")
24
25 val tablename = "account"
26
27 //初始化jobconf,TableOutputFormat必org.apache.hadoop.hbase.mapred包下的!
28 val jobConf = new JobConf(conf)
29 jobConf.setOutputFormat(classOf[TableOutputFormat])
30 jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
31
32 val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
33
34
35 val rdd = indataRDD.map(_.split(',')).map{arr=>{
36 /*一个Put对象就是一行记录,在构造方法中指定主键
37 * 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换
38 * Put.add方法接收三个参数:列族,列名,数据
39 */
40 val put = new Put(Bytes.toBytes(arr(0).toInt))
41 put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
42 put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
43 //转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
44 (new ImmutableBytesWritable, put)
45 }}
46
47 rdd.saveAsHadoopDataset(jobConf)
48
49 sc.stop()
50 }
51 }
2.第二种方式:
1 import org.apache.hadoop.hbase.client.{Put, Result}
2 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
3 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
4 import org.apache.hadoop.hbase.util.Bytes
5 import org.apache.hadoop.mapreduce.Job
6 import org.apache.spark._
7 /**
8 * Created by *** on 2018/2/12.
9 *
10 * 使用saveAsNewAPIHadoopDataset写入数据
11 */
12 object SparkWriteHBaseTwo {
13 def main(args: Array[String]): Unit = {
14 val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
15 val sc = new SparkContext(sparkConf)
16
17 val tablename = "account"
18
19 sc.hadoopConfiguration.set("hbase.zookeeper.quorum","node02,node03,node04")
20 sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
21 sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
22
23 val job = new Job(sc.hadoopConfiguration)
24 job.setOutputKeyClass(classOf[ImmutableBytesWritable])
25 job.setOutputValueClass(classOf[Result])
26 job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
27
28 val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
29 val rdd = indataRDD.map(_.split(',')).map{arr=>{
30 val put = new Put(Bytes.toBytes(arr(0)))
31 put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
32 put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
33 (new ImmutableBytesWritable, put)
34 }}
35
36 rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
37 }
38 }
以上是关于Spark连接HBase的主要内容,如果未能解决你的问题,请参考以下文章
使用 phoenix 连接器将 Spark 数据帧写入 Hbase
Spark-on-Hbase:通过Spark的DataFrame访问Hbase表