Apache Storm:一个本地执行任务的Demo

Posted 你是小KS

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Storm:一个本地执行任务的Demo相关的知识,希望对你有一定的参考价值。

当前版本:Apache Storm 2.1.1

1. 声明

当前内容主要为本人学习使用Apache Storm执行本地计算操作,当前内容主要参考:官方文档
当前内容:

  1. 使用模拟随机生成的无限流作为数据源
  2. 处理和过滤数据并存放到指定的Bolt位置

基本pom依赖

<dependency>
	<groupId>org.apache.storm</groupId>
	<artifactId>storm-core</artifactId>
	<version>2.1.1</version>
</dependency>

主要组件:

  1. Spout为数据源(就是产生数据的地方,或者拉取数据的地方)
  2. Bolt为数据处理的地方

2.基本Demo

public class App 
	public static void main(String[] args) 
		Map<String, Object> conf = new HashMap<>();
		TopologyBuilder builder = new TopologyBuilder();
		builder.setSpout("temperature", new DataStreamSource(), 3);
		builder.setBolt("high_temperature", new HighTemperatureBolt(), 3).shuffleGrouping("temperature");
		builder.setBolt("system_print", new SystemPrintBolt(), 3).shuffleGrouping("high_temperature");
		/*
		 * try (LocalCluster cluster = new LocalCluster())  // Interact with the
		 * cluster...
		 * 
		 * StormTopology createTopology = builder.createTopology(); LocalTopology
		 * submitTopology = cluster.submitTopology("test", conf, createTopology);
		 * cluster.activate("test"); cluster.addSupervisor();  catch (Exception e)  //
		 * TODO Auto-generated catch block e.printStackTrace(); 
		 */

		try (LocalDRPC drpc = new LocalDRPC();
				LocalCluster cluster = new LocalCluster();
				LocalTopology topo = cluster.submitTopology("drpc-demo", conf, builder.createTopology())) 

			System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));
		 catch (Exception e) 
			// TODO Auto-generated catch block
			e.printStackTrace();
		
	

	// 无限流方式,
	public static class DataStreamSource extends BaseRichSpout 
		public static Logger LOG = LoggerFactory.getLogger(DataStreamSource.class);
		private SpoutOutputCollector collector;

		@Override
		public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) 
			this.collector = collector;
		

		@Override
		public void close() 
			// TODO Auto-generated method stub

		

		@Override
		public void nextTuple() 
			try 
				Thread.sleep(1000L);
			 catch (InterruptedException e) 
				// TODO Auto-generated catch block
				e.printStackTrace();
			
			// 产生随机温度
			double temperature = Math.random() * 10 + 34.5;
			/* System.out.println("产出温度==>" + temperature); */
			long timestamp = System.currentTimeMillis();
			Temperature data = new Temperature(timestamp, temperature);
			collector.emit(new Values(data));

		

		@Override
		public void ack(Object msgId) 
			// TODO Auto-generated method stub

		

		@Override
		public void declareOutputFields(OutputFieldsDeclarer declarer) 
			declarer.declare(new Fields("temperature"));
		

	

	private static class Temperature 
		private final long timestamp;
		private final double temperature;

		public Temperature(long timestamp, double temperature) 
			super();
			this.timestamp = timestamp;
			this.temperature = temperature;
		

		public long getTimestamp() 
			return timestamp;
		

		public double getTemperature() 
			return temperature;
		

	

	public static class SystemPrintBolt extends BaseRichBolt 
		private OutputCollector collector;

		@Override
		public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) 
			// TODO Auto-generated method stub
			this.collector = collector;
		

		@Override
		public void execute(Tuple input) 
			int size = input.size();
			for (int i = 0; i < size; i++) 
				Temperature temperature = (Temperature) input.getValue(i);
				System.out.println("high temperature:" + temperature.getTemperature() + ",timestamp:"
						+ temperature.getTimestamp());
			
			collector.ack(input);
		

		@Override
		public void declareOutputFields(OutputFieldsDeclarer declarer) 
			// TODO Auto-generated method stub

		

	

	public static class HighTemperatureBolt implements IRichBolt 
		OutputCollector _collector;
		private double highLimitTemperature = 37.5;

		@Override
		public void prepare(Map conf, TopologyContext context, OutputCollector collector) 
			_collector = collector;
		

		@Override
		public void execute(Tuple tuple) 
			int size = tuple.size();
			for (int i = 0; i < size; i++) 
				Temperature temperature = (Temperature) tuple.getValue(i);
				double currentValue = temperature.getTemperature();
				if (currentValue > highLimitTemperature) 
					// 收集温度超过特定限制的数据
					_collector.emit(tuple, new Values(temperature));

				
			
			_collector.ack(tuple);
		

		@Override
		public void cleanup() 
		

		@Override
		public void declareOutputFields(OutputFieldsDeclarer declarer) 
			declarer.declare(new Fields("high_temperature"));
		

		@Override
		public Map<String, Object> getComponentConfiguration() 
			return null;
		
	


这里发现无法使用LocalCluster方式执行任务

3. 测试


测试成功!

以上是关于Apache Storm:一个本地执行任务的Demo的主要内容,如果未能解决你的问题,请参考以下文章

原译文理解storm拓扑并行度

storm的定时任务

Storm WordCount

Storm并行中的“任务”是啥

在 Apache Storm 中重新平衡执行器

Storm集群上的开发 ,Storm的内部原理,storm提交任务的过程