Kafka 消费者默认组 ID
Posted
技术标签:
【中文标题】Kafka 消费者默认组 ID【英文标题】:Kafka Consumer default Group Id 【发布时间】:2017-08-24 10:13:50 【问题描述】:我正在使用 Apache Kafka 及其 Java 客户端,我发现消息在属于同一组的不同 Kafka 消费者之间进行负载平衡(即共享相同的组 ID)。
在我的应用程序中,我需要所有消费者阅读所有消息。
所以我有几个问题:
如果我没有在 Consumer Properties 中设置任何组 id,那么 Kafka Consumer 会被赋予什么组 id?
是否有一个默认值?
客户端每次都创建一个随机值吗?
我是否需要为每个消费者创建不同的 id 以确保每个消费者都能收到所有消息?
编辑: 谢谢你的回答。
你是对的:如果没有设置消费者组 id,Kafka 应该抱怨。
但是,我发现如果 group id 为 null,Java 客户端将其设置为空字符串 "" 以避免出现问题。 所以显然这是我正在寻找的默认值。
让我的所有消费者感到惊讶,即使我没有设置他们的 groupId(所以他们都使用 groupId == "")似乎收到了生产者写的所有消息。
我仍然无法解释:有什么建议吗?
【问题讨论】:
这可能与您的分区数有关。你有多少个分区,你使用了多少个消费者?事实上,当多个消费者订阅一个主题并属于同一个消费者组时,组中的每个消费者都会收到来自该主题中不同分区子集的消息。 我为每个主题使用一个分区,所以所有消费者都从同一个分区接收消息:/ 【参考方案1】:如果我没有在 Consumer Properties 中设置任何组 id,那么 Kafka Consumer 会被赋予什么组 id?
kafka 消费者不会有任何消费者组。相反,您会收到此错误:The configured groupId is invalid
是否有单一的默认值?
是的,可以查看kafka的consumer.properties
文件供参考。默认消费者组 id 为:group.id=test-consumer-group
客户端是否每次都创建一个随机值?
不,对于从 Kafka 0.9.0.x 消费者开始的 Java 客户端,groupId 似乎是强制性的。你可以参考这个JIRA:https://issues.apache.org/jira/browse/KAFKA-2648
我是否需要为每个消费者创建不同的 id 以确保每个消费者都能收到所有消息?
是的,如果所有消费者使用相同的组 id,主题中的消息将在这些消费者之间分发。换句话说,每个消费者都会得到一个不重叠的消息子集。在同一组中拥有更多的消费者会增加并行度和消费的整体吞吐量。另一方面,如果每个消费者都在自己的组中,则每个消费者都将获得所有消息的完整副本。
【讨论】:
【参考方案2】:不想重复其他答案,只想指出一点:您实际上并不需要一个消费者组来消费所有消息。 Kafka Consumer
API(假设我们正在处理 Java API)同时具有 subscribe()
和 assign()
方法。如果您希望所有消费者在没有负载平衡的情况下接收所有消息(本质上是消费者组的用途),您可以在所有消费者上调用 assign()
,将主题的所有分区传递给它,可选地后跟 seek()
到设置偏移量;这样您的消费者将获得所有消息。
这样,Kafka 将不会管理分区分配,也不会保留偏移量——消费者负责所有这些。根据您的用例,与为每个消费者设置一个消费者组相比,这可能是一种更好的方法。
【讨论】:
【参考方案3】:我也有同样的问题。并花了一些时间研究这个问题。
项目spring-cloud-stream
将检查您是否为消费者设置了组ID。如果没有,spring-cloud-stream
将创建一个随机值作为组 ID。
请参考KafkaMessageChannelBinder.
类中的方法createConsumerEndpoint
【讨论】:
【参考方案4】:如果不设置group.id,消费主题数据会报错。
org.apache.kafka.common.errors.InvalidGroupIdException: The configured groupId is invalid
22:08:14.132 [testAuto-kafka-consumer-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - (Re-)joining group
22:08:14.132 [testAuto-kafka-consumer-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Sending JoinGroup (group_id=,session_timeout=15000,rebalance_timeout=300000,member_id=,protocol_type=consumer,group_protocols=[protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=18 cap=18]]) to coordinator bogon:9092 (id: 2147483647 rack: null)
22:08:14.132 [testAuto-kafka-consumer-1] ERROR org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Attempt to join group failed due to fatal error: The configured groupId is invalid
22:08:14.132 [testAuto-kafka-consumer-1] ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer - Container exception
【讨论】:
【参考方案5】:根据 KIP-289,默认 group.id 已“改进”,自 kafka 客户端版本 2.2.0 起,默认 group.id 为 null
。
KIP-289: Improve the default group id behavior in KafkaConsumer.
在我看来,当使用assign
时,您可以放弃group.id
,将其保留为空,这样就没有可用的偏移量了。
【讨论】:
【参考方案6】:检查 groupId 来自- @KafkaListener(topics = "$kafka.topic", groupId = "groupIdName")
步骤-> 转到 Kafka 文件夹
打开config文件夹
打开consumer.properties
更改组 ID
group.id=groupIdName
【讨论】:
以上是关于Kafka 消费者默认组 ID的主要内容,如果未能解决你的问题,请参考以下文章