Spring Kafka - “ErrorHandler 抛出异常”并丢失了一些记录

Posted

技术标签:

【中文标题】Spring Kafka - “ErrorHandler 抛出异常”并丢失了一些记录【英文标题】:Spring Kafka - "ErrorHandler threw an exception" and lost some records 【发布时间】:2022-01-18 15:05:50 【问题描述】:

Consumer一次轮询2条记录,即:

    @Bean
    ConsumerFactory<String, String> consumerFactory() 
        Map<String, Object> config = Map.of(
                BOOTSTRAP_SERVERS_CONFIG, "localhost:9092",
                GROUP_ID_CONFIG, "my-consumers",
                AUTO_OFFSET_RESET_CONFIG, "earliest",
                MAX_POLL_RECORDS_CONFIG, 2);

        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new StringDeserializer());
    

ErrorHandler 可能无法处理错误记录:

class MyListenerErrorHandler implements ContainerAwareErrorHandler 

    @Override
    public void handle(Exception thrownException,
                       List<ConsumerRecord<?, ?>> records,
                       Consumer<?, ?> consumer,
                       MessageListenerContainer container) 
        simulateBugInErrorHandling(records.get(0));
        skipFailedRecord(); // seek offset+1, which never happens
    

    private void simulateBugInErrorHandling(ConsumerRecord<?, ?> record) 
        throw new NullPointerException(
                "DB transaction failed when saving info about failure on offset = " + record.offset());
    

那么这样的场景是可能的:

    主题获得 3 条记录 Consumer 一次轮询 2 条记录 MessageListener 由于负载错误,无法处理第一条记录 ErrorHandler 无法处理失败并且自身抛出异常,例如由于一些临时问题 第三条记录得到处理 从不处理第二条记录(从不输入MessageListener

ErrorHandler在上述情况下抛出异常时,如何确保没有未处理的记录?

我的目标是实现有延迟的有状态重试逻辑,但为简洁起见,我省略了负责跟踪失败记录和延迟重试的代码。


我希望在ErrorHandler 引发异常之后,不应该跳过整批记录。但确实如此。

    这是正确的行为吗? 我是否应该手动处理使用 Spring/Kafka 默认值的提交? 我应该使用不同的ErrorHandlerhandle 方法吗? (我需要访问Container 来创建pause() 用于延迟重试逻辑;不能使用Thread.sleep()

不知何故相关问题:https://github.com/spring-projects/spring-kafka/issues/1265

完整代码:https://github.com/ptomaszek/spring-kafka-error-handler

【问题讨论】:

【参考方案1】:

消费者必须重新定位(使用搜索),以便在失败后重新获取记录。

使用DefaultErrorHandler(2.8.x 及更高版本)或SeekToCurrentErrorHandler 与早期版本。

您可以添加重试选项和恢复器来处理失败的记录;默认情况下它只是被记录。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#default-eh

https://docs.spring.io/spring-kafka/docs/2.7.x/reference/html/#seek-to-current

在抛出任何异常之前,您需要先进行搜索(或在 finally 块中);如果错误处理程序抛出异常,容器不会提交偏移量。

Kafka 维护 2 个偏移量——当前提交的偏移量和当前位置(在消费者启动时设置为提交的偏移量)。下一次轮询总是返回最后一次轮询之后的下一条记录。除非执行了搜索。

默认错误处理程序会捕获恢复器抛出的任何异常,并确保当前(和后续)记录将由下一次轮询返回。见SeekUtils.doSeeks()

【讨论】:

我需要一个自定义的ErrorHandler。但是,它可以在使用seek 重新定位之前引发异常。然后似乎与records 的最新记录的偏移量无论如何都会与下一个poll 一起提交。最后,失败的记录(以及他第一包 records 中的剩余记录)永远不会被重新处理 您需要先进行搜索(或在 finally 块中),然后才能引发任何异常;它不提交偏移量; Kafka 维护 2 个偏移量——当前提交的偏移量和当前位置(在消费者启动时设置为提交的偏移量)。下一次轮询总是返回最后一次轮询之后的下一条记录。除非执行搜索。默认错误处理程序捕获恢复器抛出的任何异常,并确保当前(和后续)记录将由下一次轮询返回。见SeekUtils.doSeeks()

以上是关于Spring Kafka - “ErrorHandler 抛出异常”并丢失了一些记录的主要内容,如果未能解决你的问题,请参考以下文章

Spring Kafka 和 Spring Integration Kafka 的区别

kafka:spring集成 kafka(springboot集成客户端集成)

Spring生态研习:Spring-kafka

解决 spring boot 访问 docker kafka 失败

spring boot引入kafka

用 spring 管理 Kafka 主题