2021年大数据HBase(十五):HBase的Bulk Load批量加载操作

Posted Lansonli

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了2021年大数据HBase(十五):HBase的Bulk Load批量加载操作相关的知识,希望对你有一定的参考价值。

全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 

新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点。

目录

系列历史文章

HBase的Bulk Load批量加载操作

一、Bulk Load 基本介绍

二、需求说明

三、准备工作

1、在hbase中创建名称空间, 并创建hbase的表

2、创建 maven项目 加载相关的pom 依赖

3、在项目中创建包 和 添加配置文件log4j.properties

4、将转换csv数据上传到HDFS中: 数据在资料中

四、将CSV数据转换为HFile文件格式数据

map 程序的代码

驱动类的代码 

五、将Hfile文件格式数据加载HBase中

语法说明

案例


系列历史文章

2021年大数据HBase(十七):HBase的360度全面调优

2021年大数据HBase(十六):HBase的协处理器(Coprocessor)

2021年大数据HBase(十五):HBase的Bulk Load批量加载操作

2021年大数据HBase(十四):HBase的原理及其相关的工作机制

2021年大数据HBase(十三):HBase读取和存储数据的流程

2021年大数据HBase(十二):Apache Phoenix 二级索引

2021年大数据HBase(十一):Apache Phoenix的视图操作

2021年大数据HBase(十):Apache Phoenix的基本入门操作

2021年大数据HBase(九):Apache Phoenix的安装

2021年大数据HBase(八):Apache Phoenix的基本介绍

2021年大数据HBase(七):Hbase的架构!【建议收藏】

2021年大数据HBase(六):HBase的高可用!【建议收藏】

2021年大数据HBase(五):HBase的相关操作-JavaAPI方式!【建议收藏】

2021年大数据HBase(四):HBase的相关操作-客户端命令式!【建议收藏】

2021年大数据HBase(三):HBase数据模型

2021年大数据HBase(二):HBase集群安装操作

2021年大数据HBase(一):HBase基本简介

HBase的Bulk Load批量加载操作

一、Bulk Load 基本介绍

        很多时候,我们需要将外部的数据导入到HBase集群中,例如:将一些历史的数据导入到HBase做备份。我们之前已经学习了HBase的Java API,通过put方式可以将数据写入到HBase中,我们也学习过通过MapReduce编写代码将HDFS中的数据导入到HBase。但这些方式都是基于HBase的原生API方式进行操作的。这些方式有一个共同点,就是需要与HBase连接,然后进行操作。HBase服务器要维护、管理这些连接,以及接受来自客户端的操作,会给HBase的存储、计算、网络资源造成较大消耗。此时,在需要将海量数据写入到HBase时,通过Bulk load(大容量加载)的方式,会变得更高效。可以这么说,进行大量数据操作,Bulk load是必不可少的。

       我们知道,HBase的数据最终是需要持久化到HDFS。HDFS是一个文件系统,那么数据可定是以一定的格式存储到里面的。例如:Hive我们可以以ORC、Parquet等方式存储。而HBase也有自己的数据格式,那就是HFile。Bulk Load就是直接将数据写入到StoreFile(HFile)中,从而绕开与HBase的交互,HFile生成后,直接一次性建立与HBase的关联即可。使用BulkLoad,绕过了Write to WAL,Write to MemStore及Flush to disk的过程

原有的数据写入操作大致流转流程:

正常写入数据的流程: 数据写入到Hlog --> MemStore --> StoreFile --> Hfile 

如果以及有一批数据, 需要写入到Hbase中某个表中, 传统做法, 按照上述流程, 一步步将数据最终写入Hfile中, 此时整个region集群会经历大量的写入请求操作,HBase集群需要调度大量资源来满足本次的数据写入工作,如果这个时候, 又出现大量的读取数据请求也去访问这个表, 会发生什么问题呢? 读取性能有可能回受到影响 甚至出现卡顿现象

思考如何解决呢?
    对一批数据, 提前按照HBase的Hfile文件格式存储好, 然后将Hfile文件格式数据直接放置到Hbase对应数据目录下, 让Hbase直接加载, 此时不需要Hbase提供大量的写入资源, 即可完成全部数据写入操作
 
总结: 

    第一个步骤: 将数据文件转换为HFile文件格式   -- MapReduce
    第二个步骤: 将Hfile文件格式数据加载到Hbase中

二、需求说明

需求: 需要将每一天的银行转账记录的数据 存储到HBase中 , 数据量比较的庞大

  • 数据所在位置: HDFS中,

  • 数据格式为: CSV 分割符号为逗号

三、准备工作

1、在hbase中创建名称空间, 并创建hbase的表

# 创建名称空间: 
create_namespace 'IT_BANK'
# 在名称空间下, 创建目标表: 
create 'IT_BANK:TRANSFER_RECORD' ,NAME=>'C1',COMPRESSION=>'GZ',NUMREGIONS=>6,SPLITALGO=>'HexStringSplit'

2、创建 maven项目 加载相关的pom 依赖

说明: 如果将此全部导入到本项目中, 出现全部爆红错误, 可以将此内容放置到父工程的pom依赖中

