Storm之路-WordCount-实例

Posted ′ 咋说?。 °

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm之路-WordCount-实例相关的知识,希望对你有一定的参考价值。

初学storm,有不足的地方还请纠正。

网上看了很多wordcount实例,发现都不是我想要的。

实现场景:统计shengjing.txt词频到集合,一次打印结果。

 

● 消息源Spout
  继承BaseRichSpout类 / 实现IRichSpout接口
    open,初始化动作;
    nextTuple,消息接入,执行数据发射;
    ack,tuple成功处理后调用;
    fail,tuple处理失败后调用;
    declareOutputFields,声明输出字段;

● 处理单元Bolt
  继承BaseBasicBolt类 / BaseWindowedBolt / 实现IRichBolt接口
    prepare,worker启动时初始化;
    execute,接受一个tuple / tupleWindow并执行逻辑处理,发射出去;
    cleanup,关闭前调用;
    declareOutputFiedls,字段申明;

 

● 项目结构

 

● pom.xml文件,配置项目jar依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.scps.storm</groupId>
    <artifactId>storm-example</artifactId>
    <version>0.0.1</version>
    <name>storm.example</name>
    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>1.1.0</version>
        </dependency>
    </dependencies>
</project>

 

● WordTopology.java文件,入口类,实例Topology、Spout、Bolt,配置等

 1 package com.scps.storm.helloword;
 2 
 3 import java.util.concurrent.TimeUnit;
 4 
 5 import org.apache.storm.Config;
 6 import org.apache.storm.LocalCluster;
 7 import org.apache.storm.StormSubmitter;
 8 import org.apache.storm.generated.AlreadyAliveException;
 9 import org.apache.storm.generated.AuthorizationException;
10 import org.apache.storm.generated.InvalidTopologyException;
11 import org.apache.storm.topology.TopologyBuilder;
12 import org.apache.storm.topology.base.BaseWindowedBolt.Duration;
13 import org.apache.storm.tuple.Fields;
14 
15 import com.scps.storm.helloword.bolt.SlidingWindowBolt;
16 import com.scps.storm.helloword.bolt.WordCountBolt;
17 import com.scps.storm.helloword.bolt.WordFinalBolt;
18 import com.scps.storm.helloword.bolt.WordSplitBolt;
19 import com.scps.storm.helloword.spout.WordReaderSpout;
20 
21 public class WordTopology {
22 
23     public static void main(String[] args) {
24 
25         TopologyBuilder builder = new TopologyBuilder();
26 
27         // 1个task去读文件
28         builder.setSpout("word-reader", new WordReaderSpout(), 1);
29         
30         // 2个task分割行
31         builder.setBolt("word-split", new WordSplitBolt(), 2).shuffleGrouping("word-reader");
32         
33         // 2个task分批统计,并发送相同的word到同一个task
34         builder.setBolt("word-count", new WordCountBolt(), 2).fieldsGrouping("word-split", new Fields("word"));
35 
36         // 1个task汇总,每隔3秒统计最近5秒的tuple,SlidingWindow滑动窗口(间隔)
37         // builder.setBolt("sliding-window-bolt", new SlidingWindowBolt().withWindow(new Duration(5, TimeUnit.SECONDS), new Duration(3, TimeUnit.SECONDS)), 1).shuffleGrouping("word-count");
38         // 1个task汇总,统计5秒内的tuple,不能超过15秒?提示超时错误,TumblingWindow滚动窗口
39         builder.setBolt("sliding-window-bolt", new SlidingWindowBolt().withTumblingWindow(new Duration(5, TimeUnit.SECONDS)), 1).shuffleGrouping("word-count");
40         
41         // 1个task输出
42         builder.setBolt("word-final", new WordFinalBolt(), 1).shuffleGrouping("sliding-window-bolt");
43 
44         Config conf = new Config();
45 
46         conf.setDebug(false);
47 
48         if (args != null && args.length > 0) {
49             
50             // 在集群运行,需要mvn package编译
51             // bin/storm jar "/root/storm-example-0.0.1.jar" com.scps.storm.helloword.WordTopology "http://nimbus:8080/uploads/shengjing.txt" wordcount
52             
53             try {
54 
55                 String file = args[0];
56                 String name = args[1];
57 
58                 conf.put("file", file);
59                 // conf.setNumWorkers(2);
60 
61                 StormSubmitter.submitTopology(name, conf, builder.createTopology());
62 
63             } catch (AlreadyAliveException e) {
64 
65                 e.printStackTrace();
66 
67             } catch (InvalidTopologyException e) {
68 
69                 e.printStackTrace();
70 
71             } catch (AuthorizationException e) {
72 
73                 e.printStackTrace();
74             }
75 
76         } else {
77 
78             // 直接在eclipse中运行
79             
80             conf.put("file", "C:\\\\Users\\\\Administrator\\\\Downloads\\\\shengjing1.txt");
81             // conf.put("file", "http://192.168.100.170:8080/uploads/shengjing.txt");
82             // conf.setMaxTaskParallelism(2); // 设置最大task数
83             LocalCluster cluster = new LocalCluster();
84             cluster.submitTopology("wordcount", conf, builder.createTopology());
85         }
86     }
87 }
View Code

 

