Kafka 消费者默认组 ID

Posted

技术标签:

【中文标题】Kafka 消费者默认组 ID【英文标题】:Kafka Consumer default Group Id 【发布时间】:2017-08-24 10:13:50 【问题描述】:

我正在使用 Apache Kafka 及其 Java 客户端,我发现消息在属于同一组的不同 Kafka 消费者之间进行负载平衡(即共享相同的组 ID)。

在我的应用程序中,我需要所有消费者阅读所有消息。

所以我有几个问题:

如果我没有在 Consumer Properties 中设置任何组 id,那么 Kafka Consumer 会被赋予什么组 id?

是否有一个默认值?

客户端每次都创建一个随机值吗?

我是否需要为每个消费者创建不同的 id 以确保每个消费者都能收到所有消息?

编辑: 谢谢你的回答。

你是对的:如果没有设置消费者组 id,Kafka 应该抱怨。

但是,我发现如果 group id 为 null,Java 客户端将其设置为空字符串 "" 以避免出现问题。 所以显然这是我正在寻找的默认值。

让我的所有消费者感到惊讶,即使我没有设置他们的 groupId(所以他们都使用 groupId == "")似乎收到了生产者写的所有消息。

我仍然无法解释:有什么建议吗?

【问题讨论】:

这可能与您的分区数有关。你有多少个分区,你使用了多少个消费者?事实上,当多个消费者订阅一个主题并属于同一个消费者组时,组中的每个消费者都会收到来自该主题中不同分区子集的消息。 我为每个主题使用一个分区,所以所有消费者都从同一个分区接收消息:/ 【参考方案1】:

如果我没有在 Consumer Properties 中设置任何组 id,那么 Kafka Consumer 会被赋予什么组 id?

kafka 消费者不会有任何消费者组。相反,您会收到此错误:The configured groupId is invalid

是否有单一的默认值?

是的,可以查看kafka的consumer.properties文件供参考。默认消费者组 id 为:group.id=test-consumer-group

客户端是否每次都创建一个随机值?

不,对于从 Kafka 0.9.0.x 消费者开始的 Java 客户端,groupId 似乎是强制性的。你可以参考这个JIRA:https://issues.apache.org/jira/browse/KAFKA-2648

我是否需要为每个消费者创建不同的 id 以确保每个消费者都能收到所有消息?

是的,如果所有消费者使用相同的组 id,主题中的消息将在这些消费者之间分发。换句话说,每个消费者都会得到一个不重叠的消息子集。在同一组中拥有更多的消费者会增加并行度和消费的整体吞吐量。另一方面,如果每个消费者都在自己的组中,则每个消费者都将获得所有消息的完整副本。

【讨论】:

【参考方案2】:

不想重复其他答案,只想指出一点:您实际上并不需要一个消费者组来消费所有消息。 Kafka Consumer API(假设我们正在处理 Java API)同时具有 subscribe()assign() 方法。如果您希望所有消费者在没有负载平衡的情况下接收所有消息(本质上是消费者组的用途),您可以在所有消费者上调用 assign(),将主题的所有分区传递给它,可选地后跟 seek() 到设置偏移量;这样您的消费者将获得所有消息。

这样,Kafka 将不会管理分区分配,也不会保留偏移量——消费者负责所有这些。根据您的用例,与为每个消费者设置一个消费者组相比,这可能是一种更好的方法。

【讨论】:

【参考方案3】:

我也有同样的问题。并花了一些时间研究这个问题。 项目spring-cloud-stream 将检查您是否为消费者设置了组ID。如果没有,spring-cloud-stream 将创建一个随机值作为组 ID。 请参考KafkaMessageChannelBinder.类中的方法createConsumerEndpoint

【讨论】:

【参考方案4】:

如果不设置group.id,消费主题数据会报错。

org.apache.kafka.common.errors.InvalidGroupIdException: The configured groupId is invalid
22:08:14.132 [testAuto-kafka-consumer-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group 
22:08:14.132 [testAuto-kafka-consumer-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending JoinGroup (group_id=,session_timeout=15000,rebalance_timeout=300000,member_id=,protocol_type=consumer,group_protocols=[protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=18 cap=18]]) to coordinator bogon:9092 (id: 2147483647 rack: null)
22:08:14.132 [testAuto-kafka-consumer-1] ERROR org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt to join group  failed due to fatal error: The configured groupId is invalid
22:08:14.132 [testAuto-kafka-consumer-1] ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer - Container exception

【讨论】:

【参考方案5】:

根据 KIP-289,默认 group.id 已“改进”,自 kafka 客户端版本 2.2.0 起,默认 group.id 为 null

KIP-289: Improve the default group id behavior in KafkaConsumer.

在我看来,当使用assign 时,您可以放弃group.id,将其保留为空,这样就没有可用的偏移量了。

【讨论】:

【参考方案6】:

检查 groupId 来自- @KafkaListener(topics = "$kafka.topic", groupId = "groupIdName")

步骤-> 转到 Kafka 文件夹

打开config文件夹

打开consumer.properties

更改组 ID

group.id=groupIdName

【讨论】:

以上是关于Kafka 消费者默认组 ID的主要内容,如果未能解决你的问题,请参考以下文章

kafka 消费者组 ID 无法按预期工作

五 通过命令行了解 Kafka消费者组

Kafka -- 消费组到底是什么?

Kafka -- 消费组到底是什么?

Kafka核心技术与实战——15 | 消费者组到底是什么?

关于kafka消费者的命令