Hbase

Posted 捡黄金的少年

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hbase相关的知识,希望对你有一定的参考价值。

1、HBase集群安装部署

    hbase安装包

(1)、解压文件

cd /kkb/soft/
tar -xzvf hbase-1.2.0-cdh5.14.2.tar.gz -C /kkb/install/

(2)、修改HBase配置文件

cd /kkb/install/hbase-1.2.0-cdh5.14.2/conf/
vim hbase-env.sh

 添加java地址

export JAVA_HOME=/opt/install/jdk1.8.0_141
export HBASE_MANAGES_ZK=false 

 (3)、修改 hbase-site.xml

vim hbase-site.xml

<configuration>
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://node01:8020/hbase</value>  
    </property>
    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>
    <!-- 0.98后的新变动,之前版本没有.port,默认端口为60000 -->
    <property>
        <name>hbase.master.port</name>
        <value>16000</value>
    </property>
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>node01,node02,node03</value>
    </property>
    <!-- 此属性可省略,默认值就是2181 -->
    <property>
        <name>hbase.zookeeper.property.clientPort</name>
        <value>2181</value>
    </property>
    <property>
        <name>hbase.zookeeper.property.dataDir</name>
        <value>/kkb/install/zookeeper-3.4.5-cdh5.14.2/zkdatas</value>
    </property>
    <!-- 此属性可省略,默认值就是/hbase -->
    <property>
        <name>zookeeper.znode.parent</name>
        <value>/hbase</value>
    </property>
</configuration> 

(4)、egionservers修改

[hadoop@node01 conf]$ vim regionservers

  • 指定HBase集群的从节点;原内容清空,添加如下三行

node01
node02
node03

(5)、back-masters

  • 创建back-masters配置文件,里边包含备份HMaster节点的主机名,每个机器独占一行,实现HMaster的高可用

  • vim backup-masters

  • 将node02作为备份的HMaster节点,问价内容如下

node02 

(6)、分发安装包

 cd /kkb/install
scp -r hbase-1.2.0-cdh5.14.2/ node02:$PWD

scp -r hbase-1.2.0-cdh5.14.2/ node03:$PWD

(7)、建立软连接

  • 注意:三台机器均做如下操作

  • 因为HBase集群需要读取hadoop的core-site.xml、hdfs-site.xml的配置文件信息,所以我们==三台机器==都要执行以下命令,在相应的目录创建这两个配置文件的软连接

ln -s /kkb/install/hadoop-2.6.0-cdh5.14.2/etc/hadoop/core-site.xml  /kkb/install/hbase-1.2.0-cdh5.14.2/conf/core-site.xml

ln -s /kkb/install/hadoop-2.6.0-cdh5.14.2/etc/hadoop/hdfs-site.xml  /kkb/install/hbase-1.2.0-cdh5.14.2/conf/hdfs-site.xml

(7)、添加HBase环境变量

  • 注意:三台机器均执行以下命令,添加环境变量

vim /etc/profile

export HBASE_HOME=/kkb/install/hbase-1.2.0-cdh5.14.2
export PATH=$PATH:$HBASE_HOME/bin

重新编译/etc/profile,让环境变量生效

 source /etc/profile

(8)、Base的启动与停止

  • 需要提前启动HDFS及ZooKeeper集群

  • 第一台机器==node01==(HBase主节点)执行以下命令,启动HBase集群

 start-hbase.sh

  • 启动完后,jps查看HBase相关进程

    node01、node02上有进程HMaster、HRegionServer

    node03上有进程HRegionServer

  • 警告提示:HBase启动的时候会产生一个警告,这是因为jdk7与jdk8的问题导致的,如果linux服务器安装jdk8就会产生这样的一个警告  

查看当前所有节点Jps状态

2、hbase基础学习

2.1、rowkey行键

  • table的主键,table中的记录==按照rowkey 的字典序进行排序==

  • Row key行键可以是任意字符串(最大长度是 64KB,实际应用中长度一般为 10-100bytes)

2.2 Column Family列族

  • 列族或列簇

  • HBase表中的每个列,都归属与某个列族

  • 列族是表的schema的一部分(而列不是),即建表时至少指定一个列族

  • 比如创建一张表,名为user,有两个列族,分别是infodata,建表语句create 'user', 'info', 'data'

2.3 Column列

  • 列肯定是表的某一列族下的一个列,用列族名:列名表示,如info列族下的name列,表示为info:name

  • 属于某一个ColumnFamily,类似于我们mysql当中创建的具体的列

2.4 cell单元格

  • 指定row key行键、列族、列,可以确定的一个cell单元格

  • cell中的数据是没有类型的,全部是以字节数组进行存储

2.5 Timestamp时间戳

  • 可以对表中的Cell多次赋值,每次赋值操作时的时间戳timestamp,可看成Cell值的版本号version number

  • 即一个Cell可以有多个版本的值

Client客户端

  • Client是操作HBase集群的入口

    • 对于管理类的操作,如表的增、删、改操纵,Client通过RPC与HMaster通信完成

    • 对于表数据的读写操作,Client通过RPC与RegionServer交互,读写数据

  • Client类型:

    • HBase shell

    • Java编程接口

    • Thrift、Avro、Rest等等

 ZooKeeper集群

  • 作用

    • 实现了HMaster的高可用,多HMaster间进行主备选举

    • 保存了HBase的元数据信息meta表,提供了HBase表中region的寻址入口的线索数据

    • 对HMaster和HRegionServer实现了监控

 HMaster

  • HBase集群也是主从架构,HMaster是主的角色,是老大

  • 主要负责Table表和Region的相关管理工作:

  • 关于Table

    • 管理Client对Table的增删改的操作

    • 关于Region

      • 在Region分裂后,负责新Region分配到指定的HRegionServer上

      • 管理HRegionServer间的负载均衡,迁移region分布

      • 当HRegionServer宕机后,负责其上的region的迁移

