Storm历险记之浅入浅出:Storm Hello World入门示例 本文适合Storm小白看C#转java,大神请吐槽指点。
Posted Yuema约吗
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Storm历险记之浅入浅出:Storm Hello World入门示例 本文适合Storm小白看C#转java,大神请吐槽指点。相关的知识,希望对你有一定的参考价值。
俗话说一图胜千言万语,想了解Storm的话,先来看几张图,直观的了解一下Storm。图片有官方的图片,也有技术人自己画的图片,均来自互联网,在看代码之前先来简单的看一下图片。请快速的看一下图片,找一下感觉,如果一下子看不明白,其实也没有关系。图片流之后,会有一小段文字说明。本文适合Storm小白看,大神吐槽或在1秒内关掉。
Storm是实时流式处理计算框架,不断的取数据,不断的处理数据,这个过程就像水流一样。官方配图就是一个水龙头来诠释Storm内涵。数据处理流程的开始是Spout,取数据,中间的过程是多个Bolt组合,Bolt是处理数据的单元,Spout与Bolt就像流程图的开始与中间处理过程。Spout与Bolt组合成了一个topology作业,丢给storm就能跑起来。Storm有本地模式,也有远程模式,今天的Storm Hello World采用本地模式。
代码结构:
图6
代码源自https://www.cnblogs.com/xuwujing/p/8584684.html,因为用的最新的storm-core 1.2.2,代码有些改变。从eclipse报错提示来看,store代码中原来 superclass中的部分方法被移到了interfase中,所以有些@Override要去掉。
POM文件
<!--https://mvnrepository.com/artifact/ring-cors/ring-cors -->
<dependency>
<groupId>ring-cors</groupId>
<artifactId>ring-cors</artifactId>
<version>0.1.12</version>
</dependency>
<!--https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.2</version>
<scope>provided</scope>
</dependency>
修正过的代码
TestSpout
import java.util.Map; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;
publicclass TestSpout extends BaseRichSpout{
privatestaticfinallongserialVersionUID = 225243592780939490L;
private SpoutOutputCollector collector; privatestaticfinal String field="word"; privateintcount=1; private String[] message = { "Storm Hello World", "http://www.jishudao.com storm blog", "Play with storm" };
/** * open()方法中是在ISpout接口中定义,在Spout组件初始化时被调用。 * 有三个参数: * 1.Storm配置的Map; * 2.topology中组件的信息; * 3.发射tuple的方法; */ publicvoid open(Map map, TopologyContext arg1, SpoutOutputCollector collector) { System.out.println("open:"+map.get("test")); this.collector = collector; }
/** * nextTuple()方法是Spout实现的核心。 * 也就是主要执行方法,用于输出信息,通过collector.emit方法发射。 */ publicvoid nextTuple() {
if(count<=message.length){ System.out.println("第"+count +"次开始发送数据..."); this.collector.emit(new Values(message[count-1])); } count++; }
/** * declareOutputFields是在IComponent接口中定义,用于声明数据格式。 * 即输出的一个Tuple中,包含几个字段。 */ publicvoid declareOutputFields(OutputFieldsDeclarer declarer) { System.out.println("定义格式..."); declarer.declare(new Fields(field)); }
/** * 当一个Tuple处理成功时,会调用这个方法 */ @Override publicvoid ack(Object obj) { System.out.println("ack:"+obj); }
/** * 当Topology停止时,会调用这个方法 */ @Override publicvoid close() { System.out.println("关闭..."); }
/** * 当一个Tuple处理失败时,会调用这个方法 */ @Override publicvoid fail(Object obj) { System.out.println("失败:"+obj); }
} |
TestBolt
import java.util.Map;
import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;
publicclass TestBolt extends BaseRichBolt{
/** * */ privatestaticfinallongserialVersionUID = 4743224635827696343L;
private OutputCollector collector;
/** * 在Bolt启动前执行,提供Bolt启动环境配置的入口 * 一般对于不可序列化的对象进行实例化。 * 注:如果是可以序列化的对象,那么最好是使用构造函数。 */ publicvoid prepare(Map map, TopologyContext arg1, OutputCollector collector) { System.out.println("prepare:"+map.get("test")); this.collector=collector; }
/** * execute()方法是Bolt实现的核心。 * 也就是执行方法,每次Bolt从流接收一个订阅的tuple,都会调用这个方法。 */ publicvoid execute(Tuple tuple) { String msg=tuple.getStringByField("word"); System.out.println("开始分割单词:"+msg); String[] words = msg.toLowerCase().split(" "); for (String word : words) { this.collector.emit(new Values(word));//向下一个bolt发射数据 }
}
/** * 声明数据格式 */ publicvoid declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("count")); }
/** * cleanup是IBolt接口中定义,用于释放bolt占用的资源。 * Storm在终止一个bolt之前会调用这个方法。 */ @Override publicvoid cleanup() { System.out.println("TestBolt的资源释放"); } } |
Test2Bolt
import java.util.HashMap; import java.util.Map;
import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple;
publicclass Test2Bolt extends BaseRichBolt{
/** * */ privatestaticfinallongserialVersionUID = 4743224635827696343L;
/** * 保存单词和对应的计数 */ private HashMap<String, Integer> counts = null;
privatelongcount=1; /** * 在Bolt启动前执行,提供Bolt启动环境配置的入口 * 一般对于不可序列化的对象进行实例化。 * 注:如果是可以序列化的对象,那么最好是使用构造函数。 */ publicvoid prepare(Map map, TopologyContext arg1, OutputCollector collector) { System.out.println("prepare:"+map.get("test")); this.counts=new HashMap<String, Integer>(); }
/** * execute()方法是Bolt实现的核心。 * 也就是执行方法,每次Bolt从流接收一个订阅的tuple,都会调用这个方法。 * */ publicvoid execute(Tuple tuple) { String msg=tuple.getStringByField("count"); System.out.println("第"+count+"次统计单词出现的次数"); /** * 如果不包含该单词,说明在该map是第一次出现 * 否则进行加1 */ if (!counts.containsKey(msg)) { counts.put(msg, 1); } else { counts.put(msg, counts.get(msg)+1); } count++; }
/** * cleanup是IBolt接口中定义,用于释放bolt占用的资源。 * Storm在终止一个bolt之前会调用这个方法。 */ @Override publicvoid cleanup() { System.out.println("===========开始显示单词数量============"); for (Map.Entry<String, Integer> entry : counts.entrySet()) { System.out.println(entry.getKey() + ": " + entry.getValue()); } System.out.println("===========结束============"); System.out.println("Test2Bolt的资源释放"); }
/** * 声明数据格式 */ publicvoid declareOutputFields(OutputFieldsDeclarer arg0) {
} }
|
App
import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields;
publicclass App {
privatestaticfinal String test_spout="test_spout"; privatestaticfinal String test_bolt="test_bolt"; privatestaticfinal String test2_bolt="test2_bolt";
publicstaticvoid main(String[] args) { //定义一个拓扑 TopologyBuilder builder=new TopologyBuilder(); //设置一个Executeor(线程),默认一个 builder.setSpout(test_spout, new TestSpout(),1); //shuffleGrouping:表示是随机分组 //设置一个Executeor(线程),和一个task builder.setBolt(test_bolt, new TestBolt(),1).setNumTasks(1).shuffleGrouping(test_spout); //fieldsGrouping:表示是按字段分组 //设置一个Executeor(线程),和一个task builder.setBolt(test2_bolt, new Test2Bolt(),1).setNumTasks(1).fieldsGrouping(test_bolt, new Fields("count")); Config conf = new Config(); conf.put("test", "test"); try{ //运行拓扑 if(args !=null&&args.length>0){ //有参数时,表示向集群提交作业,并把第一个参数当做topology名称 System.out.println("运行远程模式"); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else{//没有参数时,本地提交 //启动本地模式 System.out.println("运行本地模式"); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Word-counts" ,conf, builder.createTopology() ); Thread.sleep(20000); // //关闭本地集群 cluster.shutdown(); } }catch (Exception e){ e.printStackTrace(); } } } |
踩坑 The POM forring-cors:ring-cors:jar:0.1.5 is missing
解决办法:
https://mvnrepository.com/artifact/ring-cors/ring-cors/0.1.12
直接把ring-cors-0.1.12.jar ring-cors-0.1.12.pom下载下来,放到maven本地库中。
从坑里爬出来之后,就能跑起storm hello world,有初步的Hello World感觉之后,可以再详细看看官方的资料,除了看别人翻译的,强烈建议对比着官方的看。官方有详细的文档清单,不要着急,一个一个慢慢看。
Basics of Storm
Javadoc
Concepts
Scheduler
Configuration
Guaranteeing message processing
Daemon Fault Tolerance
Command line client
REST API
Understanding the parallelism of a Storm topology
FAQ
Layers on top of Storm
Trident
Tridentis an alternative interface to Storm. It provides exactly-once processing,"transactional" datastore persistence, and a set of common streamanalytics operations.
Trident Tutorial -- basic concepts and walkthrough
Trident API Overview -- operations for transforming and orchestrating data
Trident State -- exactly-once processing and fast, persistent aggregation
Trident spouts -- transactional and non-transactional data intake
Trident RAS API -- using the Resource Aware Scheduler with Trident.
Streams API
StreamAPIs is another alternative interface to Storm. It provides a typed API forexpressing streaming computations and supports functional style operations.
NOTE:Streams API is an experimental
feature,and further works might break backward compatibility. We're also notifying itvia annotating classes with marker interface @InterfaceStability.Unstable
.
Streams API
SQL
TheStorm SQL integration allows users to run SQL queries over streaming data inStorm.
NOTE:Storm SQL is an experimental
feature,so the internals of Storm SQL and supported features are subject to change. Butsmall change will not affect the user experience. We will notify the user whenbreaking UX change is introduced.
Storm SQL overview
Storm SQL example
Storm SQL reference
Storm SQL internal
Flux
Flux Data Driven Topology Builder
Setup and Deploying
Setting up a Storm cluster
Local mode
Troubleshooting
Running topologies on a production cluster
Building Storm with Maven
Setting up a Secure Cluster
CGroup Enforcement
Pacemaker reduces load on zookeeper for large clusters
Resource Aware Scheduler
Daemon Metrics/Monitoring
Windows users guide
Classpath handling
Intermediate
Serialization
Common patterns
DSLs and multilang adapters
Using non-JVM languages with Storm
Distributed RPC
Transactional topologies
Hooks
Metrics (Deprecated)
Metrics V2
State Checkpointing
Windowing
Joining Streams
Blobstore(Distcahce)
Debugging
Dynamic Log Level Settings
Searching Worker Logs
Worker Profiling
Event Logging
Integration With External Systems, and Other Libraries
Apache Kafka Integration, New Kafka Consumer Integration
Apache HBase Integration
Apache HDFS Integration
Apache Hive Integration
Apache Solr Integration
Apache Cassandra Integration
Apache RocketMQ Integration
JDBC Integration
JMS Integration
MQTT Integration
Redis Integration
Event Hubs Intergration
Elasticsearch Integration
Mongodb Integration
OpenTSDB Integration
Kinesis Integration
Druid Integration
PMML Integration
Kestrel Integration
Container, Resource Management System Integration
YARN Integration
Mesos Integration
Docker Integration
Kubernetes Integration
Advanced
Defining a non-JVM language DSL for Storm
Multilang protocol (how to provide support for another language)
Implementation docs
Storm Metricstore
参考与引用:
http://storm.apache.org/releases/current/index.html
https://www.cnblogs.com/xuwujing/p/8584684.html
以上是关于Storm历险记之浅入浅出:Storm Hello World入门示例 本文适合Storm小白看C#转java,大神请吐槽指点。的主要内容,如果未能解决你的问题,请参考以下文章