Zookeeper原理解析

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper原理解析相关的知识,希望对你有一定的参考价值。

参考技术A 微信公众号: Spark大数据

一、Zookeeper介绍

ZooKeeper是一种为分布式应用所设计的高可用、高性能且一致的开源协调服务,它提供了一项基本服务: 分布式锁服务 。

分布式应用可以基于它实现更高级的服务,实现诸如同步服务、配置维护和集群管理或者命名的服务。Zookeeper服务自身组成一个集群,2n+1个(奇数)服务允许n个失效,集群内一半以上机器可用,Zookeeper就可用。

假设 3台机器组成的集群,可以有允许一台失效,如果有2台失效,这个集群就不可用,1<1.5,一般的搭建zookeeper集群时,以奇数台机器来搭建。目的:是为了提高容错能允许多损失一台。

1.1 数据模型

1)ZooKeeper本质上是一个 分布式的小文件存储系统 ;

2)Zookeeper表现为一个分层的文件系统目录树结构(不同于文件系统的是,节点可以有自己的数据,而文件系统中的目录节点只有子节点), 每个节点可以存少量的数据(1M左右) 。

3)每个节点称做一个ZNode。 每个ZNode都可以通过其路径唯一标识 。

4)ZooKeeper中的 每个节点存储的数据要被原子性的操作 。也就是说读操作将获取与节点相关的所有数据,写操作也将替换掉节点的所有数据。

5)在zookeeper创建顺序节点(create -s ),节点路径后加编号,这个计数对于此节点的父节点来说是唯一的。

/app/

/s100000000001

/s100000000002

6)ZooKeeper中的节点有两种,分别为 临时节点和永久节点 。节点的类型在创建时即被确定,并且不能改变。

① 临时节点 :在客户端用create -e创建,该节点的生命周期依赖于创建它们的会话。一旦会话(Session)结束,临时节点将被自动删除,当然可以也可以手动删除。虽然每个临时的Znode都会绑定到一个客户端会话,但他们对所有的客户端还是可见的。另外,**ZooKeeper的临时节点不允许拥有子节点。

② 永久节点 :在客户端用create 创建,该节点的生命周期不依赖于会话,并且只有在客户端显示执行删除操作的时候,他们才能被删除。

7) 客户端可以给节点设置watch,我们称之为监视器 。当节点状态发生改变时(Znode的增、删、改)将会触发watch所对应的操作。当watch被触发时,ZooKeeper将会向客户端发送且仅发送一条通知。

分布式锁

zookeeper 是高可用协调流程图

1.2 zookeepr角色介绍

领导者(leader) ,负责进行投票的发起和决议,更新系统状态(数据同步),发送心跳。

学习者(learner) ,包括跟随者(follower)和观察者(observer)。

跟随者(follower) ,用于接受客户端请求、向客户端返回结果,在选主过程中参与投票。

观察者(Observer) ,可以接受客户端请求,会把请求转发给leader, 但observer不参加投票过程,只同步leader的状态 ,observer的目的是为了扩展系统,提高读取速度。

1)leader失效后会在follower中重新选举新的leader

2)每个follower都和leader有连接,接受leader的数据更新操作

3)客户端可以连接到每个server,每个server的数据完全相同

4)每个节点的服务Server,记录事务日志和快照到持久存储

1.3 工作原理

Zookeeper的核心是原子广播,这个机制保证了各个Server之间的同步。实现这个机制的协议叫做Zab协议。 Zab协议有两种模式 ,它们分别是恢复模式(选主)和广播模式(同步)。

恢复模式: 当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,恢复模式不接受客户端请求,当领导者被选举出来,且大多数Server完成了和leader的状态同步以后,恢复模式就结束了。状态同步保证了leader和Server具有相同的系统状态。

广播模式: 一旦Leader已经和多数的Follower进行了状态同步后,他就可以开始广播消息了,即进入广播状态。这时候当一个Server加入ZooKeeper服务中,它会在恢复模式下启动,发现Leader,并和Leader进行状态同步。待到同步结束,它也参与消息广播。ZooKeeper的广播状态一直到Leader崩溃了或者Leader失去了大部分的Followers支持。

1.4 Zookeeper节点数据操作流程

(1)写操作

1)在Client向Follwer 或 Observer 发出一个写的请求;

