JavaAPI操作hbase
Posted kevin&learn
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JavaAPI操作hbase相关的知识,希望对你有一定的参考价值。
JavaAPI操作Hbase
准备工作:
- 确保集群各节点服务运行正常
- 确保zookeeper可以正常工作
- 已经开始hbase-master,hbase-regionserver
- 环境:windows7下eclipse,集群正常工作。
- 由于我们是在windows7下运行,所以我们将服务器中的hadoop程序拷贝到本地。
- 在系统的环境变量中设置HADOOP_HOME,并且将%HADOOP_HOME%/bin添加到path当中。
- 接下来在运行中,执行程序的时候eclipse会报错说找不到winutils.exe。所以我们还要下载一个winutils.exe,我这里提供一个github上的下载链接https://github.com/srccodes/hadoop-common-2.2.0-bin,下载以后解压到bin下即可。(版本虽然很旧,但是可以使用)
连接hbase并进行简单操作:
- 创建JavaProject,导入需要的jar包,jar包来自于服务器上hbase中的lib文件夹下的jar文件,所以将lib直接拷贝到当前工程中,并build path(导入这些额外的包)。
- 同时拷贝hbase下的log4j.properties文件到项目中,在执行过程中可以查看到执行过程中产生的日志。
- 创建连接hbase需要的配置信息
- Java客户端其实就是shell客户端的一种实现,操作命令基本上就是shell客户端命令的一个映射。
- Java客户端使用的配置信息是被映射到了HbaseConfiguration的实例对象中的,使用create方法创建实例化对象的时候,会从classpath中获取hbase-site.xml文件并进行配置文件内容的读取。同时也会读取hadoop的配置文件信息。这里我们给定zookeeper的相关配置信息即可。
- 流程:先通过zookeeper拿到hbase:namespace的路径,然后从这个路径中拿到hbase:meta表的信息,接着就拿到了用户表的路径
- 代码实现如下
1 package com.hblink.demo; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.hbase.HBaseConfiguration; 7 8 public class Hblink { 9 /** 10 * 获取hbase的配置文件信息 11 * 12 * @return 13 * @throws IOException 14 */ 15 public static Configuration getHBaseConfiguration() throws IOException { 16 Configuration conf = HBaseConfiguration.create(); 17 // zookeeper的配置信息 18 conf.set("hbase.zookeeper.quorum", "kslave5,kslave6,kslave7");// zookeeper节点信息 19 conf.set("hbase.zookeeper.property.clientPort", "2181");// zookeeper端口 20 // conf.set("dfs.socket.timeout", "180000"); 21 return conf; 22 } 23 }
- 有了配置信息以后,我们开始通过配置信息连接hbase
HBaseAdmin类:是主要进行DDL操作相关的一个接口类,主要包括命名空间管理,用户表管理。通过该接口我们可以创建、删除、获取用户表,也可以进行用户表的分割,紧缩等操作。
HTable类:是hbase中的用户表的一个映射的java实例,通过该类进行表数据的操作,包括数据的增删改查,也就是在这里我们可以类似shell中put,get和sacn进行数据的操作。HTableDescriptor类:是hbase用户表的具体描述信息类,一般我们创建表获取表信息,就是通过该类进行的。
1 package com.hblink.test; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.hbase.HColumnDescriptor; 7 import org.apache.hadoop.hbase.HTableDescriptor; 8 import org.apache.hadoop.hbase.TableName; 9 import org.apache.hadoop.hbase.client.HBaseAdmin; 10 11 import com.hblink.demo.Hblink; 12 13 public class HbTest { 14 public static void main(String[] args) throws Exception { 15 Configuration configuration = Hblink.getHBaseConfiguration(); 16 HBaseAdmin hBaseAdmin = new HBaseAdmin(configuration); 17 try { 18 createTestTable(hBaseAdmin); 19 } finally { 20 hBaseAdmin.close(); // 资源释放 21 } 22 } 23 24 /** 25 * 测试创建表table 26 * 27 * @throws IOException 28 */ 29 static void createTestTable(HBaseAdmin hbAdmin) throws IOException { 30 TableName tableName = TableName.valueOf("stock-info"); // 创建表名 31 HTableDescriptor hDescriptor = new HTableDescriptor(tableName); 32 hDescriptor.addFamily(new HColumnDescriptor("f"));// 给定列族 33 hbAdmin.createTable(hDescriptor); 34 System.out.println("创建表成功!"); 35 } 36 }
- 接下来是数据插入
1 package com.hblink.test; 2 3 import java.io.BufferedReader; 4 import java.io.File; 5 import java.io.FileInputStream; 6 import java.io.FileNotFoundException; 7 import java.io.IOException; 8 import java.io.InputStreamReader; 9 10 import org.apache.hadoop.conf.Configuration; 11 import org.apache.hadoop.hbase.client.HTable; 12 import org.apache.hadoop.hbase.client.Put; 13 import org.apache.hadoop.hbase.util.Bytes; 14 15 import com.hblink.demo.Hblink; 16 17 public class TableTest { 18 public static int count = 0; 19 20 public static void main(String[] args) throws IOException { 21 22 HTable hTable = null; 23 Configuration configuration = Hblink.getHBaseConfiguration(); 24 25 hTable = new HTable(configuration, "stock-info"); 26 27 testPut(hTable);//插入数据 28 29 hTable.close();//释放资源 30 31 } 32 33 /** 34 * 测试往表里插入数据 35 * 36 * @param hTable 37 * @throws IOException 38 */ 39 static void testPut(HTable hTable) throws IOException { 40 41 File file = new File("./20171120sh.txt"); //获取本地文件 42 InputStreamReader isr = null; 43 try { 44 isr = new InputStreamReader(new FileInputStream(file), "utf-8"); 45 } catch (FileNotFoundException e) { 46 e.printStackTrace(); 47 } 48 if (isr == null) { 49 return; 50 } 51 BufferedReader br = new BufferedReader(isr); 52 String re = ""; 53 while ((re = br.readLine()) != null) { 54 String[] sarr = re.split(","); 55 // System.out.println(sarr[0] + "-" + sarr[1] + "-" + sarr[2] + "-" + sarr[3] + 56 // sarr[4] + "-" + sarr[5] + "-" + sarr[6]); 57 // System.out.println(sarr[0]); 58 59 Put put = new Put(Bytes.toBytes(sarr[0])); 60 put.add(Bytes.toBytes("f"), Bytes.toBytes("Stock"), Bytes.toBytes(sarr[1])); 61 put.add(Bytes.toBytes("f"), Bytes.toBytes("Date"), Bytes.toBytes(sarr[2])); 62 put.add(Bytes.toBytes("f"), Bytes.toBytes("Top"), Bytes.toBytes(sarr[3])); 63 put.add(Bytes.toBytes("f"), Bytes.toBytes("Change-rate"), Bytes.toBytes(sarr[4])); 64 put.add(Bytes.toBytes("f"), Bytes.toBytes("Volume"), Bytes.toBytes(sarr[5])); 65 put.add(Bytes.toBytes("f"), Bytes.toBytes("Turnover"), Bytes.toBytes(sarr[6])); 66 hTable.put(put); 67 count++; 68 } 69 System.out.println("插入" + (count - 1) + "条数据成功!"); 70 } 71 }
- 对表中数据的查询
1 package com.hblink.test; 2 3 import java.io.IOException; 4 import java.util.Scanner; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.hbase.client.HTable; 8 import org.apache.hadoop.hbase.client.Result; 9 import org.apache.hadoop.hbase.client.ResultScanner; 10 import org.apache.hadoop.hbase.client.Scan; 11 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 12 import org.apache.hadoop.hbase.filter.Filter; 13 import org.apache.hadoop.hbase.filter.RegexStringComparator; 14 import org.apache.hadoop.hbase.filter.RowFilter; 15 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 16 import org.apache.hadoop.hbase.util.Bytes; 17 18 import com.hblink.demo.Hblink; 19 20 public class ScanHbase { 21 public static Boolean flag = true; 22 public static String string = null; 23 24 public static void main(String[] args) throws IOException { 25 while (flag) { 26 HTable hTable = null; 27 Configuration configuration = Hblink.getHBaseConfiguration(); 28 hTable = new HTable(configuration, "stock-info"); 29 30 Scanner sc = new Scanner(System.in); 31 System.out.print("请输入需要查询的股票代码:"); 32 string = sc.next(); 33 34 // scanTestCell(hTable); 35 scanTestRow(hTable); 36 hTable.close(); 37 if (string.equals("quit")) { 38 flag = false; 39 } 40 } 41 } 42 43 /** 44 * 通过列查询 45 * 46 * @param hTable 47 * @throws IOException 48 */ 49 static void scanTestCell(HTable hTable) throws IOException { 50 51 // 设置过滤器 52 SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(Bytes.toBytes("f"), 53 Bytes.toBytes("Date"), CompareOp.EQUAL, Bytes.toBytes(string)); 54 // 设置全表扫描封装类 55 Scan scan = new Scan(); 56 // 添加过滤器(通过股票代码查询) 57 scan.setFilter(singleColumnValueFilter); 58 // 扫描 59 ResultScanner resultScanner = hTable.getScanner(scan); 60 for (Result result : resultScanner) { 61 byte[] data = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Date")); 62 byte[] stock = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Stock")); 63 byte[] top = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Top")); 64 byte[] change_rate = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Change-rate")); 65 byte[] volume = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Volume")); 66 byte[] turnover = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Turnover")); 67 68 System.out.print(Bytes.toString(data) + ";"); 69 System.out.print(Bytes.toString(stock) + ";"); 70 if (Bytes.toString(top).equals("--")) { 71 System.out.print(Bytes.toString(top) + ";"); 72 } else { 73 System.out.print(Bytes.toInt(top) + ";"); 74 } 75 System.out.print(Bytes.toString(change_rate) + ";"); 76 System.out.print(Bytes.toString(volume) + ";"); 77 System.out.print(Bytes.toString(turnover)); 78 System.out.println(); 79 80 } 81 82 } 83 84 /** 85 * 通过正则--匹配行键 86 * 87 * @param hTable 88 * @throws IOException 89 */ 90 static void scanTestRow(HTable hTable) throws IOException { 91 RegexStringComparator re = new RegexStringComparator("^" + string + ""); 92 Filter filter = new RowFilter(CompareOp.EQUAL, re); 93 Scan scan = new Scan(); 94 // 添加过滤器(通过股票代码查询) 95 scan.setFilter(filter); 96 // 扫描 97 ResultScanner resultScanner = hTable.getScanner(scan); 98 for (Result result : resultScanner) { 99 byte[] data = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Date")); 100 byte[] stock = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Stock")); 101 byte[] top = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Top")); 102 byte[] change_rate = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Change-rate")); 103 byte[] volume = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Volume")); 104 byte[] turnover = result.getValue(Bytes.toBytes("f"), Bytes.toBytes("Turnover")); 105 106 System.out.print(Bytes.toString(data) + ";"); 107 System.out.print(Bytes.toString(stock) + ";"); 108 if (Bytes.toString(top).equals("--")) { 109 System.out.print(Bytes.toString(top) + ";"); 110 } else { 111 System.out.print(Bytes.toInt(top) + ";"); 112 } 113 System.out.print(Bytes.toString(change_rate) + ";"); 114 System.out.print(Bytes.toString(volume) + ";"); 115 System.out.print(Bytes.toString(turnover)); 116 System.out.println(); 117 118 } 119 } 120 }
ps.继续学习中=====
以上是关于JavaAPI操作hbase的主要内容,如果未能解决你的问题,请参考以下文章