● WordReaderSpout.java文件,读取txt文件,发送行

  1 package com.scps.storm.helloword.spout;
  2 
  3 import java.io.BufferedReader;
  4 import java.io.FileInputStream;
  5 import java.io.FileNotFoundException;
  6 import java.io.IOException;
  7 import java.io.InputStream;
  8 import java.io.InputStreamReader;
  9 import java.io.UnsupportedEncodingException;
 10 import java.net.MalformedURLException;
 11 import java.net.URL;
 12 import java.net.URLConnection;
 13 import java.text.SimpleDateFormat;
 14 import java.util.Date;
 15 import java.util.Map;
 16 
 17 import org.apache.storm.spout.SpoutOutputCollector;
 18 import org.apache.storm.task.TopologyContext;
 19 import org.apache.storm.topology.IRichSpout;
 20 import org.apache.storm.topology.OutputFieldsDeclarer;
 21 import org.apache.storm.tuple.Fields;
 22 import org.apache.storm.tuple.Values;
 23 import org.apache.storm.utils.Utils;
 24 
 25 public class WordReaderSpout implements IRichSpout {
 26 
 27     private static final long serialVersionUID = 1L;
 28     private SpoutOutputCollector outputCollector;
 29     private String filePath;
 30     private boolean completed = false;
 31 
 32     public void ack(Object arg0) {
 33 
 34     }
 35 
 36     public void activate() {
 37 
 38     }
 39 
 40     public void close() {
 41 
 42     }
 43 
 44     public void deactivate() {
 45 
 46     }
 47 
 48     public void fail(Object arg0) {
 49 
 50     }
 51 
 52     @SuppressWarnings("rawtypes")
 53     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
 54 
 55         filePath = conf.get("file").toString();
 56         outputCollector = collector;
 57     }
 58 
 59     public void nextTuple() {
 60 
 61         if (!completed) {
 62 
 63             String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
 64             System.out.println("WordReaderSpout nextTuple, " + time);
 65 
 66             String line = "";
 67             InputStream inputStream = null;
 68             InputStreamReader inputStreamReader = null;
 69             BufferedReader reader = null;
 70 
 71             try {
 72 
 73                 // filePath = "http://192.168.100.170:8080/uploads/shengjing.txt";
 74                 // filePath = "C:\\\\Users\\\\Administrator\\\\Downloads\\\\shengjing.txt";
 75 
 76                 if (filePath.startsWith("http://")) { // 远程文件
 77                     URL url = new URL(filePath);
 78                     URLConnection urlConn = url.openConnection();
 79                     inputStream = urlConn.getInputStream();
 80                 } else { // 本地文件
 81                     inputStream = new FileInputStream(filePath);
 82                 }
 83 
 84                 inputStreamReader = new InputStreamReader(inputStream, "utf-8");
 85                 reader = new BufferedReader(inputStreamReader);
 86                 while ((line = reader.readLine()) != null) {
 87                     outputCollector.emit(new Values(line));
 88                 }
 89 
 90             } catch (MalformedURLException e) {
 91                 e.printStackTrace();
 92             } catch (FileNotFoundException e) {
 93                 e.printStackTrace();
 94             } catch (UnsupportedEncodingException e) {
 95                 e.printStackTrace();
 96             } catch (IOException e) {
 97                 e.printStackTrace();
 98             } finally {
 99                 completed = true;
100                 try {
101                     if (reader != null) {
102                         reader.close();
103                     }
104                     if (inputStreamReader != null) {
105                         inputStreamReader.close();
106                     }
107                     if (inputStream != null) {
108                         inputStream.close();
109                     }
110                 } catch (IOException e) {
111                     e.printStackTrace();
112                 }
113             }
114         }
115 
116         Utils.sleep(20000);
117     }
118 
119     public void declareOutputFields(OutputFieldsDeclarer declarer) {
120 
121         declarer.declare(new Fields("line"));
122     }
123 
124     public Map<String, Object> getComponentConfiguration() {
125 
126         return null;
127     }
128 }
View Code

  使用集群测试时,先把txt文件上传到nimbus的ui里,随机指派supervisor远程读取文件。

 

● WordSplitBolt.java文件,接收行,分割行,发送词

 1 package com.scps.storm.helloword.bolt;
 2 
 3 import java.util.Map;
 4 
 5 import org.apache.storm.task.OutputCollector;
 6 import org.apache.storm.task.TopologyContext;
 7 import org.apache.storm.topology.IRichBolt;
 8 import org.apache.storm.topology.OutputFieldsDeclarer;
 9 import org.apache.storm.tuple.Fields;
