Kafka消费组rebalance原理

Posted xinyuan_java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka消费组rebalance原理相关的知识,希望对你有一定的参考价值。

消费者组是 Kafka 分布式消息处理的一个重要特征,用于管理消费者并促进扩展应用程序的能力。它们将任何一个主题的消费者组合在一起,并且主题内的分区被分配给这些消费者。当组的参与者发生变化时,消费者组rebalance可能由许多因素触发,这会导致在消费者之间重新分配分区。在rebalance期间,消息处理暂停,影响吞吐量。

在本文中,将介绍消费者组的角色、消费者组rebalance以及导致rebalance的触发器。详细说明了影响rebalance持续时间和触发rebalance时间的配置。在下一篇文章中,将介绍rebalance期间对应用程序消息处理的影响以及可以应用的rebalance策略。探讨了减少不必要的rebalance和减轻rebalance影响的选项。

◆消费群体

当应用程序实现了一个 Kafka 消费者来消费来自某个主题的消息时,该消费者属于一个消费者组。在消费者组中,消费者被分配主题分区以进行消费。组成员在代理端(broker)进行管理,分区分配在客户端进行管理。代理不知道资源是什么以及它们是如何在消费者之间分配的。这是 Kafka 客户端被视为"胖"客户端的一个很好的例子。

消费者配置了group.id ,因此具有相同group.id的任何其他消费者实例都将属于同一个消费者组。这有助于扩展消费者的能力,并且这与增加主题中的分区数量相结合提供了一种增加消息吞吐量的机制。

Group Coordinator 管理消费者组和消费者。这是一个位于代理端的 Kafka 组件。它将让一个消费者成为领导者,这将负责计算主题分区分配。这些将返回给 Group Coordinator,然后由 Group Coordinator 将分区分配给消费者。

给定一个应用程序实例,其中group.id为 'foo' 的消费者正在监听特定主题,并且该主题有六个分区,然后消费者将轮询所有六个分区中的消息。

图 1:单个消费者组和一个消费者

现在启动应用程序的第二个实例。因此,这将启动具有相同group.id的“foo”的第二个消费者实例。第二个消费者实例向 Group Coordinator 发送 JoinGroup 请求,并且在消费者组中重新分配分区以分散负载。消费者组中有两个成员,每个消费者实例分配三个分区。

图 2:具有两个消费者的单个消费者组“foo”

启动第三个应用程序,组协调器再次重新分配分区,每个消费者现在轮询来自两个分区的消息。

如果消费者实例多于分区,那么这些额外的消费者将不会分配任何分区。一个主题分区将永远只有一个消费者从给定的消费者组中收听它。所以一个由 5 个消费者组成的消费者组,监听一个具有 3 个分区的主题,将有 2 个空闲消费者。

如果一个消费者以不同的group.id配置启动(就像不同服务的情况一样),并且它正在侦听相同的主题,那么这将是一个单独的消费者组的一部分。它的分区分配独立于任何其他消费者组的分配。

图 3:两个消费者组 'foo' 和 'bar'

◆Rebalance触发器

发生消费者组rebalance的原因有多种。

  • 一个新的消费者加入一个消费者组

  • 一个现有的消费者离开一个消费者组

  • 代理认为一个消费者可能已经失败了

  • Consumer Grouop订阅的任意Topic出现分区数量的变化

  • 消费者调用unsubscrible()取消对某Topic的订阅

除此之外,任何其他重新分配资源的需求都将触发重新平衡。一个示例是创建一个主题,其中为消费者配置了与该主题名称匹配的模式订阅。

当一个新的消费者加入一个消费者组时,它会向代理上的组协调器发送一个 JoinGroup 请求。然后在组中的所有一个或多个消费者之间重新分配主题分区。同样,当消费者离开组时,它会通过 LeaveGroup 请求通知组协调器,该请求再次在剩余的消费者之间重新分配主题分区(如果有的话)。

当 Group Coordinator 在预期的时间范围内没有收到消费者的消息时,无论是心跳还是下一次 poll() 调用,它都会将消费者从组中驱逐,认为它可能已经失败。主题分区再次被重新分配给组中剩余的任何其他消费者。