2)Follwer 或 Observer 把请求发送给Leader;

3)Leader接收到以后向所有follower发起提案;

4)Follwer收到提案后执行写操作,然后把操作结果发送给Leader;

5)当多数follower返回提案结果后,leader会commit该提议,通知其他Follower 和 Observer 同步信息;

6)Follwer 或Observer把请求结果返回给Client。

(2)读操作

1)在Client向Follwer 或 Observer 发出一个读的请求;

2)Follwer 或 Observer 把请求结果返回给Client;

1.5 主要特点

最终一致性 :client不论连接到哪个Server,展示给它都是同一个视图,这是zookeeper最重要的特性;

可靠性 :具有简单、健壮、良好的性能,如果消息被某一台服务器接受,那么它将被所有的服务器接受;

实时性 :Zookeeper保证客户端将在一个时间间隔范围内获得服务器的更新信息,或者服务器失效的信息。但由于网络延时等原因,Zookeeper不能保证两个客户端能同时得到刚更新的数据,如果需要最新数据,应该在读数据之前调用sync()接口;

等待无关(wait-free) :慢的或者失效的client,不得干预快速的client的请求,使得每个client都能有效的等待;

原子性 :更新只能成功或者失败,没有中间状态;

顺序性 :按照客户端发送请求的顺序更新数据。

1.6 zookeepr应用场景

1.6.1 数据发布与订阅

发布与订阅即所谓的配置管理,顾名思义就是将数据发布到ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。

应用配置集中到节点上,应用启动时主动获取,并在节点上注册一个watcher,每次配置更新都会通知到应用。

1.6.2 命名空间服务  分布式命名服务,创建一个节点后,节点的路径就是全局唯一的,可以作为全局名称使用。

1.6.3 分布式通知/协调

不同的系统都监听同一个节点,一旦有了更新,另一个系统能够收到通知。

1.6.4 分布式锁

Zookeeper能保证数据的强一致性,用户任何时候都可以相信集群中每个节点的数据都是相同的。锁的两种体现方式:

(1)保持独占

一个用户创建一个节点作为锁,另一个用户检测该节点,如果存在,代表别的用户已经锁住,如果不存在,则可以创建一个节点,代表拥有一个锁。

(2)控制时序

有一个节点作为父节点,其底下是带有编号的子节点,所有要获取锁的用户,需要在父节点下创建带有编号的子节点,编号最小的会持有锁;当最我号的节点被删除后,锁被释放,再重新找最我号的节点来持有锁,这样保证了全局有序。

1.6.5 集群管理

每个加入集群的机器都创建一个节点,写入自己的状态。监控父节点的用户会收到通知,进行相应的处理。离开时删除节点,监控父节点的用户同样会收到通知。

zookeeper原理解析-选举

1)QuorumPeerMain加载

       

Zookeeper集群启动的入口类是QuorumPeerMain来加载配置启动QuorumPeer线程。首先我们来看下QuorumPeer, 谷歌翻译quorum是法定人数,定额的意思, peer是对等的意思,那么QuorumPeer中quorum代表的意思就是每个zookeeper集群启动的时候集群中zookeeper服务数量就已经确定了,在每个zookeeper的配置文件中配置集群中的所有机器

server.1=127.0.0.1:2886:3886 

server.2=127.0.0.1:2887:3887 

server.3=127.0.0.1:2888:3888

事实上着也确定zookeeper在运行中是不能动态扩容的,必须停下服务修改配置才可以;QuorumPeer中peer代表就是集中每个zookeeper角色是对等的没有主从之分,每个zookeeper服务都可以成为leader, follower,observer。

 

1.      QuorumPeerMain加载

1)  QuorumPeerConfig读取配置文件,如下面的zoo.cfg文件

tickTime=2000

initLimit=10

syncLimit=5

dataDir=D:/workspace_zookeeper/data1

clientPort=2181

server.1=127.0.0.1:2886:3886 

server.2=127.0.0.1:2887:3887 

server.3=127.0.0.1:2888:3888

如上配置中每个server.X记录代表集群中的一个服务,QuorumPeerConfig会构建一个QuorumServer对象,其中的server.X中的X代表zookpeer的sid,每个zookeeper都会编辑自己的sid在dataDir目下的myid文件中,sid标记每个服务,在快速选举中起作用

