Java 操作HBase

Posted 叶子上的考拉

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 操作HBase相关的知识,希望对你有一定的参考价值。

Java 操作HBase

思路

1.建立连接

2.针对表的操作(创建表、删除表、判断表是否存在、使用/禁用表、列出表)

3.针对数据的操作(添加、删除、修改、查看)

4.关闭连接

HBase常用的Java API

Java API 接口 :可以理解为别人封装好的方法,可以直接调用

(一)Admin

管理HBase数据库信息(包括创建、删除表、列出表项、使表有效/无效、添加/删除表的列族成员、检查HBase运行状态等)

方法名释义
addColumn添加列族
createTable创建表
deleteTable删除表
disableTable使表无效
enableTable使表有效
tableExists判断表是否存在
listTable列出所有表项

(二)HBaseConfiguration

管理HBase的配置信息

方法名释义
create创建Configuration

(三)Table

Connection.getTable()返回Table的一个子对象,用于与HBase进行通信

方法名释义
close关闭
delete删除(单元格、行)
exists检查指定的Get对象是否存在
get获取指定的值
put添加数据
HTableDescriptor获取HTableDescriptor实例
getName获取当前表格的名字实例

(四)HTableDescriptor

HTableDescriptor包含了HBase中的表格的详细信息(例如表中的列族、表的类型、表是否只读、MemoStore的最大空间、Region分裂时机等)

方法名释义
addFamily添加列族
getFamilies返回表中所有列族的名字
getTableName返回表的名字实例
getValue获取某个属性的值
removeFamily删除某个列族
setValue设置属性的值

(五)HColumnDescriptor

HColumnDescriptor包含了列族的详细信息(例如列族的版本号、压缩设置等)。HColumnDescriptor通常在添加列族或者创建表时使用。列族一旦建立就不能被修改,只能通过删除再重新创建的方式来间接修改。一旦列族被删除,则其中包含的数据也会被删除

方法名释义
getName获取列族的名字
getValue获取单元格的值
setValue设置单元格的值

(六)Put

用来对单元格执行添加数据的操作

方法名释义
add将指定的列族、列对应的值添加到Put实例中
get获取列族和列限定符指定列中的所有单元格的值

(七)Get

用来获取当行的信息

方法名释义
addColumn根据列族和列限定符获取对应列
setFilter为获取具体的列设置相应的过滤器

(八)Result

用于存放Get或者Scan操作后的查询结果,以<key, value>的格式存储在map结构中。

该类不是线程安全的

方法名释义
containsColumn检查是否包含列族和列限定符指定的列
getColumnCells获得列族和列限定符指定列的列中的所有单元格
getValue获得列族和列限定符指定的单元格的最新值

(九)Scan

可以用Scan来限定需要查找的数据(包括但是不限于限定版本号、起始行号、终止行号、列族、列限定符、返回值的数量上限)

方法名释义
addFamily限定需要查找的列族
addColumn限定列族和列限定符指定的列
setFilter指定Filter来过滤掉不需要的数据
setStartRow限定开始的行,否则从表头开始
setStopRow限定结束的行(不包含此行)
setBatch限定最多返回的单元格数目。用于防止返回过多的数据导致OutofMemory(内存溢出)错误

Java 操作HBase实例

(一)编辑pom.xml文件

往其中添加

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.4.13</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.4.13</version>
        </dependency>

(二)编写操作HBase的文件ExampleForHBase.java

实例说明

1.创建一个学生信息表,用来存储学生的姓名(姓名作为行键,且假设姓名不会重复)以及考试成绩,其中考试成绩(score)是一个列族,存储了各个科目的考试成绩。然后向student中添加数据

2.student表结构及数据

namescore
English MathComputer
zhangsan 69 8677
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;

import java.io.IOException;

