hadoop2-HBase的Java API操作
Posted 程序员理想
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop2-HBase的Java API操作相关的知识,希望对你有一定的参考价值。
Hbase提供了丰富的Java API,以及线程池操作,下面我用线程池来展示一下使用Java API操作Hbase。
项目结构如下:
我使用的Hbase的版本是
hbase-0.98.9-hadoop2-bin.tar.gz
大家下载后,可以拿到里面的lib目录下面的jar文件,即上所示的hbase-lib资源。
接口类:
/hbase-util/src/com/b510/hbase/util/dao/HbaseDao.java
package com.b510.hbase.util.dao;
import java.util.List;
import org.apache.hadoop.hbase.client.HTableInterface;
/**
* @author Hongten
* @created 7 Nov 2018
*/
public interface HbaseDao {
// initial table
public HTableInterface getHTableFromPool(String tableName);
// check if the table is exist
public boolean isHTableExist(String tableName);
// create table
public void createHTable(String tableName, String[] columnFamilys);
// insert new row
public void addRow(String tableName, String rowKey, String columnFamily, String column, String value);
// get row by row key
public void getRow(String tableName, String rowKey);
public void getAllRows(String tableName);
// get rows by giving range
public void getRowsByRange(String tableName, String startRowKey, String endRowKey);
//delete row
public void delRow(String tableName, String rowKey);
//delete rows by row keys
public void delRowsByRowKeys(String tableName, List<String> rowKeys);
// auto flush data when close
public void closeAutoFlush(HTableInterface table);
// close table
public void closeTable(HTableInterface table);
// close pool connection
public void closePoolConnection();
// delete table
public void deleteHTable(String tableName);
}
实现类:
/hbase-util/src/com/b510/hbase/util/dao/impl/HbaseDaoImpl.java
package com.b510.hbase.util.dao.impl;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import com.b510.hbase.util.dao.HbaseDao;
/**
* @author Hongten
* @created 7 Nov 2018
*/
@SuppressWarnings("deprecation")
public class HbaseDaoImpl implements HbaseDao {
private static Configuration conf = null;
private static HBaseAdmin hAdmin;
private static HTablePool pool;
private static int defaultPoolSize = 5;
public HbaseDaoImpl(int poolSize) {
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "node1:2888,node2:2888,node3:2888");
try {
hAdmin = new HBaseAdmin(conf);
// the default pool size is 5.
pool = new HTablePool(conf, poolSize <= 0 ? defaultPoolSize : poolSize);
} catch (MasterNotRunningException e) {
e.printStackTrace();
} catch (ZooKeeperConnectionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public HTableInterface getHTableFromPool(String tableName) {
HTableInterface table = pool.getTable(tableName);
return table;
}
@Override
public boolean isHTableExist(String tableName) {
try {
return hAdmin.tableExists(tableName);
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
@Override
public void createHTable(String tableName, String[] columnFamilys) {
if (!isHTableExist(tableName)) {
HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
// The Hbase suggested the number of column family should be less than 3.
// Normally, there only have 1 column family.
for (String cfName : columnFamilys) {
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cfName);
tableDescriptor.addFamily(hColumnDescriptor);
}
try {
hAdmin.createTable(tableDescriptor);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("The table [" + tableName + "] is created.");
} else {
System.out.println("The table [" + tableName + "] is existing already.");
}
}
@Override
public void addRow(String tableName, String rowKey, String columnFamily, String column, String value) {
if (isHTableExist(tableName)) {
HTableInterface table = getHTableFromPool(tableName);
Put put = new Put(rowKey.getBytes());
put.add(columnFamily.getBytes(), column.getBytes(), value.getBytes());
try {
table.put(put);
} catch (IOException e) {
e.printStackTrace();
}
System.out.println("Insert into table [" + tableName + "], Rowkey=[" + rowKey + "], Column=[" + columnFamily + ":" + column + "], Vlaue=[" + value + "].");
closeTable(table);
} else {
System.out.println("The table [" + tableName + "] does not exist.");
}
}
@Override
public void getRow(String tableName, String rowKey) {
if (isHTableExist(tableName)) {
HTableInterface table = getHTableFromPool(tableName);
Get get = new Get(rowKey.getBytes());
Result result;
try {
result = table.get(get);
String columnName = "";
String timeStamp = "";
String columnFamily = "";
String value = "";
for (Cell cell : result.rawCells()) {
timeStamp = String.valueOf(cell.getTimestamp());
columnFamily = new String(CellUtil.cloneFamily(cell));
columnName = new String(CellUtil.cloneQualifier(cell));
value = new String(CellUtil.cloneValue(cell));
System.out.println("Get from table [" + tableName + "], Rowkey=[" + rowKey + "], Column=[" + columnFamily + ":" + columnName + "], Timestamp=[" + timeStamp + "], Vlaue=[" + value + "].");
}
} catch (IOException e) {
e.printStackTrace();
}
closeTable(table);
} else {
System.out.println("The table [" + tableName + "] does not exist.");
}
}
@Override
public void getAllRows(String tableName) {
if (isHTableExist(tableName)) {
Scan scan = new Scan();
scanHTable(tableName, scan);
} else {
System.out.println("The table [" + tableName + "] does not exist.");
}
}
private void scanHTable(String tableName, Scan scan) {
try {
HTableInterface table = getHTableFromPool(tableName);
ResultScanner results = table.getScanner(scan);
for (Result result : results) {
String rowKey = "";
String columnName = "";
String timeStamp = "";
String columnFamily = "";
String value = "";
for (Cell cell : result.rawCells()) {
rowKey = new String(CellUtil.cloneRow(cell));
timeStamp = String.valueOf(cell.getTimestamp());
columnFamily = new String(CellUtil.cloneFamily(cell));
columnName = new String(CellUtil.cloneQualifier(cell));
value = new String(CellUtil.cloneValue(cell));
System.out.println("Get from table [" + tableName + "], Rowkey=[" + rowKey + "], Column=[" + columnFamily + ":" + columnName + "], Timestamp=[" + timeStamp + "], Vlaue=[" + value + "].");
}
}
closeTable(table);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void getRowsByRange(String tableName, String startRowKey, String endRowKey) {
if (isHTableExist(tableName)) {
Scan scan = new Scan();
scan.setStartRow(startRowKey.getBytes());
// not equals Stop Row Key, it mean the result does not include the stop row record(exclusive).
// the hbase version is 0.98.9
scan.setStopRow(endRowKey.getBytes());
scanHTable(tableName, scan);
} else {
System.out.println("The table [" + tableName + "] does not exist.");
}
}
@Override
public void delRow(String tableName, String rowKey) {
if (isHTableExist(tableName)) {
HTableInterface table = getHTableFromPool(tableName);
deleteRow(table, rowKey);
} else {
System.out.println("The table [" + tableName + "] does not exist.");
}
}
private void deleteRow(HTableInterface table, String rowKey) {
Delete del = new Delete(rowKey.getBytes());
try {
table.delete(del);
System.out.println("Delete from table [" + new String(table.getTableName()) + "], Rowkey=[" + rowKey + "].");
closeTable(table);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void delRowsByRowKeys(String tableName, List<String> rowKeys) {
if (rowKeys != null && rowKeys.size() > 0) {
for (String rowKey : rowKeys) {
delRow(tableName, rowKey);
}
}
}
@Override
public void deleteHTable(String tableName) {
if (isHTableExist(tableName)) {
try {
hAdmin.disableTable(tableName.getBytes());
hAdmin.deleteTable(tableName.getBytes());
System.out.println("The table [" + tableName + "] is deleted.");
} catch (IOException e) {
e.printStackTrace();
}
} else {
System.out.println("The table [" + tableName + "] does not exist.");
}
}
@Override
public void closeAutoFlush(HTableInterface table) {
table.setAutoFlush(false, false);
}
@Override
public void closeTable(HTableInterface table) {
try {
table.close();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void closePoolConnection() {
try {
pool.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
测试类:
/hbase-util/src/com/b510/hbase/util/dao/test/HbaseDaoTest.java
package com.b510.hbase.util.dao.test; import java.util.ArrayList; import java.util.List; import org.junit.Test; import com.b510.hbase.util.dao.HbaseDao; import com.b510.hbase.util.dao.impl.HbaseDaoImpl; /** * @author Hongten * @created 7 Nov 2018 */ public class HbaseDaoTest { HbaseDao dao = new HbaseDaoImpl(4); public static final String tableName = "t_test"; public static final String columnFamilyName = "cf1"; public static final String[] CFs = { columnFamilyName }; public static final String COLUMN_NAME_NAME = "name"; public static final String COLUMN_NAME_AGE = "age"; @Test public void main() { createTable(); addRow(); getRow(); getAllRows(); getRowsByRange(); delRow(); delRowsByRowKeys(); deleteHTable(); } public void createTable() { System.out.println("=== create table ===="); dao.createHTable(tableName, CFs); } public void addRow() { System.out.println("=== insert record ===="); dao.addRow(tableName, "12345566", columnFamilyName, COLUMN_NAME_NAME, "Hongten"); dao.addRow(tableName, "12345566", columnFamilyName, COLUMN_NAME_AGE, "22"); dao.addRow(tableName, "12345567", columnFamilyName, COLUMN_NAME_NAME, "Tom"); dao.addRow(tableName, "12345567", columnFamilyName, COLUMN_NAME_AGE, "25"); dao.addRow(tableName, "12345568", columnFamilyName, COLUMN_NAME_NAME, "Jone"); dao.addRow(tableName, "12345568", columnFamilyName, COLUMN_NAME_AGE, "30"); dao.addRow(tableName, "12345569", columnFamilyName, COLUMN_NAME_NAME, "Jobs"); dao.addRow(tableName, "12345569", columnFamilyName, COLUMN_NAME_AGE, "24"); } public void getRow() { System.out.println("=== get record ===="); dao.getRow(tableName, "12345566"); } public void getAllRows() { System.out.println("=== scan table ===="); dao.getAllRows(tableName); } public void getRowsByRange() { System.out.println("=== scan record by giving range ===="); // it will return the '12345567' and '12345568' rows. dao.getRowsByRange(tableName, "12345567", "12345569"); } public void delRow() { System.out.println("=== delete record ===="); dao.delRow(tableName, "12345568"); // only '12345567' row. getRowsByRange(); } public void delRowsByRowKeys() { System.out.println("=== delete batch records ===="); List<String> rowKeys = new ArrayList<String>(); rowKeys.add("12345566"); rowKeys.add("12345569"); dao.delRowsByRowKeys(tableName, rowKeys); // can not find the '12345566' and '12345569' getAllRows(); } public void deleteHTable() { System.out.println("=== delete table ===="); dao.deleteHTable(tableName); } }
测试结果:
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. === create table ==== The table [t_test] is created. === insert record ==== Insert into table [t_test], Rowkey=[12345566], Column=[cf1:name], Vlaue=[Hongten]. Insert into table [t_test], Rowkey=[12345566], Column=[cf1:age], Vlaue=[22]. Insert into table [t_test], Rowkey=[12345567], Column=[cf1:name], Vlaue=[Tom]. Insert into table [t_test], Rowkey=[12345567], Column=[cf1:age], Vlaue=[25]. Insert into table [t_test], Rowkey=[12345568], Column=[cf1:name], Vlaue=[Jone]. Insert into table [t_test], Rowkey=[12345568], Column=[cf1:age], Vlaue=[30]. Insert into table [t_test], Rowkey=[12345569], Column=[cf1:name], Vlaue=[Jobs]. Insert into table [t_test], Rowkey=[12345569], Column=[cf1:age], Vlaue=[24]. === get record ==== Get from table [t_test], Rowkey=[12345566], Column=[cf1:age], Timestamp=[1541652952697], Vlaue=[22]. Get from table [t_test], Rowkey=[12345566], Column=[cf1:name], Timestamp=[1541652952626], Vlaue=[Hongten]. === scan table ==== Get from table [t_test], Rowkey=[12345566], Column=[cf1:age], Timestamp=[1541652952697], Vlaue=[22]. Get from table [t_test], Rowkey=[12345566], Column=[cf1:name], Timestamp=[1541652952626], Vlaue=[Hongten]. Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25]. Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom]. Get from table [t_test], Rowkey=[12345568], Column=[cf1:age], Timestamp=[1541652952834], Vlaue=[30]. Get from table [t_test], Rowkey=[12345568], Column=[cf1:name], Timestamp=[1541652952807], Vlaue=[Jone]. Get from table [t_test], Rowkey=[12345569], Column=[cf1:age], Timestamp=[1541652952928], Vlaue=[24]. Get from table [t_test], Rowkey=[12345569], Column=[cf1:name], Timestamp=[1541652952869], Vlaue=[Jobs]. === scan record by giving range ==== Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25]. Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom]. Get from table [t_test], Rowkey=[12345568], Column=[cf1:age], Timestamp=[1541652952834], Vlaue=[30]. Get from table [t_test], Rowkey=[12345568], Column=[cf1:name], Timestamp=[1541652952807], Vlaue=[Jone]. === delete record ==== Delete from table [t_test], Rowkey=[12345568]. === scan record by giving range ==== Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25]. Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom]. === delete batch records ==== Delete from table [t_test], Rowkey=[12345566]. Delete from table [t_test], Rowkey=[12345569]. === scan table ==== Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25]. Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom]. === delete table ==== The table [t_test] is deleted.
以上是关于hadoop2-HBase的Java API操作的主要内容,如果未能解决你的问题,请参考以下文章
我的Android进阶之旅OKHttp出现错误 java.lang.IllegalStateException: Expected Android API level 21+ but was 19(代
转android出现注: 某些输入文件使用或覆盖了已过时的 API。 注: 有关详细信息, 请使用 -Xlint:deprecation 重新编译。 注: 某些输入文件使用了未经检查或不安全的操作(代
replace File.separator出现异常:java.lang.IllegalArgumentException: character to be escaped is missing((代
replace File.separator出现异常:java.lang.IllegalArgumentException: character to be escaped is missing((代