如果一个服务有多个订阅互斥主题但共享同一个group.id的消费者,那么任何一个消费者触发的rebalance仍然会影响组中的其他消费者。在以下场景中,消费者 A订阅了主题abc,而消费者 B订阅了主题def。他们在同一个消费者组foo中。如果消费者 A处理一个批次的时间过长并且超时,那么它将从消费者组中删除,从而触发rebalance。组中的所有分区分配都被撤销和重新分配,包括Consumer B的分配。

图 4:跨越主题的消费者组

当消费者 A最终完成其轮询并重新加入消费者组时,将触发进一步的rebalance,并且随着分区被撤销和重新分配,所有处理再次停止。因此,为收听不同主题的消费者定义单独的消费者组可能是谨慎的。例如[service]-[topic]-consumer-group。

◆Rebalance配置

◆概述

对于 Apache Java Kafka 客户端,以下是消费者的关键配置,这些配置会影响rebalance需要多长时间才能完成,以及何时消费者可能被代理视为失败,从而触发rebalance。

以下部分检查这些配置参数的影响。

◆心跳和会话超时

消费者定期向 Group Coordinator(位于 broker 上)发送心跳。这允许 Group Coordinator 监控组中消费者的健康状况。必须在session.timeout.ms内收到心跳,并根据 heartbeat.interval.ms 发送心跳。当 Group Coordinator 收到心跳时session.timeout.ms会重置,它会响应消费者,并且必须在此重置超时内接收下一个消费者心跳。

图 5:消费者的心跳

建议将heartbeat.interval.ms配置为不超过session.timeout.ms的三分之一。这确保了如果由于例如瞬态网络问题而丢失一两个心跳,则不会认为消费者失败。在此图中,有两个心跳丢失,但第三个在会话超时之前到达,因此 Group Coordinator 知道消费者仍然健康。

图 6:失败的心跳

如果消费者确实失败并停止心跳,那么一旦会话超时到期,它就会从消费者组中被逐出,从而导致消费者组rebalance。

图 7:消费者失败

◆轮询间隔

心跳在与主处理线程不同的线程上执行。消费者在主处理线程上轮询其主题分区,每次调用 poll() 都必须在配置的max.poll.interval.ms内发生。下图添加了消费者处理线程,显示了该线程的职责以及心跳线程的职责。

图 8:消费者心跳和轮询

对 poll() 的第一次调用,以及对 poll() 的任何调用,包括分区分配等更改,都会导致启动心跳线程。每个后续的 poll() 调用都会重新开始轮询时间,这样它就有这个完整的 max.poll.interval.ms可以在其中完成。

心跳线程检查消费者处理的状态,如果在轮询之间超过了max.poll.interval.ms,那么它会发送一个 LeaveGroup 请求而不是心跳。Group Coordinator 将消费者从消费者组中移除,从而触发rebalance。

图 9:消费者超过轮询间隔

当触发rebalance时,现有消费者将收到对其下一个“rebalance”心跳的响应。每个消费者在max.poll.interval.ms超时之前通过调用 poll() 重新加入组,因为这会触发对组协调器的 JoinGroup 请求。请注意,对于 Kafka Connect,为此提供了单独的超时,即rebalance.timeout.ms。

因此,配置max.poll.interval.ms需要仔细考虑。将其设置得太低,风险在于单个轮询中消耗的一批消息未及时处理,导致rebalance和重复消息传递。将间隔设置得太高,这意味着当消费者确实失败时,代理需要更长的时间才能意识到并重新分配消费者的分区。在此处理期间,分配给失败消费者的主题分区上的消息被卡住。

◆消费者健康

因此,有两个超时需要考虑,这与消费者何时被认为是健康的或失败并被逐出消费者组有关。如果主处理线程死亡,而心跳线程仍在运行,则通过超出max.poll.interval.ms来检测故障。如果整个应用程序死了,那么这将通过session.timeout.ms内没有收到心跳来检测。

max.poll.interval.ms本质上是消费者处理的主要健康检查。但是,通过在单独的线程上使用心跳检查,这意味着可以更快地检测到整个应用程序发生故障。

以上是关于Kafka消费组rebalance原理的主要内容,如果未能解决你的问题,请参考以下文章

Kafka消费者组再均衡问题

Kafka 0.11新功能介绍:空消费组延迟rebalance

聊聊 Kafka:如何避免消费组的 Rebalance

聊聊 Kafka:如何避免消费组的 Rebalance

聊聊 Kafka:如何避免消费组的 Rebalance

聊聊 Kafka:如何避免消费组的 Rebalance