java连接hbase

Posted 超*

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java连接hbase相关的知识,希望对你有一定的参考价值。

现在有的新版本hbase在网上搜的java连接hbase代码用不了或者是idea上已经不推荐用(划横线的),我也搜了好多乱糟糟的,经过本人理解再经过整理,代码如下:

另外提示:连接配置中IP也可以写虚拟机的主机名,只需要改下映射即可。(C:\\Windows\\System32\\drivers\\etc)更改hosts文件(可能需要权限或更改不了,可以将此文件拉到桌面更改保存了再拉回去),在hosts文件下添加         IP hostname(中间有空格!!!)

另外我整理的以下代码hbase1-3版本应该都适用!!!(本人亲测过hbase-1.7.1,hbase-2.4.13,hbase-3.0.0-alpha-3)

pom.xml文件中添加

 <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.4.13</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase</artifactId>
            <version>2.4.13</version>
            <type>pom</type>
        </dependency>
    </dependencies>
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.util.Bytes;


import java.io.IOException;

import java.util.*;


public class zsgcHbase 
    public static void main(String[] args) throws IOException 
//        tj();
//        cx();
//        versioncx();
//        sc();
//        jb();
//        fwcx();
//        strRow();
//        filterRow();
        valueCol();
    
  //连接配置
    public static Connection pz() throws IOException 
        //获取配置
        Configuration conf = HBaseConfiguration.create();
        //指定HBase使用的zookeeper地址,多个用逗号隔开
        conf.set("hbase.zookeeper.quorum", "172.20.10.250:2181,172.20.10.251:2181,172.20.10.252:2181");
        //创建HBase连接,负责对HBase中数据的一些增删改查操作
        Connection conn = ConnectionFactory.createConnection(conf);
        return conn;
    

 //添加数据和更新数据
    public static void tj() throws IOException 
        Connection conn = pz();
        //获取Table对象,指定要操作的表名(这里需要提前创建好)
        Table table = conn.getTable(TableName.valueOf("test"));
        //指定Rowkey,返回put对象      这里进行批量插入数据再声明一个Put对象再添加一次put对象(也可以声明一个List<Put>集合添加list对象)
        Put put = new Put(Bytes.toBytes("001"));
        //向put对象中指定列族、列、值
        put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes("zhujuntao"));
        put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("sex"),Bytes.toBytes("man"));
        put.addColumn(Bytes.toBytes("level"),Bytes.toBytes("course"),Bytes.toBytes("qq"));
        //向表中添加数据
        table.put(put);
        //关闭table连接
        table.close();
    

  //查询根据rowkey查询数据
    public static void cx() throws IOException 
        Connection conn = pz();
        Table table = conn.getTable(TableName.valueOf("test"));
        //指定Rowkey,返回Get对象
        Get get = new Get(Bytes.toBytes("001"));
        //可以在这里指定要查询指定Rowkey数据哪些列族中的列
        // 如果不指定,默认查询指定Rowkey所有列的内容
        //get.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"));
        //get.addColumn(Bytes.toBytes("info"),Bytes.toBytes("sex"));

        Result result = table.get(get);
        //可以使用listCells()获取所有cell,cell对应的是某一个列的数据
        List<Cell> cells = result.listCells();
        for (Cell cell: cells) 
            //这里获取的信息都是字节类型的,可以通过new String(bytes)转为字符串
            //列族
            byte[] famaily_bytes = CellUtil.cloneFamily(cell);
            //列
            byte[] column_bytes = CellUtil.cloneQualifier(cell);
            //值
            byte[] value_bytes = CellUtil.cloneValue(cell);
            System.out.println("列族:"+new String(famaily_bytes)+",列:"+new String(column_bytes)+",值:"+new String(value_bytes));
        
        //这里还可以直接使用getValue(family, qualifier)直接获取指定列族中指定列的数据
        //byte[] age_bytes = result.getValue(Bytes.toBytes("info"),Bytes.toBytes("age"));
        table.close();
    
