如何使用Java API操作Hbase

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用Java API操作Hbase相关的知识,希望对你有一定的参考价值。

HBase提供了对HBase进行一系列的管理涉及到对表的管理、数据的操作java api。
常用的API操作有:
  1、 对表的创建、删除、显示以及修改等,可以用HBaseAdmin,一旦创建了表,那么可以通过HTable的实例来访问表,每次可以往表里增加数据。
  2、 插入数据
    创建一个Put对象,在这个Put对象里可以指定要给哪个列增加数据,以及当前的时间戳等值,然后通过调用HTable.put(Put)来提交操作,子猴在这里提请注意的是:在创建Put对象的时候,你必须指定一个行(Row)值,在构造Put对象的时候作为参数传入。
  3、 获取数据
    要获取数据,使用Get对象,Get对象同Put对象一样有好几个构造函数,通常在构造的时候传入行值,表示取第几行的数据,通过HTable.get(Get)来调用。
  4、 浏览每一行
    通过Scan可以对表中的行进行浏览,得到每一行的信息,比如列名,时间戳等,Scan相当于一个游标,通过next()来浏览下一个,通过调用HTable.getScanner(Scan)来返回一个ResultScanner对象。HTable.get(Get)和HTable.getScanner(Scan)都是返回一个Result。Result是一个
KeyValue的链表。
  5、 删除
    使用Delete来删除记录,通过调用HTable.delete(Delete)来执行删除操作。(注:删除这里有些特别,也就是删除并不是马上将数据从表中删除。)
  6、 锁
    新增、获取、删除在操作过程中会对所操作的行加一个锁,而浏览却不会。
  7、 簇的访问
    客户端代码通过ZooKeeper来访问找到簇,也就是说ZooKeeper quorum将被使用,那么相关的类(包)应该在客户端的类(classes)目录下,即客户端一定要找到文件hbase-site.xml。
下面是一个例子程序:

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseTest

private static Configuration conf = null;

/**
* 初始化配置
*/
static
Configuration HBASE_CONFIG = new Configuration();
//与hbase/conf/hbase-site.xml中hbase.zookeeper.quorum配置的值相同
HBASE_CONFIG.set("hbase.zookeeper.quorum", "10.1.1.1");
//与hbase/conf/hbase-site.xml中hbase.zookeeper.property.clientPort配置的值相同
HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181");
conf = HBaseConfiguration.create(HBASE_CONFIG);


/**
* 创建一张表
*/
public static void creatTable(String tableName, String[] familys) throws Exception
HBaseAdmin admin = new HBaseAdmin(conf);
if (admin.tableExists(tableName))
System.out.println("table already exists!");
else
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
for(int i=0; i<familys.length; i++)
tableDesc.addFamily(new HColumnDescriptor(familys[i]));

admin.createTable(tableDesc);
System.out.println("create table " + tableName + " ok.");



/**
* 删除表
*/
public static void deleteTable(String tableName) throws Exception
try
HBaseAdmin admin = new HBaseAdmin(conf);
admin.disableTable(tableName);
admin.deleteTable(tableName);
System.out.println("delete table " + tableName + " ok.");
catch (MasterNotRunningException e)
e.printStackTrace();
catch (ZooKeeperConnectionException e)
e.printStackTrace();



/**
* 插入一行记录
*/
public static void addRecord (String tableName, String rowKey, String family, String qualifier, String value)
throws Exception
try
HTable table = new HTable(conf, tableName);
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes(family),Bytes.toBytes(qualifier),Bytes.toBytes(value));
table.put(put);
System.out.println("insert recored " + rowKey + " to table " + tableName +" ok.");
catch (IOException e)
e.printStackTrace();



/**
* 删除一行记录
*/
public static void delRecord (String tableName, String rowKey) throws IOException
HTable table = new HTable(conf, tableName);
List list = new ArrayList();
Delete del = new Delete(rowKey.getBytes());
list.add(del);
table.delete(list);
System.out.println("del recored " + rowKey + " ok.");


/**
* 查找一行记录
*/
public static void getOneRecord (String tableName, String rowKey) throws IOException
HTable table = new HTable(conf, tableName);
Get get = new Get(rowKey.getBytes());
Result rs = table.get(get);
for(KeyValue kv : rs.raw())
System.out.print(new String(kv.getRow()) + " " );
System.out.print(new String(kv.getFamily()) + ":" );
System.out.print(new String(kv.getQualifier()) + " " );
System.out.print(kv.getTimestamp() + " " );
System.out.println(new String(kv.getValue()));



