Storm写库,Hbase批入库,定时和批入库,batchSize,TickTime
Posted Import
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm写库,Hbase批入库,定时和批入库,batchSize,TickTime相关的知识,希望对你有一定的参考价值。
1 package com.storm.hbaseTest; 2 3 import java.io.IOException; 4 import java.util.HashMap; 5 import java.util.Iterator; 6 import java.util.List; 7 import java.util.Map; 8 9 import org.apache.commons.lang.StringUtils; 10 import org.apache.hadoop.conf.Configuration; 11 import org.apache.hadoop.hbase.HBaseConfiguration; 12 import org.apache.hadoop.hbase.client.HConnection; 13 import org.apache.hadoop.hbase.client.HConnectionManager; 14 import org.apache.hadoop.hbase.client.HTableInterface; 15 import org.apache.hadoop.hbase.client.Put; 16 17 import backtype.storm.Config; 18 import backtype.storm.Constants; 19 import backtype.storm.task.OutputCollector; 20 import backtype.storm.task.TopologyContext; 21 import backtype.storm.topology.BasicOutputCollector; 22 import backtype.storm.topology.IRichBolt; 23 import backtype.storm.topology.OutputFieldsDeclarer; 24 import backtype.storm.topology.base.BaseBasicBolt; 25 import backtype.storm.tuple.Fields; 26 import backtype.storm.tuple.Tuple; 27 28 import org.slf4j.Logger; 29 import org.slf4j.LoggerFactory; 30 31 import com.google.common.collect.Lists; 32 33 /** 38 * @version V1.0 39 */ 40 @SuppressWarnings("all") 41 public class HbaseBout implements IRichBolt{ 42 43 private static final long serialVersionUID = 1L; 44 private static final Logger LOG = LoggerFactory.getLogger(HbaseBout.class); 45 46 protected OutputCollector collector; 47 protected HbaseClient hbaseClient; 48 protected String tableName; 49 protected String configKey ="hbase.conf"; 50 51 //批处理大小 52 protected int batchSize = 15000; 53 List<Put> batchMutations; 54 List<Tuple> tupleBatch; 55 //tick Time 56 int flushIntervalSecs = 1; 57 62 public void prepare(Map map, TopologyContext context, 63 OutputCollector collector) { 64 65 this.collector = collector; 66 Configuration hbConfig = HBaseConfiguration.create(); 67 Map<String,String> conf = (Map)map.get(this.configKey); 68 69 //获取Hbase配置 70 if (conf == null) { 71 throw new IllegalArgumentException("HBase configuration not found using key \'" + this.configKey + "\'"); 72 } 73 74 //批大小 75 if(map.get("batchSize") != null){ 76 this.batchSize = new Integer(map.get("batchSize").toString()); 77 } 78 //Tick Time 79 if(map.get("flushIntervalSecs") != null){ 80 this.flushIntervalSecs = Integer.valueOf(map.get("flushIntervalSecs").toString()); 81 } 82 83 //Hbase 配置 84 for (String key : conf.keySet()) { 85 hbConfig.set(key, String.valueOf(conf.get(key))); 86 } 87 this.hbaseClient = new HbaseClient(hbConfig, this.tableName); 88 } 89 94 @Override 95 public void execute(Tuple tuple) { 96 97 boolean flush = false; 98 99 try { 100 //Tick 101 if(tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){ 102 LOG.debug("TICK received! current batch status ", Integer.valueOf(this.tupleBatch.size()), Integer.valueOf(this.batchSize)); 103 flush = true; 104 }else{ 105 106 Put put = new Put(tuple.getStringByField("rowKey").getBytes()); 107 put.add("cf".getBytes(), "name".getBytes(), tuple.getStringByField("name").getBytes()); 108 put.add("cf".getBytes(), "sex".getBytes(), tuple.getStringByField("sex").getBytes()); 109 this.batchMutations.add(put); 110 this.tupleBatch.add(tuple); 111 112 //当前tuple批大小 113 if (this.tupleBatch.size() >= this.batchSize) { 114 flush = true; 115 } 116 } 117 //持久化操作 118 if ((flush) && (!this.tupleBatch.isEmpty())) { 119 this.hbaseClient.batchMutate(this.batchMutations); 120 LOG.debug("acknowledging tuples after batchMutate"); 121 for (Iterator<Tuple> tuples = this.tupleBatch.iterator(); tuples.hasNext(); ) { 122 Tuple t = (Tuple)tuples.next(); 123 this.collector.ack(t); 124 } 125 this.tupleBatch.clear(); 126 this.batchMutations.clear(); 127 } 128 } catch (Exception e) { 129 LOG.debug("inser batch fail"); 130 this.collector.reportError(e); 131 for (Tuple t : this.tupleBatch) { 132 this.collector.fail(t); 133 } 134 this.tupleBatch.clear(); 135 this.batchMutations.clear(); 136 } 137 } 138 143 public void declareOutputFields(OutputFieldsDeclarer declarer) { 144 145 } 146 151 @Override 152 public Map<String, Object> getComponentConfiguration() { 153 Config conf = new Config(); 154 conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS,flushIntervalSecs); 155 return conf; 156 } 157 158 163 @Override 164 public void cleanup() { 165 166 } 167 } 168 177 class HbaseClient{ 178 179 private static final Logger LOG = LoggerFactory.getLogger(HbaseClient.class); 180 HConnection hTablePool = null; 181 HTableInterface table = null; 182 189 public HbaseClient(final Configuration configuration, final String tableName) 190 { 191 try 192 { 193 hTablePool = HConnectionManager.createConnection(configuration) ; 194 this.table = hTablePool.getTable(tableName); 195 } 196 catch (Exception e) { 197 throw new RuntimeException("HBase create failed: " + e.getMessage(), e); 198 } 199 } 200 209 public void batchMutate(List<Put> puts) throws Exception { 210 211 try { 212 this.table.put(puts); 213 } catch (Exception e) { 214 LOG.warn("Error insert batch to HBase.", e); 215 throw e; 216 } 217 } 218 }
以上是关于Storm写库,Hbase批入库,定时和批入库,batchSize,TickTime的主要内容,如果未能解决你的问题,请参考以下文章
利用hive-hbase表做hive表快速入库hbase功能,纬度表的查询