java操作HBase模板
Posted Z-hhhhh
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java操作HBase模板相关的知识,希望对你有一定的参考价值。
使用idea建立quickstart模板的Maven工程
自动导包后,删除不需要的test
ps:把工程的三个地方的jdk统一成1.8版本
在pom.xml里添加如下依赖:
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.3.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>3.1.3</version>
</dependency>
在java同一层新建resources,并且添加log4j.properties文件。
log4j的标准版如下:
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=log/hd.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
接下来是Java操作hbase的核心代码
package cn.kgc.base;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
public class JavaToHBase {
private static Configuration config = null;
public static void init(String... items) {
config = HBaseConfiguration.create();
for (String item : items) {
String[] ps = item.split("=");
config.set(ps[0], ps[1]);
}
}
private static void close(AutoCloseable... closes) {
for (AutoCloseable close : closes) {
if (null != close) {
try {
close.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
private static Connection con() throws IOException {
return ConnectionFactory.createConnection(config);
}
private static Admin admin(Connection con) throws IOException {
return con.getAdmin();
}
private static boolean nameSpaceExists(String nameSpace, String[] nss) {
for (String ns : nss) {
if (nameSpace.equals(ns)) {
return true;
}
}
return false;
}
public static void createNameSpace(String nameSpace) {
Connection con = null;
Admin admin = null;
try {
admin = admin(con = con());
if (nameSpaceExists(nameSpace, admin.listNamespaces())) {
throw new IOException("NAMESPACE [" + nameSpace + "] created in failure for existence");
}
admin.createNamespace(NamespaceDescriptor.create(nameSpace).build());
System.out.println("NAMESPACE [" + nameSpace + "] created in success");
} catch (IOException e) {
e.printStackTrace();
System.out.println("NAMESPACE [" + nameSpace + "] created in failure");
} finally {
close(admin, con);
}
}
//删除命名空间
public static void dropNameSpace(String nameSpace) {
Connection con = null;
Admin admin = null;
try {
admin = admin(con = con());
//命名空间存在
if (nameSpaceExists(nameSpace, admin.listNamespaces())) {
TableName[] tableNames = admin.listTableNamesByNamespace(nameSpace);
for (TableName tableName : tableNames) {
if (admin.tableExists(tableName)) {
dropTable(String.valueOf(tableName));
}
}
//删除表
admin.deleteNamespace(nameSpace);
}
System.out.println("namespace [" + nameSpace + "] has been deleted");
} catch (IOException e) {
e.printStackTrace();
} finally {
close(admin, con);
}
}
public static void createTable(String tableName, String columnFamily, String... columnFamilies) {
Connection con = null;
Admin admin = null;
try {
admin = admin(con = con());
TableName tn = TableName.valueOf(tableName);
if (admin.tableExists(tn)) {
throw new IOException("table [" + tableName + "] created in failure for exitstence");
}
//根据表明创建表 表描述构造器
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tn);
//创建列簇集合
List<ColumnFamilyDescriptor> list = new ArrayList<>();
list.add(ColumnFamilyDescriptorBuilder.of(columnFamily));
for (String family : columnFamilies) {
list.add(ColumnFamilyDescriptorBuilder.of(family));
}
//向表描述器中添加列簇
builder.setColumnFamilies(list);
admin.createTable(builder.build());
System.out.println("table [" + tableName + "] created in success");
} catch (IOException e) {
e.printStackTrace();
System.out.println("table [" + tableName + "] created in fuilure");
} finally {
close(admin, con);
}
}
public static void dropTable(String tableName) {
Connection con = null;
Admin admin = null;
try {
admin = admin(con = con());
TableName tn = TableName.valueOf(tableName);
if (!admin.tableExists(tn)) {
throw new IOException("table [" + tableName + "] dropped in failure for absence");
}
if (admin.isTableEnabled(tn)) {
admin.disableTable(tn);
System.out.println("table [" + tableName + "] is enabled and disabled in success");
}
admin.deleteTable(tn);
System.out.println("table [" + tableName + "] dropped in success");
} catch (IOException e) {
e.printStackTrace();
} finally {
close(admin, con);
}
}
private static boolean tableExists(Connection con, TableName tableName) {
Admin admin = null;
try {
admin = admin(con);
return admin.tableExists(tableName);
} catch (IOException e) {
e.printStackTrace();
return false;
} finally {
close(admin);
}
}
public static void put(String tableName, String rowKey, String family, String column, String value) {
String msg = "put [" + rowKey + "=>" + column + "=> value" + value + "] into table [" + tableName + "]";
TableName tn = TableName.valueOf(tableName);
Connection con = null;
Table table = null;
try {
con = con();
if (!tableExists(con, tn)) {
throw new IOException("table [" + tableName + "] not exists error");
}
table = con.getTable(tn);
//构造带有行间的put对象
Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(family), Bytes.toBytes(column), Bytes.toBytes(value));
table.put(put);
System.out.println(msg + "in success");
} catch (IOException e) {
e.printStackTrace();
System.err.println(msg + "in failure");
} finally {
close(table, con);
}
}
/**
* 将file路径指向的文件数据映射至hbase
* 文件名即表名,为了防止命名冲突:tablename_timestamp
* 文件首行为表结构: :key,cf:col,...
* @param file
*/
public static void putBatch(String file,String regexSep){
File data = new File(file);
Connection con = null;
BufferedMutator mutator = null;
BufferedReader br = null;
try {
if(!data.exists() || !data.isFile()){
throw new IOException(file + "not exist or not file error");
}
//解析hbase表名
String[] ns = data.getName().split("_|\\\\.");
String tableName = ns[0]+":"+ns[1];
TableName tn = TableName.valueOf(tableName);
con = con();
if(!tableExists(con,tn)){
throw new IOException("hbase table [ "+tableName+" ] not exist error");
}
br = new BufferedReader(new FileReader(data));
String line = null;
if(null == (line=br.readLine())){
throw new IOException("file [ "+file + " ] empty error");
}
String[] ps = line.split(regexSep);
//创建批量插入异常侦听
DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
BufferedMutator.ExceptionListener listener = (e,_mutator)->{
System.err.println("put data into table [ "+tableName+" ] error "
+e.getNumExceptions()+" rows, retry put at "+dtf.format(LocalDateTime.now()));
int count = 0;
for (int i = 0; i < e.getNumExceptions(); i++) {
Row row = e.getRow(i);
try {
_mutator.mutate((Put)row);
count++;
} catch (IOException ex) {
ex.printStackTrace();
System.err.println("retry put "+row+" error,please check it");
}
}
System.err.println("retry put data into table [ "+tableName+" ] from error total"
+e.getNumExceptions()+" rows, finish "+count+" rows ,at "+dtf.format(LocalDateTime.now()));
};
BufferedMutatorParams bmp = new BufferedMutatorParams(tn)
.writeBufferSize(8*1024*1024).listener(listener);
mutator = con.getBufferedMutator(bmp);
int count = 0,CAPACITY = 1000;
List<Put> list = new ArrayList<>(CAPACITY);
Put put = null;
while (null!=(line=br.readLine())){
String[] arr = line.split(regexSep);
put = new Put(Bytes.toBytes(arr[0]));
for (int i = 1; i < arr.length ; i++) {
String[] ts = ps[i].split(":");
put.addColumn(Bytes.toBytes(ts[0]),Bytes.toBytes(ts[1]),Bytes.toBytes(arr[i]));
}
list.add(put);
if(list.size()==CAPACITY){
mutator.mutate(list);
count+=list.size();
list.clear();
}
}
mutator.mutate(list);
count+=list.size();
list.clear();
System.err.println("batch put into [ "+tableName+" , "+count+" rows ] from [ "+file+" ] in success");
}catch (IOException e){
e.printStackTrace();
System.err.println("batch put from [ "+file+" ] in failure");
}finally {
close(br,mutator,con);
}
}
public static void main(String[] args) {
init("hbase.zookeeper.quorum=singlenick");
// createNameSpace("dsj");
// createTable("dsj:test","cf1","cf2","cf3");
// dropTable("dsj:test");
// put("dsj:test","0000002","cf1","gfName","angela");
putBatch("D:\\\\javaproject\\\\hadoop\\\\data\\\\dsj_test_1624594633587", ",");
}
}
测试代码如下
//配置=后面写hostname 或写ip
init("hbase.zookeeper.quorum=singlenick");
// createNameSpace("zwh");
// dropNameSpace("zwh");
// createTable("zwh:test","cf1","cf2","cf3");
// dropTable("zwh:test");
// put("zwh:test","0000002","cf1","gfName","angela");
// 数据路径,和分隔符
// putBatch("D:\\\\javaproject\\\\hadoop\\\\data\\\\dsj_test_1624594633587", ",");
以上是关于java操作HBase模板的主要内容,如果未能解决你的问题,请参考以下文章