//查询列值历史版本
    public static void versioncx() throws IOException 
        Connection conn = pz();
        Table table = conn.getTable(TableName.valueOf("test"));
        Get get = new Get(Bytes.toBytes("001"));
        //这里读取cell中的所有历史版本数据
        //可以通过get.readVersions(2)来指定获取多少个历史版本的数据
        get.readAllVersions();

        Result result = table.get(get);

        //获取指定列族中指定列的所有历史版本数据,必须设置get.readAllVersions()或者get.readVersions(2),否则只会获取最新数据
        List<Cell> columnCells = result.getColumnCells(Bytes.toBytes("info"), Bytes.toBytes("name"));
        for (Cell cell :columnCells) 
            byte[] value_bytes = CellUtil.cloneValue(cell);
            long timestamp = cell.getTimestamp();
            System.out.println("值为:"+new String(value_bytes)+",时间戳:"+timestamp);
        
        table.close();
    
 //根据rowkey删除数据
    public static void sc() throws IOException 
        Connection conn = pz();
        Table table = conn.getTable(TableName.valueOf("ideatest"));
        //指定Rowkey,返回Delete对象
        Delete delete = new Delete(Bytes.toBytes("laowang"));
        //这里还可以指定要删除指定Rowkey数据哪些列族中的列
        //delete.addColumn(Bytes.toBytes("info"),Bytes.toBytes("age"));

        table.delete(delete);
        table.close();
    
 //创建表
    public static void jb() throws IOException 
        Connection conn = pz();
        //获取管理权限,负责对HBase中的表进行操作
        Admin admin = conn.getAdmin();
        //指定列族信息
        ColumnFamilyDescriptor familyDesc1 = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("info"))
                //给列族设置属性
                .setMaxVersions(3)//这个是最多存储多少个历史版本数据
                .build();
        ColumnFamilyDescriptor familyDesc2 = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("level"))
                .setMaxVersions(2)
                .build();
        ArrayList<ColumnFamilyDescriptor> familyList = new ArrayList<ColumnFamilyDescriptor>();
        familyList.add(familyDesc1);
        familyList.add(familyDesc2);
        //获取TableDescriptor对象
        TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf("test"))//指定表名
                .setColumnFamilies(familyList)//指定列族
                .build();
        //创建表
        admin.createTable(desc);
    
    //删除表,先禁用表
    //admin.disableTable(TableName.valueOf("test"));
    //admin.deleteTable(TableName.valueOf("test"));
 //Limit查询前5行
    public static void fwcx() throws IOException 
        Connection conn = pz();
        Table tableRead = conn.getTable(TableName.valueOf("ideatest"));
        //获取Scan对象
        Scan scan = new Scan();
        //同startrow
        //scan.withStartRow(Bytes.toBytes("000"));
        //limit
        scan.setLimit(5);
        //接收scan对象
        ResultScanner scanner = tableRead.getScanner(scan);
        //这里我试了很久,一开始使用上面查询的遍历查询方法结果只能出来第一条后来经过思考就写成以下的方法就行了
        for (Result rs : scanner) 
                System.out.println("行键:"+new String(rs.getRow()));
                Get get = new Get(rs.getRow());
                Result result = tableRead.get(get);
                List<Cell> cells = result.listCells();
                for (Cell cell: cells) 
                byte[] famaily_bytes = CellUtil.cloneFamily(cell);
                byte[] column_bytes = CellUtil.cloneQualifier(cell);
                byte[] value_bytes = CellUtil.cloneValue(cell);
                System.out.println("列族:"+new String(famaily_bytes)+",列:"+new String(column_bytes)+",值:"+new String(value_bytes));
            
        
        scanner.close();
        tableRead.close();
    
  //分页查询,其实跟上边limit一样(上面有提示)
    public static void strRow() throws IOException 
        Connection conn = pz();
        Table tableRead = conn.getTable(TableName.valueOf("ideatest"));
        Scan scan = new Scan();
       scan.withStartRow(Bytes.toBytes("002"),false);   //这里false表示不包含002,默认为true
       scan.withStopRow(Bytes.toBytes("003"),true);
        ResultScanner scanner = tableRead.getScanner(scan);
        for (Result rs : scanner) 
            System.out.println("行键:"+new String(rs.getRow()));
            Get get = new Get(rs.getRow());
            Result result = tableRead.get(get);
            List<Cell> cells = result.listCells();
            for (Cell cell: cells) 
                byte[] famaily_bytes = CellUtil.cloneFamily(cell);
                byte[] column_bytes = CellUtil.cloneQualifier(cell);
                byte[] value_bytes = CellUtil.cloneValue(cell);
                System.out.println("列族:"+new String(famaily_bytes)+",列:"+new String(column_bytes)+",值:"+new String(value_bytes));
            
        
        scanner.close();
        tableRead.close();
    
   //查询rowkey中包含特定前缀的数据
    public static void filterRow() throws IOException 
        Connection conn = pz();
        Table tableRead = conn.getTable(TableName.valueOf("ideatest"));
        Scan scan = new Scan();
        //CompareOperator(EQUAL等于,CREATER大于,CREATE_OR_EQUAL大于或等于,LESS小于,LESS_OR_EQUAL大于或等于,NO_OP误操作,NOT_EQUAL不等于)
        RowFilter filter = new RowFilter(CompareOperator.EQUAL,new SubstringComparator("1"));   //CompareFilter.CompareOp.EQUAL已被弃用,改用CompareOperator.EQUAL
        /*
        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);    用于存储多个条件
        FilterList.Operator.MUST_PASS_ALL --> 取交集 相当一and操作
        FilterList.Operator.MUST_PASS_ONE --> 取并集 相当于or 操作
         */
        scan.setFilter(filter);
        ResultScanner scanner = tableRead.getScanner(scan);
        for (Result rs : scanner) 
            System.out.println("行键:"+new String(rs.getRow()));
            Get get = new Get(rs.getRow());
            Result result = tableRead.get(get);
            List<Cell> cells = result.listCells();
            for (Cell cell: cells) 
                byte[] famaily_bytes = CellUtil.cloneFamily(cell);
                byte[] column_bytes = CellUtil.cloneQualifier(cell);
                byte[] value_bytes = CellUtil.cloneValue(cell);
                System.out.println("列族:"+new String(famaily_bytes)+",列:"+new String(column_bytes)+",值:"+new String(value_bytes));
            
        
        scanner.close();
        tableRead.close();
    
 //查询行键前缀为0的行    PrefixFilter
    public static void valueCol() throws IOException 
        Connection conn = pz();
        Table tableRead = conn.getTable(TableName.valueOf("ideatest"));
        Scan scan = new Scan();
        scan.setRowPrefixFilter(Bytes.toBytes("0"));
        ResultScanner scanner = tableRead.getScanner(scan);
        for (Result rs : scanner) 
            System.out.println("行键:"+new String(rs.getRow()));
            Get get = new Get(rs.getRow());
            Result result = tableRead.get(get);
            List<Cell> cells = result.listCells();
            for (Cell cell: cells) 
                byte[] famaily_bytes = CellUtil.cloneFamily(cell);
                byte[] column_bytes = CellUtil.cloneQualifier(cell);
                byte[] value_bytes = CellUtil.cloneValue(cell);
                System.out.println("列族:"+new String(famaily_bytes)+",列:"+new String(column_bytes)+",值:"+new String(value_bytes));
            
        
        scanner.close();
        tableRead.close();
    

点赞关注,持续更新

以上是关于java连接hbase的主要内容,如果未能解决你的问题,请参考以下文章

hbase单机模式下,使用java API远程连接hbase的问题。

java 连接hbase

Java 连接 HBase ( kerberized 集群 )

Java连接Hbase异常

java连接Hbase数据库

java连接hbase