Java 操作 HBase 教程

Posted Java3y

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 操作 HBase 教程相关的知识,希望对你有一定的参考价值。


本文公众号来源:美码师
作者:美码师
本文已收录至我的 GitHub

一、简介

在上一篇文章 中,我们已经介绍了 HBase 的一些基本概念,以及如何安装使用的方法。那么,作为一名 Javaer,自然是希望用 Java 的方式来与 HBase 进行对话了。所幸的是,HBase 本身就是用 Java 编写的,天生自带了 Java 原生API。我们可以通过 hbase-client 来实现 HBase 数据库的操作。所以,这次主要介绍该组件的基本用法。

在使用 hbase-client 之前,有几个要点需要注意:

  • 客户端需要能访问 Zoopkeeper,再获得 HMaster、RegionServer 实例进行操作

  • 客户端需运行在HBase/Hadoop 集群内,HBase会使用 hostname 来定位节点,因此要求客户端能访问到对应的主机名(或子域名) 如果是远程客户端则需要配置本地的hosts文件。

下面这个图,有助于理解 Client 与 HBase 集群的交互架构:

Java 操作 HBase 教程

下面开始介绍 client 的使用。


二、hbase-client 引入

在 Maven 的 pom.xml 中添加依赖:

 
   
   
 
  1. <dependency>

  2. <groupId>org.apache.hbase</groupId>

  3. <artifactId>hbase-client</artifactId>

  4. <version>2.1.5</version>

  5. <exclusions>

  6. <exclusion>

  7. <artifactId>slf4j-api</artifactId>

  8. <groupId>org.slf4j</groupId>

  9. </exclusion>

  10. <exclusion>

  11. <artifactId>slf4j-log4j12</artifactId>

  12. <groupId>org.slf4j</groupId>

  13. </exclusion>

  14. </exclusions>

  15. </dependency>

  16. <dependency>

  17. <groupId>org.apache.hbase</groupId>

  18. <artifactId>hbase</artifactId>

  19. <version>2.1.5</version>

  20. </dependency>

这里需要注意的是,客户端版本和 HBase 版本需要保持一致,否则可能会遇到不兼容的问题。


三、连接操作

示例代码:

 
   
   
 
  1. /**

  2. * 建立连接

  3. *

  4. * @return

  5. */

  6. public static Connection getConnection() {

  7. try {

  8. //获取配置

  9. Configuration configuration = getConfiguration();

  10. //检查配置

  11. HBaseAdmin.checkHBaseAvailable(configuration);

  12. return ConnectionFactory.createConnection(configuration);

  13. } catch (IOException | ServiceException e) {

  14. throw new RuntimeException(e);

  15. }

  16. }


  17. /**

  18. * 获取配置

  19. *

  20. * @return

  21. */

  22. private static Configuration getConfiguration() {

  23. try {

  24. Properties props = PropertiesLoaderUtils

  25. .loadAllProperties("hbase.properties");

  26. String clientPort = props

  27. .getProperty("hbase.zookeeper.property.clientPort");

  28. String quorum = props

  29. .getProperty("hbase.zookeeper.quorum");


  30. logger.info("connect to zookeeper {}:{}", quorum, clientPort);


  31. Configuration config = HBaseConfiguration.create();

  32. config.set("hbase.zookeeper.property.clientPort", clientPort);

  33. config.set("hbase.zookeeper.quorum", quorum);

  34. return config;

  35. } catch (IOException e) {

  36. throw new RuntimeException(e);

  37. }

  38. }

四、表操作