/**
* 显示所有数据
*/
public static void getAllRecord (String tableName)
try
HTable table = new HTable(conf, tableName);
Scan s = new Scan();
ResultScanner ss = table.getScanner(s);
for(Result r:ss)
for(KeyValue kv : r.raw())
System.out.print(new String(kv.getRow()) + " ");
System.out.print(new String(kv.getFamily()) + ":");
System.out.print(new String(kv.getQualifier()) + " ");
System.out.print(kv.getTimestamp() + " ");
System.out.println(new String(kv.getValue()));


catch (IOException e)
e.printStackTrace();



public static void main (String [] agrs)
try
String tablename = "scores";
String[] familys = "grade", "course";
HBaseTest.creatTable(tablename, familys);

//add record zkb
HBaseTest.addRecord(tablename,"zkb","grade","","5");
HBaseTest.addRecord(tablename,"zkb","course","","90");
HBaseTest.addRecord(tablename,"zkb","course","math","97");
HBaseTest.addRecord(tablename,"zkb","course","art","87");
//add record baoniu
HBaseTest.addRecord(tablename,"baoniu","grade","","4");
HBaseTest.addRecord(tablename,"baoniu","course","math","89");

System.out.println("===========get one record========");
HBaseTest.getOneRecord(tablename, "zkb");

System.out.println("===========show all record========");
HBaseTest.getAllRecord(tablename);

System.out.println("===========del one record========");
HBaseTest.delRecord(tablename, "baoniu");
HBaseTest.getAllRecord(tablename);

System.out.println("===========show all record========");
HBaseTest.getAllRecord(tablename);
catch (Exception e)
e.printStackTrace();


参考技术A 1. HBaseConfiguration是每一个hbase client都会使用到的对象,它代表的是HBase配置信息。它有两种构造方式:
public HBaseConfiguration()
public HBaseConfiguration(final Configuration c)
2. HBaseAdmin来创建表。HBaseAdmin负责表的META信息处理。HBaseAdmin提供了createTable这个方法:
public void createTable(HTableDescriptor desc)
3. addFamily方法增加family
public void addFamily(final HColumnDescriptor family)
4. 删除表
删除表也是通过HBaseAdmin来操作,删除表之前首先要disable表。
disableTable和deleteTable分别用来disable和delete表。
5. 查询数据
单条查询是通过rowkey在table中查询某一行的数据。HTable提供了get方法来完成单条查询。
批量查询是通过制定一段rowkey的范围来查询。HTable提供了个getScanner方法来完成批量查询。
public Result get(final Get get)
public ResultScanner getScanner(final Scan scan)
6. 插入数据 HTable通过put方法来插入数据。
public void put(final Put put) throws IOException
public void put(final List puts) throws IOException
7. 切分表
HBaseAdmin提供split方法来将table 进行split.
public void split(final String tableNameOrRegionName)

Java 操作 HBase 教程


本文公众号来源:美码师
作者:美码师
本文已收录至我的 GitHub

一、简介

在上一篇文章 中,我们已经介绍了 HBase 的一些基本概念,以及如何安装使用的方法。那么,作为一名 Javaer,自然是希望用 Java 的方式来与 HBase 进行对话了。所幸的是,HBase 本身就是用 Java 编写的,天生自带了 Java 原生API。我们可以通过 hbase-client 来实现 HBase 数据库的操作。所以,这次主要介绍该组件的基本用法。

在使用 hbase-client 之前,有几个要点需要注意:

  • 客户端需要能访问 Zoopkeeper,再获得 HMaster、RegionServer 实例进行操作

  • 客户端需运行在HBase/Hadoop 集群内,HBase会使用 hostname 来定位节点,因此要求客户端能访问到对应的主机名(或子域名) 如果是远程客户端则需要配置本地的hosts文件。

下面这个图,有助于理解 Client 与 HBase 集群的交互架构:

Java 操作 HBase 教程

下面开始介绍 client 的使用。


二、hbase-client 引入

