在 Storm 中通过一系列螺栓进行 ACK 的正确方法
Posted
技术标签:
【中文标题】在 Storm 中通过一系列螺栓进行 ACK 的正确方法【英文标题】:Proper way to ACK in Storm in a chain of bolts 【发布时间】:2014-01-04 12:41:46 【问题描述】:只是想确保我了解 Ack-ing 在 Storm 中的工作原理。 我有 1 个喷口和 2 个螺栓链接在一起。 Spout 向 Bolt1 发出元组,Bolt1 又向 Bolt 2 发出一个元组。我希望 Bolt 2 确认从 Spout 发送的初始元组,但我不确定如何。
为了保证容错(即:重新发送元组),我想在螺栓 2 中确认 Spout 发出的元组,以防万一它在进程中的某个地方失败,以便可以重新发送。
考虑这个例子:
喷口:
_collector.emit(new Values(queue.dequeue())
螺栓1:
def execute(tuple: Tuple)
_collector.emit(tuple, new Values("stuff"))
此时 tuple 就是 spout 发送的元组。我可以在这里确认它没有问题。现在添加另一个监听 Bolt1 发出的元组的螺栓。
螺栓2:
def execute(tuple2: Tuple)
_collector.emit(tuple2, new Values("foo"))
此时 tuple2 中的元组是从 Bolt1 发送的元组(其中包含字符串“stuff”的元组)。 因此,如果我在 Bolt2 中发送一个确认,这将确认来自 Bolt1 的元组,而不是从 Spout 发送的元组。正确的?
如何确认从 spout 发送的元组?我是否应该在所有其他 spout 上搭载初始 spout,以便我可以在最后一个 Bolt 中检索它并确认它?
我阅读了 Nathan 的教程,我得到的印象是我可以在发出 tuple2 后立即确认在 Bolt1(来自 Spout)中收到的 tuple。这会将新发出的 tuple2 链接到 Spout 发送的原始元组,因此当 Bolt2 确认元组 2 时,它实际上确认了来自 Spout 的原始元组。这是真的?
如果我在解释中遗漏了什么,请告诉我。
【问题讨论】:
【参考方案1】:对于那些感兴趣的人,我通过在风暴组中询问找到了解决方案。 我需要在 Spout 中以以下方式发出元组(具有唯一 ID):
喷口:
//ties in tuple to this UID
_collector.emit(new Values(queue.dequeue(), *uniqueID*)
然后 Bolt1 只有在将元组发送到 Bolt2 后才会确认该元组
螺栓1:
//emit first then ack
_collector.emit(tuple, new Values("stuff")) //**anchoring** - read below to see what it means
_collector.ack(tuple)
此时,来自 Spout 的元组已在 Bolt1 中得到确认,但同时新发出到 Bolt2 的元组“东西”被“锚定”到来自 Spout 的元组。这意味着它仍然需要稍后被确认,否则在超时时它将被 spout 重新发送。
螺栓2:
_collector.ack(tuple)
Bolt2 需要确认从 Bolt1 收到的元组,该元组将发送 Spout 等待的最后一个确认。如果此时 Bolt2 发出元组,那么一定有一个 Bolt3 会得到它并确认它。如果元组在最后一点没有被确认,Spout 将超时并重新发送。
每次在螺栓到螺栓的emit
语句上完成锚定时,都会在“树”结构中构建一个新节点......在我的情况下更像是一个列表,因为我从未将相同的元组发送到 2或更多元组,我有一个 1 对 1 的关系。
需要确认树中的所有节点,然后才将元组标记为完全到达。如果元组没有被确认并且它使用 UID 发送并稍后锚定,那么它将永远保存在内存中(直到确认)。
希望这会有所帮助。
【讨论】:
如果我有两个来自螺栓 1 的平行螺栓,例如 bolt1 ---> bolt2 和 bolt1 --> bolt3?现在我应该从螺栓 2 和螺栓 3 确认 tuple2 吗?这有意义吗?【参考方案2】:您可以在official documentation 中了解此内容。
如果您想在所有螺栓中跟踪 touples 的执行,您可以使用 BaseBasicBolt 作为已定义此行为的父类。
在任何其他用例中(即您希望在最后一个螺栓执行之前确认元组),您应该手动定义元组之间的链接(称为锚定)。请参阅文档。
【讨论】:
【参考方案3】:你需要 anchor
tuple 。
看看Guaranteeing-message-processing
尤其是你需要这个:
List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));
【讨论】:
为什么这样更好?根据文档,它用于连接来自不同来源的元组以重播来自所有这些来源的元组的情况。我认为这不会正确执行 tuple1 和 tuple2 来自同一个 spout,也许当它失败时它可以重播源两次而不是一次。如果我错了,请纠正我。 @zenbeni 看到我的回复 - 这就是你要找的吗?以上是关于在 Storm 中通过一系列螺栓进行 ACK 的正确方法的主要内容,如果未能解决你的问题,请参考以下文章