Storm设计一个Topology用来统计单词的TopN的实例

Posted brainstorm

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm设计一个Topology用来统计单词的TopN的实例相关的知识,希望对你有一定的参考价值。

Storm的单词统计设计

一:Storm的wordCount和Hadoop的wordCount实例对比

技术分享图片

 

二:Storm的wordCount的方案实例设计

技术分享图片

 

三:建立maven项目,添加maven相关依赖包
(1)输入:search.maven.org网址,在其中找到storm的核心依赖
(2)将核心依赖添加到pom.xml文件中

         <dependency>
            <groupId>com.github.aloomaio</groupId>
                    <artifactId>storm-core</artifactId>
                    <version>0.9.2-incubating</version>
        </dependency>

四:代码实现

一:WordCountTopology源码【启动topology的入口】

技术分享图片
 1 package com.yeepay.sxf.helloword;
 2 
 3 import backtype.storm.Config;
 4 import backtype.storm.LocalCluster;
 5 import backtype.storm.StormSubmitter;
 6 import backtype.storm.topology.TopologyBuilder;
 7 import backtype.storm.tuple.Fields;
 8 
 9 import com.yeepay.sxf.helloword.bolt.PrintBolt;
10 import com.yeepay.sxf.helloword.bolt.WordCountBolt;
11 import com.yeepay.sxf.helloword.bolt.WordNormalizerBolt;
12 import com.yeepay.sxf.helloword.spout.RandomSentenceSpout;
13 
14 public class WordCountTopology {
15 
16     private static TopologyBuilder builder=new TopologyBuilder();
17 
18     public static void main(String[] args) throws InterruptedException {
19         
20         
21         Config config=new Config();
22         
23         builder.setSpout("RandomSentence", new RandomSentenceSpout(),2);
24         builder.setBolt("WordNormalizer", new WordNormalizerBolt(),2).shuffleGrouping("RandomSentence");
25         builder.setBolt("WordCount", new WordCountBolt(),2).fieldsGrouping("WordNormalizer", new Fields("wordd"));
26         builder.setBolt("Print", new PrintBolt(),1).shuffleGrouping("WordCount");
27         
28         config.setDebug(false);//调试模式,会把所有的log都打印出来
29         
30         //通过是否有参数来判断是否启动集群,或者本地模式执行
31         if(args!=null&&args.length>0){
32             System.out.print("集群模式------------------------------------------------");
33             try {
34                 config.setNumWorkers(1);
35                 StormSubmitter.submitTopology(args[0], config, builder.createTopology());
36             } catch (Exception e) {
37                 // TODO: handle exception
38             }
39         }else{
40             System.out.print("本地模式------------------------------------------------");
41             //本地模式
42             config.setMaxTaskParallelism(1);
43             LocalCluster cluster=new LocalCluster();
44             cluster.submitTopology("wordcount",config,builder.createTopology() );
45             
46             Thread.sleep(10000L);
47             //关闭本地集群
48             cluster.shutdown();
49         }
50     }
51 }
View Code

二:RandomSentenceSpout源码【发送英语句子的消息源头】