10 import org.apache.storm.tuple.Tuple;
11 import org.apache.storm.tuple.Values;
12 
13 public class WordSplitBolt implements IRichBolt {
14 
15     private static final long serialVersionUID = 1L;
16     private OutputCollector outputCollector;
17 
18     @SuppressWarnings("rawtypes")
19     public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
20 
21         outputCollector = collector;
22     }
23 
24     public void execute(Tuple input) {
25 
26         String line = input.getStringByField("line");
27 
28         line = line.trim();
29         line = line.replace(",", " ");
30         line = line.replace(".", " ");
31         line = line.replace(":", " ");
32         line = line.replace(";", " ");
33         line = line.replace("?", " ");
34         line = line.replace("!", " ");
35         line = line.replace("(", " ");
36         line = line.replace(")", " ");
37         line = line.replace("[", " ");
38         line = line.replace("]", " ");
39         line = line.trim();
40         
41         String[] words = line.split(" ");
42         for (String word : words) {
43             word = word.trim();
44             if (!"".equals(word)) {
45                 outputCollector.emit(new Values(word));
46             }
47         }
48     }
49 
50     public void declareOutputFields(OutputFieldsDeclarer declarer) {
51 
52         declarer.declare(new Fields("word"));
53     }
54 
55     public void cleanup() {
56 
57     }
58 
59     public Map<String, Object> getComponentConfiguration() {
60 
61         return null;
62     }
63 }
View Code

 

● WordCountBolt.java文件,接收词,统计词,发送集合

 1 package com.scps.storm.helloword.bolt;
 2 
 3 import java.util.HashMap;
 4 import java.util.Map;
 5 
 6 import org.apache.storm.task.OutputCollector;
 7 import org.apache.storm.task.TopologyContext;
 8 import org.apache.storm.topology.IRichBolt;
 9 import org.apache.storm.topology.OutputFieldsDeclarer;
10 import org.apache.storm.tuple.Fields;
11 import org.apache.storm.tuple.Tuple;
12 import org.apache.storm.tuple.Values;
13 
14 public class WordCountBolt implements IRichBolt {
15 
16     private static final long serialVersionUID = 1L;
17     Map<String, Integer> counter;
18     private OutputCollector outputCollector;
19 
20     @SuppressWarnings("rawtypes")
21     public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
22 
23         counter = new HashMap<String, Integer>();
24         outputCollector = collector;
25     }
26 
27     public void execute(Tuple input) {
28 
29         String word = input.getStringByField("word");
30         int count;
31 
32         if (!counter.containsKey(word)) {
33             count = 1;
34         } else {
35             count = counter.get(word) + 1;
36         }
37 
38         counter.put(word, count);
39         outputCollector.emit(new Values(word, count));
40     }
41 
42     public void declareOutputFields(OutputFieldsDeclarer declarer) {
43 
44         declarer.declare(new Fields("word", "count"));
45     }
46 
47     public void cleanup() {
48 
49     }
50 
51     public Map<String, Object> getComponentConfiguration() {
52 
53         return null;
54     }
55 }
View Code

 

● SlidingWindowBolt.java文件,接收集合,合并集合,发送集合

 1 package com.scps.storm.helloword.bolt;
 2 
 3 import java.text.SimpleDateFormat;
 4 import java.util.Date;
 5 import java.util.HashMap;
 6 import java.util.Map;
 7 
 8 import org.apache.storm.task.OutputCollector;
 9 import org.apache.storm.task.TopologyContext;
10 import org.apache.storm.topology.OutputFieldsDeclarer;
11 import org.apache.storm.topology.base.BaseWindowedBolt;
12 import org.apache.storm.tuple.Fields;
13 import org.apache.storm.tuple.Tuple;
14 import org.apache.storm.tuple.Values;
15 import org.apache.storm.windowing.TupleWindow;
16 
17 public class SlidingWindowBolt extends BaseWindowedBolt {
18 
19     private static final long serialVersionUID = 1L;
20     Map<String, Integer> counter;
21     private OutputCollector outputCollector;
22 
23     @SuppressWarnings("rawtypes")
24     public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
25 
26         counter = new HashMap<String, Integer>();
27         outputCollector = collector;
28     }
29 
30     public void execute(TupleWindow inputWindow) {
31         
32         String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
33         System.out.println("SlidingWindowBolt execute, " + time);
34 
35         for (Tuple input : inputWindow.get()) {
36 
37             String word = input.getStringByField("word");
38             int count = input.getIntegerByField("count");
39             
40             counter.put(word, count);
41         }
42 
43         outputCollector.emit(new Values(counter));
44     }
45 
46     public void declareOutputFields(OutputFieldsDeclarer declarer) {
47 
48         declarer.declare(new Fields("counter"));
49     }
50 }
View Code

 

● WordFinalBolt.java文件,接收集合,打印集合

 1 package com.scps.storm.helloword.bolt;
以上是关于Storm之路-WordCount-实例的主要内容,如果未能解决你的问题,请参考以下文章

Storm设计一个Topology用来统计单词的TopN的实例

Storm入门WordCount示例

storm实战之WordCount

Storm常用操作命令及WordCount

storm经典例子的wordcount的实现

Storm WordCount