HRegionServer

  • HBase集群中从的角色,是小弟

  • 作用

    • 响应客户端的读写数据请求

    • 负责管理一系列的Region

    • 切分在运行过程中变大的region

 Region

  • HBase集群中分布式存储的最小单元

  • 一个Region对应一个Table表的部分数据

基础命令操作 

进入hbase

hbase shell 

创建user表,以及info,data列族 

create 'user', 'info', 'data' 

向user表中插入信息,row key为rk0001,列族info中添加名为name的列,值为zhangsan 


HBase(main):011:0> put 'user', 'rk0001', 'info:name', 'zhangsan' 

根据rowkey查询该行所有列族的数据

get 'user', 'rk0001' 

 获取user表中row key为rk0001,info列族的name、age列的信息

get 'user', 'rk0001', 'info:name', 'info:age'

过滤值, 获取user表中row key为rk0001,cell的值为zhangsan的信息

 get 'user', 'rk0001', FILTER => "ValueFilter(=, 'binary:zhangsan')"

获取user表中row key为rk0001,列标示符中含有a的信息  

get 'user', 'rk0001', FILTER => "QualifierFilter(=,'substring:a')" 

 

 查看所有的表

list

查看表中所有数据

scan 'user' 

查看user表中,一个列族的所有信息 

 scan 'user', COLUMNS => 'info' 

 查询info:name列、data:pic列的数据

scan 'user', COLUMNS => ['info:name', 'data:pic']

将user表的f1列族版本数改为5

alter 'user', NAME => 'info', VERSIONS => 5 

删除user表row key为rk0001,列标示符为info:name的数据  

delete 'user', 'rk0001', 'info:name' 

删除user表row key为rk0001,列标示符为info:name,timestamp为1392383705316的数据  

delete 'user', 'rk0001', 'info:name', 1392383705316 

清空表数据

truncate 'user'

 删除表

  • 首先需要先让该表为disable状态,使用命令:

disable 'user'
  • 然后使用drop命令删除这个表

 drop 'user'

(注意:如果直接drop表,会报错:Drop the named table. Table must first be disabled)

查看所有的表

list 

统计表中有多少个行键(rowkey) 

count 'user'

javaApi操作 

pom.xml

<dependencies>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.6.0-mr1-cdh5.14.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>1.2.0-cdh5.14.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>1.2.0-cdh5.14.2</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.testng</groupId>
        <artifactId>testng</artifactId>
        <version>6.14.3</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter</artifactId>
        <version>RELEASE</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.junit.jupiter</groupId>
        <artifactId>junit-jupiter</artifactId>
        <version>RELEASE</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13.1</version>
        <scope>compile</scope>
    </dependency>

</dependencies>

1、创建表

    @Test
    public void createTable() throws IOException 
        Configuration configuration = HBaseConfiguration.create();
        //连接HBase集群不需要指定HBase主节点的ip地址和端口号
        configuration.set("hbase.zookeeper.quorum","node01:2181,node02:2181,node03:2181");
        //创建连接对象
        Connection connection = ConnectionFactory.createConnection(configuration);
        //获取连接对象,创建一张表
        //获取管理员对象,来对手数据库进行DDL的操作
        Admin admin = connection.getAdmin();
        //指定我们的表名
        TableName myuser = TableName.valueOf("myuser");
        HTableDescriptor hTableDescriptor = new HTableDescriptor(myuser);
        //指定两个列族
        HColumnDescriptor f1 = new HColumnDescriptor("f1");
        HColumnDescriptor f2 = new HColumnDescriptor("f2");
        hTableDescriptor.addFamily(f1);
        hTableDescriptor.addFamily(f2);

        admin.createTable(hTableDescriptor);
        admin.close();
        connection.close();
    

2、添加数据

  private Connection connection;
    private final String TABLE_NAME = "myuser";
    private Table table;

    @Before
    public void initTable() throws IOException 
        System.out.println("1111111111");
        Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", "node01:2181,node02:2181");
        connection = ConnectionFactory.createConnection(configuration);
        table = connection.getTable(TableName.valueOf(TABLE_NAME));
    

    @After
    public void close() throws IOException 
        System.out.println("222222");
        table.close();
        connection.close();
    

    /**
     * 向myuser表当中添加数据
     */
    @Test
    public void addData() throws IOException 
        //获取表
        //Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
        Put put = new Put("0001".getBytes());//创建put对象,并指定rowkey值
        put.addColumn("f2".getBytes(), "name".getBytes(), "zhangsan".getBytes());
        put.addColumn("f2".getBytes(), "age".getBytes(), Bytes.toBytes(18));
        put.addColumn("f2".getBytes(), "id".getBytes(), Bytes.toBytes(25));
        put.addColumn("f2".getBytes(), "address".getBytes(), Bytes.toBytes("地球人"));
        System.out.println("000000000000000");
        table.put(put);
        table.close();
    

