Storm Kafka Spout 上的最大元组重播数

Posted

技术标签:

【中文标题】Storm Kafka Spout 上的最大元组重播数【英文标题】:Max number of tuple replays on Storm Kafka Spout 【发布时间】:2015-12-30 22:49:57 【问题描述】:

我们将 Storm 与 Kafka Spout 一起使用。当消息失败时,我们希望重播它们,但在某些情况下,错误的数据或代码错误会导致消息总是失败 Bolt,因此我们将进入无限重播循环。显然,当我们发现错误时,我们正在修复它们,但希望我们的拓扑通常具有容错性。重放 N 次以上后,我们如何 ack() 一个元组?

查看 Kafka Spout 的代码,我发现它旨在使用指数退避计时器和 comments on the PR 状态重试:

“spout 不会终止重试周期(我认为它不应该这样做,因为它无法报告有关发生中止请求的失败的上下文),它只处理延迟重试。一个螺栓仍然期望拓扑最终会调用 ack() 而不是 fail() 来停止循环。”

我看到了建议编写自定义 spout 的 *** 响应,但如果有推荐的方法在 Bolt 中执行此操作,我宁愿不要被困在维护 Kafka Spout 内部的自定义补丁。

在 Bolt 中执行此操作的正确方法是什么?我在元组中没有看到任何状态会显示它被重放了多少次。

【问题讨论】:

如果您在螺栓中有一些错误检查,您可以根据您的业务逻辑得出特定元组“坏”的结论,您可以“确认”而不是失败......所以它会不能重播..... 【参考方案1】:

据我所知,Storm 不为此提供内置支持。

我已经应用了下面提到的实现:

public class AuditMessageWriter extends BaseBolt 

        private static final long serialVersionUID = 1L;
        Map<Object, Integer> failedTuple = new HashMap<>();

        public AuditMessageWriter() 

        

        /**
         * @inheritDoc
         */
        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) 
            this.collector = collector;
            //any initialization if u want
        

        /**
         * @inheritDoc
         */
        @Override
        public void execute(Tuple input) 
            try 

            //Write your processing logic
            collector.ack(input);
             catch (Exception e2) 
            //In case of any exception save the tuple in failedTuple map with a count 1
            //Before adding the tuple in failedTuple map check the count and increase it and fail the tuple

            //if failure count reaches the limit (message reprocess limit) log that and remove from map and acknowledge the tuple
            log(input);
            ExceptionHandler.LogError(e2, "Message IO Exception");
            

        

        void log(Tuple input) 

            try 
                //Here u can pass result to dead queue or log that
//And ack the tuple 
             catch (Exception e) 
                ExceptionHandler.LogError(e, "Exception while logging");
            
        

        @Override
        public void cleanup() 
            // To declare output fields.Not required in this alert.
        

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) 
            // To declare output fields.Not required in this alert.
        

        @Override
        public Map<String, Object> getComponentConfiguration() 
            return null;
        

    

【讨论】:

【参考方案2】:

我们只是让螺栓在错误流上发出错误元组并确认它。另一个 Bolt 通过将错误写回专门针对错误的 Kafka 主题来处理错误。这使我们能够轻松地通过拓扑引导正常与错误数据流。

元组失败的唯一情况是因为某些必需的资源处于脱机状态,例如网络连接、数据库……这些都是可重试的错误。任何其他内容都被定向到错误流以进行适当的修复或处理。

当然,这一切都假设您不想造成任何数据丢失。如果您只想尽最大努力并在重试几次后忽略,那么我会考虑其他选项。

【讨论】:

【参考方案3】:

我们也面临着类似的数据,其中有错误数据导致螺栓无限失效。

为了在运行时解决这个问题,我们又引入了一个螺栓,将其命名为“DebugBolt”以供参考。因此,spout 首先将消息发送到这个bolt,然后这个bolts 对坏消息进行所需的数据修复,然后将它们发送到所需的bolt。这样就可以即时修复数据错误。

另外,如果您需要删除一些消息,您实际上可以将一个 ignoreFlag 从您的 DebugBolt 传递给您的原始 Bolt,如果 ignoreFlag 为 True,您的原始 Bolt 应该只发送一个 ack 到 spout 而不进行处理。

【讨论】:

【参考方案4】:

