Kafka相关的offset管理/重平衡/高可用

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka相关的offset管理/重平衡/高可用相关的知识,希望对你有一定的参考价值。

参考技术A kafka 组成 :consumer/producter/broker

kafka和zk关系:zk存放kafka元信息,比如 topic - partition - replication- brokerIp,数据结构如树状,一层层的结构,叶子节点是brokerIp

思考:老版本的kafka,消费的偏移量也是放在zk的,新版本已经放在kafka的broker中,为什么要迁移?kafka如何记录消息偏移?消息偏移的信息存放在哪儿?

1、目前kafka支持自动提交偏移,比如设置配置 auto.commit.enable、auto.commit.interval.ms;以及手动设置偏移

2、消息偏移信息存放在kafka内部的一个主题中 __consumer_offsets ,默认的分区数和副本配置是 offset.topic.num.partition、offset.topic.replication.factor,默认值分别是50和3;

3、__consumer_offsets记录三类消息:

    1、group创建,记录元信息

    2、消费者消费偏移

        消息的结构是key-value 格式,key 值是:<consumer group id + topic + partition number> ,value 是 partition的偏移,以及一些其他后续处理的信息,如果时间戳

    3、墓碑消息,当group下的所有的consumer下线 并且 group的消费偏移都已经删除,会发送墓碑消息,会将group下的所有的消费偏移信息全部删除

4、无论是自动提交偏移还是手动提交偏移都会有大量的记录产生,长时间会占满磁盘,kafka是如何处理的?

    kafka采用Compact机制,多条同名的key,只保留最新的偏移信息;因为key同名代表某个消费分组在某个topic下的partition的偏移,保留一份即可

1、kafka 分区副本分为leader/follower 类似与mysql的主从,但是不同的是kafka leader承接所有的读写请求,follower注意任务是数据冗余和选主,不接受consumer的读写。kafka为什么要这样设计?

> 分区实现负载;如果kafka想实现负载,可以使用多分区机制,比如kafka集群有5台机器,每台机器一个broker,主题W并发高想充分利用5台服务器,可以采用 5个partition,3个replication,producer采用默认的发送机制即round robin,那么5个分区所在的5台服务器的负载是均衡的

> 优势:read my write 读自己的写,在同一个broker中进行读写,不像mysql存在数据延迟情况; 单掉读,不会出现数据读取不一致的情况,比如follower A 和 B 同步的进度不一致,consumer开始从followerA读取,因为宕机重启后从B读取,数据不一致。

2、副本leader宕机,如何在consistent 和 avaliable 做选择?

通过ISR(In Sync Replication)保存的是同步leader的副本组,进入ISR的标准是根据落后leader的时间(这点不像mysql故障转移是根据落后的位置),落后的时间根据 replica.lag.max.ms设置,如果在范围内,则ISA扩容加入新成员,否则缩容,减少成员;在ISR的follwer在leader宕机时可以参与选主,保证可用性,但是如果ISR没有follwer成员,说明所有的副本都落后leader的规定时间差,那么会导致不可用,这是保证CAP的C;

那么如何牺牲Consistent而确保Avaliable呢? 

可以通过参数unclean.leader.election.enable=true,表示落后leader的follwer可以参与选主,保证Avaliable但是缺点是会导致数据丢失,所以要在C/A中权衡。

1、生产者 压缩,但是没有 zero copy,因为要校验,过滤掉不满足要求的消息

2、生产者顺序写磁盘 ; 采用 page cache,保证Linxu安全

3、消费者顺序读,采用 page cache,实现zero copy

4、为什么page cache可以提高吞吐,它是什么,有什么机制?

- 背景:kafka只针对已提交的消息做可靠性保证,【已提交的消息】在broker和producer都有自己的定义

0、topic定义partition的副本数 replication.factor > 1,数据冗余,保证partition高可用

1、producer如何保证数据不丢失:

    producer 开启重试策略,比如: 配置 retries属性 > 0

    producer acks=all, 生产者定义已提交的语意,所有的replication都成功写入才算成功

2、broker 如何避免数据不丢失:

    - broker 开启 unclean.leader.election.enable =false ,禁止数据落后的replication 竞选leader,避免数据丢失

    - brokder 开启 min.insync.replicas >1 ,表示消息同步几份副本才算已提交,注意 producer acks=all定义的已提交泛语意(比如之前3个replication 宕机2个,那么提交1个就算已提交),而min.iinsync.replicas = number(一般配置大于1即可) 定义的是已提交的下限。 在有replication downtime场景时作用凸显。尽量不要把min.insync.replicas的值配置成 replication.factor 副本数,因为一旦有副本downtime,broker将不可用,一般采用 replication.factor = min.insync.replicas +1

