kafka消费相同消费组问题

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka消费相同消费组问题相关的知识,希望对你有一定的参考价值。

参考技术A failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records

1、有消费者宕机下线。消费者并不一定需要真正下线,例如遇到长时间的GC、网络延迟导致消费者长时间未向 GroupCoordinator 发送心跳等情况时,GroupCoordinator 会认为消费者已经下线。
修改参数

2、 kafkaConsumer.assign() 点对点消费方式 和 subscribe()订阅消费方式 ,使用了相同的消费组,也就是他们group id 相同时,group coordinator无法识别具有相同消费组group id的consumer,直接抛异常 CommitFailedException
如果是这种情况,提示的解决方法 max.poll.interval.ms,max.poll.record 都没用,无法解决,只能修改消费组id

Kafka消费者组再均衡问题

在Kafka中,当有新消费者加入或者订阅的topic数发生变化时,会触发Rebalance(再均衡:在同一个消费者组当中,分区的所有权从一个消费者转移到另外一个消费者)机制,Rebalance顾名思义就是重新均衡消费者消费。Rebalance的过程如下:

第一步:所有成员都向coordinator发送请求,请求入组。一旦所有成员都发送了请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader。

第二步:leader开始分配消费方案,指明具体哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案发给coordinator。coordinator接收到分配方案之后会把方案发给各个consumer,这样组内的所有成员就都知道自己应该消费哪些分区了。

所以对于Rebalance来说,Coordinator起着至关重要的作用,那么怎么查看消费者对应的Coordinator呢,我们知道某个消费者组对应__consumer_offsets中的哪个Partation是通过hash计算出来的:partation=hash("test_group_1")%50=28,表示test_group_1这个消费者组属于28号partation,通过命令:

 

./kafka-topics.sh --zookeeper 192.168.33.11:2181 --describe --topic __consumer_offsets

可以找到28号Partation所对应的信息:

技术分享图片

从而可以知道coordinator对应的broker为1

 

在Rebalance期间,消费者会出现无法读取消息,造成整个消费者群组一段时间内不可用,假设现在消费者组当中有A,代码逻辑执行10s,如果消费者组在消费的过程中consumer B加入到了该消费者组,并且B的代码逻辑执行20s,那么当A处理完后先进入Rebalance状态等待,只有当B也处理完后,A和B才真正通过Rebalance重新分配,这样显然A在等待的过程中浪费了资源。

消费者A:

 1 """
 2 consumer_rebalance_a.py a消费者
 3 """
 4 import pickle
 5 import uuid
 6 import time
 7 from kafka import KafkaConsumer
 8 from kafka.structs import TopicPartition, OffsetAndMetadata
 9 from kafka import ConsumerRebalanceListener