技术分享

 

server.X后值代表zookeeper服务地址ip以及选举其中zookeeper之间连接的端口

2)启动定时清理服务任务DatadirCleanupManager.start() 用来清除过期的txtLog和snapshot文件

3)构建QuorumPeer任务对象

3.1)设置供客户端连接的端口地址

3.2)设置txtLog和snapshot的操作对象FileTxnSnaplog

3.3)设置本Zookeeper的myid(sid)

3.4)设置客户端连接工厂

3.5)设置选举算法

3.6)各种时间设置等等

4)QuorumPeer.start()启动任务线程

2)选举流程

     

uorumPeer的run方法中主要用来进行选举,以及选举后进入各角色,角色被打破重新再进行选举,下图大体流程

技术分享

 

 

2.1.1基本概念

Logicalclock, currentEpoch,acceptedEpoch   epoch  zxid

2.2.2选举算法创建

         Zookeeper启动的时候,quorumPeer线程的start方法的调用startLeaderElection方法来创建选举算法, 创建过程如下:

1) 选举算法的创建之前先创建QuorumCnxManager,QuorumCnxManager利用tcp协议来进行leader选举,每一对server之间都会保持一个tcp链接,每个zookeeper会打开一个tcp/ipj监听集群中的其他server。

         1.1) receiveConnection(socket)方法用来建立连接

                   1.1.1) 读取请求连接zookeeperserver的sid(myid)

                   1.1.2)      对方sid小于自己sid, 立马关闭连接, 自己做为client向对方请求建立连接, 注意zookeeper 服务之间都是配置myid大的作为客户端连接myid小的作为服务器端。

                   1.1.3) 对方sid大于自己sid, 创建server端的发送线程任务和接收线程任务,并且启动任务准备发送和接收client端数据

1.2) connectOne(sid):本zookeeper作为客户端向sid(myid)的zookeeper发起链接请求

        1.2.1) 向对方zookeeper写入本机sid

                   1.2.2) 对方sid大于本机, 关闭socket链接,注意zookeeper服务之间都是配置myid大的作为客户端连接myid小的作为服务器端。

                   1.2.3) 对方sid小于本机,创建client端的发送线程任务和接收线程任务,并且启动任务准备发送和接收server端数据

         1.3) SendWorker: 如同其名字run方法从阻塞的发送队列中取数据发送

         1.4) RecvWorker: 如同其名字run方法将接收到数据放入阻塞队列中,供FastLeaderElection. WorkerReceiver线程任务消费

 

2) QuorumCnxManager.listener.start()

         Listener线程任务, 绑定地址开启ServerSocket服务,用来侦听其他server连接,进行集体间选举,投票, 数据同步。Server链接数有限,是基于bio的长连接。

         Socketclient = serverSocket.accept();

         receiveConnection(client);//处理建立连接请求

 

 

3) FastLeaderElection:创建选举对象

         3.1) 构建发送队列LinkedBlockingQueue<ToSend>sendqueue

         3.2) 构建接收队列LinekdBlockingQueue<Notification>recvqueue

         3.2) 创建消息处理对象Messenger, 它有WorkerReceiver和WorkerSender两个线程子类型对象组成, 这个两个线程对象的作用就如它的名字一样

         3.2.1) WorkerReceiver

         3.2.2) WorkerSender

 

 

2.2.3选举流程:

1) Logicalclock++

2) 初始化提案:

2.1)建议选举的leader自己proposedLeader=myid

2.2)proposedZxid=lastLoggedZxid datatree中记录的最后事务id

2.3)proposedEpoch=currentEpoch 当前本地文件currentEpoch中存储的值

技术分享

 

3)  向其他服务发送通知消息:

     遍历集群中的所有的server, 构建选举建议提案

           ToSend notmsg = newToSend(ToSend.mType.notification,

                    proposedLeader,

                    proposedZxid,

                    logicalclock,

                   QuorumPeer.ServerState.LOOKING,

                    sid,

                    proposedEpoch);

    将消息加入到发送队列中。

4)  LOOKING状态循环:

4.1)从接受队列中获取消息n

4.2) n为空

4.2.1) 发送队列为空,遍历集群中的所有的server, 构建选举建议提案加入发送队列

