Spring Kafka - “ErrorHandler 抛出异常”并丢失了一些记录
Posted
技术标签:
【中文标题】Spring Kafka - “ErrorHandler 抛出异常”并丢失了一些记录【英文标题】:Spring Kafka - "ErrorHandler threw an exception" and lost some records 【发布时间】:2022-01-18 15:05:50 【问题描述】:让Consumer
一次轮询2条记录,即:
@Bean
ConsumerFactory<String, String> consumerFactory()
Map<String, Object> config = Map.of(
BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
GROUP_ID_CONFIG, "my-consumers",
AUTO_OFFSET_RESET_CONFIG, "earliest",
MAX_POLL_RECORDS_CONFIG, 2);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new StringDeserializer());
和ErrorHandler
可能无法处理错误记录:
class MyListenerErrorHandler implements ContainerAwareErrorHandler
@Override
public void handle(Exception thrownException,
List<ConsumerRecord<?, ?>> records,
Consumer<?, ?> consumer,
MessageListenerContainer container)
simulateBugInErrorHandling(records.get(0));
skipFailedRecord(); // seek offset+1, which never happens
private void simulateBugInErrorHandling(ConsumerRecord<?, ?> record)
throw new NullPointerException(
"DB transaction failed when saving info about failure on offset = " + record.offset());
那么这样的场景是可能的:
-
主题获得 3 条记录
Consumer
一次轮询 2 条记录
MessageListener
由于负载错误,无法处理第一条记录
ErrorHandler
无法处理失败并且自身抛出异常,例如由于一些临时问题
第三条记录得到处理
从不处理第二条记录(从不输入MessageListener
)
当ErrorHandler
在上述情况下抛出异常时,如何确保没有未处理的记录?
我的目标是实现有延迟的有状态重试逻辑,但为简洁起见,我省略了负责跟踪失败记录和延迟重试的代码。
我希望在ErrorHandler
引发异常之后,不应该跳过整批记录。但确实如此。
-
这是正确的行为吗?
我是否应该手动处理使用 Spring/Kafka 默认值的提交?
我应该使用不同的
ErrorHandler
或handle
方法吗? (我需要访问Container
来创建pause()
用于延迟重试逻辑;不能使用Thread.sleep()
)
不知何故相关问题:https://github.com/spring-projects/spring-kafka/issues/1265
完整代码:https://github.com/ptomaszek/spring-kafka-error-handler
【问题讨论】:
【参考方案1】:消费者必须重新定位(使用搜索),以便在失败后重新获取记录。
使用DefaultErrorHandler
(2.8.x 及更高版本)或SeekToCurrentErrorHandler
与早期版本。
您可以添加重试选项和恢复器来处理失败的记录;默认情况下它只是被记录。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#default-eh
https://docs.spring.io/spring-kafka/docs/2.7.x/reference/html/#seek-to-current
在抛出任何异常之前,您需要先进行搜索(或在 finally 块中);如果错误处理程序抛出异常,容器不会提交偏移量。
Kafka 维护 2 个偏移量——当前提交的偏移量和当前位置(在消费者启动时设置为提交的偏移量)。下一次轮询总是返回最后一次轮询之后的下一条记录。除非执行了搜索。
默认错误处理程序会捕获恢复器抛出的任何异常,并确保当前(和后续)记录将由下一次轮询返回。见SeekUtils.doSeeks()
。
【讨论】:
我需要一个自定义的ErrorHandler
。但是,它可以在使用seek
重新定位之前引发异常。然后似乎与records
的最新记录的偏移量无论如何都会与下一个poll
一起提交。最后,失败的记录(以及他第一包 records
中的剩余记录)永远不会被重新处理
您需要先进行搜索(或在 finally 块中),然后才能引发任何异常;它不提交偏移量; Kafka 维护 2 个偏移量——当前提交的偏移量和当前位置(在消费者启动时设置为提交的偏移量)。下一次轮询总是返回最后一次轮询之后的下一条记录。除非执行搜索。默认错误处理程序捕获恢复器抛出的任何异常,并确保当前(和后续)记录将由下一次轮询返回。见SeekUtils.doSeeks()
。以上是关于Spring Kafka - “ErrorHandler 抛出异常”并丢失了一些记录的主要内容,如果未能解决你的问题,请参考以下文章
Spring Kafka 和 Spring Integration Kafka 的区别
kafka:spring集成 kafka(springboot集成客户端集成)