Kafka offset管理

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka offset管理相关的知识,希望对你有一定的参考价值。

参考技术A

Kafka中的每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序号,用于partition唯一标识一条消息。

Offset记录着下一条将要发送给Consumer的消息的序号。

Offset从语义上来看拥有两种:Current Offset和Committed Offset。

Current Offset保存在Consumer客户端中,它表示Consumer希望收到的下一条消息的序号。它仅仅在poll()方法中使用。例如,Consumer第一次调用poll()方法后收到了20条消息,那么Current Offset就被设置为20。这样Consumer下一次调用poll()方法时,Kafka就知道应该从序号为21的消息开始读取。这样就能够保证每次Consumer poll消息时,都能够收到不重复的消息。

Committed Offset保存在Broker上,它表示Consumer已经确认消费过的消息的序号。主要通过 commitSync 和 commitAsync
API来操作。举个例子,Consumer通过poll() 方法收到20条消息后,此时Current Offset就是20,经过一系列的逻辑处理后,并没有调用 consumer.commitAsync() 或 consumer.commitSync() 来提交Committed Offset,那么此时Committed Offset依旧是0。

Committed Offset主要用于Consumer Rebalance。在Consumer Rebalance的过程中,一个partition被分配给了一个Consumer,那么这个Consumer该从什么位置开始消费消息呢?答案就是Committed Offset。另外,如果一个Consumer消费了5条消息(poll并且成功commitSync)之后宕机了,重新启动之后它仍然能够从第6条消息开始消费,因为Committed Offset已经被Kafka记录为5。

总结一下,Current Offset是针对Consumer的poll过程的,它可以保证每次poll都返回不重复的消息;而Committed Offset是用于Consumer Rebalance过程的,它能够保证新的Consumer能够从正确的位置开始消费一个partition,从而避免重复消费。

在Kafka 0.9前,Committed Offset信息保存在zookeeper的[consumers/group/offsets/topic/partition]目录中(zookeeper其实并不适合进行大批量的读写操作,尤其是写操作)。而在0.9之后,所有的offset信息都保存在了Broker上的一个名为__consumer_offsets的topic中。

Kafka集群中offset的管理都是由Group Coordinator中的Offset Manager完成的。

Group Coordinator是运行在Kafka集群中每一个Broker内的一个进程。它主要负责Consumer Group的管理,Offset位移管理以及 Consumer Rebalance 。

对于每一个Consumer Group,Group Coordinator都会存储以下信息:

Consumer Group如何确定自己的coordinator是谁呢? 简单来说分为两步:

由于一个partition只能固定的交给一个消费者组中的一个消费者消费,因此Kafka保存offset时并不直接为每个消费者保存,而是以groupid-topic-partition -> offset的方式保存。如图所示:

Kafka在保存Offset的时候,实际上是将Consumer Group和partition对应的offset以消息的方式保存在__consumers_offsets这个topic中

__consumers_offsets默认拥有50个partition,可以通过

的方式来查询某个Consumer Group的offset信息保存在__consumers_offsets的哪个partition中。下图展示了__consumers_offsets中保存的offset消息的格式:

如图所示,一条offset消息的格式为groupid-topic-partition -> offset。因此consumer poll消息时,已知groupid和topic,又通过Coordinator分配partition的方式获得了对应的partition,自然能够通过Coordinator查找__consumers_offsets的方式获得最新的offset了。

前面我们已经描述过offset的存储模型,它是按照 groupid-topic-partition -> offset 的方式存储的。然而Kafka只提供了根据offset读取消息的模型,并不支持根据key读取消息的方式。那么Kafka是如何支持Offset的查询呢?

答案就是Offsets Cache!!

如图所示,Consumer提交offset时,Kafka Offset Manager会首先追加一条条新的commit消息到__consumers_offsets topic中,然后更新对应的缓存。读取offset时从缓存中读取,而不是直接读取__consumers_offsets这个topic。

我们已经知道,Kafka使用 groupid-topic-partition -> offset *的消息格式,将Offset信息存储在__consumers_offsets topic中。请看下面一个例子:

如图,对于audit-consumer这个Consumer Group来说,上面的存储了两条具有相同key的记录: PageViewEvent-0 -> 240 和 PageViewEvent-0 -> 323 。事实上,这就是一种无用的冗余。因为对于一个partition来说,我们实际上只需要它当前最新的Offsets。因此这条旧的 PageViewEvent-0 -> 240 记录事实上是无用的。

为了消除这样的过期数据,Kafka为__consumers_offsets topic设置了Log Compaction功能。Log Compaction意味着对于有相同key的的不同value值,只保留最后一个版本。如果应用只关心key对应的最新value值,可以开启Kafka的Log Compaction功能,Kafka会定期将相同key的消息进行合并,只保留最新的value值。

