Storm历险记之浅入浅出:Storm Hello World入门示例 本文适合Storm小白看C#转java,大神请吐槽指点。

Posted Yuema约吗

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm历险记之浅入浅出:Storm Hello World入门示例 本文适合Storm小白看C#转java,大神请吐槽指点。相关的知识,希望对你有一定的参考价值。

俗话说一图胜千言万语,想了解Storm的话,先来看几张图,直观的了解一下Storm。图片有官方的图片,也有技术人自己画的图片,均来自互联网,在看代码之前先来简单的看一下图片。请快速的看一下图片,找一下感觉,如果一下子看不明白,其实也没有关系。图片流之后,会有一小段文字说明。本文适合Storm小白看,大神吐槽或在1秒内关掉。

 

Storm历险记之浅入浅出:Storm Hello World入门示例 本文适合Storm小白看C#转java,大神请吐槽指点。

Storm历险记之浅入浅出:Storm Hello World入门示例 本文适合Storm小白看C#转java,大神请吐槽指点。

Storm历险记之浅入浅出:Storm Hello World入门示例 本文适合Storm小白看C#转java,大神请吐槽指点。

Storm历险记之浅入浅出:Storm Hello World入门示例 本文适合Storm小白看C#转java,大神请吐槽指点。

Storm是实时流式处理计算框架,不断的取数据,不断的处理数据,这个过程就像水流一样。官方配图就是一个水龙头来诠释Storm内涵。数据处理流程的开始是Spout,取数据,中间的过程是多个Bolt组合,Bolt是处理数据的单元,Spout与Bolt就像流程图的开始与中间处理过程。Spout与Bolt组合成了一个topology作业,丢给storm就能跑起来。Storm有本地模式,也有远程模式,今天的Storm Hello World采用本地模式。

 

代码结构:


图6

代码源自https://www.cnblogs.com/xuwujing/p/8584684.html,因为用的最新的storm-core 1.2.2,代码有些改变。从eclipse报错提示来看,store代码中原来 superclass中的部分方法被移到了interfase中,所以有些@Override要去掉。

 

POM文件

<!--https://mvnrepository.com/artifact/ring-cors/ring-cors -->

<dependency>

   <groupId>ring-cors</groupId>

   <artifactId>ring-cors</artifactId>

   <version>0.1.12</version>

</dependency>

<!--https://mvnrepository.com/artifact/org.apache.storm/storm-core -->

<dependency>

   <groupId>org.apache.storm</groupId>

   <artifactId>storm-core</artifactId>

   <version>1.2.2</version>

   <scope>provided</scope>

</dependency>

修正过的代码

TestSpout

import java.util.Map;

    import  org.apache.storm.spout.SpoutOutputCollector;

    import  org.apache.storm.task.TopologyContext;

    import  org.apache.storm.topology.OutputFieldsDeclarer;

    import org.apache.storm.topology.base.BaseRichSpout;

    import org.apache.storm.tuple.Fields;

    import org.apache.storm.tuple.Values;

   

    publicclass TestSpout extends BaseRichSpout{

   

        privatestaticfinallongserialVersionUID =  225243592780939490L;

   

        private SpoutOutputCollector collector;

        privatestaticfinal String field="word";

        privateintcount=1;

        private String[] message =  {

             "Storm Hello World",

             "http://www.jishudao.com storm  blog",

             "Play with storm"

    };

       

        /**

     * open()方法中是在ISpout接口中定义,在Spout组件初始化时被调用。

     * 有三个参数:

     * 1.Storm配置的Map;

     * 2.topology中组件的信息;

     * 3.发射tuple的方法;

     */

        publicvoid open(Map map, TopologyContext arg1, SpoutOutputCollector  collector) {

            System.out.println("open:"+map.get("test"));

            this.collector = collector;

        }

   

    /**

     * nextTuple()方法是Spout实现的核心。

     * 也就是主要执行方法,用于输出信息,通过collector.emit方法发射。

     */

        publicvoid nextTuple() {

               

            if(count<=message.length){

                System.out.println(""+count +"次开始发送数据...");

                this.collector.emit(new Values(message[count-1]));

            }

            count++;

        }

   

   

        /**

     * declareOutputFields是在IComponent接口中定义,用于声明数据格式。

     * 即输出的一个Tuple中,包含几个字段。

     */

        publicvoid  declareOutputFields(OutputFieldsDeclarer declarer) {

            System.out.println("定义格式...");

            declarer.declare(new Fields(field));

        }

   

        /**

         * 当一个Tuple处理成功时,会调用这个方法

         */

        @Override

        publicvoid ack(Object obj) {

            System.out.println("ack:"+obj);

        }

       

        /**

         * Topology停止时,会调用这个方法

         */

        @Override

        publicvoid close() {

            System.out.println("关闭...");

        }

       

        /**

         * 当一个Tuple处理失败时,会调用这个方法

         */

        @Override

        publicvoid fail(Object obj) {

            System.out.println("失败:"+obj);

        }

       

    }

 

