spring 总是需要 KafkaTemplate 吗?

Posted

技术标签:

【中文标题】spring 总是需要 KafkaTemplate 吗?【英文标题】:Does spring always require KafkaTemplate? 【发布时间】:2022-01-19 05:46:05 【问题描述】:

问题:springboot 是否总是需要创建 KafkaTemplate 类型的 bean? 下面的详细信息/堆栈跟踪/代码库,请告诉我我做错了什么。谢谢

    我一直在向一个 Spring Boot 项目的主题发布消息 为了创建回调机制,我使用了 org.apache.kafka.clients.producer.KafkaProducer.send(ProducerRecord, Callback) 来发送消息并创建回调 我这样做的原因是因为在使用 KafkaTemplate 时,listenablefuture 仅在失败时提供异常(并且我想在我的所有用例中将回调注册为单独的可重用类) 但是,当我没有定义 KafkaTemplate 类型的 bean 并出现以下错误时,spring 无法启动
原因:org.springframework.beans.factory.UnsatisfiedDependencyException:在类路径资源[org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]中定义的名称为“kafkaTemplate”的bean创建错误:通过方法“kafkaTemplate”表达的依赖关系不满足参数0;嵌套异常是 org.springframework.beans.factory.NoSuchBeanDefinitionException:没有“org.springframework.kafka.core.ProducerFactory”类型的合格 bean 可用:预计至少有 1 个有资格作为自动装配候选者的 bean。依赖注释: 在 org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:800) ~[spring-beans-5.3.12.jar:5.3.12] 在 org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:541) ~[spring-beans-5.3.12.jar:5.3.12] 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1352) ~[spring-beans-5.3.12.jar:5.3.12] 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1195) ~[spring-beans-5.3.12.jar:5.3.12] 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:582) ~[spring-beans-5.3.12.jar:5.3.12] 在 org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:542) ~[spring-beans-5.3.12.jar:5.3.12] 在 org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335) ~[spring-beans-5.3.12.jar:5.3.12] 在 org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234) ~[spring-beans-5.3.12.jar:5.3.12] 在 org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:333) ~[spring-beans-5.3.12.jar:5.3.12] 在 org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:208) ~[spring-beans-5.3.12.jar:5.3.12] 在 org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:276) ~[spring-beans-5.3.12.jar:5.3.12] 在 org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1380) ~[spring-beans-5.3.12.jar:5.3.12] 在 org.springframework.beans.factory.support.DefaultListableBeanFactory$DependencyObjectProvider.getIfUnique(DefaultListableBeanFactory.java:2063) ~[spring-beans-5.3.12.jar:5.3.12] 在 org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration.(KafkaAnnotationDrivenConfiguration.java:90) ~[spring-boot-autoconfigure-2.4.12.jar:2.4.12] 在 java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) ~[na:na] 在 java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ~[na:na] 在 java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) ~[na:na] 在 java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490) ~[na:na] 在 org.springframework.beans.BeanUtils.instantiateClass(BeanUtils.java:211) ~[spring-beans-5.3.12.jar:5.3.12] ...省略了22个常用框架 原因:org.springframework.beans.factory.NoSuchBeanDefinitionException:没有可用的“org.springframework.kafka.core.ProducerFactory”类型的合格bean:预计至少有1个有资格作为自动装配候选者的bean。依赖注释: 在 org.springframework.beans.factory.support.DefaultListableBeanFactory.raiseNoMatchingBeanFound(DefaultListableBeanFactory.java:1790) ~[spring-beans-5.3.12.jar:5.3.12] 在 org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1346) ~[spring-beans-5.3.12.jar:5.3.12] 在 org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1300) ~[spring-beans-5.3.12.jar:5.3.12] 在 org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:887) ~[spring-beans-5.3.12.jar:5.3.12] 在 org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:791) ~[spring-beans-5.3.12.jar:5.3.12] ...省略了40个常用框架

我的 Kafka 配置如下

  @Configuration
public class KafkaEventConfig 

    private final KafkaProperties kafkaProperties;

    @Value("$client.id")
    private String clientId;


    @Value("$topic.movie.name")
    private String movieTopicName;
    
    @Value("$retry.backoff.ms")
    private int retryBackoffMilliseconds;

    @Value("$request.timeout.ms")
    private int requestTimeoutMilliseconds;

    public KafkaEventConfig(KafkaProperties kafkaProperties) 
        this.kafkaProperties = kafkaProperties;
    

    @Bean
    public ProducerFactory<String, Movie> producerFactory() 
        Map<String, Object> props = kafkaProperties.buildProducerProperties();
        populateCommonProperties(props);
        return new DefaultKafkaProducerFactory<>(props);
    

    private void populateCommonProperties(Map<String, Object> props) 
        props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
        props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, retryBackoffMilliseconds);
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMilliseconds);
    
    
    @Bean
    public KafkaProducer<String, Movie> movieKafkaProducer() 
        return new KafkaProducer<String, Movie>(producerFactory().getConfigurationProperties());
    

    @Bean
    public KafkaProducerMonitor kafkaProducerMonitor(KafkaProducer<String, Movie> kafkaProducer,
            MeterRegistry registry) 
        return new KafkaProducerMonitor(kafkaProducer, registry, Tags.of("topic", movieTopicName));
    