4.2.1)发送队列不为空, connectAll  查看所有服务如果没有建立连接建立。

         4.3) n不为空, 判断服务状态

                   4.3.1) LOOKING: 接收的投票还没有选举出leader

4.3.1.1)若n.electionEpoch>logicalclock

接受到消息的选举时代大于本机计算器,更新本地logicalclock,

清空收到的投票集合,

然后将收到的消息n跟自己比较判断采用哪种提案,判断规则如下:相等的情况看下一条

newEpoch > curEpoch 

                                      newZxid > curZxid

                                      newId > curId

                            更新选举提案

                            向其他server发送选举提案消息,先加入发送队列异步发送

                   4.3.1.2) n.electionEpoch< logicalclock  对方选举时代过期,废弃,break重新循环

                   4.3.1.3) n.electionEpoch==  logicalclock 决策提案规则如下相等情况看下一条

newEpoch > curEpoch 

                                               newZxid> curZxid

                                               newId > curId

如果采用对方提案, 更新本机提案并向其他server发送选举提案消息,先加入发送队列异步发送

4.3.1.4) 将对方提案构建成投票对象vote,保存到接收投票集合, 到这里本机的提案信息应该是最新的了

4.3.1.5) termPredicate(recvset,new Vote(proposedLeader, proposedZxid, logicalLock, proposedEpoch)) 将接收到的提案跟自己的提案比较,大于一般set.size()>集群数/2

                   如果没有大于一半,跳出本次循环

如果大于一半,从接收队列取消息, 如果有消息判断投票提案有没有更新有更新将消息加入接收集合, 如果接收队列中没有新的消息那么根据最新投票设置本机角色

技术分享

 

                    4.3.2) OBSERVING:

                   选举过程,OBSERVING不参与,只打印debug日志

                   4.3.3) FOLLOWING 或者 LEADING

                   接收的投票信息已经有leader选举出了

                   4.3.3.1) n.electionEpoch== logicalclock 接收到消息选出了leader跟本机是在一个选举时代

                                     将消息加入接收的投票集合recvset

                                     判断recvset集合中是不是多数投个n消息指定的leader

                                     将消息加入一个外部选举的集合outofelection用来存放外部选举出leader的消息

                                     判断大多数outofelection集合中的follower是否跟随同一个leader 

2.2  OBSERVING 本机是观察者

2.2.1)建立跟leader server的socket链接

2.2.2) 向leader server注册本oberser服务, 注册过程除了向leader表明本server的角色外, 还会将自己的acceptedEpoch提交给leader, leader判断更新再向自己反馈最新的acceptedEpoch

2.2.3)从leader同步数据

2.2.4)启动while循环,接收leader指令

           PING: 接收心跳消息, 并将自己持有的session作为心跳内容返回

           PROPOSAL,COMMIT,UPTODATE: 记录日志,不做操作

           SYNC:观察者同步数据

REVALIDATE:  重新建立session

INFORM: 观察者通过这种消息类型来提交数据, (对于follower要先PROPOSAL在COMMIT)

 

2.3  FOLLOWING 本机处于跟随者状态

2.3.1)建立跟leader server的socket链接

2.3.2)向leaderserver注册本follower服务,注册过程除了向leader表明本server的角色外,还会将自己的acceptedEpoch提交给leader, leader判断更新再向自己反馈最新的acceptedEpoch

2.3.3) 做一次检查leader返回的epoch小于本机accpetedEpoch绝不应该出现的

2.3.4)从leader同步数据

2.3.5)启动while循环,接收leader指令

          PING:接收心跳消息, 并将自己持有的session作为心跳内容返回

              PROPOSAL:leader向follower提交了一个变动操作提案

              COMMIT:   leader向follower确认变动操作

              UPTODATE:  follower启动后不应该接受到这个操作

              REVALIDATE:  重新建立session

              SYNC: 同步数据

 

2.4  LEADING 本机处于领导状态

一但选举出leader如果本机就是leader,那么Quorumpeer.run进入LEADING分支进行作为leader角色的初始化及进入leader角色的工作,由Leader.lead()执行,下面我们来看看执行流程:

         2.4.1) loadData加载数据, zookeeperServer利用snapshot和txtLog恢复数据加载到内存

         2.4.2) 获取leader的epoch,lastProcessedZxid

         2.4.3) 构建LearnerCnxAcceptor线程任务并启动

