Kafkalistener SpringBoot 故障保护
Posted
技术标签:
【中文标题】Kafkalistener SpringBoot 故障保护【英文标题】:Kafkalistener SpringBoot failsafe 【发布时间】:2021-09-23 04:44:06 【问题描述】:我正在以 KafkaListener 作为我的客户运行 spring boot。问题是我们如何从失败的 kafka 配置中恢复并避免应用程序以Process finished with exit code 0
停止。
一个不正确配置的例子例如不正确的端点 URL。如果无法访问 Kafka 服务器,则将适用相同的情况。所以在任何情况下,KafkaListner 进程都不应该杀死服务器。
@Bean
open fun consumerFactory(): ConsumerFactory<String, String>
val deserializer = JsonDeserializer<Thing>()
deserializer.addTrustedPackages("de.data.Thing")
val props: MutableMap<String, Any> = HashMap()
val serverUrl = server.substringBefore(":")
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = server
props[ConsumerConfig.GROUP_ID_CONFIG] = "group"
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = "SASL_SSL"
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
props[ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG] = false
props[SaslConfigs.SASL_MECHANISM] = "PLAIN"
props[SaslConfigs.SASL_JAAS_CONFIG] = "org.apache.kafka.common.security.plain.PlainLoginModule required " +
"username=\"\$ConnectionString\" " +
"password=\"Endpoint=sb://$serverUrl/;" +
"SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=$sharedSecret\";"
return DefaultKafkaConsumerFactory(props,
StringDeserializer(), StringDeserializer())
@Bean
open fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String>?
val factory: ConcurrentKafkaListenerContainerFactory<String, String> = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory()
factory.setMessageConverter(BytesJsonMessageConverter())
return factory
@KafkaListener(topics = ["topic"],
groupId = "group",
containerFactory = "kafkaListenerContainerFactory",
)
fun listenThingsChanged(@Payload thing: Thing,
record: ConsumerRecord<String, String>,
@Headers headers: MessageHeaders)
....
org.springframework.context.ApplicationContextException: 无法启动 bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry';嵌套异常是 org.apache.kafka.common.KafkaException: Failed to construction kafka consumer 在 org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) 在 org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) 在 org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) 在 org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) 在 org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) 在 org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895) 在 org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554) 在 org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143) 在 org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758) 在 org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750) 在 org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) 在 org.springframework.boot.SpringApplication.run(SpringApplication.java:315) 在 org.springframework.boot.SpringApplication.run(SpringApplication.java:1237) 在 org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) 在 de.x.ServerAppKt.main(ServerApp.kt:11) 在 java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在 java.base/java.lang.reflect.Method.invoke(Method.java:566) 在 org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) 引起:org.apache.kafka.common.KafkaException: 无法构造kafka消费者 在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:825) 在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:631) 在 org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:340) 在 org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:308) 在 org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:293) 在 org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:267) 在 org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:241) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.(KafkaMessageListenerContainer.java:606) 在 org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:302) 在 org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) 在 org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204) 在 org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338) 在 org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312) 在 org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257) 在 org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ...省略了19个常用框架 引起:org.apache.kafka.common.config.ConfigException:在 bootstrap.servers 中没有给出可解析的引导 URL 在 org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89) 在 org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48) 在 org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:737) ...省略了33个常用框架
【问题讨论】:
你希望它做什么?如果配置错误,那就是错误的——它怎么可能从错误的配置中恢复? 配置在语法上是正确的。但它无法启动KafkaListener进程,因为服务器的url错误或无法访问。在这种情况下,我希望 KafkaListener 进程不会使整个服务器崩溃。因为我的服务器也完成了其他任务。 【参考方案1】:如果代理刚刚关闭,应用程序将正常启动(对于 2.3.4 之前的版本,您必须在容器属性上将 missingTopicsFatal
设置为 false(从那时起默认为 false)。
在...中没有给出可解析的引导 URL
这是致命的——无法恢复。
但是,您可以在@KafkaListener
或容器工厂上设置autoStartup=false
。
这将阻止 Spring 在应用程序初始化期间尝试启动容器。
然后您可以在 try/catch 块中自己启动容器。
【讨论】:
以上是关于Kafkalistener SpringBoot 故障保护的主要内容,如果未能解决你的问题,请参考以下文章
Kafka中@KafkaListener如何动态指定多个topic