3、批量插入数据 

   /**
     * hbase的批量插入数据
     */
    @Test
    public void batchInsert() throws IOException 
        //创建put对象,并指定rowkey
        Put put = new Put("0002".getBytes());
        //f1
        put.addColumn("f1".getBytes(), "id".getBytes(), Bytes.toBytes(1));
        put.addColumn("f1".getBytes(), "name".getBytes(), Bytes.toBytes("曹操"));
        put.addColumn("f1".getBytes(), "age".getBytes(), Bytes.toBytes(30));
        //f2
        put.addColumn("f2".getBytes(), "sex".getBytes(), Bytes.toBytes("1"));
        put.addColumn("f2".getBytes(), "address".getBytes(), Bytes.toBytes("沛国谯县"));
        put.addColumn("f2".getBytes(), "phone".getBytes(), Bytes.toBytes("16888888888"));
        put.addColumn("f2".getBytes(), "say".getBytes(), Bytes.toBytes("helloworld"));

        Put put2 = new Put("0003".getBytes());
        put2.addColumn("f1".getBytes(), "id".getBytes(), Bytes.toBytes(2));
        put2.addColumn("f1".getBytes(), "name".getBytes(), Bytes.toBytes("刘备"));
        put2.addColumn("f1".getBytes(), "age".getBytes(), Bytes.toBytes(32));
        put2.addColumn("f2".getBytes(), "sex".getBytes(), Bytes.toBytes("1"));
        put2.addColumn("f2".getBytes(), "address".getBytes(), Bytes.toBytes("幽州涿郡涿县"));
        put2.addColumn("f2".getBytes(), "phone".getBytes(), Bytes.toBytes("17888888888"));
        put2.addColumn("f2".getBytes(), "say".getBytes(), Bytes.toBytes("talk is cheap , show me the code"));

        Put put3 = new Put("0004".getBytes());
        put3.addColumn("f1".getBytes(), "id".getBytes(), Bytes.toBytes(3));
        put3.addColumn("f1".getBytes(), "name".getBytes(), Bytes.toBytes("孙权"));
        put3.addColumn("f1".getBytes(), "age".getBytes(), Bytes.toBytes(35));
        put3.addColumn("f2".getBytes(), "sex".getBytes(), Bytes.toBytes("1"));
        put3.addColumn("f2".getBytes(), "address".getBytes(), Bytes.toBytes("下邳"));
        put3.addColumn("f2".getBytes(), "phone".getBytes(), Bytes.toBytes("12888888888"));
        put3.addColumn("f2".getBytes(), "say".getBytes(), Bytes.toBytes("what are you 弄啥嘞!"));

        Put put4 = new Put("0005".getBytes());
        put4.addColumn("f1".getBytes(), "id".getBytes(), Bytes.toBytes(4));
        put4.addColumn("f1".getBytes(), "name".getBytes(), Bytes.toBytes("诸葛亮"));
        put4.addColumn("f1".getBytes(), "age".getBytes(), Bytes.toBytes(28));
        put4.addColumn("f2".getBytes(), "sex".getBytes(), Bytes.toBytes("1"));
        put4.addColumn("f2".getBytes(), "address".getBytes(), Bytes.toBytes("四川隆中"));
        put4.addColumn("f2".getBytes(), "phone".getBytes(), Bytes.toBytes("14888888888"));
        put4.addColumn("f2".getBytes(), "say".getBytes(), Bytes.toBytes("出师表你背了嘛"));

        Put put5 = new Put("0006".getBytes());
        put5.addColumn("f1".getBytes(), "id".getBytes(), Bytes.toBytes(5));
        put5.addColumn("f1".getBytes(), "name".getBytes(), Bytes.toBytes("司马懿"));
        put5.addColumn("f1".getBytes(), "age".getBytes(), Bytes.toBytes(27));
        put5.addColumn("f2".getBytes(), "sex".getBytes(), Bytes.toBytes("1"));
        put5.addColumn("f2".getBytes(), "address".getBytes(), Bytes.toBytes("哪里人有待考究"));
        put5.addColumn("f2".getBytes(), "phone".getBytes(), Bytes.toBytes("15888888888"));
        put5.addColumn("f2".getBytes(), "say".getBytes(), Bytes.toBytes("跟诸葛亮死掐"));


        Put put6 = new Put("0007".getBytes());
        put6.addColumn("f1".getBytes(), "id".getBytes(), Bytes.toBytes(5));
        put6.addColumn("f1".getBytes(), "name".getBytes(), Bytes.toBytes("xiaobubu—吕布"));
        put6.addColumn("f1".getBytes(), "age".getBytes(), Bytes.toBytes(28));
        put6.addColumn("f2".getBytes(), "sex".getBytes(), Bytes.toBytes("1"));
        put6.addColumn("f2".getBytes(), "address".getBytes(), Bytes.toBytes("内蒙人"));
        put6.addColumn("f2".getBytes(), "phone".getBytes(), Bytes.toBytes("15788888888"));
        put6.addColumn("f2".getBytes(), "say".getBytes(), Bytes.toBytes("貂蝉去哪了"));

        List<Put> listPut = new ArrayList<Put>();
        listPut.add(put);
        listPut.add(put2);
        listPut.add(put3);
        listPut.add(put4);
        listPut.add(put5);
        listPut.add(put6);

        table.put(listPut);
    

4、行键rowkey的查询

/**
     * 查询rowkey为0003的人
     * get -> Result
     */
    @Test
    public void getData() throws IOException 
        //Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
        //通过get对象,指定rowkey
        Get get = new Get(Bytes.toBytes("0007"));

        get.addFamily("f1".getBytes());//限制只查询f1列族下面所有列的值
        //查询f2  列族 phone  这个字段
        get.addColumn("f2".getBytes(), "address".getBytes());
        //通过get查询,返回一个result对象,所有的字段的数据都是封装在result里面了

        Result result = table.get(get);
        List<Cell> cells = result.listCells();  //获取一条数据所有的cell,所有数据值都是在cell里面 的

        if (cells != null) 
            for (Cell cell : cells) 
                byte[] family_name = CellUtil.cloneFamily(cell);//获取列族名
                byte[] column_name = CellUtil.cloneQualifier(cell);//获取列名
                byte[] rowkey = CellUtil.cloneRow(cell);//获取rowkey
                byte[] cell_value = CellUtil.cloneValue(cell);//获取cell值
                //需要判断字段的数据类型,使用对应的转换的方法,才能够获取到值
                if ("age".equals(Bytes.toString(column_name)) || "id".equals(Bytes.toString(column_name))) 
                    System.out.println(Bytes.toString(family_name));
                    System.out.println(Bytes.toString(column_name));
                    System.out.println(Bytes.toString(rowkey));
                    System.out.println(Bytes.toInt(cell_value));
                 else 
                    System.out.println(Bytes.toString(family_name));
                    System.out.println(Bytes.toString(column_name));
                    System.out.println(Bytes.toString(rowkey));
                    System.out.println(Bytes.toString(cell_value));
                
            
            //table.close();
        
    
