Kafka offset管理
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka offset管理相关的知识,希望对你有一定的参考价值。
参考技术AKafka中的每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序号,用于partition唯一标识一条消息。
Offset记录着下一条将要发送给Consumer的消息的序号。
Offset从语义上来看拥有两种:Current Offset和Committed Offset。
Current Offset保存在Consumer客户端中,它表示Consumer希望收到的下一条消息的序号。它仅仅在poll()方法中使用。例如,Consumer第一次调用poll()方法后收到了20条消息,那么Current Offset就被设置为20。这样Consumer下一次调用poll()方法时,Kafka就知道应该从序号为21的消息开始读取。这样就能够保证每次Consumer poll消息时,都能够收到不重复的消息。
Committed Offset保存在Broker上,它表示Consumer已经确认消费过的消息的序号。主要通过 commitSync 和 commitAsync
API来操作。举个例子,Consumer通过poll() 方法收到20条消息后,此时Current Offset就是20,经过一系列的逻辑处理后,并没有调用 consumer.commitAsync() 或 consumer.commitSync() 来提交Committed Offset,那么此时Committed Offset依旧是0。
Committed Offset主要用于Consumer Rebalance。在Consumer Rebalance的过程中,一个partition被分配给了一个Consumer,那么这个Consumer该从什么位置开始消费消息呢?答案就是Committed Offset。另外,如果一个Consumer消费了5条消息(poll并且成功commitSync)之后宕机了,重新启动之后它仍然能够从第6条消息开始消费,因为Committed Offset已经被Kafka记录为5。
总结一下,Current Offset是针对Consumer的poll过程的,它可以保证每次poll都返回不重复的消息;而Committed Offset是用于Consumer Rebalance过程的,它能够保证新的Consumer能够从正确的位置开始消费一个partition,从而避免重复消费。
在Kafka 0.9前,Committed Offset信息保存在zookeeper的[consumers/group/offsets/topic/partition]目录中(zookeeper其实并不适合进行大批量的读写操作,尤其是写操作)。而在0.9之后,所有的offset信息都保存在了Broker上的一个名为__consumer_offsets的topic中。
Kafka集群中offset的管理都是由Group Coordinator中的Offset Manager完成的。
Group Coordinator是运行在Kafka集群中每一个Broker内的一个进程。它主要负责Consumer Group的管理,Offset位移管理以及 Consumer Rebalance 。
对于每一个Consumer Group,Group Coordinator都会存储以下信息:
Consumer Group如何确定自己的coordinator是谁呢? 简单来说分为两步:
由于一个partition只能固定的交给一个消费者组中的一个消费者消费,因此Kafka保存offset时并不直接为每个消费者保存,而是以groupid-topic-partition -> offset的方式保存。如图所示:
Kafka在保存Offset的时候,实际上是将Consumer Group和partition对应的offset以消息的方式保存在__consumers_offsets这个topic中 。
__consumers_offsets默认拥有50个partition,可以通过
的方式来查询某个Consumer Group的offset信息保存在__consumers_offsets的哪个partition中。下图展示了__consumers_offsets中保存的offset消息的格式:
如图所示,一条offset消息的格式为groupid-topic-partition -> offset。因此consumer poll消息时,已知groupid和topic,又通过Coordinator分配partition的方式获得了对应的partition,自然能够通过Coordinator查找__consumers_offsets的方式获得最新的offset了。
前面我们已经描述过offset的存储模型,它是按照 groupid-topic-partition -> offset 的方式存储的。然而Kafka只提供了根据offset读取消息的模型,并不支持根据key读取消息的方式。那么Kafka是如何支持Offset的查询呢?
答案就是Offsets Cache!!
如图所示,Consumer提交offset时,Kafka Offset Manager会首先追加一条条新的commit消息到__consumers_offsets topic中,然后更新对应的缓存。读取offset时从缓存中读取,而不是直接读取__consumers_offsets这个topic。
我们已经知道,Kafka使用 groupid-topic-partition -> offset *的消息格式,将Offset信息存储在__consumers_offsets topic中。请看下面一个例子:
如图,对于audit-consumer这个Consumer Group来说,上面的存储了两条具有相同key的记录: PageViewEvent-0 -> 240 和 PageViewEvent-0 -> 323 。事实上,这就是一种无用的冗余。因为对于一个partition来说,我们实际上只需要它当前最新的Offsets。因此这条旧的 PageViewEvent-0 -> 240 记录事实上是无用的。
为了消除这样的过期数据,Kafka为__consumers_offsets topic设置了Log Compaction功能。Log Compaction意味着对于有相同key的的不同value值,只保留最后一个版本。如果应用只关心key对应的最新value值,可以开启Kafka的Log Compaction功能,Kafka会定期将相同key的消息进行合并,只保留最新的value值。
这张图片生动的阐述了Log Compaction的过程:
下图阐释了__consumers_offsets topic中的数据在Log Compaction下的变化:
auto.offset.reset 表示如果Kafka中没有存储对应的offset信息的话(有可能offset信息被删除),消费者从何处开始消费消息。它拥有三个可选值:
看一下下面两个场景:
Kafka相关的offset管理/重平衡/高可用
参考技术A kafka 组成 :consumer/producter/brokerkafka和zk关系:zk存放kafka元信息,比如 topic - partition - replication- brokerIp,数据结构如树状,一层层的结构,叶子节点是brokerIp
思考:老版本的kafka,消费的偏移量也是放在zk的,新版本已经放在kafka的broker中,为什么要迁移?kafka如何记录消息偏移?消息偏移的信息存放在哪儿?
1、目前kafka支持自动提交偏移,比如设置配置 auto.commit.enable、auto.commit.interval.ms;以及手动设置偏移
2、消息偏移信息存放在kafka内部的一个主题中 __consumer_offsets ,默认的分区数和副本配置是 offset.topic.num.partition、offset.topic.replication.factor,默认值分别是50和3;
3、__consumer_offsets记录三类消息:
1、group创建,记录元信息
2、消费者消费偏移
消息的结构是key-value 格式,key 值是:<consumer group id + topic + partition number> ,value 是 partition的偏移,以及一些其他后续处理的信息,如果时间戳
3、墓碑消息,当group下的所有的consumer下线 并且 group的消费偏移都已经删除,会发送墓碑消息,会将group下的所有的消费偏移信息全部删除
4、无论是自动提交偏移还是手动提交偏移都会有大量的记录产生,长时间会占满磁盘,kafka是如何处理的?
kafka采用Compact机制,多条同名的key,只保留最新的偏移信息;因为key同名代表某个消费分组在某个topic下的partition的偏移,保留一份即可
1、kafka 分区副本分为leader/follower 类似与mysql的主从,但是不同的是kafka leader承接所有的读写请求,follower注意任务是数据冗余和选主,不接受consumer的读写。kafka为什么要这样设计?
> 分区实现负载;如果kafka想实现负载,可以使用多分区机制,比如kafka集群有5台机器,每台机器一个broker,主题W并发高想充分利用5台服务器,可以采用 5个partition,3个replication,producer采用默认的发送机制即round robin,那么5个分区所在的5台服务器的负载是均衡的
> 优势:read my write 读自己的写,在同一个broker中进行读写,不像mysql存在数据延迟情况; 单掉读,不会出现数据读取不一致的情况,比如follower A 和 B 同步的进度不一致,consumer开始从followerA读取,因为宕机重启后从B读取,数据不一致。
2、副本leader宕机,如何在consistent 和 avaliable 做选择?
通过ISR(In Sync Replication)保存的是同步leader的副本组,进入ISR的标准是根据落后leader的时间(这点不像mysql故障转移是根据落后的位置),落后的时间根据 replica.lag.max.ms设置,如果在范围内,则ISA扩容加入新成员,否则缩容,减少成员;在ISR的follwer在leader宕机时可以参与选主,保证可用性,但是如果ISR没有follwer成员,说明所有的副本都落后leader的规定时间差,那么会导致不可用,这是保证CAP的C;
那么如何牺牲Consistent而确保Avaliable呢?
可以通过参数unclean.leader.election.enable=true,表示落后leader的follwer可以参与选主,保证Avaliable但是缺点是会导致数据丢失,所以要在C/A中权衡。
1、生产者 压缩,但是没有 zero copy,因为要校验,过滤掉不满足要求的消息
2、生产者顺序写磁盘 ; 采用 page cache,保证Linxu安全
3、消费者顺序读,采用 page cache,实现zero copy
4、为什么page cache可以提高吞吐,它是什么,有什么机制?
- 背景:kafka只针对已提交的消息做可靠性保证,【已提交的消息】在broker和producer都有自己的定义
0、topic定义partition的副本数 replication.factor > 1,数据冗余,保证partition高可用
1、producer如何保证数据不丢失:
producer 开启重试策略,比如: 配置 retries属性 > 0
producer acks=all, 生产者定义已提交的语意,所有的replication都成功写入才算成功
2、broker 如何避免数据不丢失:
- broker 开启 unclean.leader.election.enable =false ,禁止数据落后的replication 竞选leader,避免数据丢失
- brokder 开启 min.insync.replicas >1 ,表示消息同步几份副本才算已提交,注意 producer acks=all定义的已提交泛语意(比如之前3个replication 宕机2个,那么提交1个就算已提交),而min.iinsync.replicas = number(一般配置大于1即可) 定义的是已提交的下限。 在有replication downtime场景时作用凸显。尽量不要把min.insync.replicas的值配置成 replication.factor 副本数,因为一旦有副本downtime,broker将不可用,一般采用 replication.factor = min.insync.replicas +1
3、消费者测如何避免数据不丢失:
- 先消费再手动commit; 避免先commit后消费
1、at most once,至多一次,如果发送失败,会丢失消息
2、at lease once,至少一次,如果出现网络抖动,发送成功了,但是因为网络抖动的原因一直没有收到响应消息,再次发送,会产生多条消息
3、exactly once
exactly once 实现
1、生产者配置enable.idempotent,实现幂等姓性生产者,实现消息只发送一次,局限是只针对单个partition / 单次会话(实现原理是,生产者会给消息增加唯一字段,broker根据唯一字段去重)
2、如何实现多个partition/多次会话的exactly once?
采用事物性的producter,大概需要三个步骤:
1、enable.idemponent 开启幂等性producer;
2、为producer配置transaction.id;
3、开启事物,实现多paritition消息要么全部成功,要么全部失败。
需要注意的是:消费者默认的消费隔离级别是: RU,需要配置isolation level 为RC
五、如何避免消息重复带来的影响?
消息幂等性,比如采用幂等性生产者、再比如消费者根据消息唯一id做消费记录,在消费消息之前做判断(消费重复性解决了,但是性能下降了,需要做具体的权衡)
1、什么情况会出现rebalence?
新增/减少consumer
新增/减少topic,比如正则表达式订阅主题,新增的主题满足正则表达式
新增/减少分区,比如修改topic ,增加/减少分区
2、rebalence的有什么优势和劣势?
优势:消费者消费分区负载均衡
劣势:rebalence时所有的消费者会被占停,如果集群的consumer group下的consumer数过多,rebalence时间会很长;现在的rebalence与partition会重新打乱,会重现发起connnect,这个缺点后续的kafka肯定会优化掉
3、如何避免rebalance
避免coordinate对consumer对下线误判, 合理对设置 session.timeout.ms (如果consumer在指定的会话时间没有发送心跳包,coordinator会认为consumer下线,引起rebalance); 解决思路: 设置 session.timeout.ms 是 heartheat.interval.ms 的倍数,一般数 10:1
coodinator认为消费者消费能力不足引起的 rebalance,例如:consumer在 max.poll.interval.ms 取出N条数据,再下一个间隔时间仍然没有处理完毕。解决思路,根据consumer实际的情况设置 max.poll.interval.ms 参数
4、细聊rebalance的过程
背景:kafka的consumer group在coodinator有四种状态: empty、rebalance、rebalanceComplete、stable;
当consumer group 下没有consuner(比如consumer全部下线)时处于empty状态,如果empty状态超过一定当阀值,consumer group之前当位移信息会变成expire offset,coodinator会定期清理expire offset,所以当group当consumer全部下线超过一定的时间,消费偏移丢失,会出现重头开始消费情况!!!!当触发rebalance当条件时,group 会处于rebalance状态,rebalance完成进入rebalanceComplete状态
rebalance过程:触发rebalance条件时broker会像组内所有当成员响应 rebalance 心跳信息,接着所有成员会进入下面两个环节 JoinGroup 、GroupSync
JoinGroup:组的所有成员会向coodinator上报自己订阅的主题信息,并将第一个上报的consumer当作consumer leader(后面会由它来做分配),coordinator收集所有的成员要订阅的主题后,会把消息通知给consumer leader,做完分配后,发送给coodinatoer
GroupSync: 组成员将自己要订阅的信息同步至coodinator后进入GroupSync阶段,consumer会定时向coordinator发送GroupSync心跳信息包,coodinator将consumer leader分配的信息同步至组内成员,JoinGroup结束
总结:从上面可以看到,如果consumer数较少时,rebalance还是很快的,Stop The Wrold时间很短,但是当consumer数很多时,consumer不工作时间长,rebalance弊端就凸显出来了。
思考:针对某个topic,给consumer group 配置多少consumer合适?
1、假如topic 有20个partition,在consumer group下有30个consumer的时候,有效的consumer会是多少?
2、假如某个topic,partition的并发很高,积压很多消息,消费者的能力小于生产者,该如何处理?
以上是关于Kafka offset管理的主要内容,如果未能解决你的问题,请参考以下文章
Spark Streaming Kafka 偏移量 Offset 管理
kafka系列之(3)——Coordinator与offset管理和Consumer Rebalance
Spark Kafka 基于Direct自己管理offset