使用bulkload方式加载数据到HBase(三种方式)
Posted 上官沐雪
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用bulkload方式加载数据到HBase(三种方式)相关的知识,希望对你有一定的参考价值。
使用spark把hive数据bulkload到HBase
场景:最近有大量数据存在hive里,由于业务需呀,把hive数据放到HBase里
于是想到如下三种方案:
1.使用hive表映射hbase
CREATE external TABLE `hbase_website`(
`key` string,
`ocid` string,
`companyname` string,
`createtime` bigint,
`updatetime` bigint,
`sitename` string,
`number` string,
`homeurl` string,
`checkdate` string,
`domain` string)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ocid,info:companyname,info:createtime,info:updatetime,info:sitename,info:number,info:homeurl,info:checkdate,info:domain")
TBLPROPERTIES("hbase.table.name" = "dingtalk:website");
在hive里创建如上所示的表把,数据与hbase表的table相对应,然后insert 进去即可。
但是上面的过程需要写WAL,内存,flush,等过程,性能极差。
2. 使用hbase自带的ImportTsv
## 1) 把csv文件转换成 hfile文件
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv \\
'-Dimporttsv.separator=,' \\
-Dimporttsv.bulk.output=/tmp/test/hfile \\
-Dimporttsv.columns=HBASE_ROW_KEY,info:en,info:ch my_test /tmp/test/test.tsv
## 2) hfile文件 文件导入到hbase
HADOOP_CLASSPATH=`/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hbase/bin/hbase mapredcp` \\
hadoop jar /opt/cloudera/parcels/CDH/lib/hbase/hbase-mapreduce.jar \\
completebulkload /tmp/test/hfile default:my_test
这种方式可以快速的把数据导入到hbase,但是如果需要导入数据里面有与分隔符相同的字符那么会出现错误,而且分割符的长度为1.
3. 最终方案:使用spark把hive数据bulkload到HBase
下面是代码,贴上即可使用
//pom文件
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scala.version>2.11.8</scala.version>
<scala.version.simple>2.11</scala.version.simple>
<hadoop.version>2.6.0</hadoop.version>
<spark.version>2.4.0</spark.version>
<netlib.java.version>1.1.2</netlib.java.version>
<hbase.version>2.1.0</hbase.version>
</properties>
<dependencies>
<!-- Scala -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<!-- Spark Core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version.simple}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Spark SQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version.simple}</artifactId>
<version>${spark.version}</version>
</dependency>
<!-- Hadoop Client -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- hbase -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>${hbase.version}</version>
</dependency>
<!-- 添加mysql数据库驱动类 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<plugins>
<!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!--<encoding>${project.build.sourceEncoding}</encoding>-->
</configuration>
</plugin>
<!-- 指定编译scala的插件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<!-- 打包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!--
zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 设置jar包的入口类(可选) -->
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
代码
// An highlighted block
package com.zhendao.hbase.bulkload
import org.apache.commons.lang3.time.FastDateFormat
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
* @ Description :
* @ Author : lish.Li
* @ CreateDate : 2021/5/31 13:29
* @ Version : 1.0
*/
object Trademark_pre {
val zookeeperQuorum = "master1:2181,master2:2181,core1:2181" //zookeeper信息
val hFilePath = "hdfs:///tmp/test/hfile/HFileTable" + System.currentTimeMillis() //hfile的存储路径
val tableName = "dingtalk:trade_mark"
val sql =
"""
|select
|key
|,shangBiaoImage
|,shangPin
|,likenessGroup
|,liuCheng
|,zhuCeHao
|,status
|,shenQingTime
|,clazzProduct
|,clazz
|,shangBiaoName
|,applicantName
|,applicantEName
|,companyAdd
|,companyEnglishAdd
|,chuShenDate
|,zhuCeDate
|,commonTrademarks
|,`type`
|,jieZhiRi
|,trademarkForm
|,dateInternationalRegistration
|,postSpecifiedDate
|,priorityDate
|,agent
|from mongodb_dingtalk.hive_trademark_jx_pre where hid is not null
""".stripMargin
//列族
val familyName = "info"
private val toDateFields = List()
def main(args: Array[String]): Unit = {
//spark 相应配置
val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$"))
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// .setMaster("local[2]")
val sparkSession = SparkSession.builder()
.enableHiveSupport()
.config(sparkConf)
.getOrCreate()
//zookeeper 相应配置
val hconf: Configuration = HBaseConfiguration.create()
hconf.set("hbase.zookeeper.quorum", zookeeperQuorum)
hconf.set("hbase.zookeeper.property.clientPort", "2181")
//设置hfile的大小 10G 进行数据分裂
hconf.set("hbase.hregion.max.filesize", "10737418240")
hconf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", "320");
//设置tableName
hconf.set("hbase.mapreduce.hfileoutputformat.table.name", tableName)
//获取hbase连接
val hbaseConn: Connection = ConnectionFactory.createConnection(hconf)
val table = hbaseConn.getTable(TableName.valueOf(tableName))
val admin = hbaseConn.getAdmin
/**
* 保存生成的HFile文件
* 注:bulk load 生成的HFile文件需要落地
* 然后再通过LoadIncrementalHFiles类load进Hbase
* 此处关于 sortBy 操作详解:
* 0. Hbase查询是根据rowkey进行查询的,并且rowkey是有序,
* 某种程度上来说rowkey就是一个索引,这是Hbase查询高效的一个原因,
* 这就要求我们在插入数据的时候,要插在rowkey该在的位置。
* 1. Put方式插入数据,会有WAL,同时在插入Hbase的时候会根据RowKey的值选择合适的位置,此方式本身就可以保证RowKey有序
* 2. bulk load 方式没有WAL,它更像是hive通过load方式直接将底层文件HFile移动到制定的Hbase路径下,所以,在不东HFile的情况下,要保证本身有序才行
* 之前写的时候只要rowkey有序即可,但是2.0.2版本的时候发现clounm也要有序,所以会有sortBy(x => (x._1, x._2.getKeyString), true)
*
*
*/
// 1. 清洗需要存放到 HFile 中的数据,rowKey 一定要排序,否则会报错:
// java.io.IOException: Added a key not lexically larger than previous.
sparkSession.sql("use mongodb_dingtalk")
val dataFrame: DataFrame = sparkSession.sql(sql)
val hiveCopyrightRdd = dataFrame.rdd
var fields : Array[String] = dataFrame.columns
fields .foreach(println(_))
fields=fields.dropWhile( line => line.equals("key"))
/**
* 下面的RDD类型:
* (key, (("info", "ocid", ocid)))
* 分别对应 rowKey 列族 列名 value
*
*/
val hbaseRowRdd = hiveCopyrightRdd.flatMap(rows => {
val key =rows.getAs[String]("key")
fields.flatMap(field => {
val fieldValue = rows.getAs[String](field)
Array(
(key,((familyName, field, fieldValue)))
)
}
)
})
val rdd: RDD[(ImmutableBytesWritable, KeyValue)] = hbaseRowRdd.filter(line => {
line._2._3 != null
}).sortBy(line => (line._1, line._2._1, line._2._2)).map(ele => {
val rowKey = Bytes.toBytes(ele._1)
val family = Bytes.toBytes(ele._2._1)
val colum = Bytes.toBytes(ele._2._2)
var value: Array[Byte] = null
if (toDateFields.length >0 && toDateFields.contains(ele._2._2)) {
var firstDate: Long = 0
val fast = FastDateFormat.getInstance("yyyy-MM-dd")
firstDate = fast.parse(ele._2._3.toString).getTime
value = Bytes.toBytes(firstDate.toString)
} else {
value = Bytes.toBytes(ele._2._3.toString)
}
(new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, family, colum, value))
})
// Save Hfiles on HDFS
rdd.saveAsNewAPIHadoopFile(hFilePath, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], hconf)
//使用BulkLoad导入到HBase
val bulkLoader = new LoadIncrementalHFiles(hconf)
val regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName))
bulkLoader.doBulkLoad(new Path(hFilePath), admin, table, regionLocator)
sparkSession.stop()
}
}
到此大功告成,hive数据顺利加载到HBase。很奥力给!
以上是关于使用bulkload方式加载数据到HBase(三种方式)的主要内容,如果未能解决你的问题,请参考以下文章