Kafkalistener SpringBoot 故障保护

Posted

技术标签:

【中文标题】Kafkalistener SpringBoot 故障保护【英文标题】:Kafkalistener SpringBoot failsafe 【发布时间】:2021-09-23 04:44:06 【问题描述】:

我正在以 KafkaListener 作为我的客户运行 spring boot。问题是我们如何从失败的 kafka 配置中恢复并避免应用程序以Process finished with exit code 0 停止。 一个不正确配置的例子例如不正确的端点 URL。如果无法访问 Kafka 服务器,则将适用相同的情况。所以在任何情况下,KafkaListner 进程都不应该杀死服务器。

 @Bean
open fun consumerFactory(): ConsumerFactory<String, String> 
    val deserializer = JsonDeserializer<Thing>()
    deserializer.addTrustedPackages("de.data.Thing")

    val props: MutableMap<String, Any> = HashMap()
    val serverUrl = server.substringBefore(":")
    props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = server
    props[ConsumerConfig.GROUP_ID_CONFIG] = "group"
    props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    props[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SASL_SSL"
    props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
    props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
    props[SaslConfigs.SASL_MECHANISM] = "PLAIN"
    props[SaslConfigs.SASL_JAAS_CONFIG] = "org.apache.kafka.common.security.plain.PlainLoginModule required " +
            "username=\"\$ConnectionString\" " +
            "password=\"Endpoint=sb://$serverUrl/;" +
            "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=$sharedSecret\";"
    return DefaultKafkaConsumerFactory(props,
            StringDeserializer(), StringDeserializer())




@Bean
open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String>? 
    val factory: ConcurrentKafkaListenerContainerFactory<String, String> = ConcurrentKafkaListenerContainerFactory<String, String>()
    factory.consumerFactory = consumerFactory()
    factory.setMessageConverter(BytesJsonMessageConverter())
    return factory


 @KafkaListener(topics = ["topic"],
        groupId = "group",
        containerFactory = "kafkaListenerContainerFactory",
)
fun listenThingsChanged(@Payload thing: Thing,
                        record: ConsumerRecord<String, String>,
                        @Headers headers: MessageHeaders) 

    ....


 

org.springframework.context.ApplicationContextException: 无法启动 bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry';嵌套异常是 org.apache.kafka.common.KafkaException: Failed to construction kafka consumer 在 org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) 在 org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) 在 org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) 在 org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) 在 org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) 在 org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895) 在 org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554) 在 org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143) 在 org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) 在 org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) 在 org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) 在 org.springframework.boot.SpringApplication.run(SpringApplication.java:315) 在 org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) 在 org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) 在 de.x.ServerAppKt.main(ServerApp.kt:11) 在 java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.base/java.lang.reflect.Method.invoke(Method.java:566) 在 org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) 引起:org.apache.kafka.common.KafkaException: 无法构造kafka消费者 在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825) 在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631) 在 org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:340) 在 org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:308) 在 org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:293) 在 org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:267) 在 org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:241) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.(KafkaMessageListenerContainer.java:606) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:302) 在 org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) 在 org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204) 在 org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) 在 org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312) 在 org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257) 在 org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ...省略了19个常用框架 引起:org.apache.kafka.common.config.ConfigException:在 bootstrap.servers 中没有给出可解析的引导 URL 在 org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89) 在 org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48) 在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:737) ...省略了33个常用框架

【问题讨论】:

你希望它做什么?如果配置错误,那就是错误的——它怎么可能从错误的配置中恢复? 配置在语法上是正确的。但它无法启动KafkaListener进程,因为服务器的url错误或无法访问。在这种情况下,我希望 KafkaListener 进程不会使整个服务器崩溃。因为我的服务器也完成了其他任务。 【参考方案1】:

如果代理刚刚关闭,应用程序将正常启动(对于 2.3.4 之前的版本,您必须在容器属性上将 missingTopicsFatal 设置为 false(从那时起默认为 false)。

在...中没有给出可解析的引导 URL

这是致命的——无法恢复。

但是,您可以在@KafkaListener 或容器工厂上设置autoStartup=false

这将阻止 Spring 在应用程序初始化期间尝试启动容器。

然后您可以在 try/catch 块中自己启动容器。

【讨论】:

以上是关于Kafkalistener SpringBoot 故障保护的主要内容,如果未能解决你的问题,请参考以下文章

浅谈@KafkaListener工作流程

Kafka中@KafkaListener如何动态指定多个topic

Spring @KafkaListener 和并发

@KafkaListener、@StreamListener 和 @ServiceActivator 的区别?

如何为@KafkaListener 编写单元测试?

@KafkaListener每次都从头开始阅读