Apache Storm:一个本地执行任务的Demo
Posted 你是小KS
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Storm:一个本地执行任务的Demo相关的知识,希望对你有一定的参考价值。
当前版本:Apache Storm 2.1.1
1. 声明
当前内容主要为本人学习使用Apache Storm执行本地计算操作,当前内容主要参考:官方文档
当前内容:
- 使用模拟随机生成的无限流作为数据源
- 处理和过滤数据并存放到指定的Bolt位置
基本pom依赖
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.1.1</version>
</dependency>
主要组件:
- Spout为数据源(就是产生数据的地方,或者拉取数据的地方)
- Bolt为数据处理的地方
2.基本Demo
public class App
public static void main(String[] args)
Map<String, Object> conf = new HashMap<>();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("temperature", new DataStreamSource(), 3);
builder.setBolt("high_temperature", new HighTemperatureBolt(), 3).shuffleGrouping("temperature");
builder.setBolt("system_print", new SystemPrintBolt(), 3).shuffleGrouping("high_temperature");
/*
* try (LocalCluster cluster = new LocalCluster()) // Interact with the
* cluster...
*
* StormTopology createTopology = builder.createTopology(); LocalTopology
* submitTopology = cluster.submitTopology("test", conf, createTopology);
* cluster.activate("test"); cluster.addSupervisor(); catch (Exception e) //
* TODO Auto-generated catch block e.printStackTrace();
*/
try (LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
LocalTopology topo = cluster.submitTopology("drpc-demo", conf, builder.createTopology()))
System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));
catch (Exception e)
// TODO Auto-generated catch block
e.printStackTrace();
// 无限流方式,
public static class DataStreamSource extends BaseRichSpout
public static Logger LOG = LoggerFactory.getLogger(DataStreamSource.class);
private SpoutOutputCollector collector;
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector)
this.collector = collector;
@Override
public void close()
// TODO Auto-generated method stub
@Override
public void nextTuple()
try
Thread.sleep(1000L);
catch (InterruptedException e)
// TODO Auto-generated catch block
e.printStackTrace();
// 产生随机温度
double temperature = Math.random() * 10 + 34.5;
/* System.out.println("产出温度==>" + temperature); */
long timestamp = System.currentTimeMillis();
Temperature data = new Temperature(timestamp, temperature);
collector.emit(new Values(data));
@Override
public void ack(Object msgId)
// TODO Auto-generated method stub
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
declarer.declare(new Fields("temperature"));
private static class Temperature
private final long timestamp;
private final double temperature;
public Temperature(long timestamp, double temperature)
super();
this.timestamp = timestamp;
this.temperature = temperature;
public long getTimestamp()
return timestamp;
public double getTemperature()
return temperature;
public static class SystemPrintBolt extends BaseRichBolt
private OutputCollector collector;
@Override
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector)
// TODO Auto-generated method stub
this.collector = collector;
@Override
public void execute(Tuple input)
int size = input.size();
for (int i = 0; i < size; i++)
Temperature temperature = (Temperature) input.getValue(i);
System.out.println("high temperature:" + temperature.getTemperature() + ",timestamp:"
+ temperature.getTimestamp());
collector.ack(input);
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
// TODO Auto-generated method stub
public static class HighTemperatureBolt implements IRichBolt
OutputCollector _collector;
private double highLimitTemperature = 37.5;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector)
_collector = collector;
@Override
public void execute(Tuple tuple)
int size = tuple.size();
for (int i = 0; i < size; i++)
Temperature temperature = (Temperature) tuple.getValue(i);
double currentValue = temperature.getTemperature();
if (currentValue > highLimitTemperature)
// 收集温度超过特定限制的数据
_collector.emit(tuple, new Values(temperature));
_collector.ack(tuple);
@Override
public void cleanup()
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
declarer.declare(new Fields("high_temperature"));
@Override
public Map<String, Object> getComponentConfiguration()
return null;
这里发现无法使用LocalCluster方式执行任务
3. 测试
测试成功!
以上是关于Apache Storm:一个本地执行任务的Demo的主要内容,如果未能解决你的问题,请参考以下文章