Kafka3.x核心速查手册二客户端使用篇-2分组消费机制

Posted roykingw

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka3.x核心速查手册二客户端使用篇-2分组消费机制相关的知识,希望对你有一定的参考价值。

渔与鱼: Kafka的HighLevel API的重要目的就是想要简化客户端的使用方式,所以对于API的使用,尽量熟练就可以了。对于其他重要的属性,都可以通过源码中的描述去学习,并且可以设计一些场景去进行验证。其重点,是要逐步在脑海之中建立一个Message在Kafka集群中进行流转的基础模型。

​ 其实Kafka的设计精髓,是在网络不稳定,服务也随时会崩溃的这些作死的复杂场景下,如何保证消息的高并发、高吞吐,那才是Kafka最为精妙的地方。但是要理解那些复杂的问题,都是需要建立在这个基础模型基础上的。

1、分组消费机制

​ 在Consumer中,都需要指定一个GROUP_ID_CONFIG属性,这表示当前Consumer所属的消费者组。与之配合的还有一个可选的GROUP_INSTANCE_ID_CONFIG属性。他们的描述是这样的:

 	public static final String GROUP_ID_CONFIG = "group.id";
    public static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";

    public static final String GROUP_INSTANCE_ID_CONFIG = "group.instance.id";
    public static final String GROUP_INSTANCE_ID_DOC = "A unique identifier of the consumer instance provided by the end user. "
                                                       + "Only non-empty strings are permitted. If set, the consumer is treated as a static member, "
                                                       + "which means that only one instance with this ID is allowed in the consumer group at any time. "
                                                       + "This can be used in combination with a larger session timeout to avoid group rebalances caused by transient unavailability "
                                                       + "(e.g. process restarts). If not set, the consumer will join the group as a dynamic member, which is the traditional behavior.";

​ 从这段描述中看到,对于Consumer,如果需要在subcribe时使用组管理功能以及Kafka提供的offset管理策略,那就必须要配置GROUP_ID_CONFIG属性。这个分组消费机制简单描述就是这样的:

​ 生产者往Topic下发消息时,会尽量均匀的将消息发送到Topic下的各个Partition当中。而这个消息,会向所有订阅了该Topic的消费者推送。推送时,每个ConsumerGroup中只会推送一份。也就是同一个消费者组中的多个消费者实例,只会共同消费一个消息副本。而不同消费者组之间,会重复消费消息副本。这就是消费者组的作用。

​ 与之相关的还有Offset偏移量。这个偏移量表示每个消费者组在每个Partiton中已经消费处理的进度。在Kafka中,可以看到消费者组的Offset记录情况。

[oper@worker1 bin]$ ./kafka-consumer-groups.sh --bootstrap-server worker1:9092 --describe --group test

​ 这个Offset偏移量,需要消费者处理完成后主动向Kafka的Broker提交。提交完成后,Broker就会更新消费进度,表示这个消息已经被这个消费者组处理完了。但是如果消费者没有提交Offset,Broker就会认为这个消息还没有被处理过,就会重新往对应的消费者组进行推送,不过这次,一般会尽量推送给同一个消费者组当中的其他消费者实例。

​ 在示例当中,是通过业务端主动调用Consumer的commitAsync方法或者commitSync方法主动提交的,Kafka中自然也提供了自动提交Offset的方式。使用自动提交,只需要在Comsumer中配置ENABLE_AUTO_COMMIT_CONFIG属性即可。

	public static final String ENABLE_AUTO_COMMIT_CONFIG = "enable.auto.commit";
    private static final String ENABLE_AUTO_COMMIT_DOC = "If true the consumer's offset will be periodically committed in the background.";

渔与鱼: 从这里可以看到,Offet是Kafka进行消息推送控制的关键之处。但是,这么关键的Offset数据,却只能由"不靠谱"的消费者进行推进,这显然是不够安全的。如果你有兴趣自己观察,会发现在Consumer中,实际上也提供了AUTO_OFFSET_RESET_CONFIG参数,来指定服务端的Offset不靠谱时如何进行后续消费。

​ 但是,这种策略显然还是不够安全的。如果客户端乱来,比如网络抽风,或者Broker端的Offset数据出现错误,比如服务崩溃,那就会带来很多问题。所以,在很多对数据敏感的实际项目中,都会考虑将Offset管理从Broker端移出,另外采用Redis或者其他存储来管理Offset。这些涉及到Kakfa很多底层运行数据的功能,简单的HighLevel API就无法做到了。需要用到一些更底层的LowLevel API。如果你感兴趣,会在后续章节中补充这部分内容。

以上是关于Kafka3.x核心速查手册二客户端使用篇-2分组消费机制的主要内容,如果未能解决你的问题,请参考以下文章

Kafka3.x核心速查手册二客户端使用篇-5发送应答机制

Kafka3.x核心速查手册二客户端使用篇-5发送应答机制

Kafka3.x核心速查手册二客户端使用篇-5发送应答机制

Kafka3.x核心速查手册二客户端使用篇-3消息序列化机制

Kafka3.x核心速查手册二客户端使用篇-3消息序列化机制

Kafka3.x核心速查手册二客户端使用篇-4消息路由机制