5、不知道rowkey的具体值,我想查询rowkey范围值是0003  到0006
 @Test
    public void scanData() throws IOException 
        //获取table
        //Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
        Scan scan = new Scan();//没有指定startRow以及stopRow  全表扫描
        //只扫描f1列族
        scan.addFamily("f1".getBytes());
        //扫描 f2列族 phone  这个字段
        scan.addColumn("f2".getBytes(), "phone".getBytes());
        scan.setStartRow("0003".getBytes());
        scan.setStopRow("0007".getBytes());
        //通过getScanner查询获取到了表里面所有的数据,是多条数据
        ResultScanner scanner = table.getScanner(scan);
        //遍历ResultScanner 得到每一条数据,每一条数据都是封装在result对象里面了
        for (Result result : scanner) 
            List<Cell> cells = result.listCells();
            for (Cell cell : cells) 
                byte[] family_name = CellUtil.cloneFamily(cell);
                byte[] qualifier_name = CellUtil.cloneQualifier(cell);
                byte[] rowkey = CellUtil.cloneRow(cell);
                byte[] value = CellUtil.cloneValue(cell);
                //判断id和age字段,这两个字段是整形值
                if ("age".equals(Bytes.toString(qualifier_name)) || "id".equals(Bytes.toString(qualifier_name))) 
                    System.out.println("数据的rowkey为" + Bytes.toString(rowkey) + "======数据的列族为" + Bytes.toString(family_name) + "======数据的列名为" + Bytes.toString(qualifier_name) + "==========数据的值为" + Bytes.toInt(value));
                 else 
                    System.out.println("数据的rowkey为" + Bytes.toString(rowkey) + "======数据的列族为" + Bytes.toString(family_name) + "======数据的列名为" + Bytes.toString(qualifier_name) + "==========数据的值为" + Bytes.toString(value));
                
            
        
        //table.close();
    

 6、查询所有rowkey比0003小的数据

  @Test
    public void rowFilter() throws IOException 
        //Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
        Scan scan = new Scan();
        //获取我们比较对象
        BinaryComparator binaryComparator = new BinaryComparator("0003".getBytes());
        /***
         * rowFilter需要加上两个参数
         * 第一个参数就是我们的比较规则
         * 第二个参数就是我们的比较对象
         */
        RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, binaryComparator);
        //为我们的scan对象设置过滤器
        scan.setFilter(rowFilter);
        SubstringComparator substringComparator = new SubstringComparator("f2");
        //通过familyfilter来设置列族的过滤器
//        FamilyFilter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, substringComparator);
//        scan.se(familyFilter);
        ResultScanner scanner = table.getScanner(scan);
        for (Result result : scanner) 
            List<Cell> cells = result.listCells();
            for (Cell cell : cells) 
                byte[] family_name = CellUtil.cloneFamily(cell);
                byte[] qualifier_name = CellUtil.cloneQualifier(cell);
                byte[] rowkey = CellUtil.cloneRow(cell);
                byte[] value = CellUtil.cloneValue(cell);
                //判断id和age字段,这两个字段是整形值
                if ("age".equals(Bytes.toString(qualifier_name)) || "id".equals(Bytes.toString(qualifier_name))) 
                    System.out.println("数据的rowkey为" + Bytes.toString(rowkey) + "======数据的列族为" + Bytes.toString(family_name) + "======数据的列名为" + Bytes.toString(qualifier_name) + "==========数据的值为" + Bytes.toInt(value));
                 else 
                    System.out.println("数据的rowkey为" + Bytes.toString(rowkey) + "======数据的列族为" + Bytes.toString(family_name) + "======数据的列名为" + Bytes.toString(qualifier_name) + "==========数据的值为" + Bytes.toString(value));
                
            
        
    

7、列族过滤器FamilyFilter

  • 查询列族名包含f2的所有列族下面的数据

 /**
     * 通过familyFilter来实现列族的过滤
     * 需要过滤,列族名包含f2
     * f1  f2   hello   world
     */
    @Test
    public void familyFilter() throws IOException 
        Table table = connection.getTable(TableName.valueOf(TABLE_NAME));
        Scan scan = new Scan();
        SubstringComparator substringComparator = new SubstringComparator("f2");
        //通过familyfilter来设置列族的过滤器
        FamilyFilter familyFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, substringComparator);
        scan.setFilter(familyFilter);
        ResultScanner scanner = table.getScanner(scan);
        for (Result result : scanner) 
            List<Cell> cells = result.listCells();
            for (Cell cell : cells) 
                byte[] family_name = CellUtil.cloneFamily(cell);
                byte[] qualifier_name = CellUtil.cloneQualifier(cell);
                byte[] rowkey = CellUtil.cloneRow(cell);
                byte[] value = CellUtil.cloneValue(cell);
                //判断id和age字段,这两个字段是整形值
                if ("age".equals(Bytes.toString(qualifier_name)) || "id".equals(Bytes.toString(qualifier_name))) 
                    System.out.println("数据的rowkey为" + Bytes.toString(rowkey) + "======数据的列族为" + Bytes.toString(family_name) + "======数据的列名为" + Bytes.toString(qualifier_name) + "==========数据的值为" + Bytes.toInt(value));
                 else 
                    System.out.println("数据的rowkey为" + Bytes.toString(rowkey) + "======数据的列族为" + Bytes.toString(family_name) + "======数据的列名为" + Bytes.toString(qualifier_name) + "==========数据的值为" + Bytes.toString(value));
                
            
        
    
