storm实现求和操作

Posted bj-xiaodao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了storm实现求和操作相关的知识,希望对你有一定的参考价值。

storm求和简单操作

 

package com.xiaodao.big;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;

import java.util.Map;

/**
 * 累积求和
 */
public class LocalSumStormTopology {

    /**
     * spout 需要继承baserichspout
     * 数据源需要产生并发送数据
     */
    public static class DataSourceSpout extends BaseRichSpout{


        private SpoutOutputCollector collector;
        /**
         * 初始化方法只会被调用一次
         *
         * @param conf 配置参数
         * @param context   上下文
         * @param collector 数据发射器
         */
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
        }

            int num = 0;
        /**
         * 会产生数据,在生产上肯定是从消息队列中获取数据
         * 这个方法是一个死循环,是因为storm一直运行,会一直不行的执行
         */
        public void nextTuple() {
            collector.emit( new Values(num++));
            System.out.println("Spout:发送 "+ num);
                Utils.sleep(2000);

        }

        /**
         * 声明下一个blot接受的名称,不然blot不知道接受到了什么
         * @param declarer
         */
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("no"));
        }
    }

    /**
     * 数据的累积求和 blot,接受数据,并处理
     */
    public static class SumBlot extends BaseRichBolt{

        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        }

        int sum =0;
        /**
         * 也是一个自旋锁.(死循环)
         * @param input
         */
        public void execute(Tuple input) {
            //这里获取方式有很多
            Integer no =  input.getIntegerByField("no");
            sum +=no;
            System.out.println("Blot: sum = ["+ sum+"]");

        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {

        }
    }

    public static void main(String[] args) {
        //任何一个作业都需要topology
        //需要控制好blot spout 顺序
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("DataSourceSpout",new DataSourceSpout(),1);
        builder.setBolt("SumBlot",new SumBlot(),1).shuffleGrouping("DataSourceSpout");
        Config conf = new Config();
        conf.setNumWorkers(2);
        //如果到200个消息就不发送了
        conf.setMaxSpoutPending(200);
        //创建一个本地的模式,不需要搭建
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalSumStormTopology",conf,builder.createTopology());
    }
}

执行运行就可以

以上是关于storm实现求和操作的主要内容,如果未能解决你的问题,请参考以下文章

使用Storm进行词频统计

代码片段 - Golang 实现集合操作

Storm- 使用Storm实现词频汇总

JStorm与Storm源码分析--BasicBoltExecutor与装饰模式

以下代码片段的算法复杂度

storm的trident编程模型