在 Storm 上使用 BaseRichSpout 无限次调用 nextTuple()
Posted
技术标签:
【中文标题】在 Storm 上使用 BaseRichSpout 无限次调用 nextTuple()【英文标题】:nextTuple() is called infinite times using BaseRichSpout on Storm 【发布时间】:2014-12-08 08:55:51 【问题描述】:我实现了简单的 Storm 拓扑,它具有单个 spout 和一个在本地集群模式下运行的 bolt。
由于某种原因,spout 的 nextTuple() 被多次调用。
知道为什么吗?
代码:
喷口:
public class CommitFeedListener extends BaseRichSpout
private SpoutOutputCollector outputCollector;
private List<String> commits;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
declarer.declare(new Fields("commit"));
@Override
public void open(Map configMap,
TopologyContext context,
SpoutOutputCollector outputCollector)
this.outputCollector = outputCollector;
**//that method is invoked more than once**
@Override
public void nextTuple()
outputCollector.emit(new Values("testValue"));
螺栓:
public class EmailExtractor extends BaseBasicBolt
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
declarer.declare(new Fields("email"));
@Override
public void execute(Tuple tuple,
BasicOutputCollector outputCollector)
String commit = tuple.getStringByField("commit");
System.out.println(commit);
运行配置:
public class LocalTopologyRunner
private static final int TEN_MINUTES = 600000;
public static void main(String[] args)
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("commit-feed-listener", new CommitFeedListener());
builder
.setBolt("email-extractor", new EmailExtractor())
.shuffleGrouping("commit-feed-listener");
Config config = new Config();
config.setDebug(true);
StormTopology topology = builder.createTopology();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("github-commit-count-topology",
config,
topology);
Utils.sleep(TEN_MINUTES);
cluster.killTopology("github-commit-count");
cluster.shutdown();
谢谢大家, 射线。
【问题讨论】:
【参考方案1】:nextTuple() 被设计成无限循环调用。这样做是为了例如对外部资源(数据库、流、IO 等)进行脏检查。
如果你在 nextTuple() 中无事可做,你应该睡一会儿以防止 CPU 使用 backtype.storm.utils.Utils 发送垃圾邮件
Utils.sleep(pollIntervalInMilliseconds);
Storm 是一种实时处理架构,因此它确实是正确的行为。查看一些示例,了解如何根据您的需要实现 spout。
【讨论】:
是的,但如果我有一个输入文件。一旦我读完它,我不希望它再次重新处理它。你怎么能避免呢? 每次发送值后只需以较高的毫秒值休眠,以防止 CPU 垃圾邮件在您发送一次值后。但是 spout 作为实时检查器,总是会按设计调用方法。 但是如果我睡觉对我没有帮助。我想知道我什么时候读完文件,例如……或一个列表。我怎么知道该方法是否一遍又一遍地调用..? @rayman,你应该先描述你的用例。您无法将 Storm 设计用于您的用途,但您可以根据需要设计拓扑。我不明白你想做什么。 我理解你。我阅读了文档。这次做对了。【参考方案2】:如何创建一些标志并在必要时设置它?
if (completed)
try
Utils.sleep(pollIntervalInMilliseconds);
catch (InterruptedException e)
// Do nothing
return;
【讨论】:
以上是关于在 Storm 上使用 BaseRichSpout 无限次调用 nextTuple()的主要内容,如果未能解决你的问题,请参考以下文章