spark将数据写入hbase以及从hbase读取数据
Posted u013063153
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark将数据写入hbase以及从hbase读取数据相关的知识,希望对你有一定的参考价值。
原文:http://blog.csdn.net/u013468917/article/details/52822074
本文将介绍
1、spark如何利用saveAsHadoopDataset和saveAsNewAPIHadoopDataset将RDD写入hbase
2、spark从hbase中读取数据并转化为RDD
操作方式为在eclipse本地运行spark连接到远程的hbase。
java版本:1.7.0
scala版本:2.10.4
zookeeper版本:3.4.5(禁用了hbase自带zookeeper,选择自己部署的)
hadoop版本:2.4.1
spark版本:1.6.1
hbase版本:1.2.3
集群:centos6.5_x64
将RDD写入hbase
注意点:
依赖:
将lib目录下的hadoop开头jar包、hbase开头jar包添加至classpath
此外还有lib目录下的:zookeeper-3.4.6.jar、metrics-core-2.2.0.jar(缺少会提示hbase RpcRetryingCaller: Call exception不断尝试重连hbase,不报错)、htrace-core-3.1.0-incubating.jar、guava-12.0.1.jar
$SPARK_HOME/lib目录下的 spark-assembly-1.6.1-hadoop2.4.0.jar
不同的package中可能会有相同名称的类,不要导错
连接集群:
spark应用需要连接到zookeeper集群,然后借助zookeeper访问hbase。一般可以通过两种方式连接到zookeeper:
第一种是将hbase-site.xml文件加入classpath
第二种是在HBaseConfiguration实例中设置
如果不设置,默认连接的是localhost:2181会报错:connection refused
本文使用的是第二种方式。
hbase创建表:
虽然可以在spark应用中创建hbase表,但是不建议这样做,最好在hbase shell中创建表,spark写或读数据
使用saveAsHadoopDataset写入数据
[plain] view plain copy
- package com.test
- import org.apache.hadoop.hbase.HBaseConfiguration
- import org.apache.hadoop.hbase.client.Put
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable
- import org.apache.hadoop.hbase.mapred.TableOutputFormat
- import org.apache.hadoop.hbase.util.Bytes
- import org.apache.hadoop.mapred.JobConf
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- import org.apache.spark.rdd.RDD.rddToPairRDDFunctions
- object TestHBase
- def main(args: Array[String]): Unit =
- val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
- val sc = new SparkContext(sparkConf)
- val conf = HBaseConfiguration.create()
- //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
- conf.set("hbase.zookeeper.quorum","slave1,slave2,slave3")
- //设置zookeeper连接端口,默认2181
- conf.set("hbase.zookeeper.property.clientPort", "2181")
- val tablename = "account"
- //初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!
- val jobConf = new JobConf(conf)
- jobConf.setOutputFormat(classOf[TableOutputFormat])
- jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
- val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
- val rdd = indataRDD.map(_.split(',')).maparr=>
- /*一个Put对象就是一行记录,在构造方法中指定主键
- * 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换
- * Put.add方法接收三个参数:列族,列名,数据
- */
- val put = new Put(Bytes.toBytes(arr(0).toInt))
- put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
- put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
- //转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset
- (new ImmutableBytesWritable, put)
- rdd.saveAsHadoopDataset(jobConf)
- sc.stop()
使用saveAsNewAPIHadoopDataset写入数据
[plain] view plain copy
- package com.test
- import org.apache.hadoop.hbase.HBaseConfiguration
- import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
- import org.apache.spark._
- import org.apache.hadoop.mapreduce.Job
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable
- import org.apache.hadoop.hbase.client.Result
- import org.apache.hadoop.hbase.client.Put
- import org.apache.hadoop.hbase.util.Bytes
- object TestHBase3
- def main(args: Array[String]): Unit =
- val sparkConf = new SparkConf().setAppName("HBaseTest").setMaster("local")
- val sc = new SparkContext(sparkConf)
- val tablename = "account"
- sc.hadoopConfiguration.set("hbase.zookeeper.quorum","slave1,slave2,slave3")
- sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
- sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
- val job = new Job(sc.hadoopConfiguration)
- job.setOutputKeyClass(classOf[ImmutableBytesWritable])
- job.setOutputValueClass(classOf[Result])
- job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
- val indataRDD = sc.makeRDD(Array("1,jack,15","2,Lily,16","3,mike,16"))
- val rdd = indataRDD.map(_.split(',')).maparr=>
- val put = new Put(Bytes.toBytes(arr(0)))
- put.add(Bytes.toBytes("cf"),Bytes.toBytes("name"),Bytes.toBytes(arr(1)))
- put.add(Bytes.toBytes("cf"),Bytes.toBytes("age"),Bytes.toBytes(arr(2).toInt))
- (new ImmutableBytesWritable, put)
- rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
从hbase读取数据转化成RDD
本例基于官方提供的例子
[plain] view plain copy
- package com.test
- import org.apache.hadoop.hbase.HBaseConfiguration, HTableDescriptor, TableName
- import org.apache.hadoop.hbase.client.HBaseAdmin
- import org.apache.hadoop.hbase.mapreduce.TableInputFormat
- import org.apache.spark._
- import org.apache.hadoop.hbase.client.HTable
- import org.apache.hadoop.hbase.client.Put
- 以上是关于spark将数据写入hbase以及从hbase读取数据的主要内容,如果未能解决你的问题,请参考以下文章
如何使用Spark Streaming读取HBase的数据并写入到HDFS
使用 phoenix 连接器将 Spark 数据帧写入 Hbase
spark 从 hbase 读取数据,worker 是不是需要从远程驱动程序中获取分区数据?