3、消费者测如何避免数据不丢失:

    - 先消费再手动commit; 避免先commit后消费

1、at most once,至多一次,如果发送失败,会丢失消息

2、at lease once,至少一次,如果出现网络抖动,发送成功了,但是因为网络抖动的原因一直没有收到响应消息,再次发送,会产生多条消息

3、exactly once

exactly once 实现

    1、生产者配置enable.idempotent,实现幂等姓性生产者,实现消息只发送一次,局限是只针对单个partition / 单次会话(实现原理是,生产者会给消息增加唯一字段,broker根据唯一字段去重)

    2、如何实现多个partition/多次会话的exactly once?

        采用事物性的producter,大概需要三个步骤:

        1、enable.idemponent 开启幂等性producer;

        2、为producer配置transaction.id;

        3、开启事物,实现多paritition消息要么全部成功,要么全部失败。

        需要注意的是:消费者默认的消费隔离级别是: RU,需要配置isolation level 为RC

五、如何避免消息重复带来的影响?

消息幂等性,比如采用幂等性生产者、再比如消费者根据消息唯一id做消费记录,在消费消息之前做判断(消费重复性解决了,但是性能下降了,需要做具体的权衡)

1、什么情况会出现rebalence?

新增/减少consumer

新增/减少topic,比如正则表达式订阅主题,新增的主题满足正则表达式

新增/减少分区,比如修改topic ,增加/减少分区

2、rebalence的有什么优势和劣势?

优势:消费者消费分区负载均衡

劣势:rebalence时所有的消费者会被占停,如果集群的consumer group下的consumer数过多,rebalence时间会很长;现在的rebalence与partition会重新打乱,会重现发起connnect,这个缺点后续的kafka肯定会优化掉

3、如何避免rebalance

避免coordinate对consumer对下线误判, 合理对设置 session.timeout.ms (如果consumer在指定的会话时间没有发送心跳包,coordinator会认为consumer下线,引起rebalance); 解决思路: 设置 session.timeout.ms 是 heartheat.interval.ms 的倍数,一般数 10:1

coodinator认为消费者消费能力不足引起的 rebalance,例如:consumer在 max.poll.interval.ms 取出N条数据,再下一个间隔时间仍然没有处理完毕。解决思路,根据consumer实际的情况设置 max.poll.interval.ms 参数

4、细聊rebalance的过程

背景:kafka的consumer group在coodinator有四种状态: empty、rebalance、rebalanceComplete、stable;

当consumer group 下没有consuner(比如consumer全部下线)时处于empty状态,如果empty状态超过一定当阀值,consumer group之前当位移信息会变成expire offset,coodinator会定期清理expire offset,所以当group当consumer全部下线超过一定的时间,消费偏移丢失,会出现重头开始消费情况!!!!当触发rebalance当条件时,group 会处于rebalance状态,rebalance完成进入rebalanceComplete状态

rebalance过程:触发rebalance条件时broker会像组内所有当成员响应 rebalance 心跳信息,接着所有成员会进入下面两个环节 JoinGroup 、GroupSync

JoinGroup:组的所有成员会向coodinator上报自己订阅的主题信息,并将第一个上报的consumer当作consumer leader(后面会由它来做分配),coordinator收集所有的成员要订阅的主题后,会把消息通知给consumer leader,做完分配后,发送给coodinatoer

GroupSync: 组成员将自己要订阅的信息同步至coodinator后进入GroupSync阶段,consumer会定时向coordinator发送GroupSync心跳信息包,coodinator将consumer leader分配的信息同步至组内成员,JoinGroup结束

总结:从上面可以看到,如果consumer数较少时,rebalance还是很快的,Stop The Wrold时间很短,但是当consumer数很多时,consumer不工作时间长,rebalance弊端就凸显出来了。

思考:针对某个topic,给consumer group 配置多少consumer合适?

1、假如topic 有20个partition,在consumer group下有30个consumer的时候,有效的consumer会是多少?

2、假如某个topic,partition的并发很高,积压很多消息,消费者的能力小于生产者,该如何处理?

以上是关于Kafka相关的offset管理/重平衡/高可用的主要内容,如果未能解决你的问题,请参考以下文章

Kafka重平衡机制

Kafka 学习笔记之 High Level Consumer相关参数

Kafka offset管理

kafka的offset相关知识

rocketMQ -- offset管理

kafka消费者状态检查—消费的offset是不是滞后/堆积