使用bulkload方式加载数据到HBase(三种方式)

Posted 上官沐雪

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用bulkload方式加载数据到HBase(三种方式)相关的知识,希望对你有一定的参考价值。

使用spark把hive数据bulkload到HBase

场景:最近有大量数据存在hive里,由于业务需呀,把hive数据放到HBase里

于是想到如下三种方案:

1.使用hive表映射hbase

CREATE external TABLE `hbase_website`(
  `key` string, 
  `ocid` string, 
  `companyname` string, 
  `createtime` bigint, 
  `updatetime` bigint, 
  `sitename` string, 
  `number` string, 
  `homeurl` string, 
  `checkdate` string, 
  `domain` string)
  STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'    
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,info:ocid,info:companyname,info:createtime,info:updatetime,info:sitename,info:number,info:homeurl,info:checkdate,info:domain")    
TBLPROPERTIES("hbase.table.name" = "dingtalk:website");

在hive里创建如上所示的表把,数据与hbase表的table相对应,然后insert 进去即可。
但是上面的过程需要写WAL,内存,flush,等过程,性能极差。

2. 使用hbase自带的ImportTsv

## 1) 把csv文件转换成 hfile文件
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv \\
'-Dimporttsv.separator=,' \\
-Dimporttsv.bulk.output=/tmp/test/hfile \\
-Dimporttsv.columns=HBASE_ROW_KEY,info:en,info:ch my_test /tmp/test/test.tsv


## 2) hfile文件 文件导入到hbase
HADOOP_CLASSPATH=`/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hbase/bin/hbase mapredcp` \\
hadoop jar /opt/cloudera/parcels/CDH/lib/hbase/hbase-mapreduce.jar \\
completebulkload /tmp/test/hfile default:my_test

这种方式可以快速的把数据导入到hbase,但是如果需要导入数据里面有与分隔符相同的字符那么会出现错误,而且分割符的长度为1.

3. 最终方案:使用spark把hive数据bulkload到HBase

下面是代码,贴上即可使用

//pom文件
 <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scala.version>2.11.8</scala.version>
        <scala.version.simple>2.11</scala.version.simple>
        <hadoop.version>2.6.0</hadoop.version>
        <spark.version>2.4.0</spark.version>
        <netlib.java.version>1.1.2</netlib.java.version>
        <hbase.version>2.1.0</hbase.version>
    </properties>

    <dependencies>

        <!-- Scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <!-- Spark Core -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version.simple}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- Spark SQL -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version.simple}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- Hadoop Client -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>


        <!-- hbase -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>${hbase.version}</version>
        </dependency>


        <!-- 添加mysql数据库驱动类 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
    </dependencies>

 <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <plugins>
            <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                </configuration>
            </plugin>
            <!-- 指定编译scala的插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <!-- 打包插件(会包含所有依赖) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <!--
                                        zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- 设置jar包的入口类(可选) -->
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

代码

// An highlighted block
package com.zhendao.hbase.bulkload

import org.apache.commons.lang3.time.FastDateFormat
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue, TableName}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}

/**
  * @ Description   :
  * @ Author        :  lish.Li
  * @ CreateDate    :  2021/5/31 13:29
  * @ Version       :  1.0
  */

object Trademark_pre {


