Storm入门--Storm编程
Posted jing-wang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm入门--Storm编程相关的知识,希望对你有一定的参考价值。
以电信通话记录为例
移动呼叫及其持续时间将作为对Apache Storm的输入,Storm将处理和分组在相同呼叫者和接收者之间的呼叫及其呼叫总数。
编程思想:
在storm中,把对数据的处理过程抽象成一个topology,这个topology包含的组件主要是spout、bolt,以及以tuple形式在组件之间传输的数据流。这个数据流在topology流一遍,就是对数据的一次处理。
1、创建Spout类
这一部分,是创建数据流的源头。
创建一个类,实现IRichSpout接口,实现相应方法。其中几个方法的含义:
- open -为Spout提供执行环境。执行器将运行此方法来初始化喷头。一般写一些第一次运行时要处理的逻辑
- nextTuple -通过收集器发出生成的数据。核心,用于生成数据流
- close -当spout将要关闭时调用此方法。
- declareOutputFields -声明元组的输出模式。即,声明了从此spout出去的流都的数据格式
- ack -确认处理了特定元组。
- fail -指定不处理和不重新处理特定元组。
open(Map conf, TopologyContext context, SpoutOutputCollector collector)
- conf - 为此spout提供storm配置。
- context - 提供有关拓扑中的spout位置,其任务ID,输入和输出信息的完整信息。
- collector - 使我们能够发出将由bolts处理的元组。
nextTuple()
nextTuple()从与ack()和fail()方法相同的循环中定期调用。它必须释放线程的控制,当没有工作要做,以便其他方法有机会被调用。因此,nextTuple的第一行检查处理是否已完成。如果是这样,它应该休眠至少一毫秒,以减少处理器在返回之前的负载。
declareOutputFields(OutputFieldsDeclarer declarer)
declarer -它用于声明输出流id,输出字段等,此方法用于指定元组的输出模式。
ack(Object msgId)
该方法确认已经处理了特定元组。
fail(Object o)
此方法通知特定元组尚未完全处理。 Storm将重新处理特定的元组
package com.jing.calllogdemo; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Random; /* spout类,负责产生数据流 */ public class CallLogSpout implements IRichSpout //spout 输出收集器 private SpoutOutputCollector collector; //是否完成 private boolean completed = false; //上下文对象 private TopologyContext context; //随机发生器 private Random randomGenerator = new Random(); //索引 private Integer idx = 0; @Override public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) //第一次运行要做的事 this.context = topologyContext; this.collector = spoutOutputCollector; @Override public void close() @Override public void activate() @Override public void deactivate() @Override public void nextTuple() //产生第一条数据, if (this.idx <= 1000) List<String> mobileNumbers = new ArrayList<String>(); mobileNumbers.add("1234123401"); mobileNumbers.add("1234123402"); mobileNumbers.add("1234123403"); mobileNumbers.add("1234123404"); Integer localIdx = 0; while (localIdx++ < 100 && this.idx++ <1000) //取出主叫 String caller = mobileNumbers.get(randomGenerator.nextInt(4)); //取出被叫 String callee = mobileNumbers.get(randomGenerator.nextInt(4)); while (caller == callee) //重新取出被叫 callee = mobileNumbers.get(randomGenerator.nextInt(4)); //模拟通话时长 Integer duration = randomGenerator.nextInt(60); //输出元祖 this.collector.emit(new Values(caller,callee,duration)); @Override public void ack(Object o) @Override public void fail(Object o) @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) //声明输出字段,定义元组的结构,定义输出字段名称 outputFieldsDeclarer.declare(new Fields("from", "to", "duration")); @Override public Map<String, Object> getComponentConfiguration() return null;
2、创建Bolt类
这一部分是完成对数据流的处理,Bolt把元组作为输入,对元组进行处理后,产生新的元组。
创建一个类,实现IRichBolt接口,实现相应方法。
- prepare -为bolt提供要执行的环境。执行器将运行此方法来初始化spout。
- execute -处理单个元组的输入
- cleanup -当spout要关闭时调用。
- declareOutputFields -声明元组的输出模式。
prepare(Map conf, TopologyContext context, OutputCollector collector)
- conf -为此bolt提供Storm配置。
- context -提供有关拓扑中的bolt位置,其任务ID,输入和输出信息等的完整信息。
- collector -使我们能够发出处理的元组。
execute(Tuple tuple)
这是bolt的核心方法,这里的元组是要处理的输入元组。execute方法一次处理单个元组。元组数据可以通过Tuple类的getValue方法访问。不必立即处理输入元组。多元组可以被处理和输出为单个输出元组。处理的元组可以通过使用OutputCollector类发出。
cleanup()
declareOutputFields(OutputFieldsDeclarer declarer)
这个方法用于指定元组的输出模式,参数declarer用于声明输出流id,输出字段等。
这里有两个bolt
呼叫日志创建者bolt接收呼叫日志元组。呼叫日志元组具有主叫方号码,接收方号码和呼叫持续时间。此bolt通过组合主叫方号码和接收方号码简单地创建一个新值。新值的格式为“来电号码 - 接收方号码”,并将其命名为新字段“呼叫”
package com.jing.calllogdemo; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.Map; /* 创建calllog日志的bolt */ public class CallLogCreatorBolt implements IRichBolt private OutputCollector collector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) this.collector = outputCollector; @Override public void execute(Tuple tuple) //处理新的同话记录 String from = tuple.getString(0); String to = tuple.getString(1); Integer duration = tuple.getInteger(2); //产生新的tuple String fromTO = from + "-" + to; collector.emit(new Values(fromTO, duration)); @Override public void cleanup() @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) //设置输出字段的名称 outputFieldsDeclarer.declare(new Fields("call", "duration")); @Override public Map<String, Object> getComponentConfiguration() return null;
呼叫日志创建者bolt接收呼叫日志元组。呼叫日志元组具有主叫方号码,接收方号码和呼叫持续时间。此bolt通过组合主叫方号码和接收方号码简单地创建一个新值。新值的格式为“来电号码 - 接收方号码”,并将其命名为新字段“呼叫”。
package com.jing.calllogdemo; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import java.util.HashMap; import java.util.Map; /* 通话记录计数器bolt */ public class CallLogCounterBolt implements IRichBolt Map<String, Integer> counterMap; private OutputCollector collector; @Override public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) this.counterMap = new HashMap<String, Integer>(); this.collector = outputCollector; @Override public void execute(Tuple tuple) String call = tuple.getString(0); Integer duration = tuple.getInteger(1); if(!counterMap.containsKey(call)) counterMap.put(call, 1); else Integer c = counterMap.get(call) + duration; counterMap.put(call, c); collector.ack(tuple); @Override public void cleanup() for(Map.Entry<String, Integer> entry : counterMap.entrySet()) System.out.println(entry.getKey() + " : " + entry.getValue()); @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) outputFieldsDeclarer.declare(new Fields("call")); @Override public Map<String, Object> getComponentConfiguration() return null;
3、创建执行入口类,构建Topology
package com.jing.calllogdemo; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; public class App public static void main(String[] args) throws InterruptedException, InvalidTopologyException, AuthorizationException, AlreadyAliveException TopologyBuilder builder = new TopologyBuilder(); //设置spout builder.setSpout("spout", new CallLogSpout()); //设置creator-bolt builder.setBolt("creator-bolt", new CallLogCreatorBolt()).shuffleGrouping("spout"); //设置countor-bolt builder.setBolt("counter-bolt", new CallLogCounterBolt()). fieldsGrouping("creator-bolt", new Fields("call")); Config config = new Config(); config.setDebug(true); /*本地模式 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); */ StormSubmitter.submitTopology("myTop", config, builder.createTopology());
作者:raincoffee
链接:https://www.jianshu.com/p/7af9693d9ffc
来源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
生产环境的集群上运行topology
1)修改提交方式,在代码中
2)导出jar包 mvn
3)在linux上运行topologys
&>storm jar XXX.jar full.class.name
以上是关于Storm入门--Storm编程的主要内容,如果未能解决你的问题,请参考以下文章
Storm入门Twitter Storm源代码分析之CoordinatedBolt
Storm历险记之浅入浅出:Storm Hello World入门示例 本文适合Storm小白看C#转java,大神请吐槽指点。