为啥在这个最小示例中 Trident 不调用 ack() 或 fail()?
Posted
技术标签:
【中文标题】为啥在这个最小示例中 Trident 不调用 ack() 或 fail()?【英文标题】:Why does Trident not call ack() or fail() in this minimal example?为什么在这个最小示例中 Trident 不调用 ack() 或 fail()? 【发布时间】:2019-11-21 15:29:25 【问题描述】:我尝试在 Trident 中创建一个小示例。目标是查看在失败的情况下如何重放元组。下面是拓扑定义
Random rand = new Random();
Config config = new Config();
config.setDebug(true);
config.setNumWorkers(1);
TridentTopology topology = new TridentTopology();
topology.newStream("spout", new RandomIntegerSpout())
.map((MapFunction) tridentTuple ->
if ((tridentTuple.getLongByField("msgid") % 50 == 0) &&
(rand.nextInt(2) == 1))
System.out.println(String.format("Failed to process tuple %d", tridentTuple.getLongByField("msgid")));
throw new ReportedFailedException("Divisible by 50");
return new Values(tridentTuple.toArray());
)
.peek((Consumer) tridentTuple -> System.out.println(tridentTuple.getValues()));
我使用来自storm-starter 的RandomIntegerSpout
,它扩展了BaseRichSpout
,并且只生成随机数。然后我应用一个MapFunction
,它每 50 个元组抽取一个随机数,并随机使元组失败。
问题是,我没有收到任何ack
s 或fail
s。
我使用了 spout 并在调试模式下运行它,尝试了相同的示例输出,并使用标准风暴螺栓进行了尝试。锚定工作正常,只是没有被三叉戟调用。
我在 v1.2.3 和 v2.0.0 中使用 LocalCluster 和 StormSubmitter 重现了这个问题。
下面是 Storm UI 的截图: 对应于 map 的bolts 确认并按预期使元组失败,但这永远不会传播回 spout。
我认为 trident mastercoord 可能期望某种状态中的持久性来实现拓扑已完成,但是用一些持久性聚合替换 peek 并没有帮助。我还通过对each
执行相同操作来排除map
中的错误。
通过检查查看代码几乎是微不足道的 我可能误解了有关 Trident / Storm 的一些基本内容。如果批处理完成,我期望 trident 调用 spout 和 ack
方法是错误的吗?我意识到IBatchSpout
中没有fail
方法。 Trident 如何处理批次的重放?
【问题讨论】:
【参考方案1】:Trident spout 不会在单个元组级别确认或失败元组。相反,元组被确认为一个批次。
三叉戟喷嘴通常看起来像this interface。
M emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, PartitionT partition, M lastPartitionMeta);
这个想法是,Trident 将管理跟踪批处理元组的确认/失败,然后如果批处理失败,它会要求 spout 重复批处理,如果没有,它根本不会。
请注意这与标准 Storm spout 有何不同。对于普通的 spout,框架基本上会告诉 spout “嘿,发出一些东西。由你决定发出什么。”,然后使用 ack
和 fail
方法告诉 spout 是否应该发出特定的元组再次。
使用 Trident,spout 会被告知“嘿,(重新)发出批号 x”,然后由 spout 知道该批次中有哪些元组。有了这个模型,就不需要fail
方法。不过,一些 Trident spout 将具有 ack/succeed
方法,以允许 spout 丢弃它可能与特定正在进行的批次相关的任何状态。
对于包装的IRichSpouts
,有一些bridging code 将它们包装到Trident API 中。基本上,包装器调用nextTuple
直到它有一个完整的批次,然后它将ID 存储在缓存中。如果要求包装器重新发送一个批次,它会在 spout 上调用 fail
。否则,一旦批处理成功,它就会调用ack
。
我认为您在 Storm UI 中没有看到与此相关的任何内容的原因是 IRichBolt
实际上并没有在那里表示。相反,它被包装了,所以ack/fail
调用发生在spout-spout
组件内的“幕后”。如果您想确定是否调用了 ack/fail,请尝试在您的 IRichSpout
的 ack/fail
方法中添加一些日志记录。
【讨论】:
以上是关于为啥在这个最小示例中 Trident 不调用 ack() 或 fail()?的主要内容,如果未能解决你的问题,请参考以下文章