Spark连接HBase

Posted 大数据技术汇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark连接HBase相关的知识,希望对你有一定的参考价值。

(一)、Spark读取HBase中的数据

hbase中的数据

 

Spark连接HBase

 1 import org.apache.hadoop.hbase.{HBaseConfiguration,HTableDescriptor,            TableName} 
2
import org.apache.hadoop.hbase.client.HBaseAdmin
3
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
4
import org.apache.spark._
5
import org.apache.hadoop.hbase.util.Bytes
6

7
/**
8
 * Created by *** on 2018/2/12.
9
 *
10  * 从hbase读取数据转化成RDD
11 */
12 object SparkReadHBase {
13
14 def main(args: Array[String]): Unit = {
15 val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
16 val sc = new SparkContext(sparkConf)
17
18 val tablename = "account"
19 val conf = HBaseConfiguration.create()
20 //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
21 conf.set("hbase.zookeeper.quorum","node02,node03,node04")
22 //设置zookeeper连接端口,默认2181
23 conf.set("hbase.zookeeper.property.clientPort", "2181")
24    conf.set(TableInputFormat.INPUT_TABLE, tablename)
25
26 // 如果表不存在则创建表
27 val admin = new HBaseAdmin(conf)
28 if (!admin.isTableAvailable(tablename)) {
29 val tableDesc = new HTableDescriptor(TableName.valueOf(tablename))
30      admin.createTable(tableDesc)
31    }
32
33 //读取数据并转化成rdd
34 val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
35      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
36      classOf[org.apache.hadoop.hbase.client.Result])   
37
38 val count = hBaseRDD.count()
39    println(count)
40 hBaseRDD.foreach{case (_,result) =>{
41 //获取行键
42 val key = Bytes.toString(result.getRow)
43 //通过列族和列名获取列
44 val name = Bytes.toString(result.getValue("cf".getBytes,"name".getBytes))
45 val age = Bytes.toInt(result.getValue("cf".getBytes,"age".getBytes))
46 println("Row key:"+key+" Name:"+name+" Age:"+age)
47    }}
48
49    sc.stop()
50    admin.close()
51  }
52
53 }

Spark连接HBase

(二)、Spark写HBase

  1.第一种方式:

Spark连接HBase

 1 import org.apache.hadoop.hbase.HBaseConfiguration 
2
import org.apache.hadoop.hbase.client.Put
3
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
4
import org.apache.hadoop.hbase.mapred.TableOutputFormat
5
import org.apache.hadoop.hbase.util.Bytes
6
import org.apache.hadoop.mapred.JobConf
7
import org.apache.spark.{SparkConf, SparkContext}
8
import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
9
/**
10  * Created by *** on 2018/2/12.
11  *
12  * 使用saveAsHadoopDataset写入数据
13 */
14 object SparkWriteHBaseOne {
15 def main(args: Array[String]): Unit = {
16 val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
17 val sc = new SparkContext(sparkConf)
18
19 val conf = HBaseConfiguration.create()
20 //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
21 conf.set("hbase.zookeeper.quorum","node02,node03,node04")
22 //设置zookeeper连接端口,默认2181
23 conf.set("hbase.zookeeper.property.clientPort", "2181")
24
25 val tablename = "account"
26
27 //初始化jobconf,TableOutputFormat必org.apache.hadoop.hbase.mapred包下的!
28 val jobConf = new JobConf(conf)
29    jobConf.setOutputFormat(classOf[TableOutputFormat])
30    jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
31
32 val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
33
34
35 val rdd = indataRDD.map(_.split(',')).map{arr=>{
36 /*一个Put对象就是一行记录,在构造方法中指定主键
37       * 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换     
38       * Put.add方法接收三个参数:列族,列名,数据      
39 */
40 val put = new Put(Bytes.toBytes(arr(0).toInt))
41 put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
42 put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
43 //转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
44 (new ImmutableBytesWritable, put)
45    }}
46
47    rdd.saveAsHadoopDataset(jobConf)    
48
49    sc.stop()   
50  } 
51 }

Spark连接HBase

  2.第二种方式:

 1 import org.apache.hadoop.hbase.client.{Put, Result} 
2
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
3
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
4
import org.apache.hadoop.hbase.util.Bytes
5
import org.apache.hadoop.mapreduce.Job
6
import org.apache.spark._
7
/**
8
 * Created by *** on 2018/2/12.
9
 *
10  * 使用saveAsNewAPIHadoopDataset写入数据
11 */
12 object SparkWriteHBaseTwo {
13 def main(args: Array[String]): Unit = {
14 val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
15 val sc = new SparkContext(sparkConf)
16
17 val tablename = "account"
18
19 sc.hadoopConfiguration.set("hbase.zookeeper.quorum","node02,node03,node04")
20 sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
21    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)   
22
23 val job = new Job(sc.hadoopConfiguration)
24    job.setOutputKeyClass(classOf[ImmutableBytesWritable])    
25    job.setOutputValueClass(classOf[Result])   
26    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])    
27
28 val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
29 val rdd = indataRDD.map(_.split(',')).map{arr=>{
30 val put = new Put(Bytes.toBytes(arr(0)))
31 put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
32 put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
33 (new ImmutableBytesWritable, put)
34    }}    
35
36    rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())   
37  }
38 }


以上是关于Spark连接HBase的主要内容,如果未能解决你的问题,请参考以下文章

使用 phoenix 连接器将 Spark 数据帧写入 Hbase

spark踩坑——dataframe写入hbase连接异常

Spark-on-Hbase:通过Spark的DataFrame访问Hbase表

可以使用 hbase-spark 连接器按性能良好的列对 hbase 行进行排序吗?

HBase连接的几种方式

spark集成hbase与hive数据转换与代码练习