Spring Integration 与 Kafka 自动配置问题

Posted

技术标签:

【中文标题】Spring Integration 与 Kafka 自动配置问题【英文标题】:Spring Integration with Kafka auto configuration issue 【发布时间】:2017-07-30 22:03:12 【问题描述】:

我正在尝试配置 Spring Boot 应用程序以使用 Kafka 消息。添加后:

<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.1.3.RELEASE</version>
</dependency>

进入我的依赖项并使用@EnableKafka@KafkaListener(topics = "some-topic") 注释,我收到以下错误:

...
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named 'kafkaListenerContainerFactory' available

然后我添加以下配置:

@Bean
public Map<String, Object> consumerConfigs() 

    Map<String, Object> propsMap = new HashMap<>();

    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
    propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return propsMap;


@Bean
public ConsumerFactory<String, String> consumerFactory() 

    return new DefaultKafkaConsumerFactory<>(consumerConfigs());


@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() 

    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;

错误消失了。但是,我认为我应该能够使用 spring.kafka.listener.* 属性自动配置它,正如文档所建议的那样。

如果我不能,我想使用自动连接的KafkaProperties。但是,为了能够使用它,我添加:

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-autoconfigure -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-autoconfigure</artifactId>
    <version>1.5.2.RELEASE</version>
</dependency>

然后就可以导入了。当我尝试如下使用它时:

@Autowired
private KafkaProperties kafkaProperties;

在我的方法中:

 return kafkaProperties.buildConsumerProperties();

我收到以下错误:

Caused by: java.lang.ClassNotFoundException: org.springframework.boot.context.annotation.DeterminableImports
.

我认为这是一个 Maven 依赖问题。

所以我的问题是:

    是否可以在不创建 @Beans 而仅使用 application.properties 的情况下配置 Kafka 配置? 如果没有,我如何跳过手动创建所需的Map 对象,而直接使用kafkaProperties.buildConsumerProperties() 而不会出现上述错误(第二个)?

【问题讨论】:

【参考方案1】:

转到http://start.spring.io并选择Kafka;他将为您构建一个项目框架,一切准备就绪

波姆:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>kafkademo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>kafkademo</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

道具...

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=myGroup

应用程序...

@SpringBootApplication
public class KafkademoApplication 

    public static void main(String[] args) throws Exception 
        ConfigurableApplicationContext context = SpringApplication.run(KafkademoApplication.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        context.close();
    

    @KafkaListener(topics="so8400in")
    public void listen(String in) 
        System.out.println(in);
    


编辑

我把监听器改成

@KafkaListener(topics="so8400in")
public void listen(String in) 
    System.out.println("Received message:" + in);

然后发送此消息...

$ kafka-console-producer --broker-list localhost:9092 --topic so8400in
test message for so42703487

...在控制台上...

Received message:test message for so42703487

如果您需要进一步证明,我可以在 GitHub 上发布该项目。

【讨论】:

这正是我的意思,这个配置是行不通的,除了上面提到的例外。 在我的回答中使用 pom 对我来说效果很好。如您所见,不需要@Beans。 您可以尝试使用@EnableKafka 注释吗?我相信您无法使用上述应用程序接收消息。 引导在自动配置过程中添加@EnableKafka。见here - 我可以向你保证上面的作品(我写的时候测试过)。 请看下面我的回答。我知道它有效,但这不是我问题的答案。无论如何,我都感谢您的帮助。【参考方案2】:

正如我所怀疑的那样,原来是 Maven 问题。基本上,我从事具有以下结构的多模块项目:

────parent
    ├───parent.pom
    ├───module1
    |   └───module1.pom
    └───module2
        └───module2.pom

我的parent.pom 有另一个parent 元素是:

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.2.RELEASE</version>
</parent>

基本上将上面的parentparent 替换为:

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-dependencies</artifactId>
            <type>pom</type>
            <version>1.5.2.RELEASE</version>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

按照here 的建议解决了所有问题(自动配置和能够使用KafkaProperties)。

【讨论】:

以上是关于Spring Integration 与 Kafka 自动配置问题的主要内容,如果未能解决你的问题,请参考以下文章

Spring Integration 与 Kafka 自动配置问题

Spring Integration DSL 过滤器与带有单个收件人和 DefaultOutputToParentFlow 的 RouteToRecipients

Spring Integration 反应式流支持

spring integration:如何从 Spring Controller 调用 Spring Integration?

RabbitMQ与Spring集成配置

将消息发送到套接字端口并使用Spring Integration接收响应