Apache Storm 简单实践

Posted _wangjianfeng

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Storm 简单实践相关的知识,希望对你有一定的参考价值。

Apache Storm 简单实践

前两篇文章介绍了Apache Storm的一些基础知识以及核心架构。

本篇文章介绍一些Storm的简单实践场景。

创建一个Storm项目

实践场景为,基于Storm开发出一个实时统计句子中的单词个数的拓扑,实时数据我们通过随机发射句子,在实际应用场景中,实时数据可能来自MQ或者其他来源。

使用IDEA创建一个maven项目,在pom.xml文件中添加以下依赖:

<dependency>
     <groupId>org.apache.storm</groupId>
     <artifactId>storm-core</artifactId>
     <version>1.1.0</version>
     <scope>provided</scope>
</dependency>

上两篇文章说了,Storm的数据源自于Spout。所以我们需要创建一个Spout,由于是一个简单的场景,实时数据我们通过随机发射句子:

Spout的代码如下:

public static class RandomSentenceSpout extends BaseRichSpout

        private SpoutOutputCollector collector;

        private Random random;

        public void open(Map conf, TopologyContext context,
                     SpoutOutputCollector collector)
            this.collector = collector;
            this.random = new Random();

        

        public void nextTuple()
            Utils.sleep(100);
            String[] sentences = new String[]
                "the cow jumped over the moon", 
                "an apple a day keeps the doctor away",
                "four score and seven years ago", 
                "snow white and the seven dwarfs",
                "i am at two with nature"
            ;
            String sentence = sentences[random.nextInt(sentences.length)];
            collector.emit(new Values(sentence));
        

        public void declareOutputFields(OutputFieldsDeclarer declarer)
            declarer.declare(new Fields("sentence"));
        

上面代码很简单,在拓扑启动的时候启动的时候会调用open方法,我们在这里保存了collector,然后Storm会不断的调用nextTuple方法,所以我们在这里把句子发射出去。然后在declareOutputFields声明了发射出去的句子的索引。

上面我们已经把句子发射出去了,接下来的工作就是把句子中的单词切割出来,然后再发射出去。

切割单词的任务交给一个bolt来做

  public static class SplitSentenceBolt extends BaseRichBolt
        private OutputCollector collector;

        public void prepare(Map conf, TopologyContext context,
                     OutputCollector collector) 
            this.collector = collector;
        

        public void execute(Tuple tuple) 
            String sentence = tuple.getStringByField("sentence");
            String[] words = sentence.split(" ");
            for(String word : words) 
                collector.emit(new Values(word));
            
        

        /**
         * 定义发射出去的tuple,每个field的名称
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) 
            declarer.declare(new Fields("word"));
        
  

上面代码把句子切割后发射出去,最后我们还需要一个bolt来统计单词的数量。

 public static class WordCountBolt extends BaseRichBolt 

        private static final long serialVersionUID = 7208077706057284643L;

        private static final Logger LOGGER = LoggerFactory.getLogger(WordCountBolt.class);

        private OutputCollector collector;
        private Map<String, Long> wordCounts = new HashMap<String, Long>();

        @SuppressWarnings("rawtypes")
        public void prepare(Map conf, TopologyContext context, OutputCollector collector) 
            this.collector = collector;
        

        public void execute(Tuple tuple) 
            String word = tuple.getStringByField("word");

            Long count = wordCounts.get(word);
            if(count == null) 
                count = 0L;
            
            count++;

            wordCounts.put(word, count);

            LOGGER.info("【单词计数】" + word + "出现的次数是" + count);

            collector.emit(new Values(word, count));
        

        public void declareOutputFields(OutputFieldsDeclarer declarer) 
            declarer.declare(new Fields("word", "count"));
        

写完了SpoutBolt之后,接下来要创建一个Topology类,将SpoutBolt组合成为一个拓扑:

public class WordCountTopolpgy

 public static void main(String[] args) 
        // 在main方法中,会去将spout和bolts组合起来,构建成一个拓扑
        TopologyBuilder builder = new TopologyBuilder();
        // 这里的第一个参数的意思,就是给这个spout设置一个名字
        // 第三个参数的意思,就是设置spout的executor有几个
        builder.setSpout("RandomSentence", new RandomSentenceSpout(), 2);
        builder.setBolt("SplitSentenceBolt", new SplitSentenceBolt(), 5)
                .setNumTasks(10)
                .shuffleGrouping("RandomSentence");
        builder.setBolt("WordCountBolt", new WordCountBolt(), 10)
                .setNumTasks(20)
                .fieldsGrouping("SplitSentenceBolt", new Fields("word"));

        Config config = new Config();

        // 说明是在命令行执行,打算提交到storm集群上去
        if(args != null && args.length > 0) 
            config.setNumWorkers(3);
            try 
                StormSubmitter.submitTopology(args[0], config, builder.createTopology());
             catch (Exception e) 
                e.printStackTrace();
            
         else 
            // 说明是在eclipse里面本地运行
            config.setMaxTaskParallelism(20);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("WordCountTopology", config, builder.createTopology());

            Utils.sleep(3000);

            cluster.shutdown();
        
    

上面已经开发完一个Topology了。接下来我们可以直接在本地运行,或者扔到Storm集群去运行。

如果在本地运行,则会模拟生成一个本地集群来运行,(注意本地运行需要去掉pom.xml Storm依赖中的<provided> 节点,真实集群运行需要加上这个节点)

如果要在集群中运行,需要执行命令:

mvn clean package 

得到一个jar包,将这个jar包上传到nimbus节点中,然后执行以下命令就可以运行了。

strom jar xxx.jar com.xxxx.WordCountTopology  WordCountTopology 

总结

本篇文章介绍了一个应用Storm的简单例子,演示了Storm的基本开发方式。可以通过这个例子,触类旁通,运行到真正的企业场景中去。

以上是关于Apache Storm 简单实践的主要内容,如果未能解决你的问题,请参考以下文章

Apache Flink 在汽车之家的应用与实践

Apache Flink 在汽车之家的应用与实践

Storm实时计算:流操作入门编程实践

Storm 投身 Apache 基金会,改名 Apache Storm

storm实现求和操作

Storm 第三章 Storm编程案例及Stream Grouping详解