public class ExampleForHBase 

    public static Configuration conf;//管理HBase的配置信息
    public static Connection conn;//管理HBase的连接
    public static Admin admin;//管理HBase数据库的连接

    /**
     * 创建相关连接
     *
     * @throws IOException 可能出现的异常
     */
    public static void init() throws IOException 
        conf = HBaseConfiguration.create();
        System.setProperty("HADOOP_USER_NAME", "hadoop");
        conf.set("HADOOP_USER_NAME", "hadoop");
        conf.set("hbase.root.dir", "hdfs://master:9000/hbase");
        conf.set("hbase.zookeeper.quorum", "master");//配置Zookeeper的ip地址
        conf.set("hbase.zookeeper.property.clientPort", "2181");//配置zookeeper的端口

        conn = ConnectionFactory.createConnection(conf);
        admin = conn.getAdmin();
    

    /**
     * 关闭所有连接
     *
     * @throws IOException 可能出现的异常
     */
    public static void close() throws IOException 
        if (admin != null)
            admin.close();
        if (conn != null)
            conn.close();
    

    /**
     * 创建表
     * @param myTableName 表名
     * @param colFamily 列族名的数组
     * @throws IOException 可能出现的异常
     */
    public static void createTable(String myTableName, String[] colFamily) throws IOException 
        TableName tableName = TableName.valueOf(myTableName);
        if (admin.tableExists(tableName)) 
            System.out.println(myTableName + "表已经存在");
         else 
            HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
            for (String str : colFamily) 
                HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(str);
                hTableDescriptor.addFamily(hColumnDescriptor);
            
            admin.createTable(hTableDescriptor);
        
    

    /**
     * 添加数据
     * @param tableName 表名
     * @param rowkey 行键
     * @param colFamily 列族
     * @param col 列
     * @param value 值
     * @throws IOException 可能出现的异常
     */
    public static void insertData(String tableName,String rowkey,String colFamily,String col,String value) throws IOException 
        Table table = conn.getTable(TableName.valueOf(tableName));
        Put put = new Put(rowkey.getBytes());
        put.addColumn(colFamily.getBytes(),col.getBytes(),value.getBytes());
        table.put(put);
        table.close();
    

    /**
     * 根据行键删除数据
     * @param tableName 表名
     * @param rowkey 行键
     * @throws IOException 可能出现的异常
     */
    public static void deleteData(String tableName,String rowkey) throws IOException 
        Table table = conn.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(rowkey.getBytes());
        table.delete(delete);
        table.close();
    

    /**
     * 获取数据
     * @param tableName 表名
     * @param rowkey 行键
     * @param colFamily 列族
     * @param col 列
     * @throws IOException 可能出现的异常
     */
    public static void getData(String tableName,String rowkey,String colFamily,String col) throws IOException 
        Table table = conn.getTable(TableName.valueOf(tableName));
        Get get = new Get(rowkey.getBytes());
        get.addColumn(colFamily.getBytes(),col.getBytes());
        Result result = table.get(get);
        System.out.println(new String(result.getValue(colFamily.getBytes(),col.getBytes())));
        table.close();
    

    public static void main(String[] args) throws IOException 
        init();
        createTable("student",new String[]"score");
        insertData("student","zhangsan","score","English","69");
        insertData("student","zhangsan","score","Math","86");
        insertData("student","zhangsan","score","Computer","77");
        getData("student","zhangsan","score","Computer");
        close();
    

HBase Java 操作 HBase 教程

一、简介

在上一篇文章 HBase 基础入门 中,我们已经介绍了 HBase 的一些基本概念,以及如何安装使用的方法。
那么,作为一名 Javaer,自然是希望用 Java 的方式来与 HBase 进行对话了。
所幸的是,HBase 本身就是用 Java 编写的,天生自带了 Java 原生API。 我们可以通过 hbase-client 来实现 HBase 数据库的操作。
所以,这次主要介绍该组件的基本用法。

在使用 hbase-client 之前,有几个要点需要注意:

  • 客户端需要能访问 Zoopkeeper,再获得 HMaster、RegionServer 实例进行操作
  • 客户端需运行在HBase/Hadoop 集群内,HBase会使用 hostname 来定位节点,因此要求客户端能访问到对应的主机名(或子域名)
    如果是远程客户端则需要配置本地的hosts文件。

下面这个图,有助于理解 Client 与 HBase 集群的交互架构:

下面开始介绍 client 的使用。

二、hbase-client 引入

在 Maven 的 pom.xml 中添加依赖:

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>2.1.5</version>
    <exclusions>
        <exclusion>
            <artifactId>slf4j-api</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
        <exclusion>
            <artifactId>slf4j-log4j12</artifactId>
            <groupId>org.slf4j</groupId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase</artifactId>
    <version>2.1.5</version>
</dependency>

这里需要注意的是,客户端版本和 HBase 版本需要保持一致,否则可能会遇到不兼容的问题。

三、连接操作

示例代码:

/**
 * 建立连接
 *
 * @return
 */
public static Connection getConnection() {
    try {
        //获取配置
        Configuration configuration = getConfiguration();
        //检查配置
        HBaseAdmin.checkHBaseAvailable(configuration);
        return ConnectionFactory.createConnection(configuration);
    } catch (IOException | ServiceException e) {
        throw new RuntimeException(e);
    }
}

/**
 * 获取配置
 *
 * @return
 */