在 Maven 的 pom.xml 中添加依赖:

 
   
   
 
  1. <dependency>

  2. <groupId>org.apache.hbase</groupId>

  3. <artifactId>hbase-client</artifactId>

  4. <version>2.1.5</version>

  5. <exclusions>

  6. <exclusion>

  7. <artifactId>slf4j-api</artifactId>

  8. <groupId>org.slf4j</groupId>

  9. </exclusion>

  10. <exclusion>

  11. <artifactId>slf4j-log4j12</artifactId>

  12. <groupId>org.slf4j</groupId>

  13. </exclusion>

  14. </exclusions>

  15. </dependency>

  16. <dependency>

  17. <groupId>org.apache.hbase</groupId>

  18. <artifactId>hbase</artifactId>

  19. <version>2.1.5</version>

  20. </dependency>

这里需要注意的是,客户端版本和 HBase 版本需要保持一致,否则可能会遇到不兼容的问题。


三、连接操作

示例代码:

 
   
   
 
  1. /**

  2. * 建立连接

  3. *

  4. * @return

  5. */

  6. public static Connection getConnection() {

  7. try {

  8. //获取配置

  9. Configuration configuration = getConfiguration();

  10. //检查配置

  11. HBaseAdmin.checkHBaseAvailable(configuration);

  12. return ConnectionFactory.createConnection(configuration);

  13. } catch (IOException | ServiceException e) {

  14. throw new RuntimeException(e);

  15. }

  16. }


  17. /**

  18. * 获取配置

  19. *

  20. * @return

  21. */

  22. private static Configuration getConfiguration() {

  23. try {

  24. Properties props = PropertiesLoaderUtils

  25. .loadAllProperties("hbase.properties");

  26. String clientPort = props

  27. .getProperty("hbase.zookeeper.property.clientPort");

  28. String quorum = props

  29. .getProperty("hbase.zookeeper.quorum");


  30. logger.info("connect to zookeeper {}:{}", quorum, clientPort);


  31. Configuration config = HBaseConfiguration.create();

  32. config.set("hbase.zookeeper.property.clientPort", clientPort);

  33. config.set("hbase.zookeeper.quorum", quorum);

  34. return config;

  35. } catch (IOException e) {

  36. throw new RuntimeException(e);

  37. }

  38. }

四、表操作

