在 Storm 上使用 BaseRichSpout 无限次调用 nextTuple()

Posted

技术标签:

【中文标题】在 Storm 上使用 BaseRichSpout 无限次调用 nextTuple()【英文标题】:nextTuple() is called infinite times using BaseRichSpout on Storm 【发布时间】:2014-12-08 08:55:51 【问题描述】:

我实现了简单的 Storm 拓扑,它具有单个 spout 和一个在本地集群模式下运行的 bolt。

由于某种原因,spout 的 nextTuple() 被多次调用。

知道为什么吗?

代码:

喷口:

public class CommitFeedListener extends BaseRichSpout 
    private SpoutOutputCollector outputCollector;
    private List<String> commits;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
        declarer.declare(new Fields("commit"));
    

    @Override
    public void open(Map configMap,
                     TopologyContext context,
                     SpoutOutputCollector outputCollector) 
        this.outputCollector = outputCollector;
    

    **//that method is invoked more than once**
    @Override
    public void nextTuple() 

            outputCollector.emit(new Values("testValue"));

    

螺栓:

public class EmailExtractor extends BaseBasicBolt 
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) 
        declarer.declare(new Fields("email"));
    
    @Override
    public void execute(Tuple tuple,
                        BasicOutputCollector outputCollector) 
        String commit = tuple.getStringByField("commit");
        System.out.println(commit);        
      

运行配置:

public class LocalTopologyRunner 
    private static final int TEN_MINUTES = 600000;
    public static void main(String[] args) 
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("commit-feed-listener", new CommitFeedListener());
                builder
        .setBolt("email-extractor", new EmailExtractor())
                .shuffleGrouping("commit-feed-listener");
        Config config = new Config();
        config.setDebug(true);
        StormTopology topology = builder.createTopology();
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("github-commit-count-topology",
                config,
                topology);
        Utils.sleep(TEN_MINUTES);
        cluster.killTopology("github-commit-count");
        cluster.shutdown();
    

谢谢大家, 射线。

【问题讨论】:

【参考方案1】:

nextTuple() 被设计成无限循环调用。这样做是为了例如对外部资源(数据库、流、IO 等)进行脏检查。

如果你在 nextTuple() 中无事可做,你应该睡一会儿以防止 CPU 使用 backtype.storm.utils.Utils 发送垃圾邮件

Utils.sleep(pollIntervalInMilliseconds);

Storm 是一种实时处理架构,因此它确实是正确的行为。查看一些示例,了解如何根据您的需要实现 spout。

【讨论】:

是的,但如果我有一个输入文件。一旦我读完它,我不希望它再次重新处理它。你怎么能避免呢? 每次发送值后只需以较高的毫秒值休眠,以防止 CPU 垃圾邮件在您发送一次值后。但是 spout 作为实时检查器,总是会按设计调用方法。 但是如果我睡觉对我没有帮助。我想知道我什么时候读完文件,例如……或一个列表。我怎么知道该方法是否一遍又一遍地调用..? @rayman,你应该先描述你的用例。您无法将 Storm 设计用于您的用途,但您可以根据需要设计拓扑。我不明白你想做什么。 我理解你。我阅读了文档。这次做对了。【参考方案2】:

如何创建一些标志并在必要时设置它?

if (completed) 
    try 
        Utils.sleep(pollIntervalInMilliseconds);
     catch (InterruptedException e) 
        // Do nothing
    
    return;

【讨论】:

以上是关于在 Storm 上使用 BaseRichSpout 无限次调用 nextTuple()的主要内容,如果未能解决你的问题,请参考以下文章

Storm之路-WordCount-实例

55.storm 之 hello word(本地模式)

Storm学习日记

在 Windows 上使用“storm jar”命令远程部署 Storm 拓扑

3.storm-starter打包在storm集群上运行

如何在cloudfoundry上使用kafka和storm?