hadoop2-HBase的Java API操作

Posted 程序员理想

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了hadoop2-HBase的Java API操作相关的知识,希望对你有一定的参考价值。

Hbase提供了丰富的Java API,以及线程池操作,下面我用线程池来展示一下使用Java API操作Hbase。

项目结构如下:





我使用的Hbase的版本是

hbase-0.98.9-hadoop2-bin.tar.gz

大家下载后,可以拿到里面的lib目录下面的jar文件,即上所示的hbase-lib资源。

接口类:

/hbase-util/src/com/b510/hbase/util/dao/HbaseDao.java

package com.b510.hbase.util.dao;


import java.util.List;


import org.apache.hadoop.hbase.client.HTableInterface;



/**

* @author Hongten

* @created 7 Nov 2018

*/

public interface HbaseDao {


   // initial table

   public HTableInterface getHTableFromPool(String tableName);


   // check if the table is exist

   public boolean isHTableExist(String tableName);


   // create table

   public void createHTable(String tableName, String[] columnFamilys);


   // insert new row

   public void addRow(String tableName, String rowKey, String columnFamily, String column, String value);


   // get row by row key

   public void getRow(String tableName, String rowKey);


   public void getAllRows(String tableName);


   // get rows by giving range

   public void getRowsByRange(String tableName, String startRowKey, String endRowKey);


   //delete row

   public void delRow(String tableName, String rowKey);

   

   //delete rows by row keys

   public void delRowsByRowKeys(String tableName, List<String> rowKeys);


   // auto flush data when close

   public void closeAutoFlush(HTableInterface table);


   // close table

   public void closeTable(HTableInterface table);


   // close pool connection

   public void closePoolConnection();


   // delete table

   public void deleteHTable(String tableName);

}

实现类:

/hbase-util/src/com/b510/hbase/util/dao/impl/HbaseDaoImpl.java

package com.b510.hbase.util.dao.impl;


import java.io.IOException;

import java.util.List;


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.Cell;

import org.apache.hadoop.hbase.CellUtil;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.HColumnDescriptor;

import org.apache.hadoop.hbase.HTableDescriptor;

import org.apache.hadoop.hbase.MasterNotRunningException;

import org.apache.hadoop.hbase.TableName;

import org.apache.hadoop.hbase.ZooKeeperConnectionException;

import org.apache.hadoop.hbase.client.Delete;

import org.apache.hadoop.hbase.client.Get;

import org.apache.hadoop.hbase.client.HBaseAdmin;

import org.apache.hadoop.hbase.client.HTableInterface;

import org.apache.hadoop.hbase.client.HTablePool;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.client.ResultScanner;

import org.apache.hadoop.hbase.client.Scan;


import com.b510.hbase.util.dao.HbaseDao;


/**

* @author Hongten

* @created 7 Nov 2018

*/

@SuppressWarnings("deprecation")

public class HbaseDaoImpl implements HbaseDao {


   private static Configuration conf = null;

   private static HBaseAdmin hAdmin;

   private static HTablePool pool;


   private static int defaultPoolSize = 5;


   public HbaseDaoImpl(int poolSize) {

       conf = HBaseConfiguration.create();

       conf.set("hbase.zookeeper.quorum", "node1:2888,node2:2888,node3:2888");

       try {

           hAdmin = new HBaseAdmin(conf);

           // the default pool size is 5.

           pool = new HTablePool(conf, poolSize <= 0 ? defaultPoolSize : poolSize);

       } catch (MasterNotRunningException e) {

           e.printStackTrace();

       } catch (ZooKeeperConnectionException e) {

           e.printStackTrace();

       } catch (IOException e) {

           e.printStackTrace();

       }

   }


   @Override

   public HTableInterface getHTableFromPool(String tableName) {

       HTableInterface table = pool.getTable(tableName);

       return table;

   }


   @Override

   public boolean isHTableExist(String tableName) {

       try {

           return hAdmin.tableExists(tableName);

       } catch (IOException e) {

           e.printStackTrace();

       }

       return false;

   }


   @Override

   public void createHTable(String tableName, String[] columnFamilys) {

       if (!isHTableExist(tableName)) {

           HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));

