spark操作HBASE
Posted 尧字节
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spark操作HBASE相关的知识,希望对你有一定的参考价值。
import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, TableName, HBaseConfiguration} import org.apache.hadoop.hbase.client._ import org.apache.spark.SparkContext import scala.collection.JavaConversions._ /** * HBase 1.0.0 新版API, CRUD 的基本操作代码示例 **/ object HBaseNewAPI { def main(args: Array[String]) { val sc = new SparkContext("local", "SparkHBase") val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("hbase.zookeeper.quorum", "master") //Connection 的创建是个重量级的工作,线程安全,是操作hbase的入口 val conn = ConnectionFactory.createConnection(conf) //从Connection获得 Admin 对象(相当于以前的 HAdmin) val admin = conn.getAdmin //本例将操作的表名 val userTable = TableName.valueOf("user") //创建 user 表 val tableDescr = new HTableDescriptor(userTable) tableDescr.addFamily(new HColumnDescriptor("basic".getBytes)) println("Creating table `user`. ") if (admin.tableExists(userTable)) { admin.disableTable(userTable) admin.deleteTable(userTable) } admin.createTable(tableDescr) println("Done!") try{ //获取 user 表 val table = conn.getTable(userTable) try{ //准备插入一条 key 为 id001 的数据 val p = new Put("id001".getBytes) //为put操作指定 column 和 value (以前的 put.add 方法被弃用了) p.addColumn("basic".getBytes,"name".getBytes, "wuchong".getBytes) //提交 table.put(p) //查询某条数据 val g = new Get("id001".getBytes) val result = table.get(g) val value = Bytes.toString(result.getValue("basic".getBytes,"name".getBytes)) println("GET id001 :"+value) //扫描数据 val s = new Scan() s.addColumn("basic".getBytes,"name".getBytes) val scanner = table.getScanner(s) try{ for(r <- scanner){ println("Found row: "+r) println("Found value: "+Bytes.toString(r.getValue("basic".getBytes,"name".getBytes))) } }finally { //确保scanner关闭 scanner.close() } //删除某条数据,操作方式与 Put 类似 val d = new Delete("id001".getBytes) d.addColumn("basic".getBytes,"name".getBytes) table.delete(d) }finally { if(table != null) table.close() } }finally { conn.close() } } }
import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp import org.apache.hadoop.hbase.filter.SingleColumnValueFilter import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.protobuf.ProtobufUtil import org.apache.hadoop.hbase.util.{Base64, Bytes} import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.mapred.JobConf import org.apache.spark.SparkContext import org.apache.hadoop.hbase.client._ /** * Spark 读取和写入 HBase **/ object SparkOnHBase { def convertScanToString(scan: Scan) = { val proto = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray) } def main(args: Array[String]) { val sc = new SparkContext("local","SparkOnHBase") val conf = HBaseConfiguration.create() conf.set("hbase.zookeeper.property.clientPort", "2181") conf.set("hbase.zookeeper.quorum", "master") // ======Save RDD to HBase======== // step 1: JobConf setup val jobConf = new JobConf(conf,this.getClass) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE,"user") // step 2: rdd mapping to table // 在 HBase 中表的 schema 一般是这样的 // *row cf:col_1 cf:col_2 // 而在Spark中,我们操作的是RDD元组,比如(1,"lilei",14) , (2,"hanmei",18) // 我们需要将 *RDD[(uid:Int, name:String, age:Int)]* 转换成 *RDD[(ImmutableBytesWritable, Put)]* // 我们定义了 convert 函数做这个转换工作 def convert(triple: (Int, String, Int)) = { val p = new Put(Bytes.toBytes(triple._1)) p.addColumn(Bytes.toBytes("basic"),Bytes.toBytes("name"),Bytes.toBytes(triple._2)) p.addColumn(Bytes.toBytes("basic"),Bytes.toBytes("age"),Bytes.toBytes(triple._3)) (new ImmutableBytesWritable, p) } // step 3: read RDD data from somewhere and convert val rawData = List((1,"lilei",14), (2,"hanmei",18), (3,"someone",38)) val localData = sc.parallelize(rawData).map(convert) //step 4: use `saveAsHadoopDataset` to save RDD to HBase localData.saveAsHadoopDataset(jobConf) // ================================= // ======Load RDD from HBase======== // use `newAPIHadoopRDD` to load RDD from HBase //直接从 HBase 中读取数据并转成 Spark 能直接操作的 RDD[K,V] //设置查询的表名 conf.set(TableInputFormat.INPUT_TABLE, "user") //添加过滤条件,年龄大于 18 岁 val scan = new Scan() scan.setFilter(new SingleColumnValueFilter("basic".getBytes,"age".getBytes, CompareOp.GREATER_OR_EQUAL,Bytes.toBytes(18))) conf.set(TableInputFormat.SCAN,convertScanToString(scan)) val usersRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) val count = usersRDD.count() println("Users RDD Count:" + count) usersRDD.cache() //遍历输出 usersRDD.foreach{ case (_,result) => val key = Bytes.toInt(result.getRow) val name = Bytes.toString(result.getValue("basic".getBytes,"name".getBytes)) val age = Bytes.toInt(result.getValue("basic".getBytes,"age".getBytes)) println("Row key:"+key+" Name:"+name+" Age:"+age) } // ================================= } }
转:https://gist.github.com/wuchong/95630f80966d07d7453b#file-hbasenewapi-scala
http://wuchong.me/blog/2015/04/04/spark-on-yarn-cluster-deploy/
以上是关于spark操作HBASE的主要内容,如果未能解决你的问题,请参考以下文章