10 
11 consumer = KafkaConsumer(
12     bootstrap_servers=[192.168.33.11:9092],
13     group_id="test_group_1",
14     client_id="{}".format(str(uuid.uuid4())),
15     enable_auto_commit=False,
16     key_deserializer=lambda k: pickle.loads(k),
17     value_deserializer=lambda v: pickle.loads(v)
18 )
19 
20 # 用来记录最新的偏移量信息.
21 consumer_offsets = {}
22 
23 
24 class MineConsumerRebalanceListener(ConsumerRebalanceListener):
25     def on_partitions_revoked(self, revoked):
26         """
27         再均衡开始之前 下一轮poll之前触发
28         :param revoked:
29         :return:
30         """
31         print(再均衡开始之前被自动触发.)
32         print(revoked, type(revoked))
33         consumer.commit_async(offsets=consumer_offsets)
34 
35     def on_partitions_assigned(self, assigned):
36         """
37         再均衡完成之后  即将下一轮poll之前 触发
38         :param assigned:
39         :return:
40         """
41         print(在均衡完成之后自动触发.)
42         print(assigned, type(assigned))
43 
44 
45 consumer.subscribe(topics=(round_topic,), listener=MineConsumerRebalanceListener())
46 
47 
48 def _on_send_response(*args, **kwargs):
49     """
50     提交偏移量涉及回调函数
51     :param args: 
52     :param kwargs:
53     :return:
54     """
55     if isinstance(args[1], Exception):
56         print(偏移量提交异常. {}.format(args[1]))
57     else:
58         print(偏移量提交成功)
59 
60 
61 try:
62     start_time = time.time()
63     while True:
64         # 再均衡其实是在poll之前完成的
65         consumer_records_dict = consumer.poll(timeout_ms=100)
66 
67         # 处理逻辑.
68         for k, record_list in consumer_records_dict.items():
69             for record in record_list:
70                 print("topic = {},partition = {},offset = {},key = {},value = {}".format(
71                     record.topic, record.partition, record.offset, record.key, record.value)
72                 )
73 
74                 consumer_offsets[
75                     TopicPartition(record.topic, record.partition)
76                 ] = OffsetAndMetadata(
77                     record.offset + 1, metadata=偏移量.
78                 )
79 
80         try:
81             consumer.commit_async(callback=_on_send_response)
82             time.sleep(10)
83         except Exception as e:
84             print(commit failed, str(e))
85 
86 except Exception as e:
87     print(str(e))
88 finally:
89     try:
90         # 同步提交偏移量,在消费者异常退出的时候再次提交偏移量,确保偏移量的提交.
91         consumer.commit()
92         print("同步补救提交成功")
93     except Exception as e:
94         consumer.close()

 消费者B:

  1 """
  2 consumer b.py 消费者B
  3 """
  4 
  5 import pickle
  6 import uuid
  7 import time
  8 from kafka import KafkaConsumer
  9 from kafka.structs import TopicPartition, OffsetAndMetadata
 10 from kafka import ConsumerRebalanceListener
 11 
 12 consumer = KafkaConsumer(
 13     bootstrap_servers=[192.168.33.11:9092],
 14     group_id="test_group_1",
 15     client_id="{}".format(str(uuid.uuid4())),
 16     enable_auto_commit=False,  # 设置为手动提交偏移量.
 17     key_deserializer=lambda k: pickle.loads(k),
 18     value_deserializer=lambda v: pickle.loads(v)
 19 )
 20 
 21 consumer_offsets = {}  # 用来记录最新的偏移量信息.
 22 
 23 
 24 class MineConsumerRebalanceListener(ConsumerRebalanceListener):
 25     def on_partitions_revoked(self, revoked):
 26         """
 27         再均衡开始之前 下一轮poll之前触发
 28         :param revoked:
 29         :return:
 30         """
 31         print(再均衡开始之前被自动触发.)
 32         print(revoked, type(revoked))
 33         consumer.commit_async(offsets=consumer_offsets)
 34 
 35     def on_partitions_assigned(self, assigned):
 36         """
 37         再均衡完成之后  即将下一轮poll之前 触发
 38         :param assigned:
 39         :return:
 40         """
 41 
 42         print(在均衡完成之后自动触发.)
 43         print(assigned, type(assigned))
 44 
 45 
 46 consumer.subscribe(topics=(round_topic,), listener=MineConsumerRebalanceListener())
 47 
 48 
 49 def _on_send_response(*args, **kwargs):
 50     """
 51     提交偏移量涉及回调函数
 52     :param args: 
 53     :param kwargs:
 54     :return:
 55     """
 56 
 57     if isinstance(args[1], Exception):
 58         print(偏移量提交异常. {}.format(args[1]))
 59     else:
 60         print(偏移量提交成功)
 61 
 62 
 63 try:
 64     start_time = time.time()
 65     while True:
 66         # 再均衡其实是在poll之前完成的
 67         consumer_records_dict = consumer.poll(timeout_ms=100)
 68 
 69         record_num = 0
 70         for key, record_list in consumer_records_dict.items():
 71             for record in record_list:
 72                 record_num += 1
 73         print("---->当前批次获取到的消息个数是:{}".format(record_num))
 74 
 75         # 处理逻辑.
 76         for k, record_list in consumer_records_dict.items():
 77             for record in record_list:
 78                 print("topic = {},partition = {},offset = {},key = {},value = {}".format(
 79                     record.topic, record.partition, record.offset, record.key, record.value)
 80                 )
 81 
 82                 consumer_offsets[
 83                     TopicPartition(record.topic, record.partition)
 84                 ] = OffsetAndMetadata(record.offset + 1, metadata=偏移量.)
 85 
 86         try:
 87             # 轮询一个batch 手动提交一次
 88             consumer.commit_async(callback=_on_send_response)
 89             time.sleep(20)
 90         except Exception as e:
 91             print(commit failed, str(e))
 92 
 93 except Exception as e:
 94     print(str(e))
 95 finally:
 96     try:
 97         # 同步提交偏移量,在消费者异常退出的时候再次提交偏移量,确保偏移量的提交.
 98         consumer.commit()
 99         print("同步补救提交成功")