  val zookeeperQuorum = "master1:2181,master2:2181,core1:2181" //zookeeper信息
  val hFilePath = "hdfs:///tmp/test/hfile/HFileTable" + System.currentTimeMillis() //hfile的存储路径
  val tableName = "dingtalk:trade_mark"
  val sql =
    """
      |select
      |key
      |,shangBiaoImage
      |,shangPin
      |,likenessGroup
      |,liuCheng
      |,zhuCeHao
      |,status
      |,shenQingTime
      |,clazzProduct
      |,clazz
      |,shangBiaoName
      |,applicantName
      |,applicantEName
      |,companyAdd
      |,companyEnglishAdd
      |,chuShenDate
      |,zhuCeDate
      |,commonTrademarks
      |,`type`
      |,jieZhiRi
      |,trademarkForm
      |,dateInternationalRegistration
      |,postSpecifiedDate
      |,priorityDate
      |,agent
      |from mongodb_dingtalk.hive_trademark_jx_pre where hid is not null
    """.stripMargin
  //列族
  val familyName = "info"
  private val toDateFields = List()
  def main(args: Array[String]): Unit = {
    //spark 相应配置
    val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$"))
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //      .setMaster("local[2]")

    val sparkSession = SparkSession.builder()
      .enableHiveSupport()
      .config(sparkConf)
      .getOrCreate()

    //zookeeper 相应配置
    val hconf: Configuration = HBaseConfiguration.create()
    hconf.set("hbase.zookeeper.quorum", zookeeperQuorum)
    hconf.set("hbase.zookeeper.property.clientPort", "2181")
    //设置hfile的大小 10G 进行数据分裂
    hconf.set("hbase.hregion.max.filesize", "10737418240")
    hconf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", "320");
  //设置tableName
    hconf.set("hbase.mapreduce.hfileoutputformat.table.name", tableName)


    //获取hbase连接
    val hbaseConn: Connection = ConnectionFactory.createConnection(hconf)
    val table = hbaseConn.getTable(TableName.valueOf(tableName))
    val admin = hbaseConn.getAdmin


    /**
      * 保存生成的HFile文件
      * 注:bulk load  生成的HFile文件需要落地
      * 然后再通过LoadIncrementalHFiles类load进Hbase
      * 此处关于  sortBy 操作详解:
      * 0. Hbase查询是根据rowkey进行查询的,并且rowkey是有序,
      * 某种程度上来说rowkey就是一个索引,这是Hbase查询高效的一个原因,
      * 这就要求我们在插入数据的时候,要插在rowkey该在的位置。
      * 1. Put方式插入数据,会有WAL,同时在插入Hbase的时候会根据RowKey的值选择合适的位置,此方式本身就可以保证RowKey有序
      * 2. bulk load 方式没有WAL,它更像是hive通过load方式直接将底层文件HFile移动到制定的Hbase路径下,所以,在不东HFile的情况下,要保证本身有序才行
      * 之前写的时候只要rowkey有序即可,但是2.0.2版本的时候发现clounm也要有序,所以会有sortBy(x => (x._1, x._2.getKeyString), true)
      *
      *
      */

    // 1. 清洗需要存放到 HFile 中的数据,rowKey 一定要排序,否则会报错:
    // java.io.IOException: Added a key not lexically larger than previous.
    sparkSession.sql("use mongodb_dingtalk")
    val dataFrame: DataFrame = sparkSession.sql(sql)

    val hiveCopyrightRdd = dataFrame.rdd

    var fields : Array[String] = dataFrame.columns
    fields .foreach(println(_))
    fields=fields.dropWhile( line => line.equals("key"))


    /**
      * 下面的RDD类型:
      * (key, (("info", "ocid", ocid)))
      * 分别对应 rowKey 列族 列名 value
      *
      */
    val hbaseRowRdd = hiveCopyrightRdd.flatMap(rows => {
      val key =rows.getAs[String]("key")
      fields.flatMap(field => {
        val fieldValue = rows.getAs[String](field)
        Array(
          (key,((familyName, field, fieldValue)))
        )
      }
      )
    })

    val rdd: RDD[(ImmutableBytesWritable, KeyValue)] = hbaseRowRdd.filter(line => {
      line._2._3 != null
    }).sortBy(line => (line._1, line._2._1, line._2._2)).map(ele => {
      val rowKey = Bytes.toBytes(ele._1)
      val family = Bytes.toBytes(ele._2._1)
      val colum = Bytes.toBytes(ele._2._2)
      var value: Array[Byte] = null
      if (toDateFields.length >0 && toDateFields.contains(ele._2._2)) {
        var firstDate: Long = 0
        val fast = FastDateFormat.getInstance("yyyy-MM-dd")
        firstDate = fast.parse(ele._2._3.toString).getTime
        value = Bytes.toBytes(firstDate.toString)
      } else {
        value = Bytes.toBytes(ele._2._3.toString)
      }
      (new ImmutableBytesWritable(rowKey), new KeyValue(rowKey, family, colum, value))
    })

    // Save Hfiles on HDFS
    rdd.saveAsNewAPIHadoopFile(hFilePath, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], hconf)

    //使用BulkLoad导入到HBase
    val bulkLoader = new LoadIncrementalHFiles(hconf)
    val regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName))
    bulkLoader.doBulkLoad(new Path(hFilePath), admin, table, regionLocator)
    sparkSession.stop()

  }
}

到此大功告成,hive数据顺利加载到HBase。很奥力给!

以上是关于使用bulkload方式加载数据到HBase(三种方式)的主要内容,如果未能解决你的问题,请参考以下文章

spark将数据加载到hbase--bulkload方式

spark将数据加载到hbase--bulkload方式

BulkLoad加载本地文件到HBase表

使用BulkLoad从HDFS批量导入数据到HBase

通过Spark生成HFile,并以BulkLoad方式将数据导入到HBase

Hbase Bulkload 原理|面试必备