技术分享图片
  1 package com.yeepay.sxf.helloword.spout;
  2 import java.util.Map;
  3 import java.util.Random;
  4 
  5 import backtype.storm.spout.SpoutOutputCollector;
  6 import backtype.storm.task.TopologyContext;
  7 import backtype.storm.topology.OutputFieldsDeclarer;
  8 import backtype.storm.topology.base.BaseRichSpout;
  9 import backtype.storm.tuple.Fields;
 10 import backtype.storm.tuple.Values;
 11 import backtype.storm.utils.Utils;
 12 /**
 13  * 内存中随机选取待定的英文语句,作为数据源发射出去
 14  * @author sxf
 15  *随机发送一条内置消息,该spout继承BaseRichSpout/IRichSpout
 16  *
 17  *Storm的两个主要抽象是Spout和Bolt,Storm的第三个更强大的抽象是StateSpout
 18  */
 19 @SuppressWarnings("serial")
 20 public class RandomSentenceSpout extends BaseRichSpout {
 21     //发射消息
 22     SpoutOutputCollector spoutOutputCollector;
 23     Random random;
 24     
 25     /**
 26      * 【1】IComponent接口中的open()方法
 27      * 进行spout的一些初始化工作,包括参数传递。open()方法在该组件的一个任务在集群的工作进程内被初始化时被调用。提供了Spout执行的所需的环境。
 28      * Map:是这个Spout的Storm配置,提供给拓扑与这台主机上的集群配置一起进行合并。
 29      * TopologyContext:可以用来获取关于这个任务在拓扑中的位置信息,包括该任务的id,该任务的组件id,输入和输出信息等。
 30      * SpoutOutputCollector:是收集器,用于从这个Spout发射元组,元组可以随时被发射,包括open()和colse()方法。收集器是线程安全的,应该作为这个Spout对象的实例变量进行保存
 31      * 
 32      * 
 33      * 【2】IComponent接口中的colse()方法
 34      * 当一个Ispout即将关闭时被调用。不能保证colse()方法一定会被调用。因为Supervisor可以对集群的工作进程使用Kill -9命令强制杀死进程命令
 35      * 本地模式,当拓扑被杀死事,一定调用colse()方法
 36      * 
 37      * 【3】IComponent接口中的activate()方法
 38      * Activate()方法当Spout已经从失效模式中激活时被调用。该Spout的nextTuple()方法很快就会被调用。当使用Storm客户端操作拓扑时,Spout可以在失效状态之后变成激活模式。
 39      * 
 40      * 【4】
 41      */
 42     @Override
 43     public void open(Map arg0, TopologyContext arg01, SpoutOutputCollector arg2) {
 44         this.spoutOutputCollector=arg2;
 45         this.random=new Random();
 46     }
 47     
 48     /**
 49      * 进行Tuple处理的主要方法
 50      * 【4】IComponent的nextTuple()方法
 51      * 当调用nextTuple()方法时,Storm要求Spout发射元组到输出收集器(OutputCollector)
 52      * nextTuple()方法应该是非阻塞的,所以,如果Spout没有元组可以发射,该方法应该返回。
 53      * nextTuple(),ack()和fail()方法都在Spout任务的单一线程内紧密循环被调用。当没有元组可以发射时,
 54      * 可以让nextTuple()去sleep很短时间,例如1毫秒,这样不会浪费太多cpu资源。
 55      */
 56     @Override
 57     public void nextTuple() {
 58         //每两秒种发送一条消息
 59         Utils.sleep(2000);
 60         //自定义内置数组
 61         String[] sentences=new String[]{
 62                 "or 420 million US dollars",
 63                 "What happened is that a group",
 64                 "fight lasted hours overnight between",
 65                 "the air according to the residents",
 66                 "told me that one Malian soldier",
 67                 "military spokesman says security forces",
 68                 "that thousands of people who prayed",
 69                 "continuing to receive treatment for",
 70                 "freezing temperatures currently gripping",
 71                 "Central African Republic Michel Djotodia",
 72                 "freezing temperatures currently gripping",
 73                 "former opposition will make up most",
 74                 "The Syrian government has accused",
 75                 "Doctors in South Africa reporting",
 76                 "military spokesman says security forces",
 77                 "Late on Monday, Ms Yingluck invoked special powers allowing officials to impose curfews",
 78                 "Those who took up exercise were three times more likely to remain healthy over the next eight",
 79                 "The space dream, a source of national pride and inspiration",
 80                 "There was no time to launch the lifeboats because the ferry capsized with such alarming speed"
 81                 };
 82         
 83         //从sentences数组中,随机获取一条语句,作为这次spout发送的消息
 84         String sentence=sentences[random.nextInt(sentences.length)];
 85         //使用emit方法进行Tuple发布会,参数用Values申明
 86         spoutOutputCollector.emit(new Values(sentence.trim().toLowerCase()));
 87     }
 88 
 89     
 90     
 91     /**
 92      * 【5】IComponent的ack()方法
 93      * Storm已经断定该Spout发射的标识符为msgId的元组已经被完全处理时,会调用ack方法。
 94      * 通常情况下,ack()方法会将该消息移除队列以防止它被重发
 95      */
 96     @Override
 97     public void ack(Object msgId) {
 98         
 99     }
100 
101     
102     /**
103      * 【6】IComponent接口的fail()方法
104      * 该Spout发射的标识为msgId的元组未能被完全处理时,会调用fail()方法。
105      * 通常情况下,fail方法会将消息放回队列中,并在稍后重发消息
106      */
107     @Override
108     public void fail(Object msgId) {
109         
110     }
111 
112     //字段声明
113     @Override
114     public void declareOutputFields(OutputFieldsDeclarer arg0) {
115         arg0.declare(new Fields("wordd"));
116     }
117 
118     
119 }
View Code