数据的rowkey为0001======数据的列族为f2======数据的列名为address==========数据的值为地球人
数据的rowkey为0001======数据的列族为f2======数据的列名为age==========数据的值为18
数据的rowkey为0001======数据的列族为f2======数据的列名为id==========数据的值为25
数据的rowkey为0001======数据的列族为f2======数据的列名为name==========数据的值为zhangsan
数据的rowkey为0002======数据的列族为f2======数据的列名为address==========数据的值为沛国谯县
数据的rowkey为0002======数据的列族为f2======数据的列名为phone==========数据的值为16888888888
数据的rowkey为0002======数据的列族为f2======数据的列名为say==========数据的值为helloworld
数据的rowkey为0002======数据的列族为f2======数据的列名为sex==========数据的值为1
数据的rowkey为0003======数据的列族为f2======数据的列名为address==========数据的值为幽州涿郡涿县
数据的rowkey为0003======数据的列族为f2======数据的列名为phone==========数据的值为17888888888
数据的rowkey为0003======数据的列族为f2======数据的列名为say==========数据的值为talk is cheap , show me the code
数据的rowkey为0003======数据的列族为f2======数据的列名为sex==========数据的值为1
数据的rowkey为0004======数据的列族为f2======数据的列名为address==========数据的值为下邳
数据的rowkey为0004======数据的列族为f2======数据的列名为phone==========数据的值为12888888888
数据的rowkey为0004======数据的列族为f2======数据的列名为say==========数据的值为what are you 弄啥嘞!
数据的rowkey为0004======数据的列族为f2======数据的列名为sex==========数据的值为1
数据的rowkey为0005======数据的列族为f2======数据的列名为address==========数据的值为四川隆中
数据的rowkey为0005======数据的列族为f2======数据的列名为phone==========数据的值为14888888888
数据的rowkey为0005======数据的列族为f2======数据的列名为say==========数据的值为出师表你背了嘛
数据的rowkey为0005======数据的列族为f2======数据的列名为sex==========数据的值为1
数据的rowkey为0006======数据的列族为f2======数据的列名为address==========数据的值为哪里人有待考究
数据的rowkey为0006======数据的列族为f2======数据的列名为phone==========数据的值为15888888888
数据的rowkey为0006======数据的列族为f2======数据的列名为say==========数据的值为跟诸葛亮死掐
数据的rowkey为0006======数据的列族为f2======数据的列名为sex==========数据的值为1
数据的rowkey为0007======数据的列族为f2======数据的列名为address==========数据的值为内蒙人
数据的rowkey为0007======数据的列族为f2======数据的列名为phone==========数据的值为15788888888
数据的rowkey为0007======数据的列族为f2======数据的列名为say==========数据的值为貂蝉去哪了
数据的rowkey为0007======数据的列族为f2======数据的列名为sex==========数据的值为1

8、列过滤器QualifierFilter

/**
     * 列名过滤器 只查询包含name列的值
     */
    @Test
    public void  qualifierFilter() throws IOException 
        Scan scan = new Scan();
        SubstringComparator substringComparator = new SubstringComparator("name");
        //定义列名过滤器,只查询列名包含name的列
        QualifierFilter qualifierFilter = new QualifierFilter(CompareFilter.CompareOp.EQUAL, substringComparator);
        scan.setFilter(qualifierFilter);
        ResultScanner scanner = table.getScanner(scan);
        printResult(scanner);
    

9、列值过滤器ValueFilter

  • 查询所有列当中包含8的数据

 /**
     * 查询哪些字段值  包含数字8
     */
    @Test
    public void contains8() throws IOException 
        Scan scan = new Scan();
        SubstringComparator substringComparator = new SubstringComparator("8");
        //列值过滤器,过滤列值当中包含数字8的所有的列
        ValueFilter valueFilter = new ValueFilter(CompareFilter.CompareOp.EQUAL, substringComparator);
        scan.setFilter(valueFilter);
        ResultScanner scanner = table.getScanner(scan);
        printResult(scanner);
    

10、专用过滤器使用

1、单列值过滤器 SingleColumnValueFilter

  • SingleColumnValueFilter会返回满足条件的cell。所在行的所有cell的值

  • 查询名字为刘备的数据

/**
     * select  *  from  myuser where name  = '刘备'
     * 会返回我们符合条件数据的所有的字段
     *
     * SingleColumnValueExcludeFilter  列值排除过滤器
     *  select  *  from  myuser where name  != '刘备'
     */
    @Test
    public void singleColumnValueFilter() throws IOException 
        //查询 f1  列族 name  列  值为刘备的数据
        Scan scan = new Scan();
        //单列值过滤器,过滤  f1 列族  name  列  值为刘备的数据
        SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("f1".getBytes(), "name".getBytes(), CompareFilter.CompareOp.EQUAL, "刘备".getBytes());
        scan.setFilter(singleColumnValueFilter);
        ResultScanner scanner = table.getScanner(scan);
        printResult(scanner);
    

11、列值排除过滤器SingleColumnValueExcludeFilter

  • 与SingleColumnValueFilter相反

    • 如果指定列的值符合filter条件,则会排除掉row中指定的列,其他的列全部返回

    • 如果列不存在或不符合filter条件,则不返回row中的列

11、rowkey前缀过滤器PrefixFilter

  • 查询以00开头的所有前缀的rowkey

	/**
     * 查询rowkey前缀以  00开头的所有的数据
     */
    @Test
    public  void  prefixFilter() throws IOException 
        Scan scan = new Scan();
        //过滤rowkey以  00开头的数据
        PrefixFilter prefixFilter = new PrefixFilter("00".getBytes());
        scan.setFilter(prefixFilter);
        ResultScanner scanner = table.getScanner(scan);
        printlReult(scanner);
    