此时 有可能导致其他某个项目爆红(不用管), 只需要保证当前自己的项目没有任何问题即可

<repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>false</enabled>
                <updatePolicy>never</updatePolicy>
            </snapshots>
        </repository>
    </repositories>


    <dependencies>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>2.1.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
            <version>2.7.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.7.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-auth</artifactId>
            <version>2.7.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.5</version>
        </dependency>

        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <target>1.8</target>
                    <source>1.8</source>
                </configuration>
            </plugin>
        </plugins>
    </build>

3、在项目中创建包 和 添加配置文件log4j.properties

4、将转换csv数据上传到HDFS中: 数据在资料中

hdfs dfs -mkdir -p /bulkLoad/output

hdfs dfs -put bank_record.csv /bulkLoad/output

四、将CSV数据转换为HFile文件格式数据

map 程序的代码

package com.it.bulkload.mr;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class BulkLoadMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put> 
    private  ImmutableBytesWritable k2 = new ImmutableBytesWritable();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 

        //1. 获取一行数据
        String line = value.toString();

        if(line != null && !"".equalsIgnoreCase(line))
            //2. 对数据进行切割操作
            String[] fields = line.split(",");
            //3. 封装k2 和 v2的数据
            //封装 k2
            String rowkeyStr = fields[0];
            k2.set(rowkeyStr.getBytes());

            // 封装v2
            Put v2 = new Put(rowkeyStr.getBytes());
            v2.addColumn("C1".getBytes(),"code".getBytes(),fields[1].getBytes());
            v2.addColumn("C1".getBytes(),"rec_account".getBytes(),fields[2].getBytes());
            v2.addColumn("C1".getBytes(),"rec_bank_name".getBytes(),fields[3].getBytes());
            v2.addColumn("C1".getBytes(),"rec_name".getBytes(),fields[4].getBytes());
            v2.addColumn("C1".getBytes(),"pay_account".getBytes(),fields[5].getBytes());
            v2.addColumn("C1".getBytes(),"pay_name".getBytes(),fields[6].getBytes());
            v2.addColumn("C1".getBytes(),"pay_comments".getBytes(),fields[7].getBytes());
            v2.addColumn("C1".getBytes(),"pay_channel".getBytes(),fields[8].getBytes());
            v2.addColumn("C1".getBytes(),"pay_way".getBytes(),fields[9].getBytes());
            v2.addColumn("C1".getBytes(),"status".getBytes(),fields[10].getBytes());
            v2.addColumn("C1".getBytes(),"timestamp".getBytes(),fields[11].getBytes());
            v2.addColumn("C1".getBytes(),"money".getBytes(),fields[12].getBytes());


            //4. 输出
            context.write(k2,v2);

        


    

驱动类的代码 

package com.it.bulkload.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class BulkLoadDriver 


    public static void main(String[] args) throws Exception 

        //1. 获取job对象
        //Configuration conf = new Configuration();
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");
        Job job = Job.getInstance(conf, "BulkLoadDriver");

        //2. 配置集群运行的必备项
        job.setJarByClass(BulkLoadDriver.class);

        //3. 配置 MR的天龙八部

        //3.1: 指定输入类, 及其输入的路径
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("hdfs://node1:8020/bulkLoad/input/bank_record.csv"));

        //3.2: 指定map类 及其输出的k2和v2的类型
        job.setMapperClass(BulkLoadMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);

        //3.3 : 指定 shuffle操作:  分区 排序 规约 分组  默认即可

        //3.7: 指定reduce类, 及其输出 k3 和 v3的类型
        job.setNumReduceTasks(0);

        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Put.class);


        //3.8: 设置输出类, 及其输出的路径: HFile文件格式
        job.setOutputFormatClass(HFileOutputFormat2.class);
        HFileOutputFormat2.setOutputPath(job,new Path("hdfs://node1:8020/bulkLoad/output"));

        Connection conn = ConnectionFactory.createConnection(conf);
        Table table = conn.getTable(TableName.valueOf("IT_BANK:TRANSFER_RECORD"));

        HFileOutputFormat2.configureIncrementalLoad(job,table,conn.getRegionLocator(TableName.valueOf("IT_BANK:TRANSFER_RECORD")));

        //4. 提交任務
        boolean flag = job.waitForCompletion(true);

        //5. 退出程序
        System.exit(flag ? 0 : 1);
    


五、将Hfile文件格式数据加载HBase中

语法说明

hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles  数据路径 Hbase表名

案例

hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles  hdfs://node1:8020/bulkLoad/output/  IT_BANK:TRANSFER_RECORD

  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢大数据系列文章会每天更新,停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

以上是关于2021年大数据HBase(十五):HBase的Bulk Load批量加载操作的主要内容,如果未能解决你的问题,请参考以下文章

2021年大数据HBase:HBase数据模型!!!建议收藏

2021年大数据HBase(十三):HBase读取和存储数据的流程

2021年大数据HBase(十三):HBase读取和存储数据的流程

2021年大数据HBase:Hbase的架构!建议收藏

2021年大数据HBase:Hbase的架构!建议收藏

2021年大数据Hbase:Hbase基本简介