通过线程池,从hbase中拿数据

Posted shiji7

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过线程池,从hbase中拿数据相关的知识,希望对你有一定的参考价值。

1.线程池类HbasePool

package com.example.demospringboothbase.common;

import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class HbasePool {
    private Logger log = Logger.getLogger(HbasePool.class);



    //代理类主要用于获取连接
    public class HbaseProxy {

        private String zk;
        private String zknode;
        private Connection connection;

        public HbaseProxy(String zk, String zknode) {
            this.zk = zk;
            this.zknode = zknode;
            init();
        }

        public void init() {
            Configuration entries = HBaseConfiguration.create();
            entries.set("hbase.zookeeper.quorum",zk);
            entries.set("zookeeper.znode.parent",zknode);
            try {
                this.connection = ConnectionFactory.createConnection(entries);
            } catch (IOException e) {
                log.error("获取连接失败!");
                e.printStackTrace();
            }

        }

        public Connection getConnection(){
            return this.connection;
        }

        public void close(){
            if(this.connection !=null){
                try {
                    this.connection.close();
                } catch (IOException e) {
                    log.error("链接关闭失败~");
                    e.printStackTrace();
                }
            }
        }

    }


    public class HbasePoolFactary extends BasePooledObjectFactory<HbaseProxy>{
        private String zk;
        private String zknode;

        public HbasePoolFactary(String zk, String zknode) {
            this.zk = zk;
            this.zknode = zknode;
        }

        @Override
        public HbaseProxy create() throws Exception {
            return new HbaseProxy(this.zk,this.zknode);
        }

        @Override
        public PooledObject<HbaseProxy> wrap(HbaseProxy hbaseProxy) {
            return new DefaultPooledObject<HbaseProxy>(hbaseProxy);
        }

        @Override
        public void destroyObject(PooledObject<HbaseProxy> p) throws Exception {
            HbaseProxy object = p.getObject();
            object.close();
            super.destroyObject(p);
        }
    }

    private  static HbasePool pool;
    //开始编写我们的单例池子
    private HbasePool(){}

    public static HbasePool getPool(){
        if(pool ==null){
            pool = new HbasePool();
        }
        return pool;
    }

    //还得写一个构造池子的单例方法。用通用的池子对象来进行构造
    private GenericObjectPool<HbaseProxy> gop;

    public GenericObjectPool<HbaseProxy> getGop(String zk,String zknode){
        if(gop ==null){
            HbasePoolFactary hbasePoolFactary = new HbasePoolFactary(zk, zknode);
            gop = new GenericObjectPool<HbaseProxy>(hbasePoolFactary);
            gop.setMaxTotal(10);
        }
        return gop;
    }
}

2.通过get来拿自己hbase中的数据

这里将逻辑类和测试类写一块了。


package com.example.demospringboothbase.serverce;

import com.alibaba.fastjson.JSON;
import com.example.demospringboothbase.common.HbasePool;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class test {
//从连接池中拿链接、
private HbasePool hbasePool = HbasePool.getPool();
//客户给定一个表名、rowkey、rowkey的规则、哪些列、列的规则、列簇
//输出的结果格式如下[{},{},{}]
public List<Map> resultByRowkey(String tableName, List<String> rowkey, String rowkeyAttr,
List<String> column, String columnAttr, String columnFamily) throws Exception {
//先定义一个List
ArrayList<Map> list = new ArrayList<>();
Table table = null;
//get操作是基于表名和rowkey来进行的
ArrayList<Get> gets = new ArrayList<>();
//这里将rowkey都放到gets中
for (String rk:rowkey){
Get get = null;
if (rowkeyAttr.equals("rowkey")){
get = new Get(rk.getBytes());
}
//在这里要指定列,因为只有指定列才会按照列输出,不指定列某人输出的是全部列
if (columnAttr.equals("column")){
for(String cl:column){
get.addColumn(columnFamily.getBytes(),cl.getBytes());
}
}
gets.add(get);
}
//和hbase取的联系
GenericObjectPool<HbasePool.HbaseProxy> gop = hbasePool.getGop("server3:2181", "/hbase-unsecure");
//从连接池中拿一个连接
HbasePool.HbaseProxy hbaseProxy = gop.borrowObject();
//指定表
table = hbaseProxy.getConnection().getTable(TableName.valueOf(tableName));
Result[] results = table.get(gets);
if (results!=null){
for (Result r:results){
HashMap map = new HashMap();
while (r.advance()){
Cell current = r.current();
String q = Bytes.toString(CellUtil.cloneQualifier(current));
String p = Bytes.toString(CellUtil.cloneValue(current));
map.put(q,p);
}
String rowkey1 = Bytes.toString(r.getRow());
map.put("rowkey",rowkey1);
list.add(map);
}
}else{
return list;
}
return list;
}
//测试是否成功
public static void main(String[] args) throws Exception {
test ceshi = new test();
ArrayList<String> rowkey = new ArrayList();
ArrayList<String> colum = new ArrayList();
rowkey.add("000080fd3eaf6b381e33868ec6459c49_20111230222603");
// rowkey.add("000080fd3eaf6b381e33868ec6459c49_20111230222802");
rowkey.add("0001b04bf9473458af40acb4c13f1476_20111230002114");
colum.add("click");
colum.add("url");
colum.add("serch");
List<Map> maps = ceshi.resultByRowkey("sogo3", rowkey, "rowkey", colum, "colum", "oo");

System.out.println(JSON.toJSONString(maps));
}
}
输出结果:[{"serch":"福彩3d单选一注法","rank":"10","rowkey":"000080fd3eaf6b381e33868ec6459c49_20111230222603","click":"5","url":"http://www.18888.com/read-htm-tid-6069520.html"},{"serch":"淫淫网","rank":"1","rowkey":"0001b04bf9473458af40acb4c13f1476_20111230002114","click":"1","url":"http://www.244uu.com/"}]

以上是关于通过线程池,从hbase中拿数据的主要内容,如果未能解决你的问题,请参考以下文章

java线程池

从Tomcat的处理web请求分析Java的内存模型

Java中的Future模式原理自定义实现

如何获得thread线程的threadlocals的key值

线程池与并行度

happybase 的连接池