绍圣--kafka之消费者(八)
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了绍圣--kafka之消费者(八)相关的知识,希望对你有一定的参考价值。
参考技术A 前面重点说了消费组的再平衡,等待同步,稳定状态。还有一种状态是离开状态。消费者离开消费组的情况:消费者应用程序关闭或者消费者不订阅主题了。此时协调者就不需要再在消费组中管理此消费者了。
离开消费组的方法:1,消费者取消心跳任务。2,发送离开组请求。
协调者在处理离开组请求时,首先移除心跳检测,然后将消费者从消费组元数据中移除。因为要操作消费组元数据,所以需要对消费组元数据进行加锁。
消费组状态是准备再平衡
准备再平衡时,一定有延迟操作对象存在,并且还不能完成。这时如果有消费者选择离开,有可能会使延迟操作对象完成,所以每次处理离开请求都会去尝试完成延迟操作对象。
消费组状态是等待同步
等待同步说明延迟操作已经完成,消费者已经存在于消费组中,并且已经收到了加入组响应。这时主消费者正在分配分区,这时如果有消费者要离开,那么原本分配给这个离开的消费者的分区就没有意义了。所以在等待同步状态下处理离开请求,要改变消费组的状态为准备再平衡。让其他消费者重新发送加入组请求。
消费组状态是稳定
稳定状态下的操作和等待同步一样。同样原本分配给这个离开的消费者的分区必须分给消费组中其他的消费者。所以要改变消费组的状态为准备再平衡。让其他消费者重新发送加入组请求。
在协调者把消费组的状态变为准备再平衡的时候,会创建一个延迟操作对象。这个延迟操作对象会等待消费组中所有的消费者都重新发送加入组请求后才能完成。这里就存在一个问题,如果某一个消费者一直不重新发送加入组请求,那么导致延迟操作对象一直都不会完成,协调者就一直不会发送加入组响应给消费者。所以必须设置一个超时时间,让过了超时时间后,延迟操作对象还不能完成,就强制完成。
延迟的加入组操作对象,会选择消费组中所有消费者会话超时时间的最大值,作为延迟操作的超时时间。在过了超时时间后延迟操作对象会被强制完成。在完成延迟操作时,协调者会找出那些没有在规定时间内重新发送加入组请求的消费者,将它们从消费组中移除。因为在完成延迟加入组操作对象时,会发送加入组响应给消费组中所有的消费者,所以要在事先移除掉超时未发送请求的消费者。
协调者返回加入组响应给消费者后,都会立即完成本次的延迟心跳,并创建下一次延迟心跳(针对消费组中所有消费者都会完成延迟心跳)。延迟心跳是用来对各个消费者的监控,检查消费者是否存活,它的超时时间是消费者的会话超时时间。延迟的心跳操作对象什么时候完成?外部依赖条件是协调者和消费者之间网络通信,不管是协调者处理消费者的各种请求,还是协调者发送给消费者的响应,都会去完成延迟心跳,并创建下一次的延迟心跳。消费组的一次再平衡操作过程中,协调者只会创建一个延迟的加入操作对象,并且会为每一个消费者都保存一个延迟心跳对象。延迟心跳的创建是在协调者发送加入组响应给消费者后,就会为每个消费者创建一个延迟心跳。消费者收到加入组响应后,应该在会话时间内及时发送同步组请求给协调者,因为这个时候在协调者侧已经创建了延迟心跳用来监控消费者,如果没有及时发送,那么协调者就会认为消费者故障,从而让消费者离开消费组(按照离开消费组请求的逻辑处理)。
在处理同步组请求时,有多个地方的调用可以去本次完成延迟心跳和创建下一次的延迟心跳。
1,状态为等待同步,设置消费者元数据的回调方法后调用。针对一个非主消费者。
2,状态为稳定,在发送同步组响应给消费者后调用。针对一个消费者(发送同步请求不及时,但是未超时的非主消费者)。
3,状态为等待同步,收到主消费者的同步组请求,给每个消费者发送同步组响应后调用。针对消费组里面的所有已经发送了同步组请求的消费者。
延迟心跳的尝试完成方法的判断条件是:消费者是否存活。判断消费者是否存活有三种条件,只要满足其中一种,就认为消费者是存活的。
1,消费者元数据中的awaitingJoinCallback回调方法不为空。
2,消费者元数据中的awaitingSyncCallback回调方法不为空。
3,消费者最近的心跳时间加上会话超时时间大于下一次心跳截止时间。
延迟心跳的截止时间是在创建延迟心跳时指定的,延迟心跳的创建是在完成上一次的延迟心跳操作之后创建下一次的延迟心跳。在完成了上一次的延迟心跳后,协调者会计算出下一次延迟心跳的截止时间,并创建新的延迟心跳,延迟心跳创建后,和延迟的加入一样,都会马上尝试去完成这个延迟心跳,但是如果是刚刚创建的延迟心跳就尝试去完成是不会完成的。因为刚刚创建的延迟心跳的截止时间等于最新的时间加上会话超时时间。所以不会完成。
有三个地方会去完成延迟心跳并创建下一次的延迟心跳:
1,协调者返回加入组响应给每个消费者后。
2,协调者处理消费者的同步组请求设置回调方法时。
3,协调者返回同步组响应给每个消费者后。
每次创建新的延迟心跳都会计算最新的截止时间,如果没有在下一次心跳截止时间之前完成延迟心跳并创建下一次的延迟心跳,那么延迟心跳就会超时,对应的消费者就 可能 被协调者从消费组中移除(协调者创建的每一个延迟心跳都和消费者一一对应,只要消费者存活,都对应延迟缓存中的一个延迟心跳)。
为什么说过了超时时间可能被协调者清除喃?因为还有其他的两个条件awaitingJoinCallback和awaitingSyncCallback,只要满足这两个条件其中的一个,就算是超时了也会认为消费者存活。为什么需要这样设计喃?设想一下这个场景,在等待同步状态下,有三个消费者(C1,C2,C3;C3是主消费者)。C1的新延迟心跳的截止时间为10秒,C2的新延迟心跳的截止时间为20秒,C3的新延迟心跳的截止时间为60秒。C1和C2都发送同步组请求设置回调方法:awaitingSyncCallback,完成旧的延迟心跳并创建了新的延迟心跳:C1的新延迟心跳的截止时间为20秒,C2的新延迟心跳的截止时间为40秒,这个时候只会完成C1和C2的延迟心跳,C3的旧延迟心跳还存在截止时间为60秒,C3由于自身原因在分区分配时花费了比较久的时候,在45秒的时候才发送同步组请求,在这个时间点上按理说C1和C2早就应该超时被移除消费组了,如果被移除就是纯粹的误杀。C1和C2其实这个时候正在等到协调者的同步组响应。所以如果awaitingSyncCallback不为空的话,就算是超时了,要认为消费者存活,上面的场景在收到C3主消费者的同步组请求后,返回同步组响应给所有的消费者,这是完成C1,C2,C3的延迟心跳并计算出下一次延迟心跳的截止时间创建新的延迟心跳。
协调者在处理消费者的加入组请求时,会设置awaitingJoinCallback回调方法。但是设置之后不会去调用完成延迟心跳和创建下一次的延迟心跳。这里假如:C1和C2已经在消费组里面了,必定也会有与之对应的延迟心跳,下一次心跳的截止时间时间为:C1:10秒,C2:20秒。新的消费者C3发送了加入组请求,那么C1和C2必须在心跳的截止时间内重新发送加入组请求,C1马上就发送了加入组请求,但是延迟加入操作对象不能完成,所以不会发送加入组响应给客户端就不能去完成延迟心跳,10秒后,C1的延迟心跳就超时了,按理说C1会被协调者移除消费组,但是由于协调者在处理C1的加入组请求是设置了awaitingJoinCallback回调方法,所以C1不会被移除,认为是存活的。C2在15秒的时候发送了加入组请求,延迟加入操作可以完成,返回加入组响应给三个消费者,并更新下次心跳的截止时间为:C1:25秒,C2:35秒,C1:75秒。
当消费组的状态变为稳定后,每个消费者都需要重新发送心跳给协调者。
为什么在处理加入组请求时,不去完成延迟心跳喃?按理说消费者能发送加入组请求,就代表消费者存活?
消费者能发送加入组请求是能代表消费者存活,但是现在协调者的处理再平衡状态的时候,认为消费组是不稳定的,在消费组不稳定的时候去设置心跳没有多大意义(通过前面的分析也知道,消费组在再平衡状态中,多个消费者会发送多次的加入组请求,消费组才会最终稳定)。所以在发送加入组响应给消费者后去完成心跳这个时候消费组中的所有消费者都发送了加入组请求,这样用心跳去管理消费者才有意义。所以在处理加入组请求时,不去完成延迟心跳是个很不错的设计。
参考资料:
Kafka技术内幕:图文详解Kafka源码设计与实现
Kafka二十四Kafka优化之顺序消费的实现
Kafka优化之顺序消费的实现
- ⽣产者:保证消息按顺序发送,且消息不丢失
- 使⽤同步的发送
- ack设置成⾮0的值
- 开启重试
- 消费者:主题只能设置⼀个分区,消费组中只能有⼀个消费者
- kafka的顺序消费使⽤场景不多,因为牺牲掉了性能,但是⽐如rocketmq在这⼀块有专⻔的功能已设计好。
以上是关于绍圣--kafka之消费者(八)的主要内容,如果未能解决你的问题,请参考以下文章