由于消费者速度较慢,Kafka 重新平衡主题中的数据
Posted
技术标签:
【中文标题】由于消费者速度较慢,Kafka 重新平衡主题中的数据【英文标题】:Kafka rebalance the data in a topic due to slow(er) consumer 【发布时间】:2018-10-03 20:20:06 【问题描述】:例如,假设我有一个包含 4 个分区的主题。我向这个主题发送了 4k 条消息。每个分区获得 1k 条消息。由于外部因素,3 个消费者分别处理了他们所有的 1k 条消息。但是,第 4 个分区只能处理 200 条消息,剩下 800 条消息需要处理。有没有一种机制可以让我“重新平衡”主题中的数据,比如给分区 4s 数据的分区 1-3 200 留下所有具有 200 条消息的分区一个进程?
我不是在寻找向消费者组添加额外节点并让 kafka 平衡分区的方法。
添加了重新分配分区的输出:
当前分区副本分配
"version": 1,
"partitions": [
"topic": "MyTopic",
"partition": 0,
"replicas": [
0
],
"log_\ndirs": [
"any"
]
,
"topic": "MyTopic",
"partition": 1,
"replicas": [
0
],
"log_dirs": [
"any"
]
,
"topic": "MyTopic",
"partition": 4,
"replicas": [
0
],
"log_dirs": [
"any"
]
,
"topic": "MyTopic",
"partition": 3,
"replicas": [
0
],
"log_dirs": [
"any"
]
,
"topic": "MyTopic",
"p\nartition": 2,
"replicas": [
0
],
"log_dirs": [
"any"
]
,
"topic": "MyTopic",
"partition": 5,
"replicas": [
0
],
"log_dirs": [
"any"
]
]
建议的分区重新分配配置
"version": 1,
"partitions": [
"topic": "MyTopic",
"partition": 3,
"replicas": [
0
],
"log_ dirs": [
"any"
]
,
"topic": "MyTopic",
"partition": 0,
"replicas": [
0
],
"log_dirs": [
"any"
]
,
"topic": "MyTopic",
"partition": 5,
"replicas": [
0
],
"log_dirs": [
"any"
]
,
"topic": "MyTopic",
"partition": 2,
"replicas": [
0
],
"log_dirs": [
"any"
]
,
"topic": "MyTopic",
"p artition": 4,
"replicas": [
0
],
"log_dirs": [
"any"
]
,
"topic": "MyTopic",
"partition": 1,
"replicas": [
0
],
"log_dirs": [
"any"
]
]
【问题讨论】:
我可以知道您使用的存储类型吗?我的意思是 RAID/JBOD? @PhaniKumarYadavilli 只是带有 JBOD 的机器。 你能生成输出并将其添加到问题中吗 ./kafka-reassign-partitions.sh \ --zookeeper list-of-zookeeper-nodes \ --broker-list '1,2,3' \ --topics-to-move-json-file topic.json \ --generate @PhaniKumarYadavilli 我只有 1 个经纪人。我不想将负载分配到多个代理上,而是重新分配已在某个主题的分区中生成的消息。基于此,我不确定最终会做什么。如果我误解了它的作用,请随时纠正我。 kafka-reassign-partitions.sh 重新平衡分区之间的负载。根据您的集群设置,您只能提供一个代理。 【参考方案1】:在产生消息时分配分区。它们永远不会在分区之间自动移动。通常,对于每个分区,可以有多个消费者(具有不同的消费者组 id)以不同的速度消费,因此代理无法根据消费者(组)的慢度在分区之间移动消息。不过,您可以尝试一些方法:
更多分区,希望负载分配更公平(您可以拥有比消费者更多的分区) 让生产者在每条消息上显式设置分区,以在分区之间产生一个消费者可以更好地应对的分布 让消费者监控他们的滞后并在落后时主动取消订阅分区,以便让其他消费者承担负载。【讨论】:
好的,感谢您的建议。您的第一个子弹可能是此时处理此问题的最有效方法。这些消息正在处理调用可能需要几秒钟或几分钟才能处理的外部程序。没有真正的方法可以判断特定消息是否需要很长时间才能处理。子弹 #3 也是如此。本质上它只是随机的运气。如果我的生产者发送 100 条消息,其中 10 条消息的处理速度可能会“慢”100 倍。如果其中一个分区幸运地获得了 2 或 3 个,那么它们将远远落后于其他分区。 @Michal 能否详细说明第三点,如何实现?【参考方案2】:你可以做几件事来提高性能
增加分区数 增加使用分区的消费者组。第一个将重新平衡分区上的负载,第二个将增加分区上的并行度以快速消耗消息。
我希望这会有所帮助。可以参考这个链接了解更多
https://xyu.io/2016/02/29/balancing-kafka-on-jbod/
Kafka 消费者是消费者群体的一部分。一个组中有一个或多个消费者。每个分区被分配给一个消费者。
如果您的消费者多于分区,那么您的一些消费者将处于空闲状态。如果您的分区多于消费者,则可能会将多个分区分配给单个消费者。
每当有新的消费者加入时,就会启动重新平衡,并为新消费者分配一些以前分配给其他消费者的分区。
例如,如果有 20 个分区都被一个消费者消费,而另一个消费者加入,则会进行重新平衡。
在重新平衡期间,消费者组“暂停”。
【讨论】:
以上是关于由于消费者速度较慢,Kafka 重新平衡主题中的数据的主要内容,如果未能解决你的问题,请参考以下文章
使用 Spring-kafka 在 GC/消费者重新平衡时清理 Kafka Metric 计量器