Storm完整例子

Posted hadoop_dev

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm完整例子相关的知识,希望对你有一定的参考价值。

  1. import backtype.storm.spout.SpoutOutputCollector;  
  2. import backtype.storm.task.TopologyContext;  
  3. import backtype.storm.topology.base.BaseRichSpout;  
  4. import backtype.storm.utils.Utils;  
  5.   
  6. import backtype.storm.Config;  
  7. import backtype.storm.LocalCluster;  
  8. import backtype.storm.StormSubmitter;  
  9. import backtype.storm.task.ShellBolt;  
  10.   
  11. import backtype.storm.topology.BasicOutputCollector;  
  12. import backtype.storm.topology.IRichBolt;  
  13. import backtype.storm.topology.OutputFieldsDeclarer;  
  14. import backtype.storm.topology.TopologyBuilder;  
  15. import backtype.storm.topology.base.BaseBasicBolt;  
  16.   
  17. import backtype.storm.tuple.Fields;  
  18. import backtype.storm.tuple.Tuple;  
  19. import backtype.storm.tuple.Values;  
  20.   
  21. import java.util.*;  
  22. //import java.util.HashMap;  
  23. //import java.util.Map;  
  24. //import java.util.Random;  
  25. //import java.util.StringTokenizer;  
  26.   
  27. /* 
  28. ** WordCountTopolopgyAllInJava类(单词计数) 
  29. */  
  30. public class  WordCountTopolopgyAllInJava{  
  31.       
  32.     // 定义一个喷头,用于产生数据。该类继承自BaseRichSpout  
  33.     public static class RandomSentenceSpout extends BaseRichSpout {  
  34.         SpoutOutputCollector _collector;  
  35.         Random _rand;  
  36.           
  37.         @Override  
  38.         public void open(Map conf, TopologyContext context, SpoutOutputCollector collector){  
  39.             _collector = collector;  
  40.             _rand = new Random();  
  41.         }  
  42.           
  43.         @Override  
  44.         public void nextTuple(){  
  45.               
  46.             // 睡眠一段时间后再产生一个数据  
  47.             Utils.sleep(100);  
  48.               
  49.             // 句子数组  
  50.             String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away",  
  51.                 "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };  
  52.               
  53.             // 随机选择一个句子  
  54.             String sentence = sentences[_rand.nextInt(sentences.length)];  
  55.               
  56.             // 发射该句子给Bolt  
  57.             _collector.emit(new Values(sentence));  
  58.         }  
  59.           
  60.         // 确认函数  
  61.         @Override  
  62.         public void ack(Object id){  
  63.         }  
  64.           
  65.         // 处理失败的时候调用  
  66.         @Override  
  67.         public void fail(Object id){  
  68.         }  
  69.           
  70.         @Override  
  71.         public void declareOutputFields(OutputFieldsDeclarer declarer){  
  72.             // 定义一个字段word  
  73.             declarer.declare(new Fields("word"));  
  74.         }  
  75.     }  
  76.       
  77.     // 定义个Bolt,用于将句子切分为单词  
  78.     public static class SplitSentence extends BaseBasicBolt{  
  79.         @Override  
  80.         public void execute(Tuple tuple, BasicOutputCollector collector){  
  81.             // 接收到一个句子  
  82.             String sentence = tuple.getString(0);  
  83.             // 把句子切割为单词  
  84.             StringTokenizer iter = new StringTokenizer(sentence);  
  85.             // 发送每一个单词  
  86.             while(iter.hasMoreElements()){  
  87.                 collector.emit(new Values(iter.nextToken()));  
  88.             }  
  89.         }  
  90.           
  91.         @Override  
  92.         public void declareOutputFields(OutputFieldsDeclarer declarer){  
  93.             // 定义一个字段  
  94.             declarer.declare(new Fields("word"));  
  95.         }  
  96.     }  
  97.       
  98.     // 定义一个Bolt,用于单词计数  
  99.     public static class WordCount extends BaseBasicBolt {  
  100.         Map<String, Integer> counts = new HashMap<String, Integer>();  
  101.           
  102.         @Override  
  103.         public void execute(Tuple tuple, BasicOutputCollector collector){  
  104.             // 接收一个单词  
  105.             String word = tuple.getString(0);  
  106.             // 获取该单词对应的计数  
  107.             Integer count = counts.get(word);  
  108.             if(count == null)  
  109.                 count = 0;  
  110.             // 计数增加  
  111.             count++;  
  112.             // 将单词和对应的计数加入map中  
  113.             counts.put(word,count);  
  114.             System.out.println("hello word!");  
  115.             System.out.println(word +"  "+count);  
  116.             // 发送单词和计数(分别对应字段word和count)  
  117.             collector.emit(new Values(word, count));  
  118.         }  
  119.           
  120.         @Override  
  121.         public void declareOutputFields(OutputFieldsDeclarer declarer){  
  122.             // 定义两个字段word和count  
  123.             declarer.declare(new Fields("word","count"));  
  124.         }  
  125.     }  
  126.     public static void main(String[] args) throws Exception   
  127.     {  
  128.         // 创建一个拓扑  
  129.         TopologyBuilder builder = new TopologyBuilder();  
  130.         // 设置Spout,这个Spout的名字叫做"Spout",设置并行度为5  
  131.         builder.setSpout("Spout", new RandomSentenceSpout(), 5);  
  132.         // 设置slot——“split”,并行度为8,它的数据来源是spout的  
  133.         builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");  
  134.         // 设置slot——“count”,你并行度为12,它的数据来源是split的word字段  
  135.         builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));  
  136.           
  137.         Config conf = new Config();  
  138.         conf.setDebug(false);  
  139.           
  140.         //if(args != null && args.length > 0){  
  141.         //if(false){  
  142.         //  conf.setNumWorkers(3);  
  143.         //  StormSubmitter.submitTopology(args[0], conf, builder.createTopology());  
  144.         //}else{  
  145.             conf.setMaxTaskParallelism(3);  
  146.               
  147.             // 本地集群  
  148.             LocalCluster cluster = new LocalCluster();  
  149.               
  150.             // 提交拓扑(该拓扑的名字叫word-count)  
  151.             cluster.submitTopology("word-count", conf, builder.createTopology() );  
  152.               
  153.             Thread.sleep(10000);  
  154.         //  cluster.shutdown();  
  155.         //}  
  156.     }  
  157. }  

