kafka如何平衡分区负载?
Posted
技术标签:
【中文标题】kafka如何平衡分区负载?【英文标题】:How kafka balances partitions load? 【发布时间】:2017-03-25 18:32:51 【问题描述】:我遇到了一个关于 kafka 负载平衡的问题。所以,我创建了一个有 10 个分区的主题并创建了 2 个消费者。 10 个分区被划分并分配给这些消费者(第一个分区 5 个,第二个分区 5 个),它工作正常。有时第一个消费者工作,有时第二个。
但是在某一时刻,我们可能会遇到这样一种情况,例如第二个消费者收到一条消息,并且需要时间(例如 10 分钟)来处理这条消息。
那么,我的问题是 kafka 将如何决定将消息存储到哪个分区?
我认为在这种情况下轮询不是一个好主意,因为在第二个消费者完成长时间工作之前,不会处理由第二个消费者处理的分区中的消息。
更新!
根据@Milan Baran 的回答,负载在生产者端是平衡的。但是在这种情况下,即使我们提供了一个自定义的Partitioner
实现,同样的问题是存储在分区中的消息被分配给正在做长期工作的消费者,直到这个消费者完成了它的长期工作。
可能,其他地方还有额外的负载均衡器?
【问题讨论】:
你为什么不运行更多的消费者?如果您启动 10 个消费者,那么当一个消费者长时间忙碌时,您还有 9 个消费者在处理其他 9 个分区中的数据。 但是10的分区中的所有消息直到忙时才被处理。这就是问题 能否编写一个自定义分区器,将长时间运行的消息放入一个专用分区,以便所有小的消息都可以分区并在剩余的分区中处理? 【参考方案1】:应该使用哪个分区不是由kafka决定的,而是由发送消息的生产者决定。看https://kafka.apache.org/documentation#producerconfigs
您可以提供一个分区器类来决定选择哪个分区。
partitioner.class 实现 Partitioner 的 Partitioner 类 界面。 org.apache.kafka.clients.producer.internals.DefaultPartitioner
有DefaultPartitioner策略的描述
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose a partition in a round-robin fashion
*/
【讨论】:
你的回答很好,但我认为还有一些额外的地方可以平衡负载。根据您的回答,似乎无法处理存储在分区中的消息,该分区分配给正在执行长期工作的消费者。 长期工作是什么意思?您是否为消费者设置了不同的 group.id?或者您在消费者读取的同一线程中处理消息。例如,您将收到 fast_msg、fast_msg、fast_msg、slow_msg、fast_msg 并且您被困在 slow_msg 并且无法获得下一个 fast_msg?如果是这种情况,请使用并行处理这些消息或 akka-streams。 使用不同的线程来接收和处理消息会导致额外的问题:我有slow_msg fast_msg1 fast_msg2 fast_msg3,我可以面对slow_msg仍在处理但fast_msg1和fast_msg2成功处理然后jvm的情况崩溃。重启后,我的消费者将指向 fast_msg3,slow_msg 将丢失。 但这是一个应用程序问题,而不是 kafka 问题。当您确定主题一致性时,您可以关闭自动提交和手动提交偏移量。但是,是的,这些缓慢的信息是你的瓶颈,很难摆脱它们。他们这么慢有什么意义?【参考方案2】:看来你需要的是一个QUEUE。 ONE 分区由 MULTIPLE 消费者使用。每个消费者从分区中获取一条记录,对其进行处理,然后获取另一条记录。如果一个消费者花费太多时间来处理记录,其他消费者仍然可以从分区中获取(不同的)记录。
但是,Kafka 确实不支持这一点。每个分区只能被一个消费者组中的一个消费者消费。
总之,你需要别的东西来实现目标,比如RabbitMQ。
【讨论】:
【参考方案3】:感谢大家的帮助。但我找到了我的问题的答案。所以首先,kafka 负载均衡的地方至少有 3 个:
-
将分区分配给消费者“循环”或“范围”
使用算法。这可以通过设置
partition.assignment.strategy
财产。默认使用范围。
在生产者级别可以应用选择分区的策略
存储消息。可以通过partitioner.class
完成
我的问题的答案。如果一个消费者处理消息
很长一段时间,kafka都认为这个消费者已经死了,
在另一个消费者之间重新分配分区。所以当时间长了
工作由消费者完成,没有分配分区。什么时候
消费者完成长时间的工作分区将被分配到
再说一遍。并且不会有任何消息处于待处理状态。
【讨论】:
以上是关于kafka如何平衡分区负载?的主要内容,如果未能解决你的问题,请参考以下文章
Kafka之副本信息Leader 选举流程故障处理细节分区副本分配手动调整分区副本存储Leader Partition 负载平衡增加副本文件存储机制文件清理策略高效读写数据