           // The Hbase suggested the number of column family should be less than 3.

           // Normally, there only have 1 column family.

           for (String cfName : columnFamilys) {

               HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(cfName);

               tableDescriptor.addFamily(hColumnDescriptor);

           }

           try {

               hAdmin.createTable(tableDescriptor);

           } catch (IOException e) {

               e.printStackTrace();

           }

           System.out.println("The table [" + tableName + "]  is created.");

       } else {

           System.out.println("The table [" + tableName + "]  is existing already.");

       }


   }


   @Override

   public void addRow(String tableName, String rowKey, String columnFamily, String column, String value) {

       if (isHTableExist(tableName)) {

           HTableInterface table = getHTableFromPool(tableName);

           Put put = new Put(rowKey.getBytes());

           put.add(columnFamily.getBytes(), column.getBytes(), value.getBytes());

           try {

               table.put(put);

           } catch (IOException e) {

               e.printStackTrace();

           }

           System.out.println("Insert into table [" + tableName + "], Rowkey=[" + rowKey + "], Column=[" + columnFamily + ":" + column + "], Vlaue=[" + value + "].");

           closeTable(table);

       } else {

           System.out.println("The table [" + tableName + "] does not exist.");

       }

   }


   @Override

   public void getRow(String tableName, String rowKey) {

       if (isHTableExist(tableName)) {

           HTableInterface table = getHTableFromPool(tableName);

           Get get = new Get(rowKey.getBytes());

           Result result;

           try {

               result = table.get(get);

               String columnName = "";

               String timeStamp = "";

               String columnFamily = "";

               String value = "";

               for (Cell cell : result.rawCells()) {

                   timeStamp = String.valueOf(cell.getTimestamp());

                   columnFamily = new String(CellUtil.cloneFamily(cell));

                   columnName = new String(CellUtil.cloneQualifier(cell));

                   value = new String(CellUtil.cloneValue(cell));


                   System.out.println("Get from table [" + tableName + "], Rowkey=[" + rowKey + "], Column=[" + columnFamily + ":" + columnName + "], Timestamp=[" + timeStamp + "], Vlaue=[" + value + "].");

               }

           } catch (IOException e) {

               e.printStackTrace();

           }

           closeTable(table);

       } else {

           System.out.println("The table [" + tableName + "] does not exist.");

       }

   }


   @Override

   public void getAllRows(String tableName) {

       if (isHTableExist(tableName)) {

           Scan scan = new Scan();

           scanHTable(tableName, scan);

       } else {

           System.out.println("The table [" + tableName + "] does not exist.");

       }

   }


   private void scanHTable(String tableName, Scan scan) {

       try {

           HTableInterface table = getHTableFromPool(tableName);

           ResultScanner results = table.getScanner(scan);

           for (Result result : results) {

               String rowKey = "";

               String columnName = "";

               String timeStamp = "";

               String columnFamily = "";

               String value = "";

               for (Cell cell : result.rawCells()) {

                   rowKey = new String(CellUtil.cloneRow(cell));

                   timeStamp = String.valueOf(cell.getTimestamp());

                   columnFamily = new String(CellUtil.cloneFamily(cell));

                   columnName = new String(CellUtil.cloneQualifier(cell));

                   value = new String(CellUtil.cloneValue(cell));


                   System.out.println("Get from table [" + tableName + "], Rowkey=[" + rowKey + "], Column=[" + columnFamily + ":" + columnName + "], Timestamp=[" + timeStamp + "], Vlaue=[" + value + "].");

               }

           }

           closeTable(table);

       } catch (IOException e) {

           e.printStackTrace();

       }

   }


   @Override

   public void getRowsByRange(String tableName, String startRowKey, String endRowKey) {

       if (isHTableExist(tableName)) {

           Scan scan = new Scan();

           scan.setStartRow(startRowKey.getBytes());

           // not equals Stop Row Key, it mean the result does not include the stop row record(exclusive).

           // the hbase version is 0.98.9

           scan.setStopRow(endRowKey.getBytes());

           scanHTable(tableName, scan);

       } else {

           System.out.println("The table [" + tableName + "] does not exist.");

       }

   }


   @Override

   public void delRow(String tableName, String rowKey) {

       if (isHTableExist(tableName)) {

           HTableInterface table = getHTableFromPool(tableName);

           deleteRow(table, rowKey);

       } else {

           System.out.println("The table [" + tableName + "] does not exist.");

       }

   }


   private void deleteRow(HTableInterface table, String rowKey) {

       Delete del = new Delete(rowKey.getBytes());

       try {

           table.delete(del);

           System.out.println("Delete from table [" + new String(table.getTableName()) + "], Rowkey=[" + rowKey + "].");

           closeTable(table);

       } catch (IOException e) {

           e.printStackTrace();

       }

   }


   @Override

   public void delRowsByRowKeys(String tableName, List<String> rowKeys) {

       if (rowKeys != null && rowKeys.size() > 0) {

           for (String rowKey : rowKeys) {

               delRow(tableName, rowKey);

           }

       }

   }


   @Override

   public void deleteHTable(String tableName) {

       if (isHTableExist(tableName)) {

           try {

               hAdmin.disableTable(tableName.getBytes());

               hAdmin.deleteTable(tableName.getBytes());

               System.out.println("The table [" + tableName + "] is deleted.");

           } catch (IOException e) {

               e.printStackTrace();

           }

       } else {

           System.out.println("The table [" + tableName + "] does not exist.");

       }


   }


   @Override

   public void closeAutoFlush(HTableInterface table) {

       table.setAutoFlush(false, false);

   }


   @Override

   public void closeTable(HTableInterface table) {

       try {

           table.close();

       } catch (IOException e) {

           e.printStackTrace();

       }

   }


   @Override

   public void closePoolConnection() {

       try {

           pool.close();

       } catch (IOException e) {

           e.printStackTrace();

       }

   }


}

