通过线程池,从hbase中拿数据
Posted shiji7
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过线程池,从hbase中拿数据相关的知识,希望对你有一定的参考价值。
1.线程池类HbasePool
package com.example.demospringboothbase.common; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.log4j.Logger; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class HbasePool { private Logger log = Logger.getLogger(HbasePool.class); //代理类主要用于获取连接 public class HbaseProxy { private String zk; private String zknode; private Connection connection; public HbaseProxy(String zk, String zknode) { this.zk = zk; this.zknode = zknode; init(); } public void init() { Configuration entries = HBaseConfiguration.create(); entries.set("hbase.zookeeper.quorum",zk); entries.set("zookeeper.znode.parent",zknode); try { this.connection = ConnectionFactory.createConnection(entries); } catch (IOException e) { log.error("获取连接失败!"); e.printStackTrace(); } } public Connection getConnection(){ return this.connection; } public void close(){ if(this.connection !=null){ try { this.connection.close(); } catch (IOException e) { log.error("链接关闭失败~"); e.printStackTrace(); } } } } public class HbasePoolFactary extends BasePooledObjectFactory<HbaseProxy>{ private String zk; private String zknode; public HbasePoolFactary(String zk, String zknode) { this.zk = zk; this.zknode = zknode; } @Override public HbaseProxy create() throws Exception { return new HbaseProxy(this.zk,this.zknode); } @Override public PooledObject<HbaseProxy> wrap(HbaseProxy hbaseProxy) { return new DefaultPooledObject<HbaseProxy>(hbaseProxy); } @Override public void destroyObject(PooledObject<HbaseProxy> p) throws Exception { HbaseProxy object = p.getObject(); object.close(); super.destroyObject(p); } } private static HbasePool pool; //开始编写我们的单例池子 private HbasePool(){} public static HbasePool getPool(){ if(pool ==null){ pool = new HbasePool(); } return pool; } //还得写一个构造池子的单例方法。用通用的池子对象来进行构造 private GenericObjectPool<HbaseProxy> gop; public GenericObjectPool<HbaseProxy> getGop(String zk,String zknode){ if(gop ==null){ HbasePoolFactary hbasePoolFactary = new HbasePoolFactary(zk, zknode); gop = new GenericObjectPool<HbaseProxy>(hbasePoolFactary); gop.setMaxTotal(10); } return gop; } }
2.通过get来拿自己hbase中的数据
这里将逻辑类和测试类写一块了。
package com.example.demospringboothbase.serverce;
import com.alibaba.fastjson.JSON;
import com.example.demospringboothbase.common.HbasePool;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class test {
//从连接池中拿链接、
private HbasePool hbasePool = HbasePool.getPool();
//客户给定一个表名、rowkey、rowkey的规则、哪些列、列的规则、列簇
//输出的结果格式如下[{},{},{}]
public List<Map> resultByRowkey(String tableName, List<String> rowkey, String rowkeyAttr,
List<String> column, String columnAttr, String columnFamily) throws Exception {
//先定义一个List
ArrayList<Map> list = new ArrayList<>();
Table table = null;
//get操作是基于表名和rowkey来进行的
ArrayList<Get> gets = new ArrayList<>();
//这里将rowkey都放到gets中
for (String rk:rowkey){
Get get = null;
if (rowkeyAttr.equals("rowkey")){
get = new Get(rk.getBytes());
}
//在这里要指定列,因为只有指定列才会按照列输出,不指定列某人输出的是全部列
if (columnAttr.equals("column")){
for(String cl:column){
get.addColumn(columnFamily.getBytes(),cl.getBytes());
}
}
gets.add(get);
}
//和hbase取的联系
GenericObjectPool<HbasePool.HbaseProxy> gop = hbasePool.getGop("server3:2181", "/hbase-unsecure");
//从连接池中拿一个连接
HbasePool.HbaseProxy hbaseProxy = gop.borrowObject();
//指定表
table = hbaseProxy.getConnection().getTable(TableName.valueOf(tableName));
Result[] results = table.get(gets);
if (results!=null){
for (Result r:results){
HashMap map = new HashMap();
while (r.advance()){
Cell current = r.current();
String q = Bytes.toString(CellUtil.cloneQualifier(current));
String p = Bytes.toString(CellUtil.cloneValue(current));
map.put(q,p);
}
String rowkey1 = Bytes.toString(r.getRow());
map.put("rowkey",rowkey1);
list.add(map);
}
}else{
return list;
}
return list;
}
//测试是否成功
public static void main(String[] args) throws Exception {
test ceshi = new test();
ArrayList<String> rowkey = new ArrayList();
ArrayList<String> colum = new ArrayList();
rowkey.add("000080fd3eaf6b381e33868ec6459c49_20111230222603");
// rowkey.add("000080fd3eaf6b381e33868ec6459c49_20111230222802");
rowkey.add("0001b04bf9473458af40acb4c13f1476_20111230002114");
colum.add("click");
colum.add("url");
colum.add("serch");
List<Map> maps = ceshi.resultByRowkey("sogo3", rowkey, "rowkey", colum, "colum", "oo");
System.out.println(JSON.toJSONString(maps));
}
}
输出结果:[{"serch":"福彩3d单选一注法","rank":"10","rowkey":"000080fd3eaf6b381e33868ec6459c49_20111230222603","click":"5","url":"http://www.18888.com/read-htm-tid-6069520.html"},{"serch":"淫淫网","rank":"1","rowkey":"0001b04bf9473458af40acb4c13f1476_20111230002114","click":"1","url":"http://www.244uu.com/"}]
以上是关于通过线程池,从hbase中拿数据的主要内容,如果未能解决你的问题,请参考以下文章