使用 Spring Cloud Stream Kafka Binder 时无法设置 groupId 和 clientId
Posted
技术标签:
【中文标题】使用 Spring Cloud Stream Kafka Binder 时无法设置 groupId 和 clientId【英文标题】:Unable To Set groupId and clientId when using Spring Cloud Stream Kafka Binder 【发布时间】:2020-06-11 21:22:04 【问题描述】:我在处理 Spring Cloud Stream Kafka Binder 时遇到了严重问题。 Spring Cloud 3.0.2.RELEASE 的配置设置存在很多歧义和一致性问题。我一直在尝试为 Kafka 主题设置组 id 和客户端 id,但是尽管尝试了各种不同的组合,我还是无法正确配置组 id。
文档声称我们应该能够通过配置以下设置之一来设置组 ID 和客户端 ID: https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#binding-properties
spring.cloud.stream.default.group
spring.cloud.stream.default.consumer.group
spring.cloud.stream.kafka.default.consumer.group
spring.cloud.stream.bindings.<channelName>.group
以上配置均不适用于为生产者设置客户端 ID 或为消费者设置组 ID。我得到的唯一进展是通过完全不同的配置设置客户端 ID。
spring.kafka.client-id
spring.kafka.admin.client-id
spring.kafka.producer.client-id
在使用这些设置设置成功设置客户端 ID 后,我尝试为消费者设置组 ID,但令人惊讶的是不起作用。
spring.kafka.group-id <---- does not exist as a property, but tried this anyway
spring.kafka.consumer.group-id
编辑:这是应用程序设置。
Application.java
@SpringBootApplication
@EnableSwagger2
public class Application
public static void main(String[] args)
SpringApplication.run(Application.class, args);
@Bean
public Docket swaggerApi()
return new Docket(DocumentationType.SWAGGER_2)
.select()
.apis(RequestHandlerSelectors.any())
.paths(regex("^(?!.*error).+$"))
.build()
.pathMapping("/");
应用程序.yaml
spring:
cloud:
stream:
bindings:
MyKafkaTopicBinderChannel:
destination: MyKafkaTopic
group: MyServiceGroup
default:
producer:
useNativeEncoding: on
consumer:
useNativeEncoding: on
contentType: application/*+avro
kafka:
binder:
brokers: some.broker.io
producer-properties:
key:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema:
registry:
url: some.registry.io
consumer-properties:
key:
deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value:
deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema:
registry.url: some.registry.io
specific:
avro:
reader: true
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>io.some.org</groupId>
<artifactId>MyService</artifactId>
<version>1.0.0</version>
<name>chatbotApi</name>
<description>Spring Boot Service</description>
<properties>
<java.version>11</java.version>
<gson.version>2.8.6</gson.version>
<springfox.version>2.9.2</springfox.version>
<swagger-annotations.version>1.6.0</swagger-annotations.version>
<swagger-models.version>1.6.0</swagger-models.version>
<jackson-datatype-jsr310.version>2.10.2</jackson-datatype-jsr310.version>
<avro.version>1.9.2</avro.version>
<avro-maven-plugin.version>1.9.2</avro-maven-plugin.version>
<confluent.kafka.version>5.4.0</confluent.kafka.version>
<kafka-clients.version>2.4.0</kafka-clients.version>
<spring-cloud.version>3.0.2.RELEASE</spring-cloud.version>
</properties>
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>$spring-cloud.version</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>$gson.version</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>$jackson-datatype-jsr310.version</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>$springfox.version</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>io.swagger</groupId>
<artifactId>swagger-annotations</artifactId>
</exclusion>
<exclusion>
<groupId>io.swagger</groupId>
<artifactId>swagger-models</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>$springfox.version</version>
</dependency>
<dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-annotations</artifactId>
<version>$swagger-annotations.version</version>
</dependency>
<dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-models</artifactId>
<version>$swagger-models.version</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>$confluent.kafka.version</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>$confluent.kafka.version</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-streams-avro-serde</artifactId>
<version>$confluent.kafka.version</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>$kafka-clients.version</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>$avro.version</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-tools</artifactId>
<version>$avro.version</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<version>$spring-cloud.version</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<excludes>
<exclude>local.yaml</exclude>
<exclude>avro/*</exclude>
</excludes>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>$avro-maven-plugin.version</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>$project.basedir/src/main/resources/avro/</sourceDirectory>
<outputDirectory>$project.basedir/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>$java.version</source>
<target>$java.version</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
SpringIntegrationService.java
@Component
@EnableBinding(SpringIntegrationService.KafkaTopicBindings.class)
public class SpringIntegrationService
private static Logger logger = LoggerFactory.getLogger(SpringIntegrationService.class);
private MessageChannel someChannel;
public interface KafkaTopicBindings
String MY_KAFKA_TOPIC_BINDER_CHANNEL = "MyKafkaTopicBinderChannel";
@Output(KafkaTopicBindings.MY_KAFKA_TOPIC_BINDER_CHANNEL)
MessageChannel someChannel();
public SpringIntegrationService(KafkaTopicBindings bindings)
this.someChannel = bindings.someChannel();
@ServiceActivator(inputChannel = "entry.kafka")
public boolean entryKafka(Message<someChannel> msg)
logger.info("entryKafka(): Payload: ", msg.getPayload());
try
return someChannel.send(MessageBuilder.withPayload(msg.getPayload())
.setHeader(KafkaHeaders.MESSAGE_KEY, "Some Key").build());
catch (Exception e)
logger.warn("entryKafka(): Failed to send message onto someChannel topic", e);
return false;
【问题讨论】:
我也遇到了同样的问题,解决了吗? 【参考方案1】:这是一个repo,我尝试更新blog 中提到的应用程序。清理了配置中的一些内容并更新了示例以使用新的功能模型。我验证了这行得通。你能在你的最后运行它并与你的设置进行比较吗?如果您可以将此样本作为报告任何潜在问题的一种手段,这将有助于我们进一步为您提供帮助。
【讨论】:
我已经尝试了 repo 中的代码,它实际上运行良好。不过,根据@Oleg Zhurakousky 的评论,如果您尝试将 Spring Cloud Stream Binder Kafka 与 Spring Integration 结合使用,那么该配置似乎与反应式编程模型完美配合将无法正常工作。 对于我之后的其他人,不要将 Spring Integration 与 Spring Cloud Stream 一起使用:cloud.spring.io/spring-cloud-static/spring-cloud-stream/…【参考方案2】:我不确定什么对您不起作用,如果没有看到您提供的完整样本,就不可能说。所以这里有一个简单的例子,我们可以作为起点:
@SpringBootApplication
public class SimpleStreamApplication
public static void main(String[] args) throws Exception
SpringApplication.run(SimpleStreamApplication.class,
"--spring.cloud.function.definition=uppercase",
"--spring.cloud.stream.bindings.uppercase-in-0.destination=uppercase",
"--spring.cloud.stream.bindings.uppercase-in-0.group=uppercase.group");
@Bean
public Function<String, String> uppercase()
return v -> v.toUpperCase();
这将导致正确创建/分配消费者组。你是说上面的方法不适合你吗?
【讨论】:
我现在还没有更新帖子。我也在关注指南:baeldung.com/spring-cloud-stream-kafka-avro-confluent 作为 spring-cloud-stream 的初学者,可以考虑切换到functional programming model。如您所见,通过删除几乎所有对注释编程模型的引用,我们几乎弃用了您当前使用的方法。此外,我为您提供了一个最简单的示例来让您开始行动,同时您正在参考一些博客,这会造成一些脱节,因为我不确定您到底想从这种互动中获得什么。 抱歉,我不知道 Baeldung 指南中的内容是一种已弃用的方法。我开始使用 Baeldung 的指南,因为它更详细地说明了如何启动和运行。我还注意到@sobychacko 提供的清理后的代码使用了响应式库。我确实喜欢这些库,但我害怕依赖目前不稳定的反应式数据库驱动程序版本。我将切换到新的范式。 我错过了添加 function.definition。这篇文章有帮助。谢谢:)以上是关于使用 Spring Cloud Stream Kafka Binder 时无法设置 groupId 和 clientId的主要内容,如果未能解决你的问题,请参考以下文章
spring-cloud-starter-eureka-server 和 spring-cloud-starter-netflix-eureka-server的区别
随手记录关于spring-cloud-starter-eureka-server 和 spring-cloud-starter-netflix-eureka-server
`spring-cloud-starter-eureka-server`和`spring-cloud-starter-netflix-eureka-server`之间的区别
使用 spring-boot:1.5.1 和 spring-cloud-stream 时无法启动 bean 'inputBindingLifecycle'