kafka leader选举机制原理

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka leader选举机制原理相关的知识,希望对你有一定的参考价值。

参考技术A

关注《大数据技术进阶》微信公众号,及时查看更多大数据技术干活!
kafka在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式(比Zookeeper Queue的方式更高效)通知需为此作出响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。

1.Controller在Zookeeper注册Watch,一旦有Broker宕机(这是用宕机代表任何让系统认为其die的情景,包括但不限于机器断电,网络不可用,GC导致的Stop The World,进程crash等),其在Zookeeper对应的znode会自动被删除,Zookeeper会fire Controller注册的watch,Controller读取最新的幸存的Broker

2.Controller决定set_p,该集合包含了宕机的所有Broker上的所有Partition

3.对set_p中的每一个Partition

3.1 从/brokers/topics/[topic]/partitions/[partition]/state读取该Partition当前的ISR

3.2 决定该Partition的新Leader。如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader,新的ISR则包含当前ISR中所有幸存的Replica (选举算法的实现类似于微软的PacificA) 。否则选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1。

3.3 将新的Leader,ISR和新的leader_epoch及controller_epoch写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有其version在3.1至3.3的过程中无变化时才会执行,否则跳转到3.1

4. 直接通过RPC向set_p相关的Broker发送LeaderAndISRRequest命令。Controller可以在一个RPC操作中发送多个命令从而提高效率。

1.若请求中controllerEpoch小于当前最新的controllerEpoch,则直接返回ErrorMapping.StaleControllerEpochCode。2.对于请求中partitionStateInfos中的每一个元素,即((topic, partitionId), partitionStateInfo):

2.1 若partitionStateInfo中的leader epoch大于当前ReplicManager中存储的(topic, partitionId)对应的partition的leader epoch,则:

2.1.1 若当前brokerid(或者说replica id)在partitionStateInfo中,则将该partition及partitionStateInfo存入一个名为partitionState的HashMap中

2.1.2否则说明该Broker不在该Partition分配的Replica list中,将该信息记录于log中2.2否则将相应的Error code(ErrorMapping.StaleLeaderEpochCode)存入Response中

3.筛选出partitionState中Leader与当前Broker ID相等的所有记录存入partitionsTobeLeader中,其它记录存入partitionsToBeFollower中。

4.若partitionsTobeLeader不为空,则对其执行makeLeaders方。

5.若partitionsToBeFollower不为空,则对其执行makeFollowers方法

6.若highwatermak线程还未启动,则将其启动,并将hwThreadInitialized设为true。

7.关闭所有Idle状态的Fetcher。

LeaderAndIsrRequest处理过程如下图所示

对于收到的LeaderAndIsrRequest,Broker主要通过ReplicaManager的becomeLeaderOrFollower处理,流程如下:

上文提到,在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某个Partition的所有Replica都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案:

1.等待ISR中的任一个Replica“活”过来,并且选它作为Leader

2.选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader

这就需要在可用性和一致性当中作出一个简单的折衷。如果一定要等待ISR中的Replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中的所有Replica都无法“活”过来了,或者数据都丢失了,这个Partition将永远不可用。选择第一个“活”过来的Replica作为Leader,而这个Replica不是ISR中的Replica,那即使它并不保证已经包含了所有已commit的消息,它也会成为Leader而作为consumer的数据源(前文有说明,所有读写都由Leader完成)。Kafka0.8.*使用了第二种方式。根据Kafka的文档,在以后的版本中,Kafka支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。 unclean.leader.election.enable 参数决定使用哪种方案,默认是true,采用第二种方案

参考:

KafkaLeader选举(broker /分区)

broker的leader:

Kafka集群Leader选举原理

我们知道Zookeeper集群中也有选举机制,是通过Paxos算法,通过不同节点向其他节点发送信息来投票选举出leader,但是Kafka的leader的选举就没有这么复杂了。 
Kafka的Leader选举是通过在zookeeper上创建/controller临时节点来实现leader选举,并在该节点中写入当前broker的信息 
{“version”:1,”brokerid”:1,”timestamp”:”1512018424988”} 
利用Zookeeper的强一致性特性,一个节点只能被一个客户端创建成功,创建成功的broker即为leader,即先到先得原则,leader也就是集群中的controller,负责集群中所有大小事务。 
当leader和zookeeper失去连接时,临时节点会删除,而其他broker会监听该节点的变化,当节点删除时,其他broker会收到事件通知,重新发起leader选举。

分区的leader:

Kafka的Leader是什么

  首先Kafka会将接收到的消息分区(partition),每个主题(topic)的消息有不同的分区。这样一方面消息的存储就不会受到单一服务器存储空间大小的限制,另一方面消息的处理也可以在多个服务器上并行。
  其次为了保证高可用,每个分区都会有一定数量的副本(replica)。这样如果有部分服务器不可用,副本所在的服务器就会接替上来,保证应用的持续性。

 

 

 

但是,为了保证较高的处理效率,消息的读写都是在固定的一个副本上完成。这个副本就是所谓的Leader,而其他副本则是Follower。而Follower则会定期地到Leader上同步数据。

Leader选举

  如果某个分区所在的服务器除了问题,不可用,kafka会从该分区的其他的副本中选择一个作为新的Leader。之后所有的读写就会转移到这个新的Leader上。现在的问题是应当选择哪个作为新的Leader。显然,只有那些跟Leader保持同步的Follower才应该被选作新的Leader。
  Kafka会在Zookeeper上针对每个Topic维护一个称为ISR(in-sync replica,已同步的副本)的集合,该集合中是一些分区的副本。只有当这些副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交,并反馈给消息的生产者。如果这个集合有增减,kafka会更新zookeeper上的记录。
  如果某个分区的Leader不可用,Kafka就会从ISR集合中选择一个副本作为新的Leader。
  显然通过ISR,kafka需要的冗余度较低,可以容忍的失败数比较高。假设某个topic有f+1个副本,kafka可以容忍f个服务器不可用。

 

为什么不用少数服从多数的方法

少数服从多数是一种比较常见的一致性算法和Leader选举法。

它的含义是只有超过半数的副本同步了,系统才会认为数据已同步;

选择Leader时也是从超过半数的同步的副本中选择。

这种算法需要较高的冗余度。

譬如只允许一台机器失败,需要有三个副本;而如果只容忍两台机器失败,则需要五个副本。

而kafka的ISR集合方法,分别只需要两个和三个副本。

 

如果所有的ISR副本都失败了怎么办?

  此时有两种方法可选,一种是等待ISR集合中的副本复活,一种是选择任何一个立即可用的副本,而这个副本不一定是在ISR集合中。这两种方法各有利弊,实际生产中按需选择。
  如果要等待ISR副本复活,虽然可以保证一致性,但可能需要很长时间。而如果选择立即可用的副本,则很可能该副本并不一致。

 

以上是关于kafka leader选举机制原理的主要内容,如果未能解决你的问题,请参考以下文章

kafka备份机制——zk选举leader,leader在broker里负责备份

leader 选举机制

Kafka中副本机制的设计和原理

Kafka 学习kafka 选举机制

Kafka之副本信息Leader 选举流程故障处理细节分区副本分配手动调整分区副本存储Leader Partition 负载平衡增加副本文件存储机制文件清理策略高效读写数据

8.8.ZooKeeper 原理和选举机制