增删改查方法封装如下:

 
   
   
 
  1. /**

  2. * 创建表

  3. * @param connection

  4. * @param tableName

  5. * @param columnFamilies

  6. * @throws IOException

  7. */

  8. public static void createTable(Connection connection, TableName tableName, String... columnFamilies) throws IOException {

  9. Admin admin = null;

  10. try {

  11. admin = connection.getAdmin();

  12. if (admin.tableExists(tableName)) {

  13. logger.warn("table:{} exists!", tableName.getName());

  14. } else {

  15. TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);

  16. for (String columnFamily : columnFamilies) {

  17. builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(columnFamily));

  18. }

  19. admin.createTable(builder.build());

  20. logger.info("create table:{} success!", tableName.getName());

  21. }

  22. } finally {

  23. if (admin != null) {

  24. admin.close();

  25. }

  26. }

  27. }



  28. /**

  29. * 插入数据

  30. *

  31. * @param connection

  32. * @param tableName

  33. * @param rowKey

  34. * @param columnFamily

  35. * @param column

  36. * @param data

  37. * @throws IOException

  38. */

  39. public static void put(Connection connection, TableName tableName,

  40. String rowKey, String columnFamily, String column, String data) throws IOException {


  41. Table table = null;

  42. try {

  43. table = connection.getTable(tableName);

  44. Put put = new Put(Bytes.toBytes(rowKey));

  45. put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(data));

  46. table.put(put);

  47. } finally {

  48. if (table != null) {

  49. table.close();

  50. }

  51. }

  52. }


  53. /**

  54. * 根据row key、column 读取

  55. *

  56. * @param connection

  57. * @param tableName

  58. * @param rowKey

  59. * @param columnFamily

  60. * @param column

  61. * @throws IOException

  62. */

  63. public static String getCell(Connection connection, TableName tableName, String rowKey, String columnFamily, String column) throws IOException {

  64. Table table = null;

  65. try {

  66. table = connection.getTable(tableName);

  67. Get get = new Get(Bytes.toBytes(rowKey));

  68. get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));


  69. Result result = table.get(get);

  70. List<Cell> cells = result.listCells();


  71. if (CollectionUtils.isEmpty(cells)) {

  72. return null;

  73. }

  74. String value = new String(CellUtil.cloneValue(cells.get(0)), "UTF-8");

  75. return value;

  76. } finally {

  77. if (table != null) {

  78. table.close();

  79. }

  80. }

  81. }


  82. /**

  83. * 根据rowkey 获取一行

  84. *

  85. * @param connection

  86. * @param tableName

  87. * @param rowKey

  88. * @return

  89. * @throws IOException

  90. */

  91. public static Map<String, String> getRow(Connection connection, TableName tableName, String rowKey) throws IOException {

  92. Table table = null;

  93. try {

  94. table = connection.getTable(tableName);

  95. Get get = new Get(Bytes.toBytes(rowKey));


  96. Result result = table.get(get);

  97. List<Cell> cells = result.listCells();


  98. if (CollectionUtils.isEmpty(cells)) {

  99. return Collections.emptyMap();

  100. }

  101. Map<String, String> objectMap = new HashMap<>();

  102. for (Cell cell : cells) {

  103. String qualifier = new String(CellUtil.cloneQualifier(cell));

  104. String value = new String(CellUtil.cloneValue(cell), "UTF-8");

  105. objectMap.put(qualifier, value);

  106. }

  107. return objectMap;

  108. } finally {

  109. if (table != null) {

  110. table.close();

  111. }

  112. }

  113. }


  114. /**

  115. * 扫描权标的内容

  116. *

  117. * @param connection

  118. * @param tableName

  119. * @param rowkeyStart

  120. * @param rowkeyEnd

  121. * @throws IOException

  122. */

  123. public static List<Map<String, String>> scan(Connection connection, TableName tableName, String rowkeyStart, String rowkeyEnd) throws IOException {

  124. Table table = null;

  125. try {

  126. table = connection.getTable(tableName);

  127. ResultScanner rs = null;

  128. try {

  129. Scan scan = new Scan();

  130. if (!StringUtils.isEmpty(rowkeyStart)) {

  131. scan.withStartRow(Bytes.toBytes(rowkeyStart));

  132. }

  133. if (!StringUtils.isEmpty(rowkeyEnd)) {

  134. scan.withStopRow(Bytes.toBytes(rowkeyEnd));

  135. }

  136. rs = table.getScanner(scan);


  137. List<Map<String, String>> dataList = new ArrayList<>();

  138. for (Result r : rs) {

  139. Map<String, String> objectMap = new HashMap<>();

  140. for (Cell cell : r.listCells()) {

  141. String qualifier = new String(CellUtil.cloneQualifier(cell));

  142. String value = new String(CellUtil.cloneValue(cell), "UTF-8");

  143. objectMap.put(qualifier, value);

  144. }

  145. dataList.add(objectMap);

  146. }

  147. return dataList;

  148. } finally {

  149. if (rs != null) {

  150. rs.close();

  151. }

  152. }

  153. } finally {

  154. if (table != null) {

  155. table.close();

  156. }

  157. }

  158. }


  159. /**

  160. * 删除表

  161. *

  162. * @param connection

  163. * @param tableName

  164. * @throws IOException

  165. */

  166. public static void deleteTable(Connection connection, TableName tableName) throws IOException {

  167. Admin admin = null;

  168. try {

  169. admin = connection.getAdmin();

  170. if (admin.tableExists(tableName)) {

  171. //现执行disable

  172. admin.disableTable(tableName);

  173. admin.deleteTable(tableName);

  174. }

  175. } finally {

  176. if (admin != null) {

  177. admin.close();

  178. }

  179. }

  180. }


五、运行测试

最后,我们仍然以上一篇文章中的设备数据作为例子:

  1. 建立 DeviceState 表;

  2. 定义 name/state 两个列簇;

  3. 写入列数据;

  4. 读取列、行,范围读取;

  5. 删除操作

