Storm Kafka Spout 上的最大元组重播数
Posted
技术标签:
【中文标题】Storm Kafka Spout 上的最大元组重播数【英文标题】:Max number of tuple replays on Storm Kafka Spout 【发布时间】:2015-12-30 22:49:57 【问题描述】:我们将 Storm 与 Kafka Spout 一起使用。当消息失败时,我们希望重播它们,但在某些情况下,错误的数据或代码错误会导致消息总是失败 Bolt,因此我们将进入无限重播循环。显然,当我们发现错误时,我们正在修复它们,但希望我们的拓扑通常具有容错性。重放 N 次以上后,我们如何 ack() 一个元组?
查看 Kafka Spout 的代码,我发现它旨在使用指数退避计时器和 comments on the PR 状态重试:
“spout 不会终止重试周期(我认为它不应该这样做,因为它无法报告有关发生中止请求的失败的上下文),它只处理延迟重试。一个螺栓仍然期望拓扑最终会调用 ack() 而不是 fail() 来停止循环。”
我看到了建议编写自定义 spout 的 *** 响应,但如果有推荐的方法在 Bolt 中执行此操作,我宁愿不要被困在维护 Kafka Spout 内部的自定义补丁。
在 Bolt 中执行此操作的正确方法是什么?我在元组中没有看到任何状态会显示它被重放了多少次。
【问题讨论】:
如果您在螺栓中有一些错误检查,您可以根据您的业务逻辑得出特定元组“坏”的结论,您可以“确认”而不是失败......所以它会不能重播..... 【参考方案1】:据我所知,Storm 不为此提供内置支持。
我已经应用了下面提到的实现:
public class AuditMessageWriter extends BaseBolt
private static final long serialVersionUID = 1L;
Map<Object, Integer> failedTuple = new HashMap<>();
public AuditMessageWriter()
/**
* @inheritDoc
*/
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector)
this.collector = collector;
//any initialization if u want
/**
* @inheritDoc
*/
@Override
public void execute(Tuple input)
try
//Write your processing logic
collector.ack(input);
catch (Exception e2)
//In case of any exception save the tuple in failedTuple map with a count 1
//Before adding the tuple in failedTuple map check the count and increase it and fail the tuple
//if failure count reaches the limit (message reprocess limit) log that and remove from map and acknowledge the tuple
log(input);
ExceptionHandler.LogError(e2, "Message IO Exception");
void log(Tuple input)
try
//Here u can pass result to dead queue or log that
//And ack the tuple
catch (Exception e)
ExceptionHandler.LogError(e, "Exception while logging");
@Override
public void cleanup()
// To declare output fields.Not required in this alert.
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
// To declare output fields.Not required in this alert.
@Override
public Map<String, Object> getComponentConfiguration()
return null;
【讨论】:
【参考方案2】:我们只是让螺栓在错误流上发出错误元组并确认它。另一个 Bolt 通过将错误写回专门针对错误的 Kafka 主题来处理错误。这使我们能够轻松地通过拓扑引导正常与错误数据流。
元组失败的唯一情况是因为某些必需的资源处于脱机状态,例如网络连接、数据库……这些都是可重试的错误。任何其他内容都被定向到错误流以进行适当的修复或处理。
当然,这一切都假设您不想造成任何数据丢失。如果您只想尽最大努力并在重试几次后忽略,那么我会考虑其他选项。
【讨论】:
【参考方案3】:我们也面临着类似的数据,其中有错误数据导致螺栓无限失效。
为了在运行时解决这个问题,我们又引入了一个螺栓,将其命名为“DebugBolt”以供参考。因此,spout 首先将消息发送到这个bolt,然后这个bolts 对坏消息进行所需的数据修复,然后将它们发送到所需的bolt。这样就可以即时修复数据错误。
另外,如果您需要删除一些消息,您实际上可以将一个 ignoreFlag 从您的 DebugBolt 传递给您的原始 Bolt,如果 ignoreFlag 为 True,您的原始 Bolt 应该只发送一个 ack 到 spout 而不进行处理。
【讨论】:
【参考方案4】:基本上是这样工作的:
-
如果您部署拓扑,它们应该是生产级的(也就是说,需要一定的质量水平,并且元组的数量很少)。
如果元组失败,请检查元组是否真正有效。
如果元组有效(例如,由于无法连接到外部数据库而未能插入,或类似情况),请回复。
如果元组格式错误并且永远无法处理(例如,数据库 id 是文本并且数据库需要整数)它应该是
ack
,您将永远无法修复此类问题或插入进入数据库。
应记录新类型的异常(以及元组内容本身)。您应该检查这些日志并生成规则以在将来验证元组。并最终添加代码以在未来正确处理它们 (ETL)。
不要记录所有内容,否则您的日志文件会很大,请谨慎选择要记录的内容。日志文件的内容应该是有用的,而不是一堆垃圾。
继续这样做,最终您将只涵盖所有情况。
【讨论】:
【参考方案5】:Storm 本身不为您的问题提供任何支持。因此,定制的解决方案是唯一的出路。即使您不想修补KafkaSpout
,我认为引入一个计数器并打破其中的重播周期也是最好的方法。作为替代方案,您也可以从KafkaSpout
继承并在您的子类中放置一个计数器。这当然有点类似于补丁,但可能不那么具有侵入性并且更容易实现。
如果您想使用 Bolt,您可以执行以下操作(这还需要对 KafkaSpout
或其子类进行一些更改)。
fieldsGrouping
在KafkaSpout
之后插入一个bolt(以确保重放的元组流式传输到同一个bolt 实例)。
在您的bolt 中,使用HashMap<ID,Counter>
来缓冲所有元组并计算(重新)尝试的次数。如果计数器小于您的阈值,则转发输入元组,以便它由后面的实际拓扑处理(当然,您需要适当地锚定元组)。如果计数大于您的阈值,请确认元组以中断循环并从HashMap
中删除其条目(您可能还希望记录所有失败的元组)。
为了从HashMap
中删除成功处理的元组,每次在KafkaSpout
中确认一个元组时,您需要将元组ID 转发给bolt,以便它可以从HashMap
中删除元组。只需为您的KafkaSpout
子类声明第二个输出流并覆盖Spout.ack(...)
(当然您需要调用super.ack(...)
以确保KafkaSpout
也得到确认)。
这种方法可能会消耗大量内存。作为在HashMap
中为每个元组创建一个条目的替代方法,您还可以使用第三个流(与其他两个流一样连接到螺栓),如果元组失败(即在@987654337 @)。每次,螺栓从第三个流中接收到“失败”消息,计数器就会增加。只要HashMap
中没有条目(或未达到阈值),bolt 就会简单地转发元组进行处理。这应该会减少使用的内存,但需要在 spout 和 bolt 中实现更多逻辑。
这两种方法都有缺点,每个 acked tuple 都会给新引入的 bolt 带来额外的消息(因此会增加网络流量)。对于第二种方法,您似乎只需要为之前失败的元组向螺栓发送“确认”消息。但是,您不知道哪些元组失败了,哪些没有。如果你想摆脱这种网络开销,你可以在KafkaSpout
中引入第二个HashMap
来缓冲失败消息的ID。因此,只有在成功重放失败的元组时,您才能发送“ack”消息。当然,这第三种方法使要实现的逻辑更加复杂。
如果不对KafkaSpout
进行某种程度的修改,我看不到您的问题的解决方案。我个人会修补KafkaSpout
或使用第三种方法,在KafkaSpout
子类和螺栓中使用HashMap
(因为与前两种解决方案相比,它消耗的内存很少,并且不会在网络上增加很多额外的负载)。
【讨论】:
Spout中的fail()方法是单线程调用的吗?我只是想确定是否需要 ConcurrentHashMap 来跟踪 msgIds-> errorCnt 还是简单的 HashMap 可以。谢谢nextTuple()
、ack()
和 fail()
由单个线程调用。使用HashMap
就足够了。有关详细信息,请参见此处:***.com/questions/32547935/…
另一件事,如果我有 N 个 spout,特定 msgId 的 fail() 方法是否会在 SAME 服务器/Spout 上调用?
我从未验证过,但我强烈认为是的。以上是关于Storm Kafka Spout 上的最大元组重播数的主要内容,如果未能解决你的问题,请参考以下文章
使用 Kafka Spout 的 Kafka Storm 集成
使用 Kafka Spout 的 Apache Storm 给出错误:IllegalStateException
storm问题记录 python 不断向kafka中写消息,spout做为消费者从kafka中读消息并emit给bolt,但是部分消息没有得到bolt的处理