增删改查方法封装如下:

 
   
   
 
  1. /**

  2. * 创建表

  3. * @param connection

  4. * @param tableName

  5. * @param columnFamilies

  6. * @throws IOException

  7. */

  8. public static void createTable(Connection connection, TableName tableName, String... columnFamilies) throws IOException {

  9. Admin admin = null;

  10. try {

  11. admin = connection.getAdmin();

  12. if (admin.tableExists(tableName)) {

  13. logger.warn("table:{} exists!", tableName.getName());

  14. } else {

  15. TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);

  16. for (String columnFamily : columnFamilies) {

  17. builder.setColumnFamily(ColumnFamilyDescriptorBuilder.of(columnFamily));

  18. }

  19. admin.createTable(builder.build());

  20. logger.info("create table:{} success!", tableName.getName());

  21. }

  22. } finally {

  23. if (admin != null) {

  24. admin.close();

  25. }

  26. }

  27. }



  28. /**

  29. * 插入数据

  30. *

  31. * @param connection

  32. * @param tableName

  33. * @param rowKey

  34. * @param columnFamily

  35. * @param column

  36. * @param data

  37. * @throws IOException

  38. */

  39. public static void put(Connection connection, TableName tableName,

  40. String rowKey, String columnFamily, String column, String data) throws IOException {


  41. Table table = null;

  42. try {

  43. table = connection.getTable(tableName);

  44. Put put = new Put(Bytes.toBytes(rowKey));

  45. put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(data));

  46. table.put(put);

  47. } finally {

  48. if (table != null) {

  49. table.close();

  50. }

  51. }

  52. }


  53. /**

  54. * 根据row key、column 读取

  55. *

  56. * @param connection

  57. * @param tableName

  58. * @param rowKey

  59. * @param columnFamily

  60. * @param column

  61. * @throws IOException

  62. */

  63. public static String getCell(Connection connection, TableName tableName, String rowKey, String columnFamily, String column) throws IOException {

  64. Table table = null;

  65. try {

  66. table = connection.getTable(tableName);

  67. Get get = new Get(Bytes.toBytes(rowKey));

  68. get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));


  69. Result result = table.get(get);

  70. List<Cell> cells = result.listCells();


  71. if (CollectionUtils.isEmpty(cells)) {

  72. return null;

  73. }

  74. String value = new String(CellUtil.cloneValue(cells.get(0)), "UTF-8");

  75. return value;

  76. } finally {

  77. if (table != null) {

  78. table.close();

  79. }

  80. }

  81. }


  82. /**

  83. * 根据rowkey 获取一行

  84. *

  85. * @param connection

  86. * @param tableName

  87. * @param rowKey

  88. * @return

  89. * @throws IOException

  90. */

  91. public static Map<String, String> getRow(Connection connection, TableName tableName, String rowKey) throws IOException {

  92. Table table = null;

  93. try {

  94. table = connection.getTable(tableName);

  95. Get get = new Get(Bytes.toBytes(rowKey));


  96. Result result = table.get(get);

  97. List<Cell> cells = result.listCells();


  98. if (CollectionUtils.isEmpty(cells)) {

  99. return Collections.emptyMap();

  100. }

  101. Map<String, String> objectMap = new HashMap<>();

  102. for (Cell cell : cells) {

  103. String qualifier = new String(CellUtil.cloneQualifier(cell));

  104. String value = new String(CellUtil.cloneValue(cell), "UTF-8");

  105. objectMap.put(qualifier, value);

  106. }

  107. return objectMap;

  108. } finally {

  109. if (table != null) {

  110. table.close();

  111. }

  112. }

  113. }


  114. /**

  115. * 扫描权标的内容

  116. *

  117. * @param connection

  118. * @param tableName

  119. * @param rowkeyStart

  120. * @param rowkeyEnd

  121. * @throws IOException

  122. */

  123. public static List<Map<String, String>> scan(Connection connection, TableName tableName, String rowkeyStart, String rowkeyEnd) throws IOException {

  124. Table table = null;

  125. try {

  126. table = connection.getTable(tableName);

  127. ResultScanner rs = null;

  128. try {

  129. Scan scan = new Scan();

  130. if (!StringUtils.isEmpty(rowkeyStart)) {

  131. scan.withStartRow(Bytes.toBytes(rowkeyStart));

  132. }

  133. if (!StringUtils.isEmpty(rowkeyEnd)) {

  134. scan.withStopRow(Bytes.toBytes(rowkeyEnd));

  135. }

  136. rs = table.getScanner(scan);


  137. List<Map<String, String>> dataList = new ArrayList<>();

  138. for (Result r : rs) {

  139. Map<String, String> objectMap = new HashMap<>();

  140. for (Cell cell : r.listCells()) {

  141. String qualifier = new String(CellUtil.cloneQualifier(cell));

  142. String value = new String(CellUtil.cloneValue(cell), "UTF-8");

  143. objectMap.put(qualifier, value);

  144. }

  145. dataList.add(objectMap);

  146. }

  147. return dataList;

  148. } finally {

  149. if (rs != null) {

  150. rs.close();

  151. }

  152. }

  153. } finally {

  154. if (table != null) {

  155. table.close();

  156. }

  157. }

  158. }


  159. /**

  160. * 删除表

  161. *

  162. * @param connection

  163. * @param tableName

  164. * @throws IOException

  165. */

  166. public static void deleteTable(Connection connection, TableName tableName) throws IOException {

  167. Admin admin = null;

  168. try {

  169. admin = connection.getAdmin();

  170. if (admin.tableExists(tableName)) {

  171. //现执行disable

  172. admin.disableTable(tableName);

  173. admin.deleteTable(tableName);

  174. }

  175. } finally {

  176. if (admin != null) {

  177. admin.close();

  178. }

  179. }

  180. }


五、运行测试

最后,我们仍然以上一篇文章中的设备数据作为例子:

  1. 建立 DeviceState 表;

  2. 定义 name/state 两个列簇;

  3. 写入列数据;

  4. 读取列、行,范围读取;

  5. 删除操作

