Spark学习笔记——读写Hbase
Posted tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spark学习笔记——读写Hbase相关的知识,希望对你有一定的参考价值。
1.首先在Hbase中建立一张表,名字为student
一个cell的值,取决于Row,Column family,Column Qualifier和Timestamp
Hbase表结构
2.往Hbase中写入数据,写入的时候,需要写family和column
build.sbt
libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.1.0", "mysql" % "mysql-connector-java" % "5.1.31", "org.apache.spark" %% "spark-sql" % "2.1.0", "org.apache.hbase" % "hbase-common" % "1.3.0", "org.apache.hbase" % "hbase-client" % "1.3.0", "org.apache.hbase" % "hbase-server" % "1.3.0", "org.apache.hbase" % "hbase" % "1.2.1" )
在hbaseshell中写数据的时候,写的是String,但是在idea中写代码的话,如果写的是int类型的,就会出现\\x00...的情况
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql._ import java.util.Properties import com.google.common.collect.Lists import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType} import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{Get, Put, Result, Scan} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf /** * Created by mi on 17-4-11. */ case class resultset(name: String, info: String, summary: String) case class IntroItem(name: String, value: String) case class BaikeLocation(name: String, url: String = "", info: Seq[IntroItem] = Seq(), summary: Option[String] = None) case class MewBaikeLocation(name: String, url: String = "", info: Option[String] = None, summary: Option[String] = None) object MysqlOpt { def main(args: Array[String]): Unit = { // 本地模式运行,便于测试 val conf = new SparkConf().setAppName("WordCount").setMaster("local") // 创建 spark context val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ //定义数据库和表信息 val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&characterEncoding=UTF-8" val table = "baike_pages" // 读取Hbase文件,在hbase的/usr/local/hbase/conf/hbase-site.xml中写的地址 val hbasePath = "file:///usr/local/hbase/hbase-tmp" // 创建hbase configuration val hBaseConf = HBaseConfiguration.create() hBaseConf.set(TableInputFormat.INPUT_TABLE, "student") // 初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的! val jobConf = new JobConf(hBaseConf) jobConf.setOutputFormat(classOf[TableOutputFormat]) jobConf.set(TableOutputFormat.OUTPUT_TABLE, "student") val indataRDD = sc.makeRDD(Array("1,99,98","2,97,96","3,95,94")) val rdd = indataRDD.map(_.split(\',\')).map{arr=>{ /*一个Put对象就是一行记录,在构造方法中指定主键 * 所有插入的数据必须用org.apache.hadoop.hbase.util.Bytes.toBytes方法转换 * Put.add方法接收三个参数:列族,列名,数据 */ val put = new Put(Bytes.toBytes(arr(0))) put.add(Bytes.toBytes("course"),Bytes.toBytes("math"),Bytes.toBytes(arr(1))) put.add(Bytes.toBytes("course"),Bytes.toBytes("english"),Bytes.toBytes(arr(2))) //转化成RDD[(ImmutableBytesWritable,Put)]类型才能调用saveAsHadoopDataset (new ImmutableBytesWritable, put) }} rdd.saveAsHadoopDataset(jobConf) sc.stop() } }
3.从Hbase中读取数据
import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql._ import java.util.Properties import com.google.common.collect.Lists import org.apache.spark.sql.types.{ArrayType, StringType, StructField, StructType} import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{Get, Put, Result, Scan} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableOutputFormat import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapred.JobConf /** * Created by mi on 17-4-11. */ case class resultset(name: String, info: String, summary: String) case class IntroItem(name: String, value: String) case class BaikeLocation(name: String, url: String = "", info: Seq[IntroItem] = Seq(), summary: Option[String] = None) case class MewBaikeLocation(name: String, url: String = "", info: Option[String] = None, summary: Option[String] = None) object MysqlOpt { def main(args: Array[String]): Unit = { // 本地模式运行,便于测试 val conf = new SparkConf().setAppName("WordCount").setMaster("local") // 创建 spark context val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ //定义数据库和表信息 val url = "jdbc:mysql://localhost:3306/baidubaike?useUnicode=true&characterEncoding=UTF-8" val table = "baike_pages" // 读取Hbase文件,在hbase的/usr/local/hbase/conf/hbase-site.xml中写的地址 val hbasePath = "file:///usr/local/hbase/hbase-tmp" // 创建hbase configuration val hBaseConf = HBaseConfiguration.create() hBaseConf.set(TableInputFormat.INPUT_TABLE, "student") // 从数据源获取数据并转化成rdd val hBaseRDD = sc.newAPIHadoopRDD(hBaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) println(hBaseRDD.count()) // 将数据映射为表 也就是将 RDD转化为 dataframe schema hBaseRDD.foreach{case (_,result) =>{ //获取行键 val key = Bytes.toString(result.getRow) //通过列族和列名获取列 val math = Bytes.toString(result.getValue("course".getBytes,"math".getBytes)) println("Row key:"+key+" Math:"+math) }} sc.stop() } }
输出
Row key: Math:99 Row key: Math:97 Row key: Math:95 Row key:1 Math:99 Row key:1000 Math:99 Row key:2 Math:97 Row key:3 Math:95
以上是关于Spark学习笔记——读写Hbase的主要内容,如果未能解决你的问题,请参考以下文章