这张图片生动的阐述了Log Compaction的过程:

下图阐释了__consumers_offsets topic中的数据在Log Compaction下的变化:

auto.offset.reset 表示如果Kafka中没有存储对应的offset信息的话(有可能offset信息被删除),消费者从何处开始消费消息。它拥有三个可选值:

看一下下面两个场景:

Kafka相关的offset管理/重平衡/高可用

参考技术A kafka 组成 :consumer/producter/broker

kafka和zk关系:zk存放kafka元信息,比如 topic - partition - replication- brokerIp,数据结构如树状,一层层的结构,叶子节点是brokerIp

思考:老版本的kafka,消费的偏移量也是放在zk的,新版本已经放在kafka的broker中,为什么要迁移?kafka如何记录消息偏移?消息偏移的信息存放在哪儿?

1、目前kafka支持自动提交偏移,比如设置配置 auto.commit.enable、auto.commit.interval.ms;以及手动设置偏移

2、消息偏移信息存放在kafka内部的一个主题中 __consumer_offsets ,默认的分区数和副本配置是 offset.topic.num.partition、offset.topic.replication.factor,默认值分别是50和3;

3、__consumer_offsets记录三类消息:

    1、group创建,记录元信息

    2、消费者消费偏移

        消息的结构是key-value 格式,key 值是:<consumer group id + topic + partition number> ,value 是 partition的偏移,以及一些其他后续处理的信息,如果时间戳

    3、墓碑消息,当group下的所有的consumer下线 并且 group的消费偏移都已经删除,会发送墓碑消息,会将group下的所有的消费偏移信息全部删除

4、无论是自动提交偏移还是手动提交偏移都会有大量的记录产生,长时间会占满磁盘,kafka是如何处理的?

    kafka采用Compact机制,多条同名的key,只保留最新的偏移信息;因为key同名代表某个消费分组在某个topic下的partition的偏移,保留一份即可

1、kafka 分区副本分为leader/follower 类似与mysql的主从,但是不同的是kafka leader承接所有的读写请求,follower注意任务是数据冗余和选主,不接受consumer的读写。kafka为什么要这样设计?

> 分区实现负载;如果kafka想实现负载,可以使用多分区机制,比如kafka集群有5台机器,每台机器一个broker,主题W并发高想充分利用5台服务器,可以采用 5个partition,3个replication,producer采用默认的发送机制即round robin,那么5个分区所在的5台服务器的负载是均衡的

> 优势:read my write 读自己的写,在同一个broker中进行读写,不像mysql存在数据延迟情况; 单掉读,不会出现数据读取不一致的情况,比如follower A 和 B 同步的进度不一致,consumer开始从followerA读取,因为宕机重启后从B读取,数据不一致。

2、副本leader宕机,如何在consistent 和 avaliable 做选择?

通过ISR(In Sync Replication)保存的是同步leader的副本组,进入ISR的标准是根据落后leader的时间(这点不像mysql故障转移是根据落后的位置),落后的时间根据 replica.lag.max.ms设置,如果在范围内,则ISA扩容加入新成员,否则缩容,减少成员;在ISR的follwer在leader宕机时可以参与选主,保证可用性,但是如果ISR没有follwer成员,说明所有的副本都落后leader的规定时间差,那么会导致不可用,这是保证CAP的C;

那么如何牺牲Consistent而确保Avaliable呢? 

可以通过参数unclean.leader.election.enable=true,表示落后leader的follwer可以参与选主,保证Avaliable但是缺点是会导致数据丢失,所以要在C/A中权衡。

1、生产者 压缩,但是没有 zero copy,因为要校验,过滤掉不满足要求的消息

2、生产者顺序写磁盘 ; 采用 page cache,保证Linxu安全

3、消费者顺序读,采用 page cache,实现zero copy

4、为什么page cache可以提高吞吐,它是什么,有什么机制?

- 背景:kafka只针对已提交的消息做可靠性保证,【已提交的消息】在broker和producer都有自己的定义

0、topic定义partition的副本数 replication.factor > 1,数据冗余,保证partition高可用

1、producer如何保证数据不丢失:

    producer 开启重试策略,比如: 配置 retries属性 > 0

    producer acks=all, 生产者定义已提交的语意,所有的replication都成功写入才算成功

2、broker 如何避免数据不丢失:

    - broker 开启 unclean.leader.election.enable =false ,禁止数据落后的replication 竞选leader,避免数据丢失

    - brokder 开启 min.insync.replicas >1 ,表示消息同步几份副本才算已提交,注意 producer acks=all定义的已提交泛语意(比如之前3个replication 宕机2个,那么提交1个就算已提交),而min.iinsync.replicas = number(一般配置大于1即可) 定义的是已提交的下限。 在有replication downtime场景时作用凸显。尽量不要把min.insync.replicas的值配置成 replication.factor 副本数,因为一旦有副本downtime,broker将不可用,一般采用 replication.factor = min.insync.replicas +1