private static Configuration getConfiguration() {
    try {
        Properties props = PropertiesLoaderUtils.loadAllProperties("hbase.properties");
        String clientPort = props.getProperty("hbase.zookeeper.property.clientPort");
        String quorum = props.getProperty("hbase.zookeeper.quorum");

        logger.info("connect to zookeeper {}:{}", quorum, clientPort);

        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.property.clientPort", clientPort);
        config.set("hbase.zookeeper.quorum", quorum);
        return config;
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}

四、表操作

增删改查方法封装如下:

/**
 * 创建表
 * @param connection
 * @param tableName
 * @param columnFamilies
 * @throws IOException
 */
public static void createTable(Connection connection, TableName tableName, String... columnFamilies) throws IOException {
    Admin admin = null;
    try {
        admin = connection.getAdmin();
        if (admin.tableExists(tableName)) {
            logger.warn("table:{} exists!", tableName.getName());
        } else {
            TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
            for (String columnFamily : columnFamilies) {
                builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(columnFamily));
            }
            admin.createTable(builder.build());
            logger.info("create table:{} success!", tableName.getName());
        }
    } finally {
        if (admin != null) {
            admin.close();
        }
    }
}


/**
 * 插入数据
 *
 * @param connection
 * @param tableName
 * @param rowKey
 * @param columnFamily
 * @param column
 * @param data
 * @throws IOException
 */
public static void put(Connection connection, TableName tableName,
                       String rowKey, String columnFamily, String column, String data) throws IOException {

    Table table = null;
    try {
        table = connection.getTable(tableName);
        Put put = new Put(Bytes.toBytes(rowKey));
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(data));
        table.put(put);
    } finally {
        if (table != null) {
            table.close();
        }
    }
}

/**
 * 根据row key、column 读取
 *
 * @param connection
 * @param tableName
 * @param rowKey
 * @param columnFamily
 * @param column
 * @throws IOException
 */
public static String getCell(Connection connection, TableName tableName, String rowKey, String columnFamily, String column) throws IOException {
    Table table = null;
    try {
        table = connection.getTable(tableName);
        Get get = new Get(Bytes.toBytes(rowKey));
        get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));

        Result result = table.get(get);
        List<Cell> cells = result.listCells();

        if (CollectionUtils.isEmpty(cells)) {
            return null;
        }
        String value = new String(CellUtil.cloneValue(cells.get(0)), "UTF-8");
        return value;
    } finally {
        if (table != null) {
            table.close();
        }
    }
}

/**
 * 根据rowkey 获取一行
 *
 * @param connection
 * @param tableName
 * @param rowKey
 * @return
 * @throws IOException
 */
public static Map<String, String> getRow(Connection connection, TableName tableName, String rowKey) throws IOException {
    Table table = null;
    try {
        table = connection.getTable(tableName);
        Get get = new Get(Bytes.toBytes(rowKey));

        Result result = table.get(get);
        List<Cell> cells = result.listCells();

        if (CollectionUtils.isEmpty(cells)) {
            return Collections.emptyMap();
        }
        Map<String, String> objectMap = new HashMap<>();
        for (Cell cell : cells) {
            String qualifier = new String(CellUtil.cloneQualifier(cell));
            String value = new String(CellUtil.cloneValue(cell), "UTF-8");
            objectMap.put(qualifier, value);
        }
        return objectMap;
    } finally {
        if (table != null) {
            table.close();
        }
    }
}

/**
 * 扫描权标的内容
 *
 * @param connection
 * @param tableName
 * @param rowkeyStart
 * @param rowkeyEnd
 * @throws IOException
 */
public static List<Map<String, String>> scan(Connection connection, TableName tableName, String rowkeyStart, String rowkeyEnd) throws IOException {
    Table table = null;
    try {
        table = connection.getTable(tableName);
        ResultScanner rs = null;
        try {
            Scan scan = new Scan();
            if (!StringUtils.isEmpty(rowkeyStart)) {
                scan.withStartRow(Bytes.toBytes(rowkeyStart));
            }
            if (!StringUtils.isEmpty(rowkeyEnd)) {
                scan.withStopRow(Bytes.toBytes(rowkeyEnd));
            }
            rs = table.getScanner(scan);

            List<Map<String, String>> dataList = new ArrayList<>();
            for (Result r : rs) {
                Map<String, String> objectMap = new HashMap<>();
                for (Cell cell : r.listCells()) {
                    String qualifier = new String(CellUtil.cloneQualifier(cell));
                    String value = new String(CellUtil.cloneValue(cell), "UTF-8");
                    objectMap.put(qualifier, value);
                }
                dataList.add(objectMap);
            }
            return dataList;
        } finally {
            if (rs != null) {
                rs.close();
            }
        }
    } finally {
        if (table != null) {
            table.close();
        }
    }
}

/**
 * 删除表
 *
 * @param connection
 * @param tableName
 * @throws IOException
 */
