Spring Kafka SeekToCurrentErrorHandler 找出失败的记录

Posted

技术标签:

【中文标题】Spring Kafka SeekToCurrentErrorHandler 找出失败的记录【英文标题】:Spring Kafka SeekToCurrentErrorHandler Find Out Which Record Has Failed 【发布时间】:2019-02-01 12:02:59 【问题描述】:

我已经使用KafkaHandler 实现了一个 Kafka 消费者。我的消费者应该消费事件,然后为每个事件向其他服务发送一个 REST 请求。我只想在该 REST 服务关闭时重试。否则,我可以忽略失败的事件。

我的容器工厂配置如下:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent>
  kafkaListenerContainerFactory() 

  ConcurrentKafkaListenerContainerFactory<String, MyCustomEvent> factory =
    new ConcurrentKafkaListenerContainerFactory<>();

  factory.setConsumerFactory(consumerFactory());
  factory.setStatefulRetry(true);
  factory.setRetryTemplate(retryTemplate());
  factory.setConcurrency(3);

  ContainerProperties containerProperties = factory.getContainerProperties();
  containerProperties.setAckOnError(false);
  containerProperties.setAckMode(AckMode.RECORD);
  containerProperties.setErrorHandler(new SeekToCurrentErrorHandler());

  return factory;

我使用ExceptionClassifierRetryPolicy 来设置异常和相应的重试策略。

重试后一切正常。当我收到ConnectException 时它会重试,当我收到IllegalArgumentException 时它会忽略。

然而,在IllegalArgumentException 场景中,SeekToCurrentErrorHandler 会回溯到未处理的偏移量(因为它会回溯包括失败消息在内的未处理消息),最终会立即重试失败的消息。消费者不断来回重试百万次。

如果我有机会了解SeekToCurrentErrorHandler 中的哪条记录失败,那么我将创建SeekToCurrentErrorHandler 的自定义实现来检查失败的消息是否可重试(通过使用thrownException 字段)。如果它不可重试,那么我会将其从records 列表中删除以进行查找。

关于如何实现此功能的任何想法?

注意:enable.auto.commit 设置为 falseauto.offset.reset 设置为 earliest

谢谢!

【问题讨论】:

为什么将 auto.offset.reset 设置为最早?我认为它应该在 PROD env 上设置为最新。 我只有 1 个消费组,所以没关系。您可以查看this 以获得更好的解释。 【参考方案1】:

自 Spring 以来就有一个 FailedRecordTracker 用于 Apache Kafka 2.2(尚未发布):

https://docs.spring.io/spring-kafka/docs/2.2.0.M2/reference/html/whats-new-part.html#_listener_container_changes

从 2.2 版开始,SeekToCurrentErrorHandler 现在可以恢复(跳过)一直失败的记录。默认情况下,在 10 次失败后,将记录失败的记录 (ERROR)。您可以使用自定义恢复器 (BiConsumer) 和/或最大故障数来配置处理程序。

SeekToCurrentErrorHandler errorHandler =
    new SeekToCurrentErrorHandler((record, exception) -> 
          // recover after 3 failures - e.g. send to a dead-letter topic
          , 3);

因此,您只需将FailedRecordTrackerSeekToCurrentErrorHandler 源代码从master 复制/粘贴到您的项目中,您将拥有您正在寻找的功能:

https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/FailedRecordTracker.java

https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekToCurrentErrorHandler.java

【讨论】:

谢谢@ArtemBilan,这看起来很有希望用于不可重试的错误。如果异常是不可重试异常之一,我可以将最大重试计数设置为 1 并记录失败(或将其发送到 DLT)。可重试的异常如何?我希望它们永远重试,所以我不想为它们设置最大失败限制。 在我的情况下,消费者的数量等于分区的数量。所以,现在,我假设records 列表的第一条记录将具有该主题的最小偏移量。如果异常不可重试,我会将其从记录列表中删除。 貌似spring kafka有个bug,如果并发级别小于分区数,consumer会永远重试消息。 看起来你在谈论这个:***.com/questions/57889424/… 删除偏移量不会给您带来任何价值:下一个成功的更大数字仍然会将所有内容提交到偏移量存储中。您可以考虑为ConcurrentKafkaListenerContainerFactory配置一个recoveryCallback,以便在排气后做一些逻辑。

以上是关于Spring Kafka SeekToCurrentErrorHandler 找出失败的记录的主要内容,如果未能解决你的问题,请参考以下文章

Spring Kafka 和 Spring Integration Kafka 的区别

kafka:spring集成 kafka(springboot集成客户端集成)

Spring生态研习:Spring-kafka

解决 spring boot 访问 docker kafka 失败

spring boot引入kafka

用 spring 管理 Kafka 主题