Storm之路-WordCount-实例
Posted ′ 咋说?。 °
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm之路-WordCount-实例相关的知识,希望对你有一定的参考价值。
初学storm,有不足的地方还请纠正。
网上看了很多wordcount实例,发现都不是我想要的。
实现场景:统计shengjing.txt词频到集合,一次打印结果。
● 消息源Spout
继承BaseRichSpout类 / 实现IRichSpout接口
open,初始化动作;
nextTuple,消息接入,执行数据发射;
ack,tuple成功处理后调用;
fail,tuple处理失败后调用;
declareOutputFields,声明输出字段;
● 处理单元Bolt
继承BaseBasicBolt类 / BaseWindowedBolt / 实现IRichBolt接口
prepare,worker启动时初始化;
execute,接受一个tuple / tupleWindow并执行逻辑处理,发射出去;
cleanup,关闭前调用;
declareOutputFiedls,字段申明;
● 项目结构
● pom.xml文件,配置项目jar依赖
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.scps.storm</groupId> <artifactId>storm-example</artifactId> <version>0.0.1</version> <name>storm.example</name> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.1.0</version> </dependency> </dependencies> </project>
● WordTopology.java文件,入口类,实例Topology、Spout、Bolt,配置等
1 package com.scps.storm.helloword; 2 3 import java.util.concurrent.TimeUnit; 4 5 import org.apache.storm.Config; 6 import org.apache.storm.LocalCluster; 7 import org.apache.storm.StormSubmitter; 8 import org.apache.storm.generated.AlreadyAliveException; 9 import org.apache.storm.generated.AuthorizationException; 10 import org.apache.storm.generated.InvalidTopologyException; 11 import org.apache.storm.topology.TopologyBuilder; 12 import org.apache.storm.topology.base.BaseWindowedBolt.Duration; 13 import org.apache.storm.tuple.Fields; 14 15 import com.scps.storm.helloword.bolt.SlidingWindowBolt; 16 import com.scps.storm.helloword.bolt.WordCountBolt; 17 import com.scps.storm.helloword.bolt.WordFinalBolt; 18 import com.scps.storm.helloword.bolt.WordSplitBolt; 19 import com.scps.storm.helloword.spout.WordReaderSpout; 20 21 public class WordTopology { 22 23 public static void main(String[] args) { 24 25 TopologyBuilder builder = new TopologyBuilder(); 26 27 // 1个task去读文件 28 builder.setSpout("word-reader", new WordReaderSpout(), 1); 29 30 // 2个task分割行 31 builder.setBolt("word-split", new WordSplitBolt(), 2).shuffleGrouping("word-reader"); 32 33 // 2个task分批统计,并发送相同的word到同一个task 34 builder.setBolt("word-count", new WordCountBolt(), 2).fieldsGrouping("word-split", new Fields("word")); 35 36 // 1个task汇总,每隔3秒统计最近5秒的tuple,SlidingWindow滑动窗口(间隔) 37 // builder.setBolt("sliding-window-bolt", new SlidingWindowBolt().withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, TimeUnit.SECONDS)), 1).shuffleGrouping("word-count"); 38 // 1个task汇总,统计5秒内的tuple,不能超过15秒?提示超时错误,TumblingWindow滚动窗口 39 builder.setBolt("sliding-window-bolt", new SlidingWindowBolt().withTumblingWindow(new Duration(5, TimeUnit.SECONDS)), 1).shuffleGrouping("word-count"); 40 41 // 1个task输出 42 builder.setBolt("word-final", new WordFinalBolt(), 1).shuffleGrouping("sliding-window-bolt"); 43 44 Config conf = new Config(); 45 46 conf.setDebug(false); 47 48 if (args != null && args.length > 0) { 49 50 // 在集群运行,需要mvn package编译 51 // bin/storm jar "/root/storm-example-0.0.1.jar" com.scps.storm.helloword.WordTopology "http://nimbus:8080/uploads/shengjing.txt" wordcount 52 53 try { 54 55 String file = args[0]; 56 String name = args[1]; 57 58 conf.put("file", file); 59 // conf.setNumWorkers(2); 60 61 StormSubmitter.submitTopology(name, conf, builder.createTopology()); 62 63 } catch (AlreadyAliveException e) { 64 65 e.printStackTrace(); 66 67 } catch (InvalidTopologyException e) { 68 69 e.printStackTrace(); 70 71 } catch (AuthorizationException e) { 72 73 e.printStackTrace(); 74 } 75 76 } else { 77 78 // 直接在eclipse中运行 79 80 conf.put("file", "C:\\\\Users\\\\Administrator\\\\Downloads\\\\shengjing1.txt"); 81 // conf.put("file", "http://192.168.100.170:8080/uploads/shengjing.txt"); 82 // conf.setMaxTaskParallelism(2); // 设置最大task数 83 LocalCluster cluster = new LocalCluster(); 84 cluster.submitTopology("wordcount", conf, builder.createTopology()); 85 } 86 } 87 }
● WordReaderSpout.java文件,读取txt文件,发送行
1 package com.scps.storm.helloword.spout; 2 3 import java.io.BufferedReader; 4 import java.io.FileInputStream; 5 import java.io.FileNotFoundException; 6 import java.io.IOException; 7 import java.io.InputStream; 8 import java.io.InputStreamReader; 9 import java.io.UnsupportedEncodingException; 10 import java.net.MalformedURLException; 11 import java.net.URL; 12 import java.net.URLConnection; 13 import java.text.SimpleDateFormat; 14 import java.util.Date; 15 import java.util.Map; 16 17 import org.apache.storm.spout.SpoutOutputCollector; 18 import org.apache.storm.task.TopologyContext; 19 import org.apache.storm.topology.IRichSpout; 20 import org.apache.storm.topology.OutputFieldsDeclarer; 21 import org.apache.storm.tuple.Fields; 22 import org.apache.storm.tuple.Values; 23 import org.apache.storm.utils.Utils; 24 25 public class WordReaderSpout implements IRichSpout { 26 27 private static final long serialVersionUID = 1L; 28 private SpoutOutputCollector outputCollector; 29 private String filePath; 30 private boolean completed = false; 31 32 public void ack(Object arg0) { 33 34 } 35 36 public void activate() { 37 38 } 39 40 public void close() { 41 42 } 43 44 public void deactivate() { 45 46 } 47 48 public void fail(Object arg0) { 49 50 } 51 52 @SuppressWarnings("rawtypes") 53 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 54 55 filePath = conf.get("file").toString(); 56 outputCollector = collector; 57 } 58 59 public void nextTuple() { 60 61 if (!completed) { 62 63 String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); 64 System.out.println("WordReaderSpout nextTuple, " + time); 65 66 String line = ""; 67 InputStream inputStream = null; 68 InputStreamReader inputStreamReader = null; 69 BufferedReader reader = null; 70 71 try { 72 73 // filePath = "http://192.168.100.170:8080/uploads/shengjing.txt"; 74 // filePath = "C:\\\\Users\\\\Administrator\\\\Downloads\\\\shengjing.txt"; 75 76 if (filePath.startsWith("http://")) { // 远程文件 77 URL url = new URL(filePath); 78 URLConnection urlConn = url.openConnection(); 79 inputStream = urlConn.getInputStream(); 80 } else { // 本地文件 81 inputStream = new FileInputStream(filePath); 82 } 83 84 inputStreamReader = new InputStreamReader(inputStream, "utf-8"); 85 reader = new BufferedReader(inputStreamReader); 86 while ((line = reader.readLine()) != null) { 87 outputCollector.emit(new Values(line)); 88 } 89 90 } catch (MalformedURLException e) { 91 e.printStackTrace(); 92 } catch (FileNotFoundException e) { 93 e.printStackTrace(); 94 } catch (UnsupportedEncodingException e) { 95 e.printStackTrace(); 96 } catch (IOException e) { 97 e.printStackTrace(); 98 } finally { 99 completed = true; 100 try { 101 if (reader != null) { 102 reader.close(); 103 } 104 if (inputStreamReader != null) { 105 inputStreamReader.close(); 106 } 107 if (inputStream != null) { 108 inputStream.close(); 109 } 110 } catch (IOException e) { 111 e.printStackTrace(); 112 } 113 } 114 } 115 116 Utils.sleep(20000); 117 } 118 119 public void declareOutputFields(OutputFieldsDeclarer declarer) { 120 121 declarer.declare(new Fields("line")); 122 } 123 124 public Map<String, Object> getComponentConfiguration() { 125 126 return null; 127 } 128 }
使用集群测试时,先把txt文件上传到nimbus的ui里,随机指派supervisor远程读取文件。
● WordSplitBolt.java文件,接收行,分割行,发送词
1 package com.scps.storm.helloword.bolt; 2 3 import java.util.Map; 4 5 import org.apache.storm.task.OutputCollector; 6 import org.apache.storm.task.TopologyContext; 7 import org.apache.storm.topology.IRichBolt; 8 import org.apache.storm.topology.OutputFieldsDeclarer; 9 import org.apache.storm.tuple.Fields; 10 import org.apache.storm.tuple.Tuple; 11 import org.apache.storm.tuple.Values; 12 13 public class WordSplitBolt implements IRichBolt { 14 15 private static final long serialVersionUID = 1L; 16 private OutputCollector outputCollector; 17 18 @SuppressWarnings("rawtypes") 19 public void prepare(Map conf, TopologyContext context, OutputCollector collector) { 20 21 outputCollector = collector; 22 } 23 24 public void execute(Tuple input) { 25 26 String line = input.getStringByField("line"); 27 28 line = line.trim(); 29 line = line.replace(",", " "); 30 line = line.replace(".", " "); 31 line = line.replace(":", " "); 32 line = line.replace(";", " "); 33 line = line.replace("?", " "); 34 line = line.replace("!", " "); 35 line = line.replace("(", " "); 36 line = line.replace(")", " "); 37 line = line.replace("[", " "); 38 line = line.replace("]", " "); 39 line = line.trim(); 40 41 String[] words = line.split(" "); 42 for (String word : words) { 43 word = word.trim(); 44 if (!"".equals(word)) { 45 outputCollector.emit(new Values(word)); 46 } 47 } 48 } 49 50 public void declareOutputFields(OutputFieldsDeclarer declarer) { 51 52 declarer.declare(new Fields("word")); 53 } 54 55 public void cleanup() { 56 57 } 58 59 public Map<String, Object> getComponentConfiguration() { 60 61 return null; 62 } 63 }
● WordCountBolt.java文件,接收词,统计词,发送集合
1 package com.scps.storm.helloword.bolt; 2 3 import java.util.HashMap; 4 import java.util.Map; 5 6 import org.apache.storm.task.OutputCollector; 7 import org.apache.storm.task.TopologyContext; 8 import org.apache.storm.topology.IRichBolt; 9 import org.apache.storm.topology.OutputFieldsDeclarer; 10 import org.apache.storm.tuple.Fields; 11 import org.apache.storm.tuple.Tuple; 12 import org.apache.storm.tuple.Values; 13 14 public class WordCountBolt implements IRichBolt { 15 16 private static final long serialVersionUID = 1L; 17 Map<String, Integer> counter; 18 private OutputCollector outputCollector; 19 20 @SuppressWarnings("rawtypes") 21 public void prepare(Map conf, TopologyContext context, OutputCollector collector) { 22 23 counter = new HashMap<String, Integer>(); 24 outputCollector = collector; 25 } 26 27 public void execute(Tuple input) { 28 29 String word = input.getStringByField("word"); 30 int count; 31 32 if (!counter.containsKey(word)) { 33 count = 1; 34 } else { 35 count = counter.get(word) + 1; 36 } 37 38 counter.put(word, count); 39 outputCollector.emit(new Values(word, count)); 40 } 41 42 public void declareOutputFields(OutputFieldsDeclarer declarer) { 43 44 declarer.declare(new Fields("word", "count")); 45 } 46 47 public void cleanup() { 48 49 } 50 51 public Map<String, Object> getComponentConfiguration() { 52 53 return null; 54 } 55 }
● SlidingWindowBolt.java文件,接收集合,合并集合,发送集合
1 package com.scps.storm.helloword.bolt; 2 3 import java.text.SimpleDateFormat; 4 import java.util.Date; 5 import java.util.HashMap; 6 import java.util.Map; 7 8 import org.apache.storm.task.OutputCollector; 9 import org.apache.storm.task.TopologyContext; 10 import org.apache.storm.topology.OutputFieldsDeclarer; 11 import org.apache.storm.topology.base.BaseWindowedBolt; 12 import org.apache.storm.tuple.Fields; 13 import org.apache.storm.tuple.Tuple; 14 import org.apache.storm.tuple.Values; 15 import org.apache.storm.windowing.TupleWindow; 16 17 public class SlidingWindowBolt extends BaseWindowedBolt { 18 19 private static final long serialVersionUID = 1L; 20 Map<String, Integer> counter; 21 private OutputCollector outputCollector; 22 23 @SuppressWarnings("rawtypes") 24 public void prepare(Map conf, TopologyContext context, OutputCollector collector) { 25 26 counter = new HashMap<String, Integer>(); 27 outputCollector = collector; 28 } 29 30 public void execute(TupleWindow inputWindow) { 31 32 String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); 33 System.out.println("SlidingWindowBolt execute, " + time); 34 35 for (Tuple input : inputWindow.get()) { 36 37 String word = input.getStringByField("word"); 38 int count = input.getIntegerByField("count"); 39 40 counter.put(word, count); 41 } 42 43 outputCollector.emit(new Values(counter)); 44 } 45 46 public void declareOutputFields(OutputFieldsDeclarer declarer) { 47 48 declarer.declare(new Fields("counter")); 49 } 50 }
● WordFinalBolt.java文件,接收集合,打印集合
1 package com.scps.storm.helloword.bolt;以上是关于Storm之路-WordCount-实例的主要内容,如果未能解决你的问题,请参考以下文章