Apache Storm Trident - 动态创建拓扑
Posted
技术标签:
【中文标题】Apache Storm Trident - 动态创建拓扑【英文标题】:Apache storm Trident - creating topologies dynamically 【发布时间】:2016-06-20 15:20:39 【问题描述】:有没有办法在 trident 中动态创建拓扑? 谁能提供例子?
【问题讨论】:
您可以将拓扑配置存储在某个属性文件 (JSON) 中,并且在部署拓扑时可以从该文件中读取它。但是一旦你部署了它,你就不能动态地改变它 【参考方案1】:首先,您可能还知道创建拓扑不是 Trident 的一部分。 Trident 只是一个用于微批处理的 API。
根据定义,创建新拓扑是动态的。这就是TopologyBuilder
类正在做的事情。
所以要回答您的问题,是的,可以从 Trident 或简单的 Storm spouts and bolts 创建新拓扑。您唯一需要的是,您的拓扑创建逻辑应该可以访问 Storm 集群(类和其他资源),如果您在 Storm 中运行逻辑,这再次满足定义。
您需要做的最后一件事是找到一种方法来提交新创建的拓扑,这就是 StormSubmitter
类的用途,这也是 (!surprise :) ) 满足您的定义在 Trident 或普通 spout/bolt 中运行逻辑时的类路径。
出于好奇,您为什么打算这样做?您有什么要求?
例子:
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.tuple.Fields;
public class DynamicTopologySpout implements IBatchSpout
private static final long serialVersionUID = -3269435263455830842L;
@Override
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context)
@Override
public void emitBatch(long batchId, TridentCollector collector)
if (newTopologyNeeded())
TopologyBuilder builder = new TopologyBuilder();
builder
.setSpout("spout", new BaseRichSpout()
private static final long serialVersionUID = 1L;
@Override public void declareOutputFields(OutputFieldsDeclarer declarer)
@Override @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
@Override public void nextTuple()
, 1)
.setMaxSpoutPending(15)
.setNumTasks(1);
StormTopology topology = builder.createTopology();
Config config = new Config();
try
StormSubmitter.submitTopology("dynamic-topology", config, topology);
catch (Exception e)
e.printStackTrace();
collector.reportError(e);
private boolean newTopologyNeeded()
// Check if topology needed ...
return false;
@Override
public void ack(long batchId)
@Override
public void close()
@Override
public Map<String, Object> getComponentConfiguration() return null;
@Override
public Fields getOutputFields() return null;
【讨论】:
以上是关于Apache Storm Trident - 动态创建拓扑的主要内容,如果未能解决你的问题,请参考以下文章
Storm Trident示例shuffle¶llelismHint
Apache Storm 官方文档 —— Trident State