LearnerCnxAcceptor用来侦听集群中的learner(follower&observer),并跟它们建立连接,leader跟每个learner的连接都会构建LearnerHandler线程任务来跟learner一对一单独交流

         2.4.4) 获取最新接收的选举时代epoch

这个是通过接收所有(这里也不是所有,是通过策略,默认大于一般, 否则阻塞等待)zookeeper集群的lastAcceptedEpoch比较出最大一个在加一得出,

2.4.5)根据最新epoch创建新的zxid,高32位是epoch,低32位是id计数

2.4.6)  构建leader, epoch信息的对象QuorumPacket,向集群中的learner发送, learner接收后epoch大于自己acceptedEpoch的更新learner的acceptedEpoch

                   将自己加入newLeaderProposal.ackSet集合

然后接收各个learner反馈(默认大于一半否则会阻塞等待)

                   设置currentEpoch,序列化到本地文件

2.4.7) leader通过learnerhandler与各个learner进行交互,在leader向各个learner发送newLeaderProposal后,各个learner反馈后, 各个learner就会根据最新的epoch,zxid来跟leader进行数据同步,同步完成后会向leader发送ack消息,leader接收到后将ack消息加入到newLeaderProposal.ackSet, 所以leader的下一步操作就是循环判断newLeaderProposal.ackSet个数是否超过一半

2.4.8) 启动while循环用来定时向learner发送心跳, 同时决策集群中活着的server是否大于一半, 如果不是退出循环会重新在选举

3)数据同步

 

1.      Zookeeper集群一旦选举leader后,  leader跟follower,observer之间会进行一些列的交互产生epoch,数据同步操作, 以及后续操作的投票处理决策等操作,这些交互过程其实挺复杂的,zookeeper的代码个人觉的没有整理的很清晰导致深入细节的时候看起来很费劲,下面我就对他们之间的交互做整体并尽可能细致的介绍

3.1)如下图leader.lead()方法创建并启动LearnerCnxAcceptor线程用来侦听follower.followLeader()或者observer.observerLeader()中通过connectToLeader(addr)来连接leader, 每个连接会构建一个LearnerHandler线程对象来专门负责处理。

 

技术分享

 

 

由于follower与observer的差别在于observer不参与选举投票其他都类似,所以下面我就以leader跟follower的交互进行讲解。

 

3.2)leader通过投票获取个sever的lastAccpetEpoch决策出最新的

3.2.1)leader构建LearnerCnxAcceptor,侦听learner链接

3.2.2)learner连接上leader,leader为对于learner构建LearnerHandler处理器

3.2.3)leader将自己sid加入connectingFollowers集合等待决策

            超过半数notifyall

            没有超过半数wait

3.2.4)learner向leader发送自己的lastAccpetEpoch, 如果learner的lastAccpetEpoch大于leader的则更新leader的为learner的lastAccpetEpoch+1, 将learner的sid加入connectingFollowers集合等待决策

           超过半数notifyall

             没有超过半数wait

注: 3.2.3和3.2.4多线程没有先后, 3.2.4会有很多

3.2.5)hold住,等待connectingFollowers超过半数,得到最新的epoch

3.2.6)向learner写入leaderinfo即,leader得到的做下epoch

3.2.7)learner读取leader信息, 更新自己的lastAccpetEpoch

             并向leader反馈更新信息

3.2.8)leader获取learner的反馈, 决策超过半数,返回最新的epoch

技术分享

 

上图是获取epoch的一个立体交互

 

3.3) 数据同步

leader决策出epoch后,zookeeper进入一个新的时代,zookeeper在对外接客之前需要跟leader进行数据同步,数据同步并不是独立组件,代码是紧接着上面获取epoch的。

3.3.1)

 

toBeApplied  commitLog outstandingProposals集合的关系区别

技术分享

 

以上是关于Zookeeper原理解析的主要内容,如果未能解决你的问题,请参考以下文章

zookeeper原理解析-数据存储

Zookeeper原理解析&使用场景详解

zookeeper原理解析-服务器端处理流程

zookeeper原理解析-客户端与服务器端交互

Hbase原理深入解析及集成Hadoop

Zookeeper全解析——Client端(转)