为啥在这个最小示例中 Trident 不调用 ack() 或 fail()?

Posted

技术标签:

【中文标题】为啥在这个最小示例中 Trident 不调用 ack() 或 fail()?【英文标题】:Why does Trident not call ack() or fail() in this minimal example?为什么在这个最小示例中 Trident 不调用 ack() 或 fail()? 【发布时间】:2019-11-21 15:29:25 【问题描述】:

我尝试在 Trident 中创建一个小示例。目标是查看在失败的情况下如何重放元组。下面是拓扑定义

        Random rand = new Random();

        Config config = new Config();
        config.setDebug(true);
        config.setNumWorkers(1);

        TridentTopology topology = new TridentTopology();

        topology.newStream("spout", new RandomIntegerSpout())
                .map((MapFunction) tridentTuple -> 
                    if ((tridentTuple.getLongByField("msgid") % 50 == 0) &&
                            (rand.nextInt(2) == 1)) 
                        System.out.println(String.format("Failed to process tuple %d", tridentTuple.getLongByField("msgid")));
                        throw new ReportedFailedException("Divisible by 50");
                    
                    return new Values(tridentTuple.toArray());
                )
                .peek((Consumer) tridentTuple -> System.out.println(tridentTuple.getValues()));

我使用来自storm-starter 的RandomIntegerSpout,它扩展了BaseRichSpout,并且只生成随机数。然后我应用一个MapFunction,它每 50 个元组抽取一个随机数,并随机使元组失败。

问题是,我没有收到任何acks 或fails。

我使用了 spout 并在调试模式下运行它,尝试了相同的示例输出,并使用标准风暴螺栓进行了尝试。锚定工作正常,只是没有被三叉戟调用。

我在 v1.2.3 和 v2.0.0 中使用 LocalCluster 和 StormSubmitter 重现了这个问题。

下面是 Storm UI 的截图: 对应于 map 的bolts 确认并按预期使元组失败,但这永远不会传播回 spout。

我认为 trident mastercoord 可能期望某种状态中的持久性来实现拓扑已完成,但是用一些持久性聚合替换 peek 并没有帮助。我还通过对each 执行相同操作来排除map 中的错误。

通过检查查看代码几乎是微不足道的 我可能误解了有关 Trident / Storm 的一些基本内容。如果批处理完成,我期望 trident 调用 spout 和 ack 方法是错误的吗?我意识到IBatchSpout 中没有fail 方法。 Trident 如何处理批次的重放?

【问题讨论】:

【参考方案1】:

Trident spout 不会在单个元组级别确认或失败元组。相反,元组被确认为一个批次。

三叉戟喷嘴通常看起来像this interface。

M emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, PartitionT partition, M lastPartitionMeta);

这个想法是,Trident 将管理跟踪批处理元组的确认/失败,然后如果批处理失败,它会要求 spout 重复批处理,如果没有,它根本不会。

请注意这与标准 Storm spout 有何不同。对于普通的 spout,框架基本上会告诉 spout “嘿,发出一些东西。由你决定发出什么。”,然后使用 ackfail 方法告诉 spout 是否应该发出特定的元组再次。

使用 Trident,spout 会被告知“嘿,(重新)发出批号 x”,然后由 spout 知道该批次中有哪些元组。有了这个模型,就不需要fail 方法。不过,一些 Trident spout 将具有 ack/succeed 方法,以允许 spout 丢弃它可能与特定正在进行的批次相关的任何状态。

对于包装的IRichSpouts,有一些bridging code 将它们包装到Trident API 中。基本上,包装器调用nextTuple 直到它有一个完整的批次,然后它将ID 存储在缓存中。如果要求包装器重新发送一个批次,它会在 spout 上调用 fail。否则,一旦批处理成功,它就会调用ack

我认为您在 Storm UI 中没有看到与此相关的任何内容的原因是 IRichBolt 实际上并没有在那里表示。相反,它被包装了,所以ack/fail 调用发生在spout-spout 组件内的“幕后”。如果您想确定是否调用了 ack/fail,请尝试在您的 IRichSpoutack/fail 方法中添加一些日志记录。

【讨论】:

以上是关于为啥在这个最小示例中 Trident 不调用 ack() 或 fail()?的主要内容,如果未能解决你的问题,请参考以下文章

Storm Trident示例ReducerAggregator

Storm Trident示例Aggregator

为啥这个表达不统一

最小生成树模板

最小示例中的无效挂钩调用错误[关闭]

Storm Trident示例shuffle&parallelismHint