古人云,纸上得来终觉浅,绝知此事要躬行。翻译过来,就是学东西哪有不踩坑的。
因为工作原因要折腾Storm,环境和第一个例子折腾了好久,搞完了回头看,吐血的简单。
Storm有两种模式,单机和集群。入门当然选单机。
1、安装JDK,配置Eclipse环境
2、建立一个Maven工程,在pom.xml加上这段:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.1.2</version>
<scope>compile</scope>
</dependency>3、通过Maven建立项目和下载依赖包。
其实,所需要的storm-core-1.1.2.jar可以从官网下载的storm包里面的lib目录中找到。
Java在下不熟悉,也就不多说了。
4、参考官方或者各种教程的word-count例子编个代码。
5、在Eclipse里面run起来就可以了。
什么Storm, Zookeeper,其实在这个单机入门例子里面,都是不需要的!
就这么简单。
具体代码来说,官方提供的storm-starter例子中,WordCountTopology.java挺适合入门的。
只是里面有个坑:
官方采用了python作为句子分割blot的实现,但是如果环境不具备的话,一跑就会出错。
就是这段:
public static class SplitSentence extends ShellBolt implements IRichBolt {
public SplitSentence() {
super("python", "splitsentence.py");
}// 其余部分略
可以用这个类来替代:
View Code1 public static class SplitSentence extends BaseBasicBolt{ 2 @Override 3 public void execute(Tuple tuple, BasicOutputCollector collector){ 4 // 接收到一个句子 5 String sentence = tuple.getString(0); 6 // 把句子切割为单词 7 StringTokenizer iter = new StringTokenizer(sentence); 8 // 发送每一个单词 9 while(iter.hasMoreElements()){ 10 collector.emit(new Values(iter.nextToken())); 11 } 12 } 13 14 @Override 15 public void declareOutputFields(OutputFieldsDeclarer declarer){ 16 // 定义一个字段 17 declarer.declare(new Fields("word")); 18 } 19 20 @Override 21 public Map<String, Object> getComponentConfiguration() { 22 return null; 23 } 24 }
Run起来以后,在Eclipse的Console窗口里面可以看到运行的详情。
完整代码如下:
View Code1 package storm.blueprints; 2 3 import org.apache.storm.spout.SpoutOutputCollector; 4 import org.apache.storm.task.TopologyContext; 5 import org.apache.storm.topology.OutputFieldsDeclarer; 6 import org.apache.storm.topology.base.BaseRichSpout; 7 import org.apache.storm.tuple.Fields; 8 import org.apache.storm.tuple.Values; 9 10 import org.apache.storm.utils.Utils; 11 import org.slf4j.Logger; 12 import org.slf4j.LoggerFactory; 13 14 import org.apache.storm.Config; 15 import org.apache.storm.LocalCluster; 16 import org.apache.storm.StormSubmitter; 17 import org.apache.storm.task.ShellBolt; 18 19 import org.apache.storm.topology.BasicOutputCollector; 20 import org.apache.storm.topology.IRichBolt; 21 import org.apache.storm.topology.TopologyBuilder; 22 import org.apache.storm.topology.base.BaseBasicBolt; 23 24 import org.apache.storm.tuple.Tuple; 25 import java.util.HashMap; 26 import java.util.Map; 27 28 29 import java.util.*; 30 31 public class HelloWordCount 32 { 33 public static class RandomSentenceSpout extends BaseRichSpout { 34 private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class); 35 36 SpoutOutputCollector _collector; 37 Random _rand; 38 39 40 @Override 41 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 42 _collector = collector; 43 _rand = new Random(); 44 } 45 46 @Override 47 public void nextTuple() { 48 Utils.waitForMillis(100);//(100); 49 String[] sentences = new String[]{ 50 sentence("the cow jumped over the moon"), 51 sentence("an apple a day keeps the doctor away"), 52 sentence("four score and seven years ago"), 53 sentence("snow white and the seven dwarfs"), 54 sentence("i am at two with nature")}; 55 final String sentence = sentences[_rand.nextInt(sentences.length)]; 56 57 LOG.debug("Emitting tuple: {}", sentence); 58 59 _collector.emit(new Values(sentence)); 60 61 System.out.println("***" + sentence); 62 } 63 64 protected String sentence(String input) { 65 return input; 66 } 67 68 @Override 69 public void ack(Object id) { 70 } 71 72 @Override 73 public void fail(Object id) { 74 } 75 76 @Override 77 public void declareOutputFields(OutputFieldsDeclarer declarer) { 78 declarer.declare(new Fields("sentence")); 79 } 80 } 81 82 83 // 定义个Bolt,用于将句子切分为单词 84 public static class SplitSentence extends BaseBasicBolt{ 85 @Override 86 public void execute(Tuple tuple, BasicOutputCollector collector){ 87 // 接收到一个句子 88 String sentence = tuple.getString(0); 89 // 把句子切割为单词 90 StringTokenizer iter = new StringTokenizer(sentence); 91 // 发送每一个单词 92 while(iter.hasMoreElements()){ 93 collector.emit(new Values(iter.nextToken())); 94 } 95 } 96 97 @Override 98 public void declareOutputFields(OutputFieldsDeclarer declarer){ 99 // 定义一个字段 100 declarer.declare(new Fields("word")); 101 } 102 103 @Override 104 public Map<String, Object> getComponentConfiguration() { 105 return null; 106 } 107 } 108 109 // 定义一个Bolt,用于单词计数 110 public static class WordCount extends BaseBasicBolt { 111 Map<String, Integer> counts = new HashMap<String, Integer>(); 112 113 @Override 114 public void execute(Tuple tuple, BasicOutputCollector collector){ 115 String word = tuple.getString(0); 116 Integer count = counts.get(word); 117 if (count == null) 118 count = 0; 119 count++; 120 counts.put(word, count); 121 122 System.out.println(word +" "+count); 123 } 124 125 @Override 126 public void declareOutputFields(OutputFieldsDeclarer declarer){ 127 // 定义两个字段word和count 128 declarer.declare(new Fields("word","count")); 129 } 130 } 131 public static void main(String[] args) throws Exception 132 { 133 System.out.println("main"); 134 // 创建一个拓扑 135 TopologyBuilder builder = new TopologyBuilder(); 136 // 设置Spout,这个Spout的名字叫做"Spout",设置并行度为5 137 builder.setSpout("Spout", new RandomSentenceSpout(), 5); 138 // 设置slot——“split”,并行度为8,它的数据来源是spout的 139 builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("Spout"); 140 // 设置slot——“count”,你并行度为12,它的数据来源是split的word字段 141 builder.setBolt("count", new WordCount(), 12).globalGrouping("split");//, new Fields("word")); 142 143 Config conf = new Config(); 144 145 // 本地集群 146 LocalCluster cluster = new LocalCluster(); 147 148 System.out.println("LocalCluster"); 149 150 // 提交拓扑(该拓扑的名字叫word-count) 151 cluster.submitTopology("word-count", conf, builder.createTopology() ); 152 153 System.out.println("submitTopology"); 154 155 Utils.waitForSeconds(10); 156 cluster.killTopology("word-count"); 157 cluster.shutdown(); 158 } 159 } 160 161 public static class Utils { 162 163 public static void waitForSeconds(int seconds) { 164 try { 165 Thread.sleep(seconds * 1000); 166 } catch (InterruptedException e) { 167 } 168 } 169 170 public static void waitForMillis(long milliseconds) { 171 try { 172 Thread.sleep(milliseconds); 173 } catch (InterruptedException e) { 174 } 175 } 176 } 177 }
请使用手机"扫一扫"x