Spring Kafka - 手动确认

Posted

技术标签:

【中文标题】Spring Kafka - 手动确认【英文标题】:Spring Kafka - Manual acknowledgement 【发布时间】:2020-06-29 00:00:41 【问题描述】:

我有一个 spring-boot 应用程序,它侦听 Kafka 流并将记录发送到某个服务以进行进一步处理。该服务有时可能会失败。 cmets 中提到了异常情况。截至目前,我自己模拟了服务成功和异常场景。

监听代码:

@Autowired
PlanitService service

@KafkaListener(
        topics = "$app.topic",
        groupId = "notifGrp", 
        containerFactory = "storeKafkaListener")
public void processStoreNotify(StoreNotify store) throws RefrigAlarmNotifyException
       service.planitStoreNotification(store);

       // Some other logic which throws custom exception
       // RefrigAlarmNotifyException


    

消费者工厂配置如下:

@Bean
    public ConsumerFactory<String, StoreNotify> storeConsumerFactory() 
        Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getConsumerBootstrapServers());
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "notifGrp");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        try (ErrorHandlingDeserializer2<String> headerErrorHandlingDeserializer = new ErrorHandlingDeserializer2<>(
                new StringDeserializer());
                ErrorHandlingDeserializer2<StoreNotify> errorHandlingDeserializer = new ErrorHandlingDeserializer2<>(
                        new JsonDeserializer<>(StoreNotify.class, objectMapper()))) 
            return new DefaultKafkaConsumerFactory<>(config, headerErrorHandlingDeserializer,
                    errorHandlingDeserializer);
        
    

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, StoreNotify> storeKafkaListener() 
        ConcurrentKafkaListenerContainerFactory<String, StoreNotify> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(storeConsumerFactory());
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setAckMode(AckMode.RECORD);
        //factory.setMessageConverter(new ByteArrayJsonMessageConverter());     

        DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
                (r, e) -> 

                    LOGGER.error("Exception is of type: ", e);
                    if (e instanceof RestClientException) 
                        LOGGER.error("RestClientException while processing  ", r.value(), e);
                        return new TopicPartition(storeDeadLtrTopic, r.partition());
                    
                    else 
                        LOGGER.error("Generic exception while processing  ", r.value(), e);
                        return new TopicPartition(storeErrorTopic, r.partition());
                    
                );
        factory.setErrorHandler(new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 0L)));
        return factory;
    

由于 REST 服务正在抛出 RestClientException,它应该进入上面提到的 if 块。关于 FixedBackOff,我不希望 SeekToCurrentErrorHandler 进行重试处理,所以我将第二个参数作为 0l 传递。我只是希望它发送具有指定主题的记录。如果我错了,请纠正我 异常堆栈跟踪是

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void com.demo.ran.consumer.StoreKafkaConsumer.processStoreNotifMessage(com.demo.ran.model.StoreNotify) throws com.demo.ran.exception.RefrigAlarmNotifyException' threw exception; nested exception is org.springframework.web.client.RestClientException: Service exception; nested exception is org.springframework.web.client.RestClientException: Service exception
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1742) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1730) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1647) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1577) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1485) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1235) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:985) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:905) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_241]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_241]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_241]
Caused by: org.springframework.web.client.RestClientException: Service exception
    at com.demo.ran.service.PlanitService.planitStoreNotification(PlanitService.java:53) ~[classes/:na]
    at com.demo.ran.consumer.StoreKafkaConsumer.processStoreNotifMessage(StoreKafkaConsumer.java:48) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_241]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_241]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_241]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_241]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120) ~[spring-messaging-5.2.3.RELEASE.jar:5.2.3.RELEASE]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:326) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1696) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1679) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1634) ~[spring-kafka-2.3.5.RELEASE.jar:2.3.5.RELEASE]
    ... 8 common frames omitted

【问题讨论】:

【参考方案1】:

您不需要为此用例使用手动确认;只需配置一个SeekToCurrentErrorHandler 并将异常抛出到容器中;它将丢弃未处理的记录,执行搜索并重新传递失败的消息。

见the documentation。

您可以使用DeadLetterPublishingRecoverer 配置错误处理程序,可用于在重试几次后将记录发送到死信主题。

您可以配置哪些异常是可重试的。

         catch (Exception exception) 
            LOGGER.error("Exception while calling the service  ", exception);
            // Ignore the record
        

你不能像那样“吃掉”异常,让它传播到容器中。

使用手动确认时,必须将Acknowledgment作为参数添加并确认。

【讨论】:

感谢您的信息。我将通过这种方法。知道为什么现有代码不起作用。在上面的代码中,我忘记将 Acknowledgement 作为参数,但在实际代码中它就在那里。 由于您正在捕获异常,因此容器对此一无所知。即使您抛出异常,默认错误处理程序也只会记录异常并继续。 是的,这很有道理。 见the documentation。如果添加SeekToCurrentErrorHandler,您可以配置 1) 根据异常类型重试多少次,以及 2) 在进行下一次传递尝试之前延迟多长时间。这允许您控制哪些异常是致命的并且不应重试。默认情况下,有 10 次交付尝试,没有回退。默认情况下,某些异常被认为是致命的。再次,请参阅参考手册。当重试用尽时,您可以记录或发送到另一个主题。 ***异常是ListenerExecutionFailedException你需要if (e.getCause() instanceof RestClientException)

以上是关于Spring Kafka - 手动确认的主要内容,如果未能解决你的问题,请参考以下文章

Spring Kafka 再平衡说明

Spring AMQP 源码分析 06 - 手动消息确认

设置手动确认 SQS 消息的 Spring Cloud AWS 问题

Spring Cloud Stream功能手册确认 - KafkaHeaders.ACKNOWLEDGMENT不可用。

kafka 提交offset

如何配置RabbitMQ(在Spring Boot 2.x中),以便手动确认工作