3、消费者测如何避免数据不丢失:

    - 先消费再手动commit; 避免先commit后消费

1、at most once,至多一次,如果发送失败,会丢失消息

2、at lease once,至少一次,如果出现网络抖动,发送成功了,但是因为网络抖动的原因一直没有收到响应消息,再次发送,会产生多条消息

3、exactly once

exactly once 实现

    1、生产者配置enable.idempotent,实现幂等姓性生产者,实现消息只发送一次,局限是只针对单个partition / 单次会话(实现原理是,生产者会给消息增加唯一字段,broker根据唯一字段去重)

    2、如何实现多个partition/多次会话的exactly once?

        采用事物性的producter,大概需要三个步骤:

        1、enable.idemponent 开启幂等性producer;

        2、为producer配置transaction.id;

        3、开启事物,实现多paritition消息要么全部成功,要么全部失败。

        需要注意的是:消费者默认的消费隔离级别是: RU,需要配置isolation level 为RC

五、如何避免消息重复带来的影响?

消息幂等性,比如采用幂等性生产者、再比如消费者根据消息唯一id做消费记录,在消费消息之前做判断(消费重复性解决了,但是性能下降了,需要做具体的权衡)

1、什么情况会出现rebalence?

新增/减少consumer

新增/减少topic,比如正则表达式订阅主题,新增的主题满足正则表达式

新增/减少分区,比如修改topic ,增加/减少分区

2、rebalence的有什么优势和劣势?

优势:消费者消费分区负载均衡

劣势:rebalence时所有的消费者会被占停,如果集群的consumer group下的consumer数过多,rebalence时间会很长;现在的rebalence与partition会重新打乱,会重现发起connnect,这个缺点后续的kafka肯定会优化掉

3、如何避免rebalance

避免coordinate对consumer对下线误判, 合理对设置 session.timeout.ms (如果consumer在指定的会话时间没有发送心跳包,coordinator会认为consumer下线,引起rebalance); 解决思路: 设置 session.timeout.ms 是 heartheat.interval.ms 的倍数,一般数 10:1

coodinator认为消费者消费能力不足引起的 rebalance,例如:consumer在 max.poll.interval.ms 取出N条数据,再下一个间隔时间仍然没有处理完毕。解决思路,根据consumer实际的情况设置 max.poll.interval.ms 参数

4、细聊rebalance的过程

背景:kafka的consumer group在coodinator有四种状态: empty、rebalance、rebalanceComplete、stable;

当consumer group 下没有consuner(比如consumer全部下线)时处于empty状态,如果empty状态超过一定当阀值,consumer group之前当位移信息会变成expire offset,coodinator会定期清理expire offset,所以当group当consumer全部下线超过一定的时间,消费偏移丢失,会出现重头开始消费情况!!!!当触发rebalance当条件时,group 会处于rebalance状态,rebalance完成进入rebalanceComplete状态

rebalance过程:触发rebalance条件时broker会像组内所有当成员响应 rebalance 心跳信息,接着所有成员会进入下面两个环节 JoinGroup 、GroupSync

JoinGroup:组的所有成员会向coodinator上报自己订阅的主题信息,并将第一个上报的consumer当作consumer leader(后面会由它来做分配),coordinator收集所有的成员要订阅的主题后,会把消息通知给consumer leader,做完分配后,发送给coodinatoer

GroupSync: 组成员将自己要订阅的信息同步至coodinator后进入GroupSync阶段,consumer会定时向coordinator发送GroupSync心跳信息包,coodinator将consumer leader分配的信息同步至组内成员,JoinGroup结束

总结:从上面可以看到,如果consumer数较少时,rebalance还是很快的,Stop The Wrold时间很短,但是当consumer数很多时,consumer不工作时间长,rebalance弊端就凸显出来了。

思考:针对某个topic,给consumer group 配置多少consumer合适?

1、假如topic 有20个partition,在consumer group下有30个consumer的时候,有效的consumer会是多少?

2、假如某个topic,partition的并发很高,积压很多消息,消费者的能力小于生产者,该如何处理?

以上是关于Kafka offset管理的主要内容,如果未能解决你的问题,请参考以下文章

Kafka相关的offset管理/重平衡/高可用

Spark Streaming Kafka 偏移量 Offset 管理

kafka系列之(3)——Coordinator与offset管理和Consumer Rebalance

Spark Kafka 基于Direct自己管理offset

经典篇 | Spark Streaming 中管理 Kafka Offsets 的几种方式

深入了解Kafka消费者的Offset管理