100     except Exception as e:
101         consumer.close()

消费者A和消费者B是同一个消费者组(test_group_1)的两个消费者,用time.sleep的方式模拟执行时间,A:10s,B:20s;首先A开始消费,当B新加入消费者组的时候会触发Rebalance,可以通过实现再均衡监听器(RebalanceListener)中的on_partitions_revoked和on_partitions_assigned方法来查看再均衡触发前后的partition变化情况,依次启动消费者A和B之后:

消费者A:
再均衡开始之前被自动触发.
{TopicPartition(topic=round_topic, partition=0), TopicPartition(topic=round_topic, partition=1), TopicPartition(topic=round_topic, partition=2)} <class set>
<----------------------------------------
---------------------------------------->
在均衡完成之后自动触发.
{TopicPartition(topic=round_topic, partition=0), TopicPartition(topic=round_topic, partition=1)} <class set>
<----------------------------------------


消费者B:
再均衡开始之前被自动触发.
set() <class set>
<----------------------------------------
---------------------------------------->
在均衡完成之后自动触发.
{TopicPartition(topic=round_topic, partition=2)} <class set>
<----------------------------------------

在等待B的逻辑执行完后,A和B进入再均衡状态;再均衡前A处于partition 0、1、 2三个分区,B不占有任何partition;当再均衡结束后,A占有partition 0、1,B占有partition 2;然后A和B分别开始消费对应的partition。

在上述消费者A和B的代码中重写了RebalanceListener,主要是为了在发生再均衡之前提交最后一个已经处理记录的偏移量,因为再均衡时消费者将失去对一个分区的所有权,如果消费者已经消费了当前partition还没提交offset,这时候发生再均衡会使得消费者重新分配partition,可能使得同一个消息先后被两个消费者消费的情况,实现MineConsumerRebalanceListener再均衡前提交一次offset,确保每一个消费者在触发再均衡前提交最后一次offset:

 1 class MineConsumerRebalanceListener(ConsumerRebalanceListener):
 2     def on_partitions_revoked(self, revoked):
 3         """
 4         再均衡开始之前 下一轮poll之前触发
 5         :param revoked:
 6         :return:
 7         """
 8         print(再均衡开始之前被自动触发.)
 9         print(revoked, type(revoked))
10         consumer.commit_async(offsets=consumer_offsets)
11 
12     def on_partitions_assigned(self, assigned):
13         """
14         再均衡完成之后  即将下一轮poll之前 触发
15         :param assigned:
16         :return:
17         """
18 
19         print(在均衡完成之后自动触发.)
20         print(assigned, type(assigned))

 

再均衡发生的场景有以下几种:

1. 组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了)
2. 订阅主题数发生变更,如果你使用了正则表达式的方式进行订阅,那么新建匹配正则表达式的topic就会触发rebalance
3. 订阅主题的分区数发生变更
鉴于触发再均衡后会造成资源浪费的问题,所以我们尽量不要触发再均衡

 






以上是关于kafka消费相同消费组问题的主要内容,如果未能解决你的问题,请参考以下文章

Kafka快速入门(Kafka消费者)

Kafka 消费者默认组 ID

kafka 如何决定单个消费者组中哪个消费者读取消息?

flink kafka消费者组ID不起作用

Kafka Consumer(消费者组)

Spark Streaming消费Kafka出现消费组rebalancing