Spring Kafka SeekToCurrentErrorHandler 找出失败的记录
Posted
技术标签:
【中文标题】Spring Kafka SeekToCurrentErrorHandler 找出失败的记录【英文标题】:Spring Kafka SeekToCurrentErrorHandler Find Out Which Record Has Failed 【发布时间】:2019-02-01 12:02:59 【问题描述】:我已经使用KafkaHandler
实现了一个 Kafka 消费者。我的消费者应该消费事件,然后为每个事件向其他服务发送一个 REST 请求。我只想在该 REST 服务关闭时重试。否则,我可以忽略失败的事件。
我的容器工厂配置如下:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent>
kafkaListenerContainerFactory()
ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setStatefulRetry(true);
factory.setRetryTemplate(retryTemplate());
factory.setConcurrency(3);
ContainerProperties containerProperties = factory.getContainerProperties();
containerProperties.setAckOnError(false);
containerProperties.setAckMode(AckMode.RECORD);
containerProperties.setErrorHandler(new SeekToCurrentErrorHandler());
return factory;
我使用ExceptionClassifierRetryPolicy
来设置异常和相应的重试策略。
重试后一切正常。当我收到ConnectException
时它会重试,当我收到IllegalArgumentException
时它会忽略。
然而,在IllegalArgumentException
场景中,SeekToCurrentErrorHandler
会回溯到未处理的偏移量(因为它会回溯包括失败消息在内的未处理消息),最终会立即重试失败的消息。消费者不断来回重试百万次。
如果我有机会了解SeekToCurrentErrorHandler
中的哪条记录失败,那么我将创建SeekToCurrentErrorHandler
的自定义实现来检查失败的消息是否可重试(通过使用thrownException
字段)。如果它不可重试,那么我会将其从records
列表中删除以进行查找。
关于如何实现此功能的任何想法?
注意:enable.auto.commit
设置为 false
,auto.offset.reset
设置为 earliest
。
谢谢!
【问题讨论】:
为什么将 auto.offset.reset 设置为最早?我认为它应该在 PROD env 上设置为最新。 我只有 1 个消费组,所以没关系。您可以查看this 以获得更好的解释。 【参考方案1】:自 Spring 以来就有一个 FailedRecordTracker
用于 Apache Kafka 2.2
(尚未发布):
https://docs.spring.io/spring-kafka/docs/2.2.0.M2/reference/html/whats-new-part.html#_listener_container_changes
从 2.2 版开始,
SeekToCurrentErrorHandler
现在可以恢复(跳过)一直失败的记录。默认情况下,在 10 次失败后,将记录失败的记录 (ERROR)。您可以使用自定义恢复器 (BiConsumer
) 和/或最大故障数来配置处理程序。
SeekToCurrentErrorHandler errorHandler =
new SeekToCurrentErrorHandler((record, exception) ->
// recover after 3 failures - e.g. send to a dead-letter topic
, 3);
因此,您只需将FailedRecordTracker
和SeekToCurrentErrorHandler
源代码从master
复制/粘贴到您的项目中,您将拥有您正在寻找的功能:
https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java
https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java
【讨论】:
谢谢@ArtemBilan,这看起来很有希望用于不可重试的错误。如果异常是不可重试异常之一,我可以将最大重试计数设置为 1 并记录失败(或将其发送到 DLT)。可重试的异常如何?我希望它们永远重试,所以我不想为它们设置最大失败限制。 在我的情况下,消费者的数量等于分区的数量。所以,现在,我假设records
列表的第一条记录将具有该主题的最小偏移量。如果异常不可重试,我会将其从记录列表中删除。
貌似spring kafka有个bug,如果并发级别小于分区数,consumer会永远重试消息。
看起来你在谈论这个:***.com/questions/57889424/…
删除偏移量不会给您带来任何价值:下一个成功的更大数字仍然会将所有内容提交到偏移量存储中。您可以考虑为ConcurrentKafkaListenerContainerFactory
配置一个recoveryCallback
,以便在排气后做一些逻辑。以上是关于Spring Kafka SeekToCurrentErrorHandler 找出失败的记录的主要内容,如果未能解决你的问题,请参考以下文章
Spring Kafka 和 Spring Integration Kafka 的区别
kafka:spring集成 kafka(springboot集成客户端集成)