测试类:

/hbase-util/src/com/b510/hbase/util/dao/test/HbaseDaoTest.java

package com.b510.hbase.util.dao.test; import java.util.ArrayList; import java.util.List; import org.junit.Test; import com.b510.hbase.util.dao.HbaseDao; import com.b510.hbase.util.dao.impl.HbaseDaoImpl; /** * @author Hongten * @created 7 Nov 2018 */ public class HbaseDaoTest {    HbaseDao dao = new HbaseDaoImpl(4);    public static final String tableName = "t_test";    public static final String columnFamilyName = "cf1";    public static final String[] CFs = { columnFamilyName };    public static final String COLUMN_NAME_NAME = "name";    public static final String COLUMN_NAME_AGE = "age";    @Test    public void main() {        createTable();        addRow();        getRow();        getAllRows();        getRowsByRange();        delRow();        delRowsByRowKeys();        deleteHTable();    }    public void createTable() {        System.out.println("=== create table ====");        dao.createHTable(tableName, CFs);    }    public void addRow() {        System.out.println("=== insert record ====");        dao.addRow(tableName, "12345566", columnFamilyName, COLUMN_NAME_NAME, "Hongten");        dao.addRow(tableName, "12345566", columnFamilyName, COLUMN_NAME_AGE, "22");        dao.addRow(tableName, "12345567", columnFamilyName, COLUMN_NAME_NAME, "Tom");        dao.addRow(tableName, "12345567", columnFamilyName, COLUMN_NAME_AGE, "25");        dao.addRow(tableName, "12345568", columnFamilyName, COLUMN_NAME_NAME, "Jone");        dao.addRow(tableName, "12345568", columnFamilyName, COLUMN_NAME_AGE, "30");        dao.addRow(tableName, "12345569", columnFamilyName, COLUMN_NAME_NAME, "Jobs");        dao.addRow(tableName, "12345569", columnFamilyName, COLUMN_NAME_AGE, "24");    }    public void getRow() {        System.out.println("=== get record ====");        dao.getRow(tableName, "12345566");    }    public void getAllRows() {        System.out.println("=== scan table ====");        dao.getAllRows(tableName);    }    public void getRowsByRange() {        System.out.println("=== scan record by giving range ====");        // it will return the '12345567' and '12345568' rows.        dao.getRowsByRange(tableName, "12345567", "12345569");    }    public void delRow() {        System.out.println("=== delete record ====");        dao.delRow(tableName, "12345568");        // only '12345567' row.        getRowsByRange();    }    public void delRowsByRowKeys() {        System.out.println("=== delete batch records ====");        List<String> rowKeys = new ArrayList<String>();        rowKeys.add("12345566");        rowKeys.add("12345569");        dao.delRowsByRowKeys(tableName, rowKeys);        // can not find the '12345566' and '12345569'        getAllRows();    }    public void deleteHTable() {        System.out.println("=== delete table ====");        dao.deleteHTable(tableName);    } }


测试结果:

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. === create table ==== The table [t_test]  is created. === insert record ==== Insert into table [t_test], Rowkey=[12345566], Column=[cf1:name], Vlaue=[Hongten]. Insert into table [t_test], Rowkey=[12345566], Column=[cf1:age], Vlaue=[22]. Insert into table [t_test], Rowkey=[12345567], Column=[cf1:name], Vlaue=[Tom]. Insert into table [t_test], Rowkey=[12345567], Column=[cf1:age], Vlaue=[25]. Insert into table [t_test], Rowkey=[12345568], Column=[cf1:name], Vlaue=[Jone]. Insert into table [t_test], Rowkey=[12345568], Column=[cf1:age], Vlaue=[30]. Insert into table [t_test], Rowkey=[12345569], Column=[cf1:name], Vlaue=[Jobs]. Insert into table [t_test], Rowkey=[12345569], Column=[cf1:age], Vlaue=[24]. === get record ==== Get from table [t_test], Rowkey=[12345566], Column=[cf1:age], Timestamp=[1541652952697], Vlaue=[22]. Get from table [t_test], Rowkey=[12345566], Column=[cf1:name], Timestamp=[1541652952626], Vlaue=[Hongten]. === scan table ==== Get from table [t_test], Rowkey=[12345566], Column=[cf1:age], Timestamp=[1541652952697], Vlaue=[22]. Get from table [t_test], Rowkey=[12345566], Column=[cf1:name], Timestamp=[1541652952626], Vlaue=[Hongten]. Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25]. Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom]. Get from table [t_test], Rowkey=[12345568], Column=[cf1:age], Timestamp=[1541652952834], Vlaue=[30]. Get from table [t_test], Rowkey=[12345568], Column=[cf1:name], Timestamp=[1541652952807], Vlaue=[Jone]. Get from table [t_test], Rowkey=[12345569], Column=[cf1:age], Timestamp=[1541652952928], Vlaue=[24]. Get from table [t_test], Rowkey=[12345569], Column=[cf1:name], Timestamp=[1541652952869], Vlaue=[Jobs]. === scan record by giving range ==== Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25]. Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom]. Get from table [t_test], Rowkey=[12345568], Column=[cf1:age], Timestamp=[1541652952834], Vlaue=[30]. Get from table [t_test], Rowkey=[12345568], Column=[cf1:name], Timestamp=[1541652952807], Vlaue=[Jone]. === delete record ==== Delete from table [t_test], Rowkey=[12345568]. === scan record by giving range ==== Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25]. Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom]. === delete batch records ==== Delete from table [t_test], Rowkey=[12345566]. Delete from table [t_test], Rowkey=[12345569]. === scan table ==== Get from table [t_test], Rowkey=[12345567], Column=[cf1:age], Timestamp=[1541652952779], Vlaue=[25]. Get from table [t_test], Rowkey=[12345567], Column=[cf1:name], Timestamp=[1541652952743], Vlaue=[Tom]. === delete table ==== The table [t_test] is deleted.



以上是关于hadoop2-HBase的Java API操作的主要内容,如果未能解决你的问题,请参考以下文章

我的Android进阶之旅OKHttp出现错误 java.lang.IllegalStateException: Expected Android API level 21+ but was 19(代

转android出现注: 某些输入文件使用或覆盖了已过时的 API。 注: 有关详细信息, 请使用 -Xlint:deprecation 重新编译。 注: 某些输入文件使用了未经检查或不安全的操作(代

《深入理解java虚拟机》笔记JVM调优(分代垃圾收集器)

replace File.separator出现异常:java.lang.IllegalArgumentException: character to be escaped is missing((代

replace File.separator出现异常:java.lang.IllegalArgumentException: character to be escaped is missing((代

Hibernate框架入门