Storm的StreamID使用样例(版本1.0.2)

Posted 大墨垂杨

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm的StreamID使用样例(版本1.0.2)相关的知识,希望对你有一定的参考价值。

随手尝试了一下StreamID的的用法。留个笔记。

 

==数据样例==

{
    "Address": "小桥镇小桥中学对面",
    "CityCode": "511300",
    "CountyCode": "511322",
    "EnterpriseCode": "YUNDA",
    "MailNo": "667748320345",
    "Mobile": "183****5451",
    "Name": "王***",
    "ProvCode": "510000",
    "Weight": "39"
}

 

==拓扑结构==

 

==程序源码==

<Spout1>

package test;

import com.alibaba.fastjson.JSONObject;
import common.constants.Constants;
import common.simulate.DataRandom;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class Spout1 extends BaseRichSpout {
    private SpoutOutputCollector _collector = null;
    private DataRandom _dataRandom = null;
    private int _timeInterval = 1000;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("Stream1", new Fields("json"));
        declarer.declareStream("Stream2", new Fields("json"));
    }

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        _dataRandom = DataRandom.getInstance();
        if (conf.containsKey(Constants.SpoutInterval)) {
            _timeInterval = Integer.valueOf((String) conf.get(Constants.SpoutInterval));
        }
    }

    @Override
    public void nextTuple() {
        try {
            Thread.sleep(_timeInterval);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        JSONObject jsonObject = _dataRandom.getRandomExpressData();
        System.out.print("[---Spout1---]jsonObject=" + jsonObject + "\\n");
        _collector.emit("Stream1", new Values(jsonObject.toJSONString()));
        _collector.emit("Stream2", new Values(jsonObject.toJSONString()));
    }
}

 

<CountBolt1>

package test;

import com.alibaba.fastjson.JSONObject;
import common.constants.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

public class CountBolt1 extends BaseRichBolt {
    private OutputCollector _collector = null;
    private int taskId = 0;
    private Map<String, Integer> _map = new HashMap<>();

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream("Stream3", new Fields("company", "count"));
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        _collector = collector;
        taskId = context.getThisTaskId();
    }

    @Override
    public void execute(Tuple input) {
        String str = input.getStringByField("json");
        JSONObject jsonObject = JSONObject.parseObject(str);
        String company = jsonObject.getString(Constants.EnterpriseCode);

        int count = 0;
        if (_map.containsKey(company)) {
            count = _map.get(company);
        }
        count++;
        _map.put(company, count);

        _collector.emit("Stream3", new Values(company, count));
        System.out.print("[---CountBolt1---]" +
                "taskId=" + taskId + ", company=" + company + ", count=" + count + "\\n");
    }
}

 

<CountBolt2>

package test;

import com.alibaba.fastjson.JSONObject;
import common.constants.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

public class CountBolt2 extends BaseRichBolt {
    private OutputCollector _collector = null;
    private int _taskId = 0;
    private Map<String, Integer> _map = new HashMap<>();

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        _collector = outputCollector;
        _taskId = topologyContext.getThisTaskId();
    }

    @Override
    public void execute(Tuple tuple) {
        String str = tuple.getStringByField("json");
        JSONObject jsonObject = JSONObject.parseObject(str);
        String prov = jsonObject.getString(Constants.ProvCode);

        int count = 0;
        if (_map.containsKey(prov)) {
            count = _map.get(prov);
        }
        count++;
        _map.put(prov, count);

        _collector.emit("Stream4", new Values(prov, count, UUID.randomUUID()));
        System.out.print("[---CountBolt2---]" +
                "taskId=" + _taskId + ", prov=" + prov + ", count=" + count + "\\n");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("Stream4", new Fields("prov", "count", "random"));
    }
}

 

<CountBolt3>

package test;

import com.alibaba.fastjson.JSONObject;
import common.constants.Constants;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

public class CountBolt3 extends BaseRichBolt {
    private OutputCollector _collector = null;
    private int _taskId = 0;
    private Map<String, Integer> _map = new HashMap<>();

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        _collector = outputCollector;
        _taskId = topologyContext.getThisTaskId();
    }

    @Override
    public void execute(Tuple tuple) {
        String str = tuple.getStringByField("json");

        JSONObject jsonObject = JSONObject.parseObject(str);
        String city = jsonObject.getString(Constants.CityCode);

        int count = 0;
        if (_map.containsKey(city)) {
            count = _map.get(city);
        }
        count++;
        _map.put(city, count);

        _collector.emit("Stream4", new Values(city, count));
        System.out.print("[---CountBolt3---]" +
                "taskId=" + _taskId + ", city=" + city + ", count=" + count + "\\n");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declareStream("Stream4", new Fields("city", "count"));
    }
}

 

<TopBolt>

package test;

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

import java.util.List;
import java.util.Map;

public class TopBolt extends BaseRichBolt {

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
    }

    @Override
    public void execute(Tuple tuple) {
        System.out.print("[---TopBolt---]StreamID=" + tuple.getSourceStreamId() + "\\n");
        List<Object> values = tuple.getValues();
        for(Object value : values) {
            System.out.print("[---TopBolt---]value=" + value + "\\n");
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }
}

 

<TestTopology>

package test;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class TestTopology {
    public static void main(String[] args)
            throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("Spout1", new Spout1());
        builder.setBolt("Count1", new CountBolt1()).shuffleGrouping("Spout1", "Stream1");
        builder.setBolt("Count2", new CountBolt2()).shuffleGrouping("Spout1", "Stream2");
        builder.setBolt("Count3", new CountBolt3()).shuffleGrouping("Spout1", "Stream2");
        builder.setBolt("Top", new TopBolt())
                .fieldsGrouping("Count1", "Stream3", new Fields("company"))
                .fieldsGrouping("Count2", "Stream4", new Fields("prov"))
                .fieldsGrouping("Count3", "Stream4", new Fields("city"));

        Config config = new Config();
        config.setNumWorkers(1);
        config.put(common.constants.Constants.SpoutInterval, args[1]);

        if (Boolean.valueOf(args[0])) {
            StormSubmitter.submitTopology("TestTopology1", config, builder.createTopology());
        } else {
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("TestTopology1", config, builder.createTopology());
        }
    }
}

 

==结果日志==

[---Spout1---]jsonObject={"CityCode":"511300","CountyCode":"511322","Address":"小桥镇小桥中学对面","MailNo":"667748320345","ProvCode":"510000","Mobile":"183****5451","EnterpriseCode":"YUNDA","Weight":"39","Name":"王***"}
[---CountBolt1---]taskId=1, company=YUNDA, count=1
[---CountBolt3---]taskId=3, city=511300, count=1
[---CountBolt2---]taskId=2, prov=510000, count=1
[---TopBolt---]StreamID=Stream4
[---TopBolt---]value=510000
[---TopBolt---]value=1
[---TopBolt---]value=99bd1cdb-d5c1-4ac8-b1a1-a4cfffb5a616
[---TopBolt---]StreamID=Stream4
[---TopBolt---]value=511300
[---TopBolt---]value=1
[---TopBolt---]StreamID=Stream3
[---TopBolt---]value=YUNDA
[---TopBolt---]value=1

 

以上是关于Storm的StreamID使用样例(版本1.0.2)的主要内容,如果未能解决你的问题,请参考以下文章

Storm中Task数的设置与计算(1.0.1版本)

CentOS下Storm 1.0.0集群安装具体解释

Storm集群环境搭建

2017.4.5 集群现状

storm-kafka 访问带 kerberos 权限的 kafka

storm 的分组策略深入理解(-)