12、分页过滤器PageFilter

  • 通过pageFilter实现分页过滤器

/**
     * HBase当中的分页
     */
    @Test
    public void hbasePageFilter() throws IOException 
        int pageNum= 3;
        int pageSize = 2;
        Scan scan = new Scan();
        if(pageNum == 1 )
            //获取第一页的数据
            //scan.setMaxResultSize(pageSize);
            scan.setStartRow("".getBytes());
            //使用分页过滤器来实现数据的分页
            PageFilter filter = new PageFilter(pageSize);
            scan.setFilter(filter);
            ResultScanner scanner = table.getScanner(scan);
            printResult(scanner);
        else
            String  startRow = "";
            //扫描数据的调试 扫描五条数据
            int scanDatas = (pageNum - 1) * pageSize + 1;
            //scan.setMaxResultSize(scanDatas);//设置一步往前扫描多少条数据
            PageFilter filter = new PageFilter(scanDatas);
            scan.setFilter(filter);
            ResultScanner scanner = table.getScanner(scan);
            for (Result result : scanner) 
                byte[] row = result.getRow();//获取rowkey
                //最后一次startRow的值就是0005
                startRow= Bytes.toString(row);//循环遍历我们多有获取到的数据的rowkey
                //最后一条数据的rowkey就是我们需要的起始的rowkey
            
            //获取第三页的数据
            scan.setStartRow(startRow.getBytes());
            //scan.setMaxResultSize(pageSize);//设置我们扫描多少条数据
            PageFilter filter1 = new PageFilter(pageSize);
            scan.setFilter(filter1);
//            scan.setAllowPartialResults(true);
//            scan.setBatch(100);
            ResultScanner scanner1 = table.getScanner(scan);
            printResult(scanner1);
        
    

 分页过滤器二

@Test
    public void hbasePageFilter2() throws IOException 

        Scan scan = new Scan();
        int pageNum = 2;
        int pageSize = 2;
        String startRow = "000" + (pageNum - 1) * pageSize + 1;

        //获取第三页的数据
        scan.setStartRow(startRow.getBytes());
        //scan.setMaxResultSize(pageSize);//设置我们扫描多少条数据
        PageFilter filter1 = new PageFilter(pageSize);
        scan.setFilter(filter1);
//            scan.setAllowPartialResults(true);
//            scan.setBatch(100);
        ResultScanner scanner1 = table.getScanner(scan);
        printResult(scanner1);

    

13、多过滤器综合查询FilterList

  • 需求:使用SingleColumnValueFilter查询f1列族,name为刘备的数据,并且同时满足rowkey的前缀以00开头的数据(PrefixFilter)

   @Test
    public void filterList() throws IOException 
        Scan scan = new Scan();
        SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("f1".getBytes(), "name".getBytes(), CompareFilter.CompareOp.EQUAL, "刘备".getBytes());
        PrefixFilter prefixFilter = new PrefixFilter("00".getBytes());
        FilterList filterList = new FilterList();
        filterList.addFilter(singleColumnValueFilter);
        filterList.addFilter(prefixFilter);
        scan.setFilter(filterList);
        ResultScanner scanner = table.getScanner(scan);
        printResult(scanner);
    

14、根据rowkey删除数据

  • 删除rowkey为003的数据

/**
     * 删除数据
     */
    @Test
    public  void  deleteData() throws IOException 
        Delete delete = new Delete("0003".getBytes());
        table.delete(delete);
    

15、删除表

/**
     * 删除表
     */
    @Test
    public void deleteTable() throws IOException 
        //获取管理员对象,用于表的删除
        Admin admin = connection.getAdmin();
        //删除一张表之前,需要先禁用表
        admin.disableTable(TableName.valueOf(TABLE_NAME));
        admin.deleteTable(TableName.valueOf(TABLE_NAME));
    

HBase读写流程

  • 第一部分: HLog, WAL, 存储写入数据的指令, 而不是数据, HLog存储在hdfs之上

  • 第二部分: BlockCache

  • 第三部分: Region

    • 多个store模块, store模块和列族有关系

      • memoryStore

      • 多个StoreFile(HFile)

  • RegionServer --> n个Region

  • RegionServer -> 1HLog

  • RegionServer -> 1 BlockCache

  • Region -> n个store模块

  • store -> 1memorystroe(128) + n个storeFile

  1. Cli连接zk, 获取一张特殊表, hbase/meta表, 获取这张表对应的region, 存储在哪一个regionserver上, 然后将对应的信息缓存在客户端

  2. 客户端连接对应的regionserver, 读取对应的meta的信息(存储了hbase的元数据信息, 有哪些hbase表, 表对应的region在哪里, 以及每一个region管理的数据范围), hbase/meta表存储的是hbase的元数据信息 scan 'hbase:meta'

存储到blockcache: 为了提高后期查询的效率 (为了提高读的效率)

memorystore: 是为了提高写入的效率, 如果直接把文件写入到磁盘(阻塞) io效率差

  • 如果memorystore中的数据超过128M, memorystore中的数据会进入到一个flush队列, 书写到StoreFile

  • 刷写完成之后, 删除HLog中的历史数据

1、hbase读取数据流程 

说明:HBase集群,只有一张meta表,此表只有一个region,该region数据保存在一个HRegionServer上

  • 1、客户端首先与zk进行连接;

    • 从zk找到meta表的region位置,即meta表的数据存储在某一HRegionServer上;

    • 客户端与此HRegionServer建立连接,然后读取meta表中的数据;meta表中存储了所有用户表的region信息,我们可以通过scan 'hbase:meta'来查看meta表信息

  • 2、根据要查询的namespace、表名和rowkey信息。找到写入数据对应的region信息

  • 3、找到这个region对应的regionServer,然后发送请求

  • 4、查找并定位到对应的region

  • 5、先从memstore查找数据,如果没有,再从BlockCache上读取

    • HBase上Regionserver的内存分为两个部分

      • 一部分作为Memstore,主要用来写;

      • 另外一部分作为BlockCache,主要用于读数据;

  • 6、如果BlockCache中也没有找到,再到StoreFile上进行读取

    • 从storeFile中读取到数据之后,不是直接把结果数据返回给客户端,而是把数据先写入到BlockCache中,目的是为了加快后续的查询;然后再返回结果给客户端。

