大数据之数据存Hbase

Posted 潇洒哥浩浩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了大数据之数据存Hbase相关的知识,希望对你有一定的参考价值。

package com.sjw.flink

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.hadoop.conf
import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants, TableName}
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes


object HBaseSinkTest {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val dataDS: DataStream[String] = env.readTextFile("src/main/resources/buy_log.txt")

val buyDS: DataStream[Buy] = dataDS.map(data => {
val arr: Array[String] = data.split(",")
Buy(arr(0).trim, arr(1).trim.toDouble)
})
buyDS.addSink(new MyHBaseSink())


env.execute()
}

}

case class Buy(id:String,count:Double)

class MyHBaseSink() extends RichSinkFunction[Buy]{

//建立连接
var conn:Connection = null

override def open(parameters: Configuration): Unit ={
val conf= HBaseConfiguration.create()
conf.set(HConstants.ZOOKEEPER_QUORUM,"sunjunwei1.com,sunjunwei2.com,sunjunwei3.com")
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,"2181")
//实例化
conn = ConnectionFactory.createConnection(conf)
}

override def invoke(value: Buy, context: SinkFunction.Context[_]): Unit ={

val table = conn.getTable(TableName.valueOf("stu"))

val put = new Put(Bytes.toBytes(value.id))
put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("count"),Bytes.toBytes(value.count.toString))

table.put(put)
}

override def close(): Unit = {
conn.close()
}
}

以上是关于大数据之数据存Hbase的主要内容,如果未能解决你的问题,请参考以下文章

大数据技术之HBaseHBase简介HBase快速入门HBase进阶

大数据之Hbase:HBase之读写数据流程

大数据存储之hbase数据存储

大数据学习系列之—HBASE

大数据之Hbase:HBase简介

大数据-05-Spark之读写HBase数据