批处理模式下 Spring Boot kafka 中的错误处理
Posted
技术标签:
【中文标题】批处理模式下 Spring Boot kafka 中的错误处理【英文标题】:Error handling in SpringBoot kafka in Batch mode 【发布时间】:2021-08-31 12:55:40 【问题描述】:我想弄清楚是否有任何方法可以在 Spring Boot Kafka 中以批处理模式发送死信主题中的失败记录。 我不想让记录重复发送,因为它是批量消耗的,而且很少有已经处理的。 我看到了spring-kafka consumer batch error handling with spring boot version 2.3.7 这个链接
我想到了一个用例来停止容器并在不使用 DLT 的情况下重新启动,但重复问题将再次出现在批处理模式下。
@Garry Russel 能否提供一个用于批量错误处理的小代码。
【问题讨论】:
【参考方案1】:RetryingBatchErrorHandler
是在 spring-kafka 2.5 版(Boot 2.3 附带)中添加的。
侦听器必须抛出异常以指示批处理中的哪条记录失败(完整记录或列表中的索引)。
提交失败记录之前的记录偏移量,失败记录可以重试和/或发送到死信主题。
见https://docs.spring.io/spring-kafka/docs/current/reference/html/#recovering-batch-eh
那里有一个小例子。
RetryingBatchErrorHandler
是在 2.3.7 中添加的,但它会将整个批次发送到死信主题,这通常不是您想要的(因此我们添加了 RetryingBatchErrorHandler
)。
【讨论】:
感谢加里,我现在可以完全理解了。虽然我仍然坚持如何在代码中正确配置它:(( 。这是否足以在配置级别进行设置 factory.setBatchErrorHandler(new RecoveringBatchErrorHandler()); 您可以简单地将错误处理程序定义为@Bean
,Spring Boot 会自动将其连接到自动配置的工厂。如果您要定义自己的工厂,那么只需将其添加到那里即可。您很可能希望在错误处理程序中配置一个恢复器(例如DeadLetterPublishingRecoverer
)和一个BackOff
。默认值(如您评论中的那个)已由容器连接。它将重试 9 次,两次尝试之间没有后退。
感谢 Gary,它彻底消除了我的疑虑:))。
我问了一个新问题。如果您能对此 ***.com/questions/67992900/… 有所了解,将会很有帮助以上是关于批处理模式下 Spring Boot kafka 中的错误处理的主要内容,如果未能解决你的问题,请参考以下文章
spring boot kafka 在将 testcontainers 与 kafka、zookeeper、模式注册表一起使用时因“代理可能不可用”而失败
项目系统中使用Spring boot集成kafka业务实现系统处理消费实例
如何在没有kafka服务器的情况下运行spring boot