Apache Storm Trident - 动态创建拓扑

Posted

技术标签:

【中文标题】Apache Storm Trident - 动态创建拓扑【英文标题】:Apache storm Trident - creating topologies dynamically 【发布时间】:2016-06-20 15:20:39 【问题描述】:

有没有办法在 trident 中动态创建拓扑? 谁能提供例子?

【问题讨论】:

您可以将拓扑配置存储在某个属性文件 (JSON) 中,并且在部署拓扑时可以从该文件中读取它。但是一旦你部署了它,你就不能动态地改变它 【参考方案1】:

首先,您可能还知道创建拓扑不是 Trident 的一部分。 Trident 只是一个用于微批处理的 API。

根据定义,创建新拓扑是动态的。这就是TopologyBuilder 类正在做的事情。

所以要回答您的问题,是的,可以从 Trident 或简单的 Storm spouts and bolts 创建新拓扑。您唯一需要的是,您的拓扑创建逻辑应该可以访问 Storm 集群(类和其他资源),如果您在 Storm 中运行逻辑,这再次满足定义。

您需要做的最后一件事是找到一种方法来提交新创建的拓扑,这就是 StormSubmitter 类的用途,这也是 (!surprise :) ) 满足您的定义在 Trident 或普通 spout/bolt 中运行逻辑时的类路径。

出于好奇,您为什么打算这样做?您有什么要求?

例子:

import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.tuple.Fields;

public class DynamicTopologySpout implements IBatchSpout 

    private static final long serialVersionUID = -3269435263455830842L;

    @Override
    @SuppressWarnings("rawtypes")
    public void open(Map conf, TopologyContext context) 

    @Override
    public void emitBatch(long batchId, TridentCollector collector) 
        if (newTopologyNeeded()) 
            TopologyBuilder builder = new TopologyBuilder();
            builder
            .setSpout("spout", new BaseRichSpout() 
                private static final long serialVersionUID = 1L;
                @Override public void declareOutputFields(OutputFieldsDeclarer declarer) 
                @Override @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) 
                @Override public void nextTuple() 
            , 1)
            .setMaxSpoutPending(15)
            .setNumTasks(1);
            StormTopology topology = builder.createTopology();
            Config config = new Config();
            try 
                StormSubmitter.submitTopology("dynamic-topology", config, topology);
             catch (Exception e) 
                e.printStackTrace();
                collector.reportError(e);
            
        
    

    private boolean newTopologyNeeded() 
        // Check if topology needed ...
        return false;
    

    @Override
    public void ack(long batchId) 

    @Override
    public void close() 

    @Override
    public Map<String, Object> getComponentConfiguration()  return null; 

    @Override
    public Fields getOutputFields()  return null; 


【讨论】:

以上是关于Apache Storm Trident - 动态创建拓扑的主要内容,如果未能解决你的问题,请参考以下文章

Trident整合Kafka

Storm Trident示例shuffle&parallelismHint

Apache Storm 官方文档 —— Trident State

如何修复 Apache Storm Trident 拓扑中的错误“组件:[x] 订阅不存在的组件 [y]”

Storm-HBase Trident - 同时查询多个列

storm trident function函数