HBase写数据流程

  • 1、客户端首先从zk找到meta表的region位置,然后读取meta表中的数据,meta表中存储了用户表的region信息

  • 2、根据namespace、表名和rowkey信息。找到写入数据对应的region信息

  • 3、找到这个region对应的regionServer,然后发送请求

  • 4、把数据分别写到HLog(write ahead log)和memstore各一份

  • 5、memstore达到阈值后把数据刷到磁盘,生成storeFile文件

  • 6、删除HLog中的历史数据

补充:
HLog(write ahead log):
    也称为WAL意为Write ahead log,类似mysql中的binlog,用来做灾难恢复时用,HLog记录数据的所有变更,一旦数据修改,就可以从log中进行恢复。 

 MemStore的flush

1、Flush触发条件

1、 memstore级别限制

  • 当Region中任意一个MemStore的大小达到了上限(hbase.hregion.memstore.flush.size,默认128MB),会触发Memstore刷新。

<property>
    <name>hbase.hregion.memstore.flush.size</name>
    <value>134217728</value>
</property>

2、region级别限制

  • 当Region中所有Memstore的大小总和达到了上限(hbase.hregion.memstore.block.multiplier * hbase.hregion.memstore.flush.size,默认 2* 128M = 256M),会触发memstore刷新。

<property>
    <name>hbase.hregion.memstore.flush.size</name>
    <value>134217728</value>
</property>
<property>
    <name>hbase.hregion.memstore.block.multiplier</name>
    <value>4</value>
</property>   

3、Region Server级别限制

  • 当一个Region Server中所有Memstore的大小总和超过低水位阈值hbase.regionserver.global.memstore.size.lower.limit*hbase.regionserver.global.memstore.size(前者默认值0.95),RegionServer开始强制flush;

  • 先Flush Memstore最大的Region,再执行次大的,依次执行;

  • 如写入速度大于flush写出的速度,导致总MemStore大小超过高水位阈值hbase.regionserver.global.memstore.size(默认为JVM内存的40%),此时RegionServer会阻塞更新并强制执行flush,直到总MemStore大小低于低水位阈值

<property>
    <name>hbase.regionserver.global.memstore.size.lower.limit</name>
    <value>0.95</value>
</property>
<property>
    <name>hbase.regionserver.global.memstore.size</name>
    <value>0.4</value>
</property>

4、 HLog数量上限

  • 当一个Region Server中HLog数量达到上限(可通过参数hbase.regionserver.maxlogs配置)时,系统会选取最早的一个 HLog对应的一个或多个Region进行flush

5、定期刷新Memstore

  • 默认周期为1小时,确保Memstore不会长时间没有持久化。为避免所有的MemStore在同一时间都进行flush导致的问题,定期的flush操作有20000左右的随机延时。

6、手动flush

  • 用户可以通过shell命令flush ‘tablename’或者flush ‘region name’分别对一个表或者一个Region进行flush。

flush的流程

  • 为了减少flush过程对读写的影响,将整个flush过程分为三个阶段:

    • prepare阶段:遍历当前Region中所有的Memstore,将Memstore中当前数据集CellSkipListSet做一个快照snapshot;然后再新建一个CellSkipListSet。后期写入的数据都会写入新的CellSkipListSet中。prepare阶段需要加一把updateLock对写请求阻塞,结束之后会释放该锁。因为此阶段没有任何费时操作,因此持锁时间很短。

    • flush阶段:遍历所有Memstore,将prepare阶段生成的snapshot持久化为临时文件,临时文件会统一放到目录.tmp下。这个过程因为涉及到磁盘IO操作,因此相对比较耗时。

    • commit阶段:遍历所有Memstore,将flush阶段生成的临时文件移到指定的ColumnFamily目录下,针对HFile生成对应的storefile和Reader,把storefile添加到HStore的storefiles列表中,最后再清空prepare阶段生成的snapshot。

Compact合并机制

  • hbase为了==防止小文件过多==,以保证查询效率,hbase需要在必要的时候将这些小的store file合并成相对较大的store file,这个过程就称之为compaction。

  • 在hbase中主要存在两种类型的compaction合并

    • ==minor compaction 小合并==

    • ==major compaction 大合并==

minor compaction 小合并

  • 在将Store中多个HFile合并为一个HFile

    在这个过程中会选取一些小的、相邻的StoreFile将他们合并成一个更大的StoreFile,对于超过了==TTL的数据、更新的数据、删除的数据==仅仅只是做了标记。并没有进行物理删除,一次Minor Compaction的结果是更少并且更大的StoreFile。这种合并的触发频率很高。

  • minor compaction触发条件由以下几个参数共同决定:

<!--默认值3;表示一个store中至少有4个store file时,会触发minor compaction-->
<property>
    <name>hbase.hstore.compactionThreshold</name>
    <value>3</value>
</property>
​
<!--默认值10;表示一次minor compaction中最多合并10个store file-->
<property>
    <name>hbase.hstore.compaction.max</name>
    <value>10</value>
</property>
​
<!--默认值为128m;表示store file文件大小小于该值时,一定会加入到minor compaction的-->
<property>
    <name>hbase.hstore.compaction.min.size</name>
    <value>134217728</value>
</property>
​
<!--默认值为LONG.MAX_VALUE;表示store file文件大小大于该值时,一定会被minor compaction排除-->
<property>
    <name>hbase.hstore.compaction.max.size</name>
    <value>9223372036854775807</value>