三:WordNormalizerBolt源码【将英语句子的切割成单词的处理逻辑单元】

 

技术分享图片
  1 package com.yeepay.sxf.helloword.bolt;
  2 
  3 import java.util.Map;
  4 
  5 import backtype.storm.task.OutputCollector;
  6 import backtype.storm.task.TopologyContext;
  7 import backtype.storm.topology.IRichBolt;
  8 import backtype.storm.topology.OutputFieldsDeclarer;
  9 import backtype.storm.tuple.Fields;
 10 import backtype.storm.tuple.Tuple;
 11 import backtype.storm.tuple.Values;
 12 /**
 13  * 消息预处理的bolt,将消息按单词切分
 14  * @author sxf
 15  *Bolt的生命周期如下:
 16  *在客户端主机上创建IBolt对象,IBolt被序列化到拓扑并提交到集群的主控节点(Nimbus)然后Nimbus启动工作进程(Worker)反序列化对象,
 17  *调用对象上的prepare()方法,然后开始处理元组。
 18  *如果你希望参数化一个IBolt,应该通过其构造函数设置参数并作为实例变量保存参数化状态。然后,实例变量会序列化,
 19  *并发送给跨集群的每个任务来执行这个Bolt
 20  *如果使用java来定义Bolt,应该使用IRichBolt接口,IRichBolt接口添加了使用java TopologyBuilder API的必要方法
 21  *
 22  * 
 23  * IBasicBolt与IRichBolt具有一样的同名方法,唯一不同,IBasicBolt的execute()方法会自动处理Acking机制,
 24  * 如果在execute中想让元组失败,可以显示抛出一个FailedException异常
 25  */
 26 @SuppressWarnings("serial")
 27 public class WordNormalizerBolt implements IRichBolt {
 28 
 29     //发射消息
 30     private OutputCollector outputCollector;
 31     
 32     /**
 33      * bolt的初始化方法
 34      *(配置的参数,上下文,发送器)
 35      *【1】IBolt接口的prepare()方法
 36      *在该组建的一个任务在集群的工作进程内被初始化时被调用,提供了Bolt执行的所需环境
 37      *Map参数:是这个Bolt的Storm配置,提供给拓扑与这台主机上的集群配置一起进行合并。
 38      *TopologyContext参数:可以用来获取关于这个任务在拓扑中的位置信息,比如任务的id,该任务的组件id,输入输出信息等。
 39      *OutputCollector参数:是收集器皿,用于从这个Bolt发射元组。元组随时被发射,包括prepare()和cleanup()方法。
 40      *收集器是线程安全,应该作为这个Bolt对象的实例变量进行保存。
 41      */
 42     @Override
 43     public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
 44         this.outputCollector=arg2;
 45     }
 46     
 47     
 48     /**
 49      * 执行订阅的Tuple逻辑过程的方法
 50      * 【2】IBolt接口的execute()方法
 51      * 用于处理一个输入元组,元组对象包含元组来自哪个组件/流/任务的元数据。
 52      * 元组的值可以使用tuple.getValue()进行访问。Ibolt没有立即处理元组,而是完全地捕获一个元组在以后进行处理。
 53      * 元组应该使用prepare方法提供的OutputCollector进行发射。使用OutputCollector在某种程度上要求所有输入元组是ack或者fail
 54      * 否则Storm将无法确定来自Spout的元组什么时候处理完成。
 55      * 常见做法是:在execute()方法结束时对输入的元组调用ack方法,而IBasicBolt会自动处理该部分。
 56      * Tuple参数:为被处理的输入元组
 57      * 
 58      */
 59     @Override
 60     public void execute(Tuple tuple) {
 61         //获取订阅的tuple的内容
 62         String sentence=tuple.getString(0);
 63         //获取元组来自那个bolt或spout。返回它们的名字
 64         String ad=tuple.getSourceComponent();
 65         System.out.print(ad);//RandomSentence
 66         //进行单词分割
 67         String[] words=sentence.split(" ");
 68         //将单词发送出去
 69         for(String word:words){
 70             outputCollector.emit(new Values(word));
 71         }
 72     }
 73     
 74     /**
 75      * 此方法,在当前Bolt被关闭时,调用此方法来清理任何已经打开的资源,但不能保证这个方法会被集群调用
 76      * 【2】IBolt接口的cleanup()方法
 77      * 当一个Bolt即将关闭时被调用。不能保证cleanup()方法一定会被调用,
 78      * 因为Supervisor可以对集群的工作进程使用Kill -9命令强制杀死进程命令
 79      * 如果在本地模式下运行storm,当拓扑被杀死时一定会调用该方法
 80      *
 81      */
 82     @Override
 83     public void cleanup() {
 84         
 85     }
 86 
 87     
 88     /**
 89      * 此方法,用于声明当前Bolt类发射一个字段名为"wordd"的一个元组
 90      * 为该拓扑的所有流声明输出模式
 91      */
 92     @Override
 93     public void declareOutputFields(OutputFieldsDeclarer declarer) {
 94         declarer.declare(new Fields("wordd"));
 95     }
 96 
 97     /**
 98      * 此方法孕育你配置关于当前这个组件如何运行的很多参数,会被运行中调用
 99      */
