批处理模式下 Spring Boot kafka 中的错误处理

Posted

技术标签:

【中文标题】批处理模式下 Spring Boot kafka 中的错误处理【英文标题】:Error handling in SpringBoot kafka in Batch mode 【发布时间】:2021-08-31 12:55:40 【问题描述】:

我想弄清楚是否有任何方法可以在 Spring Boot Kafka 中以批处理模式发送死信主题中的失败记录。 我不想让记录重复发送,因为它是批量消耗的,而且很少有已经处理的。 我看到了spring-kafka consumer batch error handling with spring boot version 2.3.7 这个链接

我想到了一个用例来停止容器并在不使用 DLT 的情况下重新启动,但重复问题将再次出现在批处理模式下。

@Garry Russel 能否提供一个用于批量错误处理的小代码。

【问题讨论】:

【参考方案1】:

RetryingBatchErrorHandler 是在 spring-kafka 2.5 版(Boot 2.3 附带)中添加的。

侦听器必须抛出异常以指示批处理中的哪条记录失败(完整记录或列表中的索引)。

提交失败记录之前的记录偏移量,失败记录可以重试和/或发送到死信主题。

见https://docs.spring.io/spring-kafka/docs/current/reference/html/#recovering-batch-eh

那里有一个小例子。

RetryingBatchErrorHandler 是在 2.3.7 中添加的,但它会将整个批次发送到死信主题,这通常不是您想要的(因此我们添加了 RetryingBatchErrorHandler)。

【讨论】:

感谢加里,我现在可以完全理解了。虽然我仍然坚持如何在代码中正确配置它:(( 。这是否足以在配置级别进行设置 factory.setBatchErrorHandler(new RecoveringBatchErrorHandler()); 您可以简单地将错误处理程序定义为@Bean,Spring Boot 会自动将其连接到自动配置的工厂。如果您要定义自己的工厂,那么只需将其添加到那里即可。您很可能希望在错误处理程序中配置一个恢复器(例如DeadLetterPublishingRecoverer)和一个BackOff。默认值(如您评论中的那个)已由容器连接。它将重试 9 次,两次尝试之间没有后退。 感谢 Gary,它彻底消除了我的疑虑:))。 我问了一个新问题。如果您能对此 ***.com/questions/67992900/… 有所了解,将会很有帮助

以上是关于批处理模式下 Spring Boot kafka 中的错误处理的主要内容,如果未能解决你的问题,请参考以下文章

spring boot kafka 在将 testcontainers 与 kafka、zookeeper、模式注册表一起使用时因“代理可能不可用”而失败

Spring Boot 之 Kafka

项目系统中使用Spring boot集成kafka业务实现系统处理消费实例

如何在没有kafka服务器的情况下运行spring boot

总结kafka的consumer消费能力很低的情况下的处理方案

Kafka学习--spring boot 整合kafka