java操作HBase模板

Posted Z-hhhhh

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java操作HBase模板相关的知识,希望对你有一定的参考价值。

使用idea建立quickstart模板的Maven工程

自动导包后,删除不需要的test

ps:把工程的三个地方的jdk统一成1.8版本

在pom.xml里添加如下依赖:

 <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-client</artifactId>
      <version>2.3.5</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>3.1.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-auth</artifactId>
      <version>3.1.3</version>
    </dependency>

在java同一层新建resources,并且添加log4j.properties文件。

log4j的标准版如下:

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=log/hd.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

接下来是Java操作hbase的核心代码

package cn.kgc.base;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;

public class JavaToHBase 
    private static Configuration config = null;

    public static void init(String... items) 
        config = HBaseConfiguration.create();
        for (String item : items) 
            String[] ps = item.split("=");
            config.set(ps[0], ps[1]);
        
    

    private static void close(AutoCloseable... closes) 
        for (AutoCloseable close : closes) 
            if (null != close) 
                try 
                    close.close();
                 catch (Exception e) 
                    e.printStackTrace();
                
            

        
    

    private static Connection con() throws IOException 
        return ConnectionFactory.createConnection(config);
    

    private static Admin admin(Connection con) throws IOException 
        return con.getAdmin();
    

    private static boolean nameSpaceExists(String nameSpace, String[] nss) 
        for (String ns : nss) 
            if (nameSpace.equals(ns)) 
                return true;
            
        
        return false;
    

    public static void createNameSpace(String nameSpace) 
        Connection con = null;
        Admin admin = null;
        try 
            admin = admin(con = con());
            if (nameSpaceExists(nameSpace, admin.listNamespaces())) 
                throw new IOException("NAMESPACE [" + nameSpace + "] created in failure for existence");
            
            admin.createNamespace(NamespaceDescriptor.create(nameSpace).build());
            System.out.println("NAMESPACE [" + nameSpace + "] created in success");
         catch (IOException e) 
            e.printStackTrace();
            System.out.println("NAMESPACE [" + nameSpace + "] created in failure");
         finally 
            close(admin, con);
        
    

    //删除命名空间
    public static void dropNameSpace(String nameSpace) 
        Connection con = null;
        Admin admin = null;
        try 
            admin = admin(con = con());
            //命名空间存在
            if (nameSpaceExists(nameSpace, admin.listNamespaces())) 
                TableName[] tableNames = admin.listTableNamesByNamespace(nameSpace);
                for (TableName tableName : tableNames) 
                    if (admin.tableExists(tableName)) 
                        dropTable(String.valueOf(tableName));
                    
                
                //删除表
                admin.deleteNamespace(nameSpace);
            
            System.out.println("namespace [" + nameSpace + "] has been deleted");
         catch (IOException e) 
            e.printStackTrace();
         finally 
            close(admin, con);
        
    

    public static void createTable(String tableName, String columnFamily, String... columnFamilies) 
        Connection con = null;
        Admin admin = null;
        try 
            admin = admin(con = con());
            TableName tn = TableName.valueOf(tableName);
            if (admin.tableExists(tn)) 
                throw new IOException("table [" + tableName + "] created in failure for exitstence");
            
            //根据表明创建表 表描述构造器
            TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tn);
            //创建列簇集合
            List<ColumnFamilyDescriptor> list = new ArrayList<>();
            list.add(ColumnFamilyDescriptorBuilder.of(columnFamily));
            for (String family : columnFamilies) 
                list.add(ColumnFamilyDescriptorBuilder.of(family));
            
            //向表描述器中添加列簇
            builder.setColumnFamilies(list);

            admin.createTable(builder.build());

            System.out.println("table [" + tableName + "] created in success");
         catch (IOException e) 
            e.printStackTrace();
            System.out.println("table [" + tableName + "] created in fuilure");
         finally 
            close(admin, con);
        
    

    public static void dropTable(String tableName) 
        Connection con = null;
        Admin admin = null;
        try 
            admin = admin(con = con());
            TableName tn = TableName.valueOf(tableName);
            if (!admin.tableExists(tn)) 
                throw new IOException("table [" + tableName + "] dropped in failure for absence");
            
            if (admin.isTableEnabled(tn)) 
                admin.disableTable(tn);
                System.out.println("table [" + tableName + "] is enabled and disabled in success");
            
            admin.deleteTable(tn);
            System.out.println("table [" + tableName + "] dropped in success");
         catch (IOException e) 
            e.printStackTrace();
         finally 
            close(admin, con);
        
    

    private static boolean tableExists(Connection con, TableName tableName) 
        Admin admin = null;
        try 
            admin = admin(con);
            return admin.tableExists(tableName);
         catch (IOException e) 
            e.printStackTrace();
            return false;
         finally 
            close(admin);
        
    

    public static void put(String tableName, String rowKey, String family, String column, String value) 
        String msg = "put [" + rowKey + "=>" + column + "=> value" + value + "] into table [" + tableName + "]";
        TableName tn = TableName.valueOf(tableName);
        Connection con = null;
        Table table = null;
        try 
            con = con();
            if (!tableExists(con, tn)) 
                throw new IOException("table [" + tableName + "] not exists error");
            
            table = con.getTable(tn);
            //构造带有行间的put对象
            Put put = new Put(Bytes.toBytes(rowKey));
            put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
            table.put(put);
            System.out.println(msg + "in success");
         catch (IOException e) 
            e.printStackTrace();
            System.err.println(msg + "in failure");
         finally 
            close(table, con);
        
    


    /**
     * 将file路径指向的文件数据映射至hbase
     * 文件名即表名,为了防止命名冲突:tablename_timestamp
     * 文件首行为表结构:    :key,cf:col,...
     * @param file
     */
    public static void putBatch(String file,String regexSep)
        File data = new File(file);
        Connection con = null;
        BufferedMutator mutator = null;
        BufferedReader br = null;
        try 
            if(!data.exists() || !data.isFile())
                throw new IOException(file + "not exist or not file error");
            
            //解析hbase表名
            String[] ns = data.getName().split("_|\\\\.");
            String tableName = ns[0]+":"+ns[1];
            TableName tn = TableName.valueOf(tableName);
            con = con();
            if(!tableExists(con,tn))
                throw new IOException("hbase table [ "+tableName+" ] not exist error");
            
            br = new BufferedReader(new FileReader(data));
            String line  = null;
            if(null == (line=br.readLine()))
                throw new IOException("file [ "+file + " ] empty error");
            
            String[] ps = line.split(regexSep);

            //创建批量插入异常侦听
            DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
            BufferedMutator.ExceptionListener listener = (e,_mutator)->
                System.err.println("put data into table [ "+tableName+" ] error "
                        +e.getNumExceptions()+" rows, retry put at "+dtf.format(LocalDateTime.now()));
                int count = 0;
                for (int i = 0; i < e.getNumExceptions(); i++) 
                    Row row = e.getRow(i);
                    try 
                        _mutator.mutate((Put)row);
                        count++;
                     catch (IOException ex) 
                        ex.printStackTrace();
                        System.err.println("retry put "+row+" error,please check it");
                    
                
                System.err.println("retry put data into table [ "+tableName+" ] from error total"
                        +e.getNumExceptions()+" rows, finish "+count+" rows ,at "+dtf.format(LocalDateTime.now()));
            ;

            BufferedMutatorParams bmp = new BufferedMutatorParams(tn)
                    .writeBufferSize(8*1024*1024).listener(listener);
            mutator = con.getBufferedMutator(bmp);

            int count = 0,CAPACITY = 1000;
            List<Put> list = new ArrayList<>(CAPACITY);
            Put put = null;
            while (null!=(line=br.readLine()))
                String[] arr = line.split(regexSep);
                put = new Put(Bytes.toBytes(arr[0]));
                for (int i = 1; i < arr.length ; i++) 
                    String[] ts = ps[i].split(":");
                    put.addColumn(Bytes.toBytes(ts[0]),Bytes.toBytes(ts[1]),Bytes.toBytes(arr[i]));
                
                list.add(put);
                if(list.size()==CAPACITY)
                    mutator.mutate(list);
                    count+=list.size();
                    list.clear();
                
            
            mutator.mutate(list);
            count+=list.size();
            list.clear();

            System.err.println("batch put into [ "+tableName+" , "+count+" rows ] from [ "+file+" ] in success");
        catch (IOException e)
            e.printStackTrace();
            System.err.println("batch put from [ "+file+" ] in failure");
        finally 
            close(br,mutator,con);
        
    


    public static void main(String[] args) 
        init("hbase.zookeeper.quorum=singlenick");
//        createNameSpace("dsj");
//        createTable("dsj:test","cf1","cf2","cf3");
//        dropTable("dsj:test");
//        put("dsj:test","0000002","cf1","gfName","angela");
        putBatch("D:\\\\javaproject\\\\hadoop\\\\data\\\\dsj_test_1624594633587", ",");
    

测试代码如下

 //配置=后面写hostname 或写ip 
			init("hbase.zookeeper.quorum=singlenick");
//        createNameSpace("zwh");
//			dropNameSpace("zwh");
//        createTable("zwh:test","cf1","cf2","cf3");
//        dropTable("zwh:test");
//        put("zwh:test","0000002","cf1","gfName","angela");
//		数据路径,和分隔符
//        putBatch("D:\\\\javaproject\\\\hadoop\\\\data\\\\dsj_test_1624594633587", ",");

以上是关于java操作HBase模板的主要内容,如果未能解决你的问题,请参考以下文章

Java读取Hbase数据,使用POI操作Excel模板,并定时发送带有Excel附件的邮件

HBase Java 操作 HBase 教程

HBase实战:使用JAVA操作分布式集群HBASE

Java 操作 HBase 教程

HBase的简单java操作

大数据学习笔记:利用JAVA项目操作HBase