100     @Override
101     public Map<String, Object> getComponentConfiguration() {
102         return null;
103     }
104 
105 }
View Code

 

四:WordCountBolt源码【进行单词统计的处理逻辑单元】

技术分享图片
  1 package com.yeepay.sxf.helloword.bolt;
  2 
  3 import java.util.HashMap;
  4 import java.util.Map;
  5 
  6 import backtype.storm.task.OutputCollector;
  7 import backtype.storm.task.TopologyContext;
  8 import backtype.storm.topology.IRichBolt;
  9 import backtype.storm.topology.OutputFieldsDeclarer;
 10 import backtype.storm.tuple.Fields;
 11 import backtype.storm.tuple.Tuple;
 12 import backtype.storm.tuple.Values;
 13 
 14 import com.yeepay.sxf.helloword.util.MapSort;
 15 /**
 16  * 单词统计,并且实时获取词频前N的发射出去
 17  * @author sxf
 18  *
 19  */
 20 @SuppressWarnings("serial")
 21 public class WordCountBolt implements IRichBolt{
 22 
 23     //单词统计结果
 24     private Map<String, Integer> counters;
 25     //消息发射器皿
 26     private OutputCollector outputCollector;
 27     
 28     //bolt的初始化的方法
 29     @Override
 30     public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) {
 31         this.outputCollector=arg2;
 32         this.counters=new HashMap<String, Integer>();
 33     }
 34     
 35     //执行方法,进行该bolt的逻辑处理
 36     @Override
 37     public void execute(Tuple tuple) {
 38         //获得tuple
 39         String str=tuple.getString(0);
 40         //获取当前的tuple来自那个Bolt或spout的,返回他们的名字
 41         String ad=tuple.getSourceComponent();
 42         System.out.print(ad);//WordNormalizer
 43         //逻辑处理
 44         if(!counters.containsKey(str)){
 45             //如果不包含,则进行初始化统计
 46             counters.put(str, 1);
 47         }else{
 48             //如果包含,则数值加1
 49             Integer c=counters.get(str)+1;
 50             counters.put(str, c);
 51         }
 52         //我们取前n个的单词词频
 53         int num=8;
 54         int length=0;
 55         //使用工具类MapSort对map进行排序
 56         counters=MapSort.sortByValue(counters);
 57         
 58         if(num<counters.keySet().size()){
 59             length=num;
 60         }else{
 61             length=counters.keySet().size();
 62         }
 63         
 64         String word=null;
 65         StringBuffer st=new StringBuffer();
 66         //增量统计
 67         int count=0;
 68         for(String key:counters.keySet()){
 69             
 70             //获取前n个
 71             if(count>=length){
 72                 break;
 73             }
 74             
 75             if(count==0){
 76                 st.append("The first ").append(length).append("==>");
 77                 st.append("[").append(key).append(":").append(counters.get(key)).append("]");
 78             }else{
 79                 st.append(",[").append(key).append(":").append(counters.get(key)).append("]");
 80             }
 81             count++;
 82         }
 83         
 84         //将消息发射出去
 85         outputCollector.emit(new Values(st.toString()));
 86                 
 87     }
 88 
 89     
 90     /**
 91      * 此方法,用于声明当前Bolt类发射一个字段名为"oneword"的一个元组
 92      * 为该拓扑的所有流声明输出模式
 93      */
 94     @Override
 95     public void declareOutputFields(OutputFieldsDeclarer arg0) {
 96         
 97     arg0.declare(new Fields("oneword"));
 98     }
 99 
