Spring Integration 与 Kafka 自动配置问题
Posted
技术标签:
【中文标题】Spring Integration 与 Kafka 自动配置问题【英文标题】:Spring Integration with Kafka auto configuration issue 【发布时间】:2017-07-30 22:03:12 【问题描述】:我正在尝试配置 Spring Boot 应用程序以使用 Kafka 消息。添加后:
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.3.RELEASE</version>
</dependency>
进入我的依赖项并使用@EnableKafka
和@KafkaListener(topics = "some-topic")
注释,我收到以下错误:
...
Caused by: org.springframework.beans.factory.NoSuchBeanDefinitionException: No bean named 'kafkaListenerContainerFactory' available
然后我添加以下配置:
@Bean
public Map<String, Object> consumerConfigs()
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return propsMap;
@Bean
public ConsumerFactory<String, String> consumerFactory()
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory()
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
错误消失了。但是,我认为我应该能够使用 spring.kafka.listener.*
属性自动配置它,正如文档所建议的那样。
如果我不能,我想使用自动连接的KafkaProperties
。但是,为了能够使用它,我添加:
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-autoconfigure -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
<version>1.5.2.RELEASE</version>
</dependency>
然后就可以导入了。当我尝试如下使用它时:
@Autowired
private KafkaProperties kafkaProperties;
在我的方法中:
return kafkaProperties.buildConsumerProperties();
我收到以下错误:
Caused by: java.lang.ClassNotFoundException: org.springframework.boot.context.annotation.DeterminableImports
.
我认为这是一个 Maven 依赖问题。
所以我的问题是:
-
是否可以在不创建
@Bean
s 而仅使用 application.properties
的情况下配置 Kafka 配置?
如果没有,我如何跳过手动创建所需的Map
对象,而直接使用kafkaProperties.buildConsumerProperties()
而不会出现上述错误(第二个)?
【问题讨论】:
【参考方案1】:转到http://start.spring.io并选择Kafka;他将为您构建一个项目框架,一切准备就绪
波姆:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>kafkademo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafkademo</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.2.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
道具...
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=myGroup
应用程序...
@SpringBootApplication
public class KafkademoApplication
public static void main(String[] args) throws Exception
ConfigurableApplicationContext context = SpringApplication.run(KafkademoApplication.class, args);
System.out.println("Hit enter to terminate");
System.in.read();
context.close();
@KafkaListener(topics="so8400in")
public void listen(String in)
System.out.println(in);
编辑
我把监听器改成
@KafkaListener(topics="so8400in")
public void listen(String in)
System.out.println("Received message:" + in);
然后发送此消息...
$ kafka-console-producer --broker-list localhost:9092 --topic so8400in
test message for so42703487
...在控制台上...
Received message:test message for so42703487
如果您需要进一步证明,我可以在 GitHub 上发布该项目。
【讨论】:
这正是我的意思,这个配置是行不通的,除了上面提到的例外。 在我的回答中使用 pom 对我来说效果很好。如您所见,不需要@Bean
s。
您可以尝试使用@EnableKafka
注释吗?我相信您无法使用上述应用程序接收消息。
引导在自动配置过程中添加@EnableKafka
。见here - 我可以向你保证上面的作品(我写的时候测试过)。
请看下面我的回答。我知道它有效,但这不是我问题的答案。无论如何,我都感谢您的帮助。【参考方案2】:
正如我所怀疑的那样,原来是 Maven 问题。基本上,我从事具有以下结构的多模块项目:
────parent
├───parent.pom
├───module1
| └───module1.pom
└───module2
└───module2.pom
我的parent.pom
有另一个parent
元素是:
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-parent -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.2.RELEASE</version>
</parent>
基本上将上面的parent
的parent
替换为:
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<type>pom</type>
<version>1.5.2.RELEASE</version>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
按照here 的建议解决了所有问题(自动配置和能够使用KafkaProperties
)。
【讨论】:
以上是关于Spring Integration 与 Kafka 自动配置问题的主要内容,如果未能解决你的问题,请参考以下文章
Spring Integration 与 Kafka 自动配置问题
Spring Integration DSL 过滤器与带有单个收件人和 DefaultOutputToParentFlow 的 RouteToRecipients
spring integration:如何从 Spring Controller 调用 Spring Integration?