我的 Kafka 回调如下

       @Slf4j 
public class KafkaProducerCallBack<K, V> implements Callback 

    private ProducerRecord<K, V> producerRecord;

    public KafkaProducerCallBack(ProducerRecord<K, V> producerRecord) 

        this.producerRecord = producerRecord;
    

    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) 
        String topicName= metadata.topic();
        long offset= metadata.offset();
        
        if (exception != null) 

            log.error("Failed to produce message [] to topic  with exception ", producerRecord, topicName, exception);
        

        else 

            log.info("Sucessfully published message [] to topic  to offset ", producerRecord, topicName , offset);
            
        

    


我是这样发布消息的

movieKafkaProducer.send(message, new KafkaProducerCallBack<String, Movie>(message));

请注意我在 KafkaEventConfig 中添加以下行的那一刻一切正常

@Bean
    public KafkaTemplate<String, Movie> movieKafkaTemplate() 
        return new KafkaTemplate<String, Movie>(producerFactory());
    

【问题讨论】:

不,它没有,但是你是手动配置 Kafka 的部分,Spring Boot 也会尝试做一些自动配置。如果禁用 Kafka 的自动配置,则不需要 KafkaTemplate。或者您利用自动配置并使用KafkaTemplate(可以由 Spring Boot 自动提供)来发送消息并简化您自己的配置。 谢谢,太棒了..这两个工作完美.. @EnableConfigurationProperties(KafkaProperties.class) @SpringBootApplication(exclude=KafkaAutoConfiguration.class) 【参考方案1】:

仔细查看您的异常堆栈跟踪会发现问题。

Error creating bean with name 'kafkaTemplate' defined in class path resource
 [org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.class]: 
Unsatisfied dependency expressed through method 'kafkaTemplate' parameter 0; nested exception is org.springframework.beans.factory.NoSuchBeanDefinitionException: No qualifying bean of type 'org.springframework.kafka.core.ProducerFactory' available: expected at least 1 bean which qualifies as autowire candidate. Dependency annotations: 

错误来自无法在 Spring Boot 中应用基于 Kafka 的自动配置。 KafkaAutoConfiguration 类期望并配置一些bean,如果找到某些bean,则退出。当您正在配置一些 bean 时,这将部分回退,因此无法自动配置 Kafka 类。

要修复,您可以排除KafkaAutoConfiguration。您可以在 @SpringBootApplication 注释中执行此操作,就像这样

@SpringBootApplication(exclude=KafkaAutoConfiguration.class

或者您可以利用自动配置,让 Spring Boot 进行配置,然后您使用提供的 KafkaTemplateProducerFactory 来做您想做的事情。

后者将简化您自己的配置。我对 Kafka 自动配置和您的用例知之甚少,无法提供更有用的代码 sn-p,但您应该能够自己弄清楚,或者只是排除 KafkaAutoConfiguration 并使用您现在拥有的.

【讨论】:

【参考方案2】:

除了@M.Deinum 提到的后者:

看看KafkaAutoConfiguration类:

    @Bean
    @ConditionalOnMissingBean(KafkaTemplate.class)
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
            ProducerListener<Object, Object> kafkaProducerListener,
            ObjectProvider<RecordMessageConverter> messageConverter) 
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactory);
        messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
        return kafkaTemplate;
    

如果您不创建自己的,Springboot 将为您创建一个 KafkaTemplate bean。这个自动配置的 bean 依赖于 ProducerFactory&lt;Object, Object&gt; bean,因为你声明了一个 ProducerFactory&lt;String, Movie&gt;。如您所见,类型不合适,这就是您收到错误的原因。


我这样做的原因是因为使用 KafkaTemplate 时可听的future 仅在失败时提供异常(并且我想在我的所有用例中将回调注册为一个单独的可重用类)

您的情况,您仍然可以获得使用KafkaTemplate 的优势。您可以实现自己的ProducerListener&lt;K, V&gt; 并将其绑定到您的KafkaTemple,而不是实现Callback。例如:

FullLoggingProducerListener.class

public class FullLoggingProducerListener<K, V> implements ProducerListener<K, V> 
    @Override
    public void onSuccess(ProducerRecord<K, V> record, RecordMetadata recordMetadata) 
        log.info("Successful!");
    

    @Override
    public void onError(ProducerRecord<K, V> record, @Nullable RecordMetadata recordMetadata, Exception exception) 
        log.error("Error!");
    

YourConfigration.class

    @Bean
    public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<String, Movie> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener) 
        KafkaTemplate<String, Movie> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
        kafkaTemplate.setProducerListener(kafkaProducerListener);
        return kafkaTemplate;
    

现在,每次您使用KafkaTemplate 发送记录时,您都会看到日志。

【讨论】:

以上是关于spring 总是需要 KafkaTemplate 吗?的主要内容,如果未能解决你的问题,请参考以下文章

spring-kafka整合:KafkaTemplate-kafka模板类介绍

Spring Kafka 异步发送调用块

Kafka问题 02KafkaTemplate 报错 Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected 问题解决

SpringBoot集成Kafka,实现简单的收发消息

spring-kafka整合:DefaultKafkaProducerFactory默认kafka生产者工厂介绍

我总是需要一个带有 Spring 集成的 Web 服务器吗?