100 
101     /**
102      * 此方法孕育你配置关于当前这个组件如何运行的很多参数,会被运行中调用
103      */
104     @Override
105     public Map<String, Object> getComponentConfiguration() {
106         // TODO Auto-generated method stub
107         return null;
108     }
109 
110     
111     /**
112      * 此方法,在当前Bolt被关闭时,调用此方法来清理任何已经打开的资源,但不能保证这个方法会被集群调用
113      */
114     @Override
115     public void cleanup() {
116         // TODO Auto-generated method stub
117         
118     }
119 
120     
121 
122 
123     
124 }
View Code

五:PrintBolt源码【进行单词topN的打印单元】

技术分享图片
 1 package com.yeepay.sxf.helloword.bolt;
 2 
 3 import java.text.SimpleDateFormat;
 4 import java.util.Date;
 5 
 6 import backtype.storm.topology.BasicOutputCollector;
 7 import backtype.storm.topology.OutputFieldsDeclarer;
 8 import backtype.storm.topology.base.BaseBasicBolt;
 9 import backtype.storm.tuple.Tuple;
10 /**
11  * 打印接受的数据的Bolt
12  * @author sxf
13  *
14  */
15 @SuppressWarnings("serial")
16 public class PrintBolt extends BaseBasicBolt {
17 
18     
19     @Override
20     public void execute(Tuple tuple, BasicOutputCollector arg1) {
21         //接收数据date
22         try {
23             String mesg=tuple.getString(0);
24             if(mesg!=null){
25                 System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS").format(new Date())+"===>"+mesg);
26             }
27         } catch (Exception e) {
28             e.printStackTrace();
29         }
30         
31     }
32     /**
33      * 此方法,用于声明当前Bolt类发射什么字段的元组,当前bolt只是打印,不再发射元组,所以不用定义
34      * 为该拓扑的所有流声明输出模式
35      */
36     @Override
37     public void declareOutputFields(OutputFieldsDeclarer declarer) {
38     }
39     
40     
41 
42     
43 }
View Code

六:MapSort源码【进行单词排序】

技术分享图片
 1 package com.yeepay.sxf.helloword.util;
 2 
 3 import java.util.Collections;
 4 import java.util.Comparator;
 5 import java.util.HashMap;
 6 import java.util.Iterator;
 7 import java.util.LinkedHashMap;
 8 import java.util.LinkedList;
 9 import java.util.List;
10 import java.util.Map;
11 import java.util.Map.Entry;
12 /**
13  * 对map排序的工具类
14  * @author sxf
15  *
16  */
17 public class MapSort {
18 
19     
20     public static Map<String, Integer> sortByValue(Map<String, Integer> map) {
21 
22         if (map == null) {
23             return null;
24         }
25 
26         List list = new LinkedList(map.entrySet());
27 
28         Collections.sort(list, new Comparator() {
29             
30             public int compare(Object o1, Object o2) {
31                 Comparable sort1 = (Comparable) ((Map.Entry) o1).getValue();
32                 Comparable sort2 = (Comparable) ((Map.Entry) o2).getValue();
33                 return sort2.compareTo(sort1);
34             }
35             
36         });
37 
38         Map result = new LinkedHashMap();
39 
40         for (Iterator it = list.iterator(); it.hasNext();) {
41             
42             Map.Entry entry = (Map.Entry) it.next();
43             result.put(entry.getKey(), entry.getValue());
44             
45         }
46 
47         return result;
48     }
49 
50     public static void main(String[] args) {
51         
52         Map<String, Integer> map = new HashMap<String, Integer> ();
53         map.put("test", 3);
54         map.put("hcy", 1);
55         map.put("put", 2);
56         
57         map = sortByValue(map);
58         
59         for (String key : map.keySet()) {
60             System.out.println( key + " ==> " + map.get(key));
61         }
62     }
63 
64 }
View Code

 












以上是关于Storm设计一个Topology用来统计单词的TopN的实例的主要内容,如果未能解决你的问题,请参考以下文章

Storm- 使用Storm实现词频汇总

Storm集群上的开发 ,Topology任务的编写 之 WordCountTopology数据流分析(storm编程模型)(一张图说明问题)

Storm集群上的开发 ,Topology任务的编写 之 WordCount Spout和Blot的分组策略(一张图说明问题)

Storm事务Topology的接口介绍

使用Storm进行词频统计

巧妙拆分bolt提升Storm集群吞吐量 增加并行处理速度 Storm & kafka处理实时日志实战topology经验谈