TestBolt

import java.util.Map;

   

    import  org.apache.storm.task.OutputCollector;

    import  org.apache.storm.task.TopologyContext;

    import  org.apache.storm.topology.OutputFieldsDeclarer;

    import  org.apache.storm.topology.base.BaseRichBolt;

    import org.apache.storm.tuple.Fields;

    import org.apache.storm.tuple.Tuple;

    import org.apache.storm.tuple.Values;

   

    publicclass TestBolt extends BaseRichBolt{

   

        /**

         *

         */

        privatestaticfinallongserialVersionUID =  4743224635827696343L;

       

        private OutputCollector collector;

      

        /**

    * Bolt启动前执行,提供Bolt启动环境配置的入口

    * 一般对于不可序列化的对象进行实例化。

    * :如果是可以序列化的对象,那么最好是使用构造函数。

    */

        publicvoid prepare(Map map, TopologyContext arg1, OutputCollector collector) {

            System.out.println("prepare:"+map.get("test"));

            this.collector=collector;

        }

     

        /**

         * execute()方法是Bolt实现的核心。

         * 也就是执行方法,每次Bolt从流接收一个订阅的tuple,都会调用这个方法。

         */

        publicvoid execute(Tuple tuple) {

            String msg=tuple.getStringByField("word");

        System.out.println("开始分割单词:"+msg);

    String[] words = msg.toLowerCase().split(" ");

    for (String word : words) {

    this.collector.emit(new Values(word));//向下一个bolt发射数据

    }

       

        }

   

        /**

         * 声明数据格式

         */

        publicvoid  declareOutputFields(OutputFieldsDeclarer declarer) {

            declarer.declare(new Fields("count"));

        }

       

        /**

     * cleanupIBolt接口中定义,用于释放bolt占用的资源。

     * Storm在终止一个bolt之前会调用这个方法。

         */

        @Override

        publicvoid cleanup() {

            System.out.println("TestBolt的资源释放");

        }

    }

 

Test2Bolt

