Storm的 hello 输出流
Posted tangsonghuai
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm的 hello 输出流相关的知识,希望对你有一定的参考价值。
Spout (随机生成随机数)-----------------------》Ecclamation(加感叹号) -----> print(加问候语)
TestWordSpout
package cn.ljh.storm.helloworld; import org.apache.storm.topology.OutputFieldsDeclarer; import java.util.Map; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TestWordSpout extends BaseRichSpout public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class); SpoutOutputCollector _collector; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) _collector = collector; public void nextTuple() Utils.sleep(100); final String[] words = new String[] "nathan", "mike", "jackson", "golda", "bertels"; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields("word"));
ExclamationBolt
package cn.ljh.storm.helloworld; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class ExclamationBolt extends BaseRichBolt OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) _collector = collector; public void execute(Tuple tuple) _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); _collector.ack(tuple); public void declareOutputFields(OutputFieldsDeclarer declarer) declarer.declare(new Fields("word")); //指定名称
printBolt
package cn.ljh.storm.helloworld; import java.util.Map; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PrintBolt extends BaseRichBolt private static Logger LOG = LoggerFactory.getLogger(PrintBolt.class); OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) _collector = collector; public void execute(Tuple tuple) LOG.info(tuple.getString(0) + " Hello World!"); _collector.ack(tuple); public void declareOutputFields(OutputFieldsDeclarer declarer) //不再下发
ExclamationTopology
package cn.ljh.storm.helloworld; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils; public class ExclamationTopology public static void main(String[] args) throws Exception TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word", new TestWordSpout(), 1); //指定消息ID 为word builder.setBolt("exclaim", new ExclamationBolt(), 1).shuffleGrouping("word"); //指定分发策略,由spout(wordID)进行下发 builder.setBolt("print", new PrintBolt(), 1).shuffleGrouping("exclaim"); //指定分组策略 Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); //随机NAME else LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test3", conf, builder.createTopology()); Utils.sleep(20000); cluster.killTopology("test3"); cluster.shutdown();
以上是关于Storm的 hello 输出流的主要内容,如果未能解决你的问题,请参考以下文章
聊聊批计算、流计算、Hadoop、Spark、Storm、Flink等等