Kafka 中的 Consumer Id 和 Group Id:是啥让两个消费者相同
Posted
技术标签:
【中文标题】Kafka 中的 Consumer Id 和 Group Id:是啥让两个消费者相同【英文标题】:Consumer Id and Group Id in Kafka: what makes two consumers the sameKafka 中的 Consumer Id 和 Group Id:是什么让两个消费者相同 【发布时间】:2019-10-03 23:29:52 【问题描述】:我已经使用 Kafka 几个月了,我意识到一些核心概念对我来说还不是很清楚。我的疑问与consumerId、groupId 和offsets 之间的关系有关。在我们的应用程序中,我们需要 Kafka 使用发布 - 订阅范式工作,因此我们为每个消费者使用不同的组 ID,这些 ID 是随机生成的。
我以前认为设置auto.offset.reset = latest
我的消费者总是会收到他们还没有收到的消息,但最近我学会了that is not the case。这仅在消费者尚未提交偏移量时才有效。在任何其他情况下,消费者将继续接收偏移量大于其提交的最后偏移量的消息。
由于我总是使用随机组 id 创建新的消费者,我意识到我的消费者“没有记忆”,他们是新消费者,他们永远不会提交偏移量,所以auto.offset.reset = latest
策略将始终适用。这就是我的怀疑开始的地方。 假设以下场景:
-
我有两个客户端应用程序,A 和 B,每个都有一个使用者,以发布 - 订阅方式工作(因此,使用不同的组 ID)。两个消费者都订阅了主题
my-topic
。 auto.offset.reset
setting 对于两个消费者来说都是latest
。
一些生产者(或多个生产者)将消息 M1、M2 和 M3 发布到主题 my-topic
。
A 和 B 都接收 M1、M2 和 M3。
现在我关闭应用程序 B。
生产者产生消息 M4 和 M5。
应用程序 A 接收消息 M4 和 M5。
现在我重新启动应用程序 B。记住,groupId
是随机的,我没有设置任何消费者 ID,这意味着这是一个新消费者(对吗?)。应用程序 B 没有收到任何消息。
生产者发布消息 M6 和 M7。
应用程序 A 和 B 都接收消息 M6 和 M7。
所以,总结一下,如果我没记错的话,A 收到了所有消息,但 B 错过了 M4 和 M5。我用kafka-console-consumer.sh
试过这个,它的行为是这样的。
那么,如何让应用程序 B 在关闭时接收发布的消息?现在,如果我启动它时分配与最初启动时相同的 groupId,它将读取消息 M4 和 M5,但这是设置组 ID。是否也可以设置消费者 ID 并获得相同的行为?
或者换一种说法,重新启动同一个消费者是什么意思?如果两个消费者有相同的groupId、相同的consumerId,那么两个消费者是同一个消费者?
对了,consumerId和属性client.id是一样的吗?
【问题讨论】:
【参考方案1】:如果两个消费者具有相同的group.id
设置,则他们属于同一组。
我不完全确定您所说的 consumerId
是什么意思。从 Kafka 2.2 开始,consumer configurations 中不存在此类字段。
如果你说的是client.id
,这个设置没有任何功能影响,它只是用来标记请求,以便在需要时可以在代理的日志中匹配它们。
当您使用auto.offset.reset=latest
运行消费者时,如果不存在已提交的偏移量,消费者将从日志末尾重新开始消费。所以它只会接收启动后产生的消息。所以在你的场景中,你是对的,它永远不会收到 M4 和 M5。
如果你想消费所有的消息,你需要保持相同的group.id
。在这种情况下,auto.offset.reset
只会在消费者第一次启动时应用。这样,当您的消费者重新启动时,它将恢复到停止时的位置。
【讨论】:
我不是与“auto.offset.reset”政策相关的实体。它是基于组的吗?基于主题还是基于消费者? 如果没有消费者 ID,如何在同一个分区上拥有 2 个消费者?如果我想在没有消息重复的情况下对消费者进行故障转移,我只会看到复制消费者/客户端 ID 的选项......【参考方案2】:这是因为你设置了 auto.offset.reset = latest
在消费者未启动和运行期间发送的任何消息都不会被消费者处理。
所以 B 会错过这两条消息
【讨论】:
以上是关于Kafka 中的 Consumer Id 和 Group Id:是啥让两个消费者相同的主要内容,如果未能解决你的问题,请参考以下文章
kafka javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=consumer-1(示
Kafka 中的 __consumer_offsets 和 _schema 主题有啥用?