使用maven编译该项目: mvn clean package

运行:storm jar word-count-1.0.jar WordCountTopolopgyAllInJava 

结果如下:

hello word!
moon    811
hello word!
an      829
hello word!
apple   829
hello word!
a       829
hello word!
keeps   829
hello word!
day     829
hello word!
score   800
hello word!

pom.xml文件定义如下

 

[html] view plain copy
 
 技术分享技术分享
    1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    2.   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
    3.   <modelVersion>4.0.0</modelVersion>  
    4.   
    5.  <groupId>storm-yqj</groupId>  
    6.   <artifactId>word-count</artifactId>  
    7.   <version>1.0</version>  
    8.   <packaging>jar</packaging>  
    9.   
    10.   <name>word-count</name>  
    11.   <url>http://maven.apache.org</url>  
    12.   
    13.   <properties>  
    14.     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  
    15.   </properties>  
    16.   
    17.     
    18.   
    19.   <dependencies>  
    20.     <dependency>  
    21.       <groupId>junit</groupId>  
    22.       <artifactId>junit</artifactId>  
    23.       <version>3.8.1</version>  
    24.       <scope>test</scope>  
    25.     </dependency>  
    26.     <dependency>  
    27.       <groupId>org.testng</groupId>  
    28.       <artifactId>testng</artifactId>  
    29.       <version>6.8.5</version>  
    30.       <scope>test</scope>  
    31.     </dependency>  
    32.     <dependency>  
    33.       <groupId>org.mockito</groupId>  
    34.       <artifactId>mockito-all</artifactId>  
    35.       <version>1.9.0</version>  
    36.       <scope>test</scope>  
    37.     </dependency>  
    38.     <dependency>  
    39.       <groupId>org.easytesting</groupId>  
    40.       <artifactId>fest-assert-core</artifactId>  
    41.       <version>2.0M8</version>  
    42.       <scope>test</scope>  
    43.     </dependency>  
    44.     <dependency>  
    45.       <groupId>org.jmock</groupId>  
    46.       <artifactId>jmock</artifactId>  
    47.       <version>2.6.0</version>  
    48.       <scope>test</scope>  
    49.     </dependency>  
    50.     <dependency>  
    51.       <groupId>org.apache.storm</groupId>  
    52.       <artifactId>storm-core</artifactId>  
    53.       <version>0.9.1-incubating</version>  
    54.     </dependency>  
    55.   
    56.   
    57.     <dependency>  
    58.       <groupId>commons-collections</groupId>  
    59.       <artifactId>commons-collections</artifactId>  
    60.       <version>3.2.1</version>  
    61.     </dependency>  
    62.     <dependency>  
    63.       <groupId>com.google.guava</groupId>  
    64.       <artifactId>guava</artifactId>  
    65.       <version>15.0</version>  
    66.     </dependency>  
    67.   </dependencies>  
    68.   
    69.   <build>  
    70.     <resources>  
    71.       <resource>  
    72.         <directory>${basedir}/multilang</directory>  
    73.       </resource>  
    74.     </resources>  
    75.   
    76.     <plugins>  
    77.       <plugin>  
    78.         <artifactId>maven-assembly-plugin</artifactId>  
    79.         <configuration>  
    80.           <descriptorRefs>  
    81.             <descriptorRef>jar-with-dependencies</descriptorRef>  
    82.           </descriptorRefs>  
    83.           <archive>  
    84.             <manifest>  
    85.               <mainClass></mainClass>  
    86.             </manifest>  
    87.           </archive>  
    88.         </configuration>  
    89.         <executions>  
    90.           <execution>  
    91.             <id>make-assembly</id>  
    92.             <phase>package</phase>  
    93.             <goals>  
    94.               <goal>single</goal>  
    95.             </goals>  
    96.           </execution>  
    97.         </executions>  
    98.       </plugin>  
    99.   
    100.       <plugin>  
    101.         <groupId>com.theoryinpractise</groupId>  
    102.         <artifactId>clojure-maven-plugin</artifactId>  
    103.         <version>1.3.12</version>  
    104.         <extensions>true</extensions>  
    105.         <configuration>  
    106.           <sourceDirectories>  
    107.             <sourceDirectory>src/clj</sourceDirectory>  
    108.           </sourceDirectories>  
    109.         </configuration>  
    110.         <executions>  
    111.           <execution>  
    112.             <id>compile</id>  
    113.             <phase>compile</phase>  
    114.             <goals>  
    115.               <goal>compile</goal>  
    116.             </goals>  
    117.           </execution>  
    118.           <execution>  
    119.             <id>test</id>  
    120.             <phase>test</phase>  
    121.             <goals>  
    122.               <goal>test</goal>  
    123.             </goals>  
    124.           </execution>  
    125.         </executions>  
    126.       </plugin>  
    127.   
    128.       <plugin>  
    129.         <groupId>org.codehaus.mojo</groupId>  
    130.         <artifactId>exec-maven-plugin</artifactId>  
    131.         <version>1.2.1</version>  
    132.         <executions>  
    133.           <execution>  
    134.             <goals>  
    135.               <goal>exec</goal>  
    136.             </goals>  
    137.           </execution>  
    138.         </executions>  
    139.         <configuration>  
    140.           <executable>java</executable>  
    141.           <includeProjectDependencies>true</includeProjectDependencies>  
    142.           <includePluginDependencies>false</includePluginDependencies>  
    143.           <classpathScope>compile</classpathScope>  
    144.           <mainClass>${storm.topology}</mainClass>  
    145.         </configuration>  
    146.       </plugin>  
    147.   
    148.       <plugin>  
    149.         <groupId>org.apache.maven.plugins</groupId>  
    150.         <artifactId>maven-compiler-plugin</artifactId>  
    151.         <configuration>  
    152.           <source>1.6</source>  
    153.           <target>1.6</target>  
    154.         </configuration>  
    155.       </plugin>  
    156.   
    157.     </plugins>  
    158.   </build>  
    159. </project>  
















以上是关于Storm完整例子的主要内容,如果未能解决你的问题,请参考以下文章

storm的例子,一个非常好的网址

Storm消息可靠处理机制

Storm集成Siddhi

Storm:最火的流式处理框架

Storm:最火的流式处理框架

Storm 入门教程