public static void deleteTable(Connection connection, TableName tableName) throws IOException {
    Admin admin = null;
    try {
        admin = connection.getAdmin();
        if (admin.tableExists(tableName)) {
            //现执行disable
            admin.disableTable(tableName);
            admin.deleteTable(tableName);
        }
    } finally {
        if (admin != null) {
            admin.close();
        }
    }
}

五、运行测试

最后,我们仍然以上一篇文章中的设备数据作为例子:

  1. 建立 DeviceState 表;
  2. 定义 name/state 两个列簇;
  3. 写入列数据;
  4. 读取列、行,范围读取;
  5. 删除操作

最终实现的代码如下:

private static final Logger logger = LoggerFactory.getLogger(HBaseTest.class);

public static void main(String[] args) {

    Connection connection = null;
    try {
        connection = getConnection();
        TableName tableName = TableName.valueOf("DeviceState");

        //创建DeviceState表
        createTable(connection, tableName, "name", "state");

        logger.info("创建表 {}", tableName.getNameAsString());

        //写入数据
        put(connection, tableName, "row1", "name", "c1", "空调");
        put(connection, tableName, "row1", "state", "c2", "打开");
        put(connection, tableName, "row2", "name", "c1", "电视机");
        put(connection, tableName, "row2", "state", "c2", "关闭");

        logger.info("写入数据.");

        String value = getCell(connection, tableName, "row1", "state", "c2");
        logger.info("读取单元格-row1.state:{}", value);

        Map<String, String> row = getRow(connection, tableName, "row2");
        logger.info("读取单元格-row2:{}", JsonUtil.toJson(row));

        List<Map<String, String>> dataList = scan(connection, tableName, null, null);
        logger.info("扫描表结果-:\\n{}", JsonUtil.toPrettyJson(dataList));

        //删除DeviceState表
        deleteTable(connection, tableName);
        logger.info("删除表 {}", tableName.getNameAsString());

        logger.info("操作完成.");
    } catch (Exception e) {
        logger.error("操作出错", e);
    } finally {
        if (connection != null) {
            try {
                connection.close();
            } catch (IOException e) {
                logger.error("error occurs", e);
            }
        }
    }

}

执行代码,控制台输出如下:

INFO -createTable(HBaseTest.java:89) - create table:[68, 101, 118, 105, 99, 101, 83, 116, 97, 116, 101] success!
INFO -main(HBaseTest.java:32) - 创建表 DeviceState
INFO -main(HBaseTest.java:40) - 写入数据.
INFO -main(HBaseTest.java:43) - 读取单元格-row1.state:打开
INFO -main(HBaseTest.java:46) - 读取单元格-row2:{"c1":"电视机","c2":"关闭"}
INFO -main(HBaseTest.java:49) - 扫描表结果-:
[ {
  "c1" : "空调",
  "c2" : "打开"
}, {
  "c1" : "电视机",
  "c2" : "关闭"
} ]
INFO -HBaseAdmin$9.call(HBaseAdmin.java:1380) - Started disable of DeviceState
INFO -HBaseAdmin$DisableTableFuture.postOperationResult(HBaseAdmin.java:1409) - Disabled DeviceState
INFO -HBaseAdmin$DeleteTableFuture.postOperationResult(HBaseAdmin.java:965) - Deleted DeviceState
INFO -main(HBaseTest.java:53) - 删除表 DeviceState
INFO -main(HBaseTest.java:55) - 操作完成.

此时Java Client已经完成制作。

FAQ

  • 提示报错 找不到winutils程序

Failed to locate the winutils binary in the hadoop binary path
原因是在Windows下依赖一个winutils.exe程序,该程序通过${HADOOP_HOME}/bin 来查找。
该报错不影响程序执行,但如果要规避问题,需要下载hadoop-commons-master,再配置变量HADOOP_HOME
可参考地址:https://blog.csdn.net/ycf921244819/article/details/81706119

  • 提示报错,UnknownHostException,无法找到节点..
    原因是客户端无法解析HMaster实例节点的主机名
    需要编辑 C:\\Windows\\System32\\drivers\\etc\\hosts 文件,添加对应的映射,如下:
47.xx.8x.xx izwz925kr63w5jitjys6dtt

参考文档

官方文档
https://hbase.apache.org/book.html#quickstart
Java HBase客户端API
https://www.baeldung.com/hbase

以上是关于Java 操作HBase的主要内容,如果未能解决你的问题,请参考以下文章

大数据技术之HBaseHBase简介HBase快速入门HBase进阶

大数据技术之HBaseHBase简介HBase快速入门HBase进阶

HBaseHBase集群Shell操作

主流 NoSQL 数据库对比

头歌 HBase(相关的五个实验)

HBaseHBase入门简介