最终实现的代码如下:

 
   
   
 
  1. private static final Logger logger = LoggerFactory.getLogger(HBaseTest.class);


  2. public static void main(String[] args) {


  3. Connection connection = null;

  4. try {

  5. connection = getConnection();

  6. TableName tableName = TableName.valueOf("DeviceState");


  7. //创建DeviceState表

  8. createTable(connection, tableName, "name", "state");


  9. logger.info("创建表 {}", tableName.getNameAsString());


  10. //写入数据

  11. put(connection, tableName, "row1", "name", "c1", "空调");

  12. put(connection, tableName, "row1", "state", "c2", "打开");

  13. put(connection, tableName, "row2", "name", "c1", "电视机");

  14. put(connection, tableName, "row2", "state", "c2", "关闭");


  15. logger.info("写入数据.");


  16. String value = getCell(connection, tableName, "row1", "state", "c2");

  17. logger.info("读取单元格-row1.state:{}", value);


  18. Map<String, String> row = getRow(connection, tableName, "row2");

  19. logger.info("读取单元格-row2:{}", JsonUtil.toJson(row));


  20. List<Map<String, String>> dataList = scan(connection, tableName, null, null);

  21. logger.info("扫描表结果-: {}", JsonUtil.toPrettyJson(dataList));


  22. //删除DeviceState表

  23. deleteTable(connection, tableName);

  24. logger.info("删除表 {}", tableName.getNameAsString());


  25. logger.info("操作完成.");

  26. } catch (Exception e) {

  27. logger.error("操作出错", e);

  28. } finally {

  29. if (connection != null) {

  30. try {

  31. connection.close();

  32. } catch (IOException e) {

  33. logger.error("error occurs", e);

  34. }

  35. }

  36. }


  37. }

执行代码,控制台输出如下:

 
   
   
 
  1. INFO -createTable(HBaseTest.java:89) - create table:[68, 101, 118, 105, 99, 101, 83, 116, 97, 116, 101] success!

  2. INFO -main(HBaseTest.java:32) - 创建表 DeviceState

  3. INFO -main(HBaseTest.java:40) - 写入数据.

  4. INFO -main(HBaseTest.java:43) - 读取单元格-row1.state:打开

  5. INFO -main(HBaseTest.java:46) - 读取单元格-row2:{"c1":"电视机","c2":"关闭"}

  6. INFO -main(HBaseTest.java:49) - 扫描表结果-:

  7. [ {

  8. "c1": "空调",

  9. "c2": "打开"

  10. }, {

  11. "c1": "电视机",

  12. "c2": "关闭"

  13. } ]

  14. INFO -HBaseAdmin$9.call(HBaseAdmin.java:1380) - Started disable of DeviceState

  15. INFO -HBaseAdmin$DisableTableFuture.postOperationResult(HBaseAdmin.java:1409) - DisabledDeviceState

  16. INFO -HBaseAdmin$DeleteTableFuture.postOperationResult(HBaseAdmin.java:965) - DeletedDeviceState

  17. INFO -main(HBaseTest.java:53) - 删除表 DeviceState

  18. INFO -main(HBaseTest.java:55) - 操作完成.

此时Java Client已经完成制作。


FAQ

A. 提示报错 找不到 winutils 程序

Failed to locate the winutils binary in the hadoop binary path

原因

在Windows下依赖一个 winutils.exe 程序,该程序通过${HADOOPHOME}/bin 来查找。该报错不影响程序执行。

解决办法

需要下载hadoop-commons-master,再配置变量 HADOOPHOME

B. 提示报错,UnknownHostException,无法找到节点..
原因
客户端无法解析HMaster实例节点的主机名 
解决办法
需要编辑 C:WindowsSystem32driversetchosts 文件,添加对应的映射,如下:
 
   
   
 

  1. 47.xx.8x.xx izwz925kr63w5jitjys6dtt


参考文档

官方文档 https://hbase.apache.org/book.html#quickstart Java HBase客户端API https://www.baeldung.com/hbase



欢迎加入 交流群 学习,备注 加群
说实话在这个群,哪怕您不说话,光看聊天记录,都能学到东西Java 操作 HBase 教程


Java 操作 HBase 教程

推荐阿里云推广服务器89/年,229/3年,买来送自己,送女朋友马上过年再合适不过了,买了搭建个项目给面试官看也香,还可以熟悉技术栈,(老用户用家人账号买就好了,我用我女朋友的

以上是关于如何使用Java API操作Hbase的主要内容,如果未能解决你的问题,请参考以下文章

如何使用Java API操作Hbase

HBASE基础使用Java API实现DDL与DML

HBase Java 操作 HBase 教程

如何使用java api像jdbc一样直接发送hbase shell命令?

Java 操作 HBase 教程

hbase单机模式下,使用java API远程连接hbase的问题。