storm WC 2 (根据日志)
Posted tangsonghuai
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了storm WC 2 (根据日志)相关的知识,希望对你有一定的参考价值。
1.产生虚拟日志
package les7.readFileTopo; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.util.Random; public class GetData /** * @param args */ public static void main(String[] args) File logFile = new File("track.log"); Random random = new Random(); String[] hosts = "movie information" ; String[] session_id = "ABYH6Y4V4SCVXTG6DPB4VH9U12", "XXYH6YCGFJYERTT834R52FDXV9U34", "BBYH61456FGHHJ7JL89RG5VV9UYU7", "CYYH6Y2345GHI899OFG4V9U567", "VVVYH6Y4V4SFXZ56JIPDPB4V678" ; String[] time = "2019-03-07 08:40:50", "2019-03-07 08:40:51", "2019-03-07 08:40:52", "2019-03-07 08:40:53", "2019-03-07 09:40:49", "2019-03-07 10:40:49", "2019-03-07 11:40:49", "2019-03-07 12:40:49" ; StringBuffer sbBuffer = new StringBuffer() ; for (int i = 0; i < 5000; i++) sbBuffer.append(hosts[0]+"\t"+session_id[random.nextInt(5)]+"\t"+time[random.nextInt(8)]+"\n"); if(! logFile.exists()) try logFile.createNewFile(); catch (IOException e) System.out.println("Create logFile fail !"); byte[] b = (sbBuffer.toString()).getBytes(); FileOutputStream fs; try fs = new FileOutputStream(logFile); fs.write(b); fs.close(); catch (Exception e) e.printStackTrace();
2.spout自定义数据流入拓扑逻辑
package les7.readFileTopo; import java.io.BufferedReader; import java.io.FileInputStream; import java.io.InputStreamReader; import java.util.Map; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; public class ReadFileSpout implements IRichSpout /** * */ private static final long serialVersionUID = 1L; FileInputStream fis; InputStreamReader isr; BufferedReader br; SpoutOutputCollector collector = null; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) // 初始化方法 try this.collector = collector; this.fis = new FileInputStream("track.log"); this.isr = new InputStreamReader(fis, "UTF-8"); this.br = new BufferedReader(isr); catch (Exception e) e.printStackTrace(); @Override public void close() // TODO 关闭Topo @Override public void activate() // TODO 激活Topo @Override public void deactivate() // TODO 停用Topo String str = null; String[] str01=null; @Override public void nextTuple() // TODO 核心方法,死循环,获取外部Touple,emit到下一级组件 try while ((str = this.br.readLine()) != null) // // 过滤动作 // str01=str.split("\t"); collector.emit(new Values(str)); // Thread.sleep(3); // //to do catch (Exception e) // TODO: handle exception @Override public void ack(Object msgId) // TODO 如果开启Acker,成功执行Tuple后会回调该4方法,告知Storm框架该Tuple已经被成功执行。 @Override public void fail(Object msgId) // TODO 如果开启Acker,当失败执行Tuple后会回调该方法,告知Storm框架该Tuple已经被执行失败。 // 以便我们手工编码实现失败重发,并控制重发次数。 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) // TODO 定义输出的列名 declarer.declare(new Fields("log")); @Override public Map<String, Object> getComponentConfiguration() // TODO 可以在代码里设置一下属性。该方法基本是废弃不用的。 return null;
3.bolt处理逻辑
package les7.readFileTopo; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class FileBolt implements IRichBolt /** * */ private static final long serialVersionUID = 1L; OutputCollector collector = null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) // TODO 初始化函数 this.collector = collector; Integer num = 0; String[] words; @Override public void execute(Tuple tuple) // TODO 死循环,核心方法,处理业务逻辑 String value =tuple.getString(0); //分词 String[] words = value.split("\t"); //输出 for(String w:words) collector.emit(new Values(w,1)); @Override public void cleanup() // TODO 销毁方法,基本不用4 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) // TODO 定义输出列名 declarer.declare(new Fields("word","count")); @Override public Map<String, Object> getComponentConfiguration() // TODO Auto-generated method stub return null;
4.bolt输出逻辑
package les7.readFileTopo; import java.util.HashMap; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import static org.apache.htrace.Tracer.LOG; public class PrintBolt implements IRichBolt /** * */ private static final long serialVersionUID = 1L; private Map<String, Integer> result = new HashMap<>(); private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) // TODO Auto-generated method stub this.collector = collector; @Override public void execute(Tuple tuple) String word = tuple.getStringByField("word"); int count = tuple.getIntegerByField("count"); //求和 if(result.containsKey(word)) //如果已经存在,累加 int total = result.get(word); result.put(word, total+count); else //这是一个新单词 result.put(word, count); //输出到屏幕 System.out.println("统计的结果是:" + result); //输出给下一个组件 单词 总频率 this.collector.emit(new Values(word,result.get(word))); @Override public void cleanup() // TODO Auto-generated method stub @Override public void declareOutputFields(OutputFieldsDeclarer declarer) // TODO Auto-generated method stub @Override public Map<String, Object> getComponentConfiguration() // TODO Auto-generated method stub return null;
5.书写拓扑逻辑代码
package les7.readFileTopo; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; //展示信息数据 public class FileCountTopo public static void main(String[] args) // TODO Auto-generated method stub TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new ReadFileSpout(),1) ; builder.setBolt("b1", new FileBolt(),2).shuffleGrouping("spout"); builder.setBolt("PrintBolt", new PrintBolt(),1).shuffleGrouping("b1"); Config conf = new Config(); conf.setDebug(true); if (args.length > 0) try //提交到集群 StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); catch (Exception e) e.printStackTrace(); else //本地模式提交 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("mytopology", conf, builder.createTopology());
以上是关于storm WC 2 (根据日志)的主要内容,如果未能解决你的问题,请参考以下文章