spark将数据写入hbase以及从hbase读取数据

Posted u013063153

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark将数据写入hbase以及从hbase读取数据相关的知识,希望对你有一定的参考价值。

原文:http://blog.csdn.net/u013468917/article/details/52822074

本文将介绍

1、spark如何利用saveAsHadoopDataset和saveAsNewAPIHadoopDataset将RDD写入hbase

2、spark从hbase中读取数据并转化为RDD

操作方式为在eclipse本地运行spark连接到远程的hbase。

java版本:1.7.0

scala版本:2.10.4

zookeeper版本:3.4.5(禁用了hbase自带zookeeper,选择自己部署的)

hadoop版本:2.4.1

spark版本:1.6.1

hbase版本:1.2.3

集群:centos6.5_x64

将RDD写入hbase

注意点:

依赖:

将lib目录下的hadoop开头jar包、hbase开头jar包添加至classpath

此外还有lib目录下的:zookeeper-3.4.6.jar、metrics-core-2.2.0.jar(缺少会提示hbase RpcRetryingCaller: Call exception不断尝试重连hbase,不报错)、htrace-core-3.1.0-incubating.jar、guava-12.0.1.jar

$SPARK_HOME/lib目录下的 spark-assembly-1.6.1-hadoop2.4.0.jar

不同的package中可能会有相同名称的类,不要导错

连接集群:

spark应用需要连接到zookeeper集群,然后借助zookeeper访问hbase。一般可以通过两种方式连接到zookeeper:

第一种是将hbase-site.xml文件加入classpath

第二种是在HBaseConfiguration实例中设置

如果不设置,默认连接的是localhost:2181会报错:connection refused 

本文使用的是第二种方式。

hbase创建表:

虽然可以在spark应用中创建hbase表,但是不建议这样做,最好在hbase shell中创建表,spark写或读数据

使用saveAsHadoopDataset写入数据

[plain] view plain copy
  1. package com.test  
  2.   
  3. import org.apache.hadoop.hbase.HBaseConfiguration  
  4. import org.apache.hadoop.hbase.client.Put  
  5. import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
  6. import org.apache.hadoop.hbase.mapred.TableOutputFormat  
  7. import org.apache.hadoop.hbase.util.Bytes  
  8. import org.apache.hadoop.mapred.JobConf  
  9. import org.apache.spark.SparkConf  
  10. import org.apache.spark.SparkContext  
  11. import org.apache.spark.rdd.RDD.rddToPairRDDFunctions  
  12.   
  13. object TestHBase   
  14.   
  15.   def main(args: Array[String]): Unit =   
  16.     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")  
  17.     val sc = new SparkContext(sparkConf)  
  18.   
  19.     val conf = HBaseConfiguration.create()  
  20.     //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置  
  21.     conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")  
  22.     //设置zookeeper连接端口,默认2181  
  23.     conf.set("hbase.zookeeper.property.clientPort", "2181")  
  24.   
  25.     val tablename = "account"  
  26.       
  27.     //初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!  
  28.     val jobConf = new JobConf(conf)  
  29.     jobConf.setOutputFormat(classOf[TableOutputFormat])  
  30.     jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)  
  31.       
  32.     val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))  
  33.   
  34.   
  35.     val rdd = indataRDD.map(_.split(',')).maparr=>  
  36.       /*一个Put对象就是一行记录,在构造方法中指定主键  
  37.        * 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换  
  38.        * Put.add方法接收三个参数:列族,列名,数据  
  39.        */  
  40.       val put = new Put(Bytes.toBytes(arr(0).toInt))  
  41.       put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))  
  42.       put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))  
  43.       //转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset  
  44.       (new ImmutableBytesWritable, put)   
  45.       
  46.       
  47.     rdd.saveAsHadoopDataset(jobConf)  
  48.       
  49.     sc.stop()  
  50.     
  51.   
  52.   

使用saveAsNewAPIHadoopDataset写入数据


[plain] view plain copy
  1. package com.test  
  2.   
  3. import org.apache.hadoop.hbase.HBaseConfiguration  
  4. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat  
  5. import org.apache.spark._  
  6. import org.apache.hadoop.mapreduce.Job  
  7. import org.apache.hadoop.hbase.io.ImmutableBytesWritable  
  8. import org.apache.hadoop.hbase.client.Result  
  9. import org.apache.hadoop.hbase.client.Put  
  10. import org.apache.hadoop.hbase.util.Bytes  
  11.   
  12. object TestHBase3   
  13.   
  14.   def main(args: Array[String]): Unit =   
  15.     val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")  
  16.     val sc = new SparkContext(sparkConf)  
  17.       
  18.     val tablename = "account"  
  19.       
  20.     sc.hadoopConfiguration.set("hbase.zookeeper.quorum","slave1,slave2,slave3")  
  21.     sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")  
  22.     sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)  
  23.       
  24.     val job = new Job(sc.hadoopConfiguration)  
  25.     job.setOutputKeyClass(classOf[ImmutableBytesWritable])  
  26.     job.setOutputValueClass(classOf[Result])    
  27.     job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])    
  28.   
  29.     val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))  
  30.     val rdd = indataRDD.map(_.split(',')).maparr=>  
  31.       val put = new Put(Bytes.toBytes(arr(0)))  
  32.       put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))  
  33.       put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))  
  34.       (new ImmutableBytesWritable, put)   
  35.       
  36.       
  37.     rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())  
  38.     
  39.   
  40.   


从hbase读取数据转化成RDD

本例基于官方提供的例子

[plain] view plain copy
  1. package com.test  
  2.   
  3. import org.apache.hadoop.hbase.HBaseConfiguration, HTableDescriptor, TableName  
  4. import org.apache.hadoop.hbase.client.HBaseAdmin  
  5. import org.apache.hadoop.hbase.mapreduce.TableInputFormat  
  6. import org.apache.spark._  
  7. import org.apache.hadoop.hbase.client.HTable  
  8. import org.apache.hadoop.hbase.client.Put  
  9. 以上是关于spark将数据写入hbase以及从hbase读取数据的主要内容,如果未能解决你的问题,请参考以下文章

    如何使用Spark Streaming读取HBase的数据并写入到HDFS

    使用 phoenix 连接器将 Spark 数据帧写入 Hbase

    spark 从 hbase 读取数据,worker 是不是需要从远程驱动程序中获取分区数据?

    在 saveAsNewAPIHadoopDataset 上阻止 python 的火花流将数据写入 Hbase

    Spark操作Hbase

    Spark DataFrame写入HBase的常用方式