基本上是这样工作的:

    如果您部署拓扑,它们应该是生产级的(也就是说,需要一定的质量水平,并且元组的数量很少)。 如果元组失败,请检查元组是否真正有效。 如果元组有效(例如,由于无法连接到外部数据库而未能插入,或类似情况),请回复。 如果元组格式错误并且永远无法处理(例如,数据库 id 是文本并且数据库需要整数)它应该是ack,您将永远无法修复此类问题或插入进入数据库。 应记录新类型的异常(以及元组内容本身)。您应该检查这些日志并生成规则以在将来验证元组。并最终添加代码以在未来正确处理它们 (ETL)。 不要记录所有内容,否则您的日志文件会很大,请谨慎选择要记录的内容。日志文件的内容应该是有用的,而不是一堆垃圾。 继续这样做,最终您将只涵盖所有情况。

【讨论】:

【参考方案5】:

Storm 本身不为您的问题提供任何支持。因此,定制的解决方案是唯一的出路。即使您不想修补KafkaSpout,我认为引入一个计数器并打破其中的重播周期也是最好的方法。作为替代方案,您也可以从KafkaSpout 继承并在您的子类中放置一个计数器。这当然有点类似于补丁,但可能不那么具有侵入性并且更容易实现。

如果您想使用 Bolt,您可以执行以下操作(这还需要对 KafkaSpout 或其子类进行一些更改)。

为每个元组分配一个唯一 ID 作为附加属性(也许,已经有一个唯一 ID 可用;否则,您可以引入一个“计数器 ID”或只引入整个元组,即所有属性,以标识每个元组)。 在 ID 上通过fieldsGroupingKafkaSpout 之后插入一个bolt(以确保重放的元组流式传输到同一个bolt 实例)。 在您的bolt 中,使用HashMap&lt;ID,Counter&gt; 来缓冲所有元组并计算(重新)尝试的次数。如果计数器小于您的阈值,则转发输入元组,以便它由后面的实际拓扑处理(当然,您需要适当地锚定元组)。如果计数大于您的阈值,请确认元组以中断循环并从HashMap 中删除其条目(您可能还希望记录所有失败的元组)。 为了从HashMap 中删除成功处理的元组,每次在KafkaSpout 中确认一个元组时,您需要将元组ID 转发给bolt,以便它可以从HashMap 中删除元组。只需为您的KafkaSpout 子类声明第二个输出流并覆盖Spout.ack(...)(当然您需要调用super.ack(...) 以确保KafkaSpout 也得到确认)。

这种方法可能会消耗大量内存。作为在HashMap 中为每个元组创建一个条目的替代方法,您还可以使用第三个流(与其他两个流一样连接到螺栓),如果元组失败(即在@987654337 @)。每次,螺栓从第三个流中接收到“失败”消息,计数器就会增加。只要HashMap 中没有条目(或未达到阈值),bolt 就会简单地转发元组进行处理。这应该会减少使用的内存,但需要在 spout 和 bolt 中实现更多逻辑。

这两种方法都有缺点,每个 acked tuple 都会给新引入的 bolt 带来额外的消息(因此会增加网络流量)。对于第二种方法,您似乎只需要为之前失败的元组向螺栓发送“确认”消息。但是,您不知道哪些元组失败了,哪些没有。如果你想摆脱这种网络开销,你可以在KafkaSpout 中引入第二个HashMap 来缓冲失败消息的ID。因此,只有在成功重放失败的元组时,您才能发送“ack”消息。当然,这第三种方法使要实现的逻辑更加复杂。

如果不对KafkaSpout 进行某种程度的修改,我看不到您的问题的解决方案。我个人会修补KafkaSpout 或使用第三种方法,在KafkaSpout 子类和螺栓中使用HashMap(因为与前两种解决方案相比,它消耗的内存很少,并且不会在网络上增加很多额外的负载)。

【讨论】:

Spout中的fail()方法是单线程调用的吗?我只是想确定是否需要 ConcurrentHashMap 来跟踪 msgIds-> errorCnt 还是简单的 HashMap 可以。谢谢 nextTuple()ack()fail() 由单个线程调用。使用HashMap 就足够了。有关详细信息,请参见此处:***.com/questions/32547935/… 另一件事,如果我有 N 个 spout,特定 msgId 的 fail() 方法是否会在 SAME 服务器/Spout 上调用? 我从未验证过,但我强烈认为是的。

以上是关于Storm Kafka Spout 上的最大元组重播数的主要内容,如果未能解决你的问题,请参考以下文章

使用 Kafka Spout 的 Kafka Storm 集成

使用 Kafka Spout 的 Apache Storm 给出错误:IllegalStateException

Storm Ui错误kafka spout,不使用HDP

jstorm在使用kafka作为spout的时候多线程问题

storm问题记录 python 不断向kafka中写消息,spout做为消费者从kafka中读消息并emit给bolt,但是部分消息没有得到bolt的处理

Kafka和Storm环境下如何实现多租户?