</property>

2 major compaction 大合并

  • 合并Store中所有的HFile为一个HFile

    将所有的StoreFile合并成一个StoreFile,这个过程还会清理三类无意义数据:被删除的数据、TTL过期数据、版本号超过设定版本号的数据。合并频率比较低,默认7天执行一次,并且性能消耗非常大,建议生产关闭(设置为0),在应用空闲时间手动触发。一般可以是手动控制进行合并,防止出现在业务高峰期。

  • major compaction触发时间条件

    <!--默认值为7天进行一次大合并,-->
    <property>
        <name>hbase.hregion.majorcompaction</name>
        <value>604800000</value>
    </property>
  • 手动触发

    ##使用major_compact命令
    major_compact tableName

HBase表的预分区

  • 当一个table刚被创建的时候,Hbase默认的分配一个region给table。也就是说这个时候,所有的读写请求都会访问到同一个regionServer的同一个region中,这个时候就达不到负载均衡的效果了,集群中的其他regionServer就可能会处于比较空闲的状态。

  • 解决这个问题可以用pre-splitting,在创建table的时候就配置好,生成多个region。

  为何要预分区?

  • 增加数据读写效率

  • 负载均衡,防止数据倾斜

  • 方便集群容灾调度region

  • 优化Map数量

2、预分区原理

  • 每一个region维护着startRow与endRowKey,如果加入的数据符合某个region维护的rowKey范围,则该数据交给这个region维护。

3、手动指定预分区

  • 三种方式

第一种

create 'person','info1','info2',SPLITS => ['1000','2000','3000','4000'] 

 

方式二:也可以把分区规则创建于文件中

cd /kkb/install

vim split.txt

aaa
bbb
ccc
ddd 

create 'student','info',SPLITS_FILE => '/kkb/install/split.txt' 

方式三: HexStringSplit 算法

  • HexStringSplit会将数据从“00000000”到“FFFFFFFF”之间的数据长度按照n等分之后算出每一段的起始rowkey和结束rowkey,以此作为拆分点。

  • 例如:

create 'mytable', 'base_info',' extra_info', NUMREGIONS => 15, SPLITALGO => 'HexStringSplit' 

region的冷合并和热合并 

region合并说明

  • Region的合并不是为了性能, 而是出于便于运维的目的 .

  • 比如删除了大量的数据 ,这个时候每个Region都变得很小 ,存储多个Region就浪费了 ,这个时候可以把Region合并起来,进而可以减少一些Region服务器节点

一、冷合并 

这里通过org.apache.hadoop.hbase.util.Merge类来实现,不需要进入hbase shell,直接执行(==需要先关闭hbase集群==):

hbase org.apache.hadoop.hbase.util.Merge test test,,1565940912661.62d28d7d20f18debd2e7dac093bc09d8. test,1000,1565940912661.5b6f9e8dad3880bcc825826d12e81436.

2、通过online_merge热合并Region

  • ==不需要关闭hbase集群==,在线进行合并

  • 与冷合并不同的是,online_merge的传参是Region的hash值,而Region的hash值就是Region名称的最后那段在两个.之间的字符串部分。

  • 需求:需要把test表中的2个region数据进行合并: test,2000,1565940912661.c2212a3956b814a6f0d57a90983a8515. test,3000,1565940912661.553dd4db667814cf2f050561167ca030.

  • 需要进入hbase shell:

    merge_region 'a34185c9f32b1e3211d6d688664266f9','abf69cdc7335880918a60f3c917093e9'

hbase实战 

1、hbase实战一(hbase通过MR将一个表中数据,筛选到宁外一张表中)

  • 需求:==读取HBase当中myuser这张表的f1:name、f1:age数据,将数据写入到另外一张myuser2表的f1列族里面去==

  • 第一步:创建myuser2这张hbase表

注意:列族的名字要与myuser表的列族名字相同

create 'myuser2','f1'

 HBaseReadMapper类,继承TableMapper,拿到每条数据,并对数据进行过滤并封装

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;

import java.io.IOException;

/**
 * myuser f1: name&age => myuser2 f1
 */
public class HBaseReadMapper extends TableMapper<Text, Put> 
    /**
     *
     * @param key rowkey
     * @param value rowkey此行的数据 Result类型
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException 
        //获得roweky的字节数组
        byte[] rowkey_bytes = key.get();
        String rowkeyStr = Bytes.toString(rowkey_bytes);
        Text text = new Text(rowkeyStr);

        //输出数据 -> 写数据 -> Put 构建Put对象
        Put put = new Put(rowkey_bytes);
        //获取一行中所有的Cell对象
        Cell[] cells = value.rawCells();
        //将f1 : name& age输出
        for(Cell cell: cells) 
            //当前cell是否是f1
            //列族
            byte[] family_bytes = CellUtil.cloneFamily(cell);
            String familyStr = Bytes.toString(family_bytes);
            if("f1".equals(familyStr)) 
                //在判断是否是name | age
                byte[] qualifier_bytes = CellUtil.cloneQualifier(cell);
                String qualifierStr = Bytes.toString(qualifier_bytes);
                if("name".equals(qualifierStr)) 
                    put.add(cell);
                
                if("age".equals(qualifierStr)) 
                    put.add(cell);
                
            
        

        //判断是否为空;不为空,才

以上是关于Hbase的主要内容,如果未能解决你的问题,请参考以下文章

浅谈HBase的数据分布

为啥微信小程序青云一次只能上传一张图片,上限是五张,每一次添加都会换掉之前的图片?

HBase最佳实践-读性能优化策略

HBase读延迟的12种优化套

扫雷实用帖:HBase读延迟的12种优化套路

Facebook 连接图状态对象的评论上限为 25