最终实现的代码如下:

 
   
   
 
  1. private static final Logger logger = LoggerFactory.getLogger(HBaseTest.class);


  2. public static void main(String[] args) {


  3. Connection connection = null;

  4. try {

  5. connection = getConnection();

  6. TableName tableName = TableName.valueOf("DeviceState");


  7. //创建DeviceState表

  8. createTable(connection, tableName, "name", "state");


  9. logger.info("创建表 {}", tableName.getNameAsString());


  10. //写入数据

  11. put(connection, tableName, "row1", "name", "c1", "空调");

  12. put(connection, tableName, "row1", "state", "c2", "打开");

  13. put(connection, tableName, "row2", "name", "c1", "电视机");

  14. put(connection, tableName, "row2", "state", "c2", "关闭");


  15. logger.info("写入数据.");


  16. String value = getCell(connection, tableName, "row1", "state", "c2");

  17. logger.info("读取单元格-row1.state:{}", value);


  18. Map<String, String> row = getRow(connection, tableName, "row2");

  19. logger.info("读取单元格-row2:{}", JsonUtil.toJson(row));


  20. List<Map<String, String>> dataList = scan(connection, tableName, null, null);

  21. logger.info("扫描表结果-: {}", JsonUtil.toPrettyJson(dataList));


  22. //删除DeviceState表

  23. deleteTable(connection, tableName);

  24. logger.info("删除表 {}", tableName.getNameAsString());


  25. logger.info("操作完成.");

  26. } catch (Exception e) {

  27. logger.error("操作出错", e);

  28. } finally {

  29. if (connection != null) {

  30. try {

  31. connection.close();

  32. } catch (IOException e) {

  33. logger.error("error occurs", e);

  34. }

  35. }

  36. }


  37. }

执行代码,控制台输出如下:

 
   
   
 
  1. INFO -createTable(HBaseTest.java:89) - create table:[68, 101, 118, 105, 99, 101, 83, 116, 97, 116, 101] success!

  2. INFO -main(HBaseTest.java:32) - 创建表 DeviceState

  3. INFO -main(HBaseTest.java:40) - 写入数据.

  4. INFO -main(HBaseTest.java:43) - 读取单元格-row1.state:打开

  5. INFO -main(HBaseTest.java:46) - 读取单元格-row2:{"c1":"电视机","c2":"关闭"}

  6. INFO -main(HBaseTest.java:49) - 扫描表结果-:

  7. [ {

  8. "c1": "空调",

  9. "c2": "打开"

  10. }, {

  11. "c1": "电视机",

  12. "c2": "关闭"

  13. } ]

  14. INFO -HBaseAdmin$9.call(HBaseAdmin.java:1380) - Started disable of DeviceState

  15. INFO -HBaseAdmin$DisableTableFuture.postOperationResult(HBaseAdmin.java:1409) - DisabledDeviceState

  16. INFO -HBaseAdmin$DeleteTableFuture.postOperationResult(HBaseAdmin.java:965) - DeletedDeviceState

  17. INFO -main(HBaseTest.java:53) - 删除表 DeviceState

  18. INFO -main(HBaseTest.java:55) - 操作完成.

此时Java Client已经完成制作。


FAQ

A. 提示报错 找不到 winutils 程序

Failed to locate the winutils binary in the hadoop binary path

原因

在Windows下依赖一个 winutils.exe 程序,该程序通过${HADOOPHOME}/bin 来查找。该报错不影响程序执行。

解决办法

需要下载hadoop-commons-master,再配置变量 HADOOPHOME

B. 提示报错,UnknownHostException,无法找到节点..
原因
客户端无法解析HMaster实例节点的主机名 
解决办法
需要编辑 C:WindowsSystem32driversetchosts 文件,添加对应的映射,如下:
 
   
   
 

  1. 47.xx.8x.xx izwz925kr63w5jitjys6dtt


参考文档

官方文档 https://hbase.apache.org/book.html#quickstart Java HBase客户端API https://www.baeldung.com/hbase



欢迎加入 交流群 学习,备注 加群
说实话在这个群,哪怕您不说话,光看聊天记录,都能学到东西Java 操作 HBase 教程


Java 操作 HBase 教程

推荐阿里云推广服务器89/年,229/3年,买来送自己,送女朋友马上过年再合适不过了,买了搭建个项目给面试官看也香,还可以熟悉技术栈,(老用户用家人账号买就好了,我用我女朋友的

以上是关于Java 操作 HBase 教程的主要内容,如果未能解决你的问题,请参考以下文章

暑期——第八周总结(Hbase命令详解)

HBase Thrift协议编程入门教程

通过Java操作HBase

HBase Shell基本操作 一万字

如何使用Java API操作Hbase

Hbase-2.0.0_02_常用操作