import java.util.HashMap;

    import java.util.Map;

   

    import  org.apache.storm.task.OutputCollector;

    import  org.apache.storm.task.TopologyContext;

    import  org.apache.storm.topology.OutputFieldsDeclarer;

    import  org.apache.storm.topology.base.BaseRichBolt;

    import org.apache.storm.tuple.Tuple;

   

    publicclass Test2Bolt extends BaseRichBolt{

   

        /**

         *

         */

        privatestaticfinallongserialVersionUID =  4743224635827696343L;

       

       

        /**

         * 保存单词和对应的计数

         */

        private HashMap<String, Integer> counts = null;

        

        privatelongcount=1;

        /**

    * Bolt启动前执行,提供Bolt启动环境配置的入口

    * 一般对于不可序列化的对象进行实例化。

    * :如果是可以序列化的对象,那么最好是使用构造函数。

    */

        publicvoid prepare(Map map, TopologyContext arg1, OutputCollector collector) {

            System.out.println("prepare:"+map.get("test"));

            this.counts=new HashMap<String, Integer>();

        }

     

        /**

         * execute()方法是Bolt实现的核心。

         * 也就是执行方法,每次Bolt从流接收一个订阅的tuple,都会调用这个方法。

         *

         */

        publicvoid execute(Tuple tuple) {

            String msg=tuple.getStringByField("count");

            System.out.println(""+count+"次统计单词出现的次数");

            /**

             * 如果不包含该单词,说明在该map是第一次出现

             * 否则进行加1

             */

            if (!counts.containsKey(msg)) {

                counts.put(msg, 1);

            } else {

                counts.put(msg, counts.get(msg)+1);

            }

            count++;

        }

   

        

        /**

     * cleanupIBolt接口中定义,用于释放bolt占用的资源。

     * Storm在终止一个bolt之前会调用这个方法。

         */

        @Override

        publicvoid cleanup() {

            System.out.println("===========开始显示单词数量============");

            for (Map.Entry<String, Integer> entry : counts.entrySet()) {

                System.out.println(entry.getKey() + ": " + entry.getValue());

            }

            System.out.println("===========结束============");

           System.out.println("Test2Bolt的资源释放");

        }

       

        /**

         * 声明数据格式

         */

        publicvoid  declareOutputFields(OutputFieldsDeclarer arg0) {

           

        }

    }

   

 

 

App

import org.apache.storm.Config;

    import org.apache.storm.LocalCluster;

    import org.apache.storm.StormSubmitter;

    import  org.apache.storm.topology.TopologyBuilder;

    import org.apache.storm.tuple.Fields;

   

    publicclass App {

       

        privatestaticfinal String test_spout="test_spout";

        privatestaticfinal String test_bolt="test_bolt";

        privatestaticfinal String test2_bolt="test2_bolt";

   

        publicstaticvoid main(String[] args)  {

            //定义一个拓扑

            TopologyBuilder builder=new TopologyBuilder();

            //设置一个Executeor(线程),默认一个

            builder.setSpout(test_spout, new TestSpout(),1);

            //shuffleGrouping:表示是随机分组

            //设置一个Executeor(线程),和一个task

            builder.setBolt(test_bolt, new  TestBolt(),1).setNumTasks(1).shuffleGrouping(test_spout);

            //fieldsGrouping:表示是按字段分组

            //设置一个Executeor(线程),和一个task

            builder.setBolt(test2_bolt, new  Test2Bolt(),1).setNumTasks(1).fieldsGrouping(test_bolt, new Fields("count"));

            Config conf = new Config();

            conf.put("test", "test");

            try{

              //运行拓扑

           if(args !=null&&args.length>0){ //有参数时,表示向集群提交作业,并把第一个参数当做topology名称

             System.out.println("运行远程模式");

                 StormSubmitter.submitTopology(args[0], conf, builder.createTopology());

          } else{//没有参数时,本地提交

        //启动本地模式

            System.out.println("运行本地模式");

        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology("Word-counts" ,confbuilder.createTopology() );

        Thread.sleep(20000);

    //  //关闭本地集群

        cluster.shutdown();

          }

            }catch (Exception e){

                e.printStackTrace();

            }

        }

    }

 

 

踩坑 The POM forring-cors:ring-cors:jar:0.1.5 is missing

解决办法:

https://mvnrepository.com/artifact/ring-cors/ring-cors/0.1.12

直接把ring-cors-0.1.12.jar ring-cors-0.1.12.pom下载下来,放到maven本地库中。

 


从坑里爬出来之后,就能跑起storm hello world,有初步的Hello World感觉之后,可以再详细看看官方的资料,除了看别人翻译的,强烈建议对比着官方的看。官方有详细的文档清单,不要着急,一个一个慢慢看。

Basics of Storm

  • Javadoc

  • Concepts

  • Scheduler

  • Configuration

  • Guaranteeing message processing

  • Daemon Fault Tolerance

  • Command line client

  • REST API

  • Understanding the parallelism of a Storm topology

  • FAQ

Layers on top of Storm

Trident

Tridentis an alternative interface to Storm. It provides exactly-once processing,"transactional" datastore persistence, and a set of common streamanalytics operations.

  • Trident Tutorial -- basic concepts and walkthrough

  • Trident API Overview -- operations for transforming     and orchestrating data

  • Trident State -- exactly-once processing and     fast, persistent aggregation

  • Trident spouts -- transactional and     non-transactional data intake

  • Trident RAS API -- using the Resource Aware     Scheduler with Trident.

Streams API

StreamAPIs is another alternative interface to Storm. It provides a typed API forexpressing streaming computations and supports functional style operations.

NOTE:Streams API is an experimental feature,and further works might break backward compatibility. We're also notifying itvia annotating classes with marker interface @InterfaceStability.Unstable.

  • Streams API

SQL

TheStorm SQL integration allows users to run SQL queries over streaming data inStorm.

NOTE:Storm SQL is an experimental feature,so the internals of Storm SQL and supported features are subject to change. Butsmall change will not affect the user experience. We will notify the user whenbreaking UX change is introduced.

  • Storm SQL overview

  • Storm SQL example

  • Storm SQL reference

  • Storm SQL internal

Flux

  • Flux Data Driven Topology Builder

Setup and Deploying

  • Setting up a Storm cluster

  • Local mode

  • Troubleshooting

  • Running topologies on a production cluster

  • Building Storm with Maven

  • Setting up a Secure Cluster

  • CGroup Enforcement

  • Pacemaker reduces load on zookeeper for large     clusters

  • Resource Aware Scheduler

  • Daemon Metrics/Monitoring

  • Windows users guide

  • Classpath handling

Intermediate

  • Serialization

  • Common patterns

  • DSLs and multilang adapters

  • Using non-JVM languages with Storm

  • Distributed RPC

  • Transactional topologies

  • Hooks

  • Metrics (Deprecated)

  • Metrics V2

  • State Checkpointing

  • Windowing

  • Joining Streams

  • Blobstore(Distcahce)

Debugging

  • Dynamic Log Level Settings

  • Searching Worker Logs

  • Worker Profiling

  • Event Logging

Integration With External Systems, and Other Libraries

  • Apache Kafka IntegrationNew Kafka Consumer Integration

  • Apache HBase Integration

  • Apache HDFS Integration

  • Apache Hive Integration

  • Apache Solr Integration

  • Apache Cassandra Integration

  • Apache RocketMQ Integration

  • JDBC Integration

  • JMS Integration

  • MQTT Integration

  • Redis Integration

  • Event Hubs Intergration

  • Elasticsearch Integration

  • Mongodb Integration

  • OpenTSDB Integration

  • Kinesis Integration

  • Druid Integration

  • PMML Integration

  • Kestrel Integration

Container, Resource Management System Integration

  • YARN     Integration

  • Mesos Integration

  • Docker Integration

  • Kubernetes Integration

Advanced

  • Defining a non-JVM language DSL for Storm

  • Multilang protocol (how to provide support for     another language)

  • Implementation docs

  • Storm Metricstore

 

 

参考与引用:

http://storm.apache.org/releases/current/index.html

https://www.cnblogs.com/xuwujing/p/8584684.html


以上是关于Storm历险记之浅入浅出:Storm Hello World入门示例 本文适合Storm小白看C#转java,大神请吐槽指点。的主要内容,如果未能解决你的问题,请参考以下文章

包学会之浅入浅出Vue.js:结业篇

包学会之浅入浅出Vue.js:升学篇

包学会之浅入浅出Vue.js:升学篇

包学会之浅入浅出Vue.js:开学篇

包学会之浅入浅出 Vue.js:开学篇

JS系列 React浅入浅出