我可以设置 Kafka Stream 消费者 group.id 吗?

Posted

技术标签:

【中文标题】我可以设置 Kafka Stream 消费者 group.id 吗?【英文标题】:Can I set Kafka Stream consumer group.id? 【发布时间】:2019-08-09 16:58:56 【问题描述】:

我正在将 Kafka Stream 库用于流式应用程序。 我想设置 kafka 消费者组 ID。 然后,我将 Kafka 流配置如下所示。



    streamsCopnfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "JoinTestApp");
    streamsCopnfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "JonTestClientId1");
    streamsCopnfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
    streamsCopnfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstreapServer);
    streamsCopnfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsCopnfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Bytes().getClass().getName());
    streamsCopnfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    streamsCopnfiguration.put(StreamsConfig.consumerPrefix("group.id"), "groupId1");
//    streamsCopnfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId1");

但是,我的控制台日志 kafka 消费者配置未设置 group.id。 只需流式传输应用程序 ID..

2019-03-19 17:17:03,206 [main] INFO  org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [....]
check.crcs = true
client.dns.lookup = default
client.id = JonTestClientId111-StreamThread-1-consumer
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = JoinTestApp
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false

我可以设置 kafka 流消费者 group.id 吗??

【问题讨论】:

【参考方案1】:

不,您不能。为此,您需要在 Kafka Streams 中使用 application.id

Kafka Streams application.id 用于在各个地方隔离应用程序使用的资源与其他资源。

application.id 用作 Kafka 消费者 group.id 进行协调。这就是为什么你不能明确设置group.id

来自Kafka Streams Official documentation,application.id也用于以下地方:

 - As the default Kafka consumer and producer client.id prefix
 - As the name of the subdirectory in the state directory (cf. state.dir)
 - As the prefix of internal Kafka topic names

【讨论】:

是的......正如我在回答中提到的......你不能为kafka流明确设置group.id。对于 Kafka Streams,application.id 将是 group.id

以上是关于我可以设置 Kafka Stream 消费者 group.id 吗?的主要内容,如果未能解决你的问题,请参考以下文章

kafka之消费超时死循环

Spring Cloud Stream Kafka 消费者模式

spring-cloud-stream kafka 消费者并发

如何在默认情况下从 Kafka Spring Cloud Stream 消费并消费由 Confluent API 生成的 Kafka 消息?

SpringCloud(28)——Stream重复消费与持久化

使用 Spring Cloud Stream Kafka Binder 批量使用带有密钥的 Kafka 消息