HBase操作Java api

Posted 来自遥远的水星

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了HBase操作Java api相关的知识,希望对你有一定的参考价值。

一.导入依赖

创建模块,导入以下依赖,maven默认编译版本是1.5,用1.8编译。

pom.xml

    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.0.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>2.0.5</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

二.HBase工具类

package com.bigdata.hbasedemo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

/**
 * @description: TODO HBase操作工具类
 * @author: HaoWu
 * @create: 2020/7/15 22:51
 */
public class HBaseUtils {
    private static Admin admin = null;
    private static Configuration configuration = null;
    private static Connection connection = null;

    /**
     * 私有构造方法
     */
    private HBaseUtils() {
        //创建配置信息并配置
        configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", "hadoop102:2181,hadoop103:2181,hadoop104:2181");
        configuration.set("hbase.rootdir", "hdfs:hadoop102:8020/hbase");
        try {
            //获取HBase连接对象
            connection = ConnectionFactory.createConnection(configuration);
            //获取Admin对象
            admin = connection.getAdmin();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 私有成员变量,单例模式
     */
    private static HBaseUtils instance = null;

    public static synchronized HBaseUtils getInstance() {
        if (null == instance) {
            instance = new HBaseUtils();
        }
        return instance;
    }

    /**
     * 关闭连接
     */
    public static void close() {
        try {
            if (admin != null) {
                admin.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 根据表名获取Table实例
     *
     * @param tableName
     * @return
     */
    public static Table getTable(String nameSpace, String tableName) {
        Table table = null;
        try {
            table = connection.getTable(TableName.valueOf(nameSpace, tableName));
        } catch (IOException e) {
            e.printStackTrace();
        }
        return table;
    }

    /**
     * 创建表
     *
     * @param nameSpace:命名空间
     * @param tableName:表名
     * @param splitsKey:预分区
     * @param columnFamilies:可变列簇,可设置多个
     * @return
     */
    public static void createTable(String nameSpace, String tableName, byte[][] splitsKey, String... columnFamilies) {
        //获取TableName对象
        TableName name = TableName.valueOf(nameSpace, tableName);
        try {
            //判断表是否存在,存在就删除
            if (admin.tableExists(name)) {
                admin.disableTable(name);
                admin.deleteTable(name);
            }
            //列簇集合描述
            ArrayList<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
            for (String columnFamily : columnFamilies) {
                ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder.newBuilder(columnFamily.getBytes()).build();
                columnFamilyDescriptors.add(columnFamilyDescriptor);
            }
            //设置列簇,获取表描述器
            TableDescriptor build = TableDescriptorBuilder.newBuilder(name).setColumnFamilies(columnFamilyDescriptors).build();
            admin.createTable(build, splitsKey);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 修改表结构:删除列簇1,修改列簇2的VERSION为3,新增一个列簇3
     *
     * @param nameSpace
     * @param tableName
     * @param columnFamfily1
     * @param columnFamfily2
     * @param columnFamfily3 TODO 异常处理没做好,图方便,建议自己每个操作单独写方法进行异常处理
     */
    public static void alterTable(String nameSpace, String tableName, String columnFamfily1, String columnFamfily2, String columnFamfily3) {
        TableName name = TableName.valueOf(nameSpace, tableName);
        try {
            if (admin.tableExists(name)) {
                //1.删除列簇1
                admin.deleteColumnFamily(name, Bytes.toBytes(columnFamfily1));
                //2.修改列簇2
                ColumnFamilyDescriptor build2 = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamfily2)).setMinVersions(4).setMaxVersions(4).build();
                admin.modifyColumnFamily(name, build2);
                //3.增加列簇3
                ColumnFamilyDescriptor build3 = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(columnFamfily3)).build();
                admin.addColumnFamily(name, build3);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 删除表
     *
     * @param nameSpace
     * @param tableName
     */
    public static void deleteTable(String nameSpace, String tableName) {
        TableName name = TableName.valueOf(nameSpace, tableName);
        try {
            //1.禁用表
            admin.disableTable(name);
            //2.删除表
            admin.deleteTable(name);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    /**
     * 插入一条数据,值为String-单行单列簇单列
     *
     * @param tableName:表名
     * @param timeStamp:时间戳
     * @param rowKey:RowKey
     * @param columnFamily:列簇名
     * @param column:列限定符
     * @param value:String类型值
     */
    public static void putStringValue(String nameSpace, String tableName, long timeStamp, String rowKey, String columnFamily, String column, String value) {
        //1.获取表操作对象
        Table table = getTable(nameSpace, tableName);
        //2.封装Put对象,包括rowKey,timeStamp,列簇,列,列值
        Put put = new Put(Bytes.toBytes(rowKey), timeStamp);
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
        try {
            table.put(put);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (table != null) {
                try {
                    table.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 插入一条数据,值为int-单行单列簇单列
     *
     * @param tableName:表名
     * @param timeStamp:时间戳
     * @param rowKey:RowKey
     * @param columnFamily:列簇名
     * @param column:列限定符
     * @param value:int类型值
     */
    public static void putintValue(String nameSpace, String tableName, long timeStamp, String rowKey, String columnFamily, String column, int value) {
        //1.获取表操作对象
        Table table = getTable(nameSpace, tableName);
        //2.封装Put对象,包括rowKey,timeStamp,列簇,列,列值
        Put put = new Put(Bytes.toBytes(rowKey), timeStamp);
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
        try {
            table.put(put);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (table != null) {
                try {
                    table.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    /**
     * 插入多行,多列,单列簇
     *
     * @param nameSpace
     * @param tableName
     * @param columnFamily
     * @param mapList:将一个Cell数据封装成一个map<String,String>,所有的值类型要求为String。 mapList数据格式:{{ column=name, rowkey=1001, value=lisi},{ column=age, rowkey=1001, value=20},{ column=address, rowkey=1001, value="BeiJing"}...}
     */
    public static void putMultipleRowMultipleColumnOneFamily(String nameSpace, String tableName, String columnFamily, List<Map<String, String>> mapList) {
        //1.获取表操作对象
        Table table = getTable(nameSpace, tableName);
        //2.创建List<Put>对象
        List<Put> puts = new ArrayList<>();
        for (Map<String, String> map : mapList) {
            //3.将每个map数据封装成一个Put对象
            Put put = new Put(Bytes.toBytes(map.get("rowkey")));
            put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(map.get("column")), Bytes.toBytes(map.get("value")));
            puts.add(put);
        }
        try {
            //4.执行put操作
            table.put(puts);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (table != null) {
                try {
                    table.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 插入单行,多列,单列簇
     *
     * @param nameSpace
     * @param tableName
     * @param columnFamily
     * @param mapList:将一个Cell数据封装成一个map<String,String>,所有的值类型要求为String。 mapList数据格式:{{ column=name, rowkey=1001, value=lisi},{ column=age, rowkey=1001, value=20},{ column=address, rowkey=1001, value="BeiJing"}...}
     */
    public static void putOneRowMultipleColumnOneFamily(String nameSpace, String tableName, String rowKey, String columnFamily, List<Map<String, String>> mapList) {
        Table table = getTable(nameSpace, tableName);
        List<Put> puts = new ArrayList<>();
        for (Map<String, String> map : mapList) {
            Put put = new Put(Bytes.toBytes(rowKey));
            put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(map.get("column")), Bytes.toBytes(map.get("value")));
            puts.add(put);
        }
        try {
            table.put(puts);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (table != null) {
                try {
                    table.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     *修改数据和新增数据一致
     */

    /**
     * 删除单行单列簇单列数据
     *
     * @param nameSpace
     * @param tableName
     * @param family
     * @param column
     * @param rowkey
     */
    public static void deleteOneRowOneCloumn(String nameSpace, String tableName, String family, String column, String rowkey) {
        Table table = getTable(nameSpace, tableName);
        Delete delete = new Delete(Bytes.toBytes(rowkey));
        delete.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
        try {
            table.delete(delete);
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (table != null)
                try {
                    table.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
        }
    }

    /**
     * 查询单行单列簇-可变列
     * @param nameSpace
     * @param tableName
     * @param family
     * @param columns
     * @param rowkey
     */
    public static void get(String nameSpace, String tableName, String family,  String rowkey,String... columns) {
        //1.创建table对象
        Table table = getTable(nameSpace, tableName);
        //2.创建Get对象
        Get get = new Get(Bytes.toBytes(rowkey));
        //3.添加列描述
        for (String column : columns) {
            get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
        }
        try {
            //4.执行查询获取结果
            Result result = table.get(get);
            //5.获取Cell集合,然后遍历打印
            List<Cell> cells = result.listCells();
            for (Cell cell : cells) {
                //列簇
                System.out.println(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()));
                //列
                System.out.println(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
                //值
                if ("age".equals(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()))) {
                    //值为int类型
                    System.out.println(Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
                }else {
                    //值为String类型
                    System.out.println(Bytes.toInt(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));}
                System.out.println("***************************************************************************");
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (table != null)
                try {
                    table.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
        }
    }

    /**
     * Scan 扫描表-单列簇
     * @param nameSpace
     * @param tableName
     * @param family
     */
    public static void scanTable(String nameSpace, String tableName,String family){
        //1.创建table对象
        Table table = getTable(nameSpace, tableName);
        //2.创建Scan对象
        Scan scan = new Scan();
        //3.指定列簇
        scan.addFamily(Bytes.toBytes(family));
        try {
            //是否结果缓存
            scan.setCacheBlocks(false);
            //4.执行扫描,遍历结果
            ResultScanner resultScanners = table.getScanner(scan);
            Iterator<Result> iterator = resultScanners.iterator();
            while (iterator.hasNext()){
                Result result = iterator.next();
                List<Cell> cells = result.listCells();
                for (Cell cell : cells) {
                    //RowKey
                    System.out.print(Bytes.toString(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength())+":");
                    //列
                    System.out.print(Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength())+":");
                    //值
                    if("age".equals(Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()))){
                        //值为int类型
                        System.out.print(Bytes.toInt(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength())+":");

                    }else {
                        //值为String类型
                        System.out.print(Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength())+":");
                    }
                    System.out.println("***************************************************************************");
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (table != null)
                try {
                    table.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
        }
    }




    /**
     * 获取单值比较器:针对单列族单列进行String值过滤{小于|小于等于|等于|不等于|大于等于|大于}
     * @param family 列簇
     * @param column 列名
     * @param op 枚举类{LESS|LESS_OR_EQUAL|EQUAL|NOT_EQUAL|GREATER_OR_EQUAL|GREATER}
     *                 {小于|小于等于|等于|不等于|大于等于|大于}
     * @param value 值
     * @return
     */
    public static SingleColumnValueFilter getStringSingleColumnValueFilter(String family,String column,CompareOperator op,String value){
        return new SingleColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(column), op, Bytes.toBytes(value));
    }

    /**
     * 获取单值比较器:针对单列族单列进行int值过滤{小于|小于等于|等于|不等于|大于等于|大于}
     * @param family 列簇
     * @param column 列名
     * @param op 枚举类{LESS|LESS_OR_EQUAL|EQUAL|NOT_EQUAL|GREATER_OR_EQUAL|GREATER}
     *                 {小于|小于等于|等于|不等于|大于等于|大于}
     * @param value 值
     * @return
     */
    public static SingleColumnValueFilter getIntSingleColumnValueFilter(String family,String column,CompareOperator op,int value){
        return new SingleColumnValueFilter(Bytes.toBytes(family), Bytes.toBytes(column), op, Bytes.toBytes(value));
    }

    /**
     * 列簇过滤器
     * @param family
     */
    public static FamilyFilter getFamilyFilter(String family){
        return new FamilyFilter(CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes(family)));
    }

    /**
     *列过滤器
     * @param column
     * @return
     */
    public static QualifierFilter getQualifierFilter(String column){
        return new QualifierFilter(CompareOperator.EQUAL,new BinaryComparator(Bytes.toBytes(column)));
    }

    /**
     * 针对全表的值:过滤出包含值包含"value"的行
     * 在 where name like %value% 结合列簇->列->值过滤达到效果
     * @param value
     * @return
     */
    public static ValueFilter getValueFilter(String value){
        return new ValueFilter(CompareOperator.EQUAL, new SubstringComparator(value));
    }

    /**
     * 获取过滤器链:必须全满足 filter1 and filter2 and filter3
     * @param filters
     * @return
     */
    public static FilterList getFilterListALL(Filter... filters){
        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
        for (Filter filter : filters) {
            filterList.addFilter(filter);
        }
        return filterList;
    }

    /**
     * 获取过滤器链:最少有一个满足 filter1 or filter2 or filter3
     * @param filters
     * @return
     */
    public static FilterList getFilterListOne(Filter... filters){
        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
        for (Filter filter : filters) {
            filterList.addFilter(filter);
        }
        return filterList;
    }

}

三.测试类

package com.bigdata.hbasedemo;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @description: HBaseUtils工具测试类
 * @author: HaoWu
 * @create: 2020/7/16 8:16
 */
public class HBaseUtilsTest {
    //获取工具类对象
    HBaseUtils hBaseUtils = HBaseUtils.getInstance();
    /**
     * 测试创建表
     */
    @Test
    public void createTable() {
        String nameSpace = "bigdata";
        String tableName = "user6";
        byte[][] splitsKey = new byte[][]{
                Bytes.toBytes("10"),
                Bytes.toBytes("20"),
                Bytes.toBytes("30"),
                Bytes.toBytes("40")};
        String columnFamily1 = "info1";
        String columnFamily2 = "info2";
        hBaseUtils.createTable(nameSpace, tableName, splitsKey, columnFamily1, columnFamily2);
    }

    /**
     * 测试修改表
     */
    @Test
    public void alterTable() {
        hBaseUtils.alterTable("bigdata", "user", "info1", "info2", "info3");
    }

    /**
     * 删除表
     */
    @Test
    public void deleteTable(){
        hBaseUtils.deleteTable("bigdata","user5");
    }
    /**
     * 插入一条数据,值为String类型
     */
    @Test
    public void putStringValue() {
        hBaseUtils.putStringValue("bigdata", "user", 1221312312352L, "1004", "info2", "name", "lisi");
    }

    /**
     * 插入一条数据,值为int类型
     */
    @Test
    public void putintValue() {
        hBaseUtils.putintValue("bigdata", "user", 1221312312321L, "1004", "info2", "age", 25);
    }

    /**
     * 插入多行,多列,多值,单列簇
     */
    @Test
    public void putMultipleRowMultipleColumnOneFamily() {
        //数据格式:{{ "column"="name", "rowkey"="1001", "value"="lisi"},{ "column"="address", "rowkey"="1001", "value"="BeiJing"}}
        List<Map<String, String>> mapList = new ArrayList<>();
        ConcurrentHashMap<String, String> map1 = new ConcurrentHashMap<>();
        map1.put("column","name");
        map1.put("rowkey","1003");
        map1.put("value","lisi1");
        System.out.println(map1);
        ConcurrentHashMap<String, String> map2 = new ConcurrentHashMap<>();
        map2.put("column","address");
        map2.put("rowkey","1003");
        map2.put("value","BeiJing");
        ConcurrentHashMap<String, String> map3 = new ConcurrentHashMap<>();
        map3.put("column","name");
        map3.put("rowkey","1004");
        map3.put("value","wangwu1");
        mapList.add(map1);
        mapList.add(map2);
        mapList.add(map3);
        System.out.println(mapList.toString());
        hBaseUtils.putMultipleRowMultipleColumnOneFamily("bigdata", "user", "info2", mapList);
    }

    /**
     * 删除单行单列簇单列
     */
    @Test
    public void deleteOnerowOneColumn(){
        hBaseUtils.deleteOneRowOneCloumn("bigdata","user","info2","address","1001");
    }

    /**
     * 查询单行单列簇-可变列
     */
    @Test
    public void get(){
        hBaseUtils.get("bigdata","user","info2","1001","age","name");
    }

    /**
     * scan 扫描表-单列簇
     */
    @Test
    public void scanTable(){
        hBaseUtils.scanTable("bigdata","user","info2");
    }

    /**
     * 根据条件过滤:where name="wangwu" or(name="lisi" and age>30)
     *
     */
    @Test
    public void testFilter1() throws IOException {
        Table table = hBaseUtils.getTable("bigdata", "user");
        Scan scan = new Scan();
        //name="lisi"
        SingleColumnValueFilter filter1 = hBaseUtils.getStringSingleColumnValueFilter("info2", "name", CompareOperator.EQUAL, "lisi");
        //age>30
        SingleColumnValueFilter filter2 = hBaseUtils.getIntSingleColumnValueFilter("info2", "age", CompareOperator.GREATER,30);
        //name="lisi" and age>30
        FilterList filterList = hBaseUtils.getFilterListALL(filter1, filter2);
        //name="wangwu"
        SingleColumnValueFilter filter3 = hBaseUtils.getStringSingleColumnValueFilter("info2", "name", CompareOperator.EQUAL, "wangwu");
        //name="wangwu" or(name="lisi" and age>30)
        FilterList filters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
        filters.addFilter(filter3);
        filters.addFilter(filterList);
        //设置过滤器
        scan.setFilter(filters);
        ResultScanner scanner = table.getScanner(scan);
        Iterator<Result> iterator = scanner.iterator();
        while (iterator.hasNext()){
            Result result = iterator.next();
            List<Cell> cells = result.listCells();
            for (Cell cell : cells) {
                //RowKey
                System.out.print(Bytes.toString(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength())+":");
                //列
                System.out.print(Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength())+":");
                //值
                if("age".equals(Bytes.toString(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength()))){
                    //值为int类型
                    System.out.print(Bytes.toInt(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength())+":");

                }else {
                    //值为String类型
                    System.out.print(Bytes.toString(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength())+":");
                }
                System.out.println("***************************************************************************");
            }
        }
    }



    /**
     * 关闭admin, connection
     */
    @Test
    public void close(){
        hBaseUtils.close();
    }


}

以上是关于HBase操作Java api的主要内容,如果未能解决你的问题,请参考以下文章

HBase新版本Java API编程实战及基本操作方法封装

hbase-api

hbase-api

如何使用Java API操作Hbase

HBase 6用Phoenix Java api操作HBase

hadoop2-HBase的Java API操作