区块链共识之Paxos算法理解与实战
Posted HEX区块链技术营
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了区块链共识之Paxos算法理解与实战相关的知识,希望对你有一定的参考价值。
0.前言
本文记录笔者学习和理解区块链共识算法Paxos的点滴,文章比较长,需要耐心来细细琢磨,笔者也是苦战了一个周末才对此有那么一点初步了解,有问题的地方请不吝斧正!
1.初始是阅读本文后续内容的基础,概念性的东西叙述不多,干货干货干货在后面的代码实战。但有提供我认为优秀的帖子以供参考理解。前面这些Paxos基本的理解是2.代码设计和3.实战流程的基础!
1.初识
相关概念
Paxos 问题是指分布式的系统中存在故障(fault),但不存在恶意(corrupt)节点场景(即 可能消息丢失或重复,但无错误消息)下的共识达成(Consensus)问题。
Paxos最早是 Leslie Lamport 用 Paxon 岛的故事模型来进行描述而命名。故事背景是古希腊 Paxon 岛上的多个法官在一个大厅内对一个议案进行表决,如何达成统一 的结果。他们之间通过服务人员来传递纸条,但法官可能离开或进入大厅,服务人员可能偷 懒去睡觉。
Paxos 是第一个被证明的共识算法,其原理基于两阶段提交并进行扩展。 作为现在共识算法设计的鼻祖,以最初论文的难懂(算法本身并不复杂)出名。算法中将节点分为三种类型:
proposer:提出一个提案,等待大家批准为结案。往往是客户端担任该角色;
acceptor:负责对提案进行投票。往往是服务端担任该角色;一般需要至少3个且节点个数为奇数,因为Paxos算法最终要产生一个大多数决策者都同意的提议。
learner:被告知结案结果,并与之统一,不参与投票过程。可能为客户端或服务端。
paxos算法的两阶段
prepare 阶段:
1. Proposer 选择一个提案编号 n,然后向acceptor的某个超过半数的子成员发送编号为n的 prepare 请求;
2. Acceptor 收到 prepare 消息后,如果提案的编号n大于该acceptor已经回复的所有 prepare 请求的编号,则 Acceptor 将自己上次已经批准的最大编号提案回复给 Proposer,并承诺不再回复小于 n 的提案;
commit阶段:
1. 当一个 Proposer 收到了半数以上的 Acceptors 对 prepare 的回复后,就进入批准阶段。它要向回复 prepare 请求的 Acceptors 发送 accept 请求,包括编号 n 和根据 prepare阶段 决定的 value。这里的value是所有响应中编号最大的提案的value(如果根据 prepare 没有已经接受的 value,那么它可以自由决定 value)。
2. 在不违背自己向其他 Proposer 的承诺的前提下,Acceptor 收到 accept 请求后即接受这个请求。即如果acceptor收到这个针对n提案的accept请求,只要该acceptor尚未对编号大于n的prepare请求做出过响应,它就可以通过这个提案。
他山之石
Paxos算法初次接触听上去确实有点晦涩难懂,这里有一篇贴子我觉得不错。贴出来可以参考:
通过现实世界描述Paxos算法
通过实例来理解paxos算法
Paxos算法原理与推导
另外,wiki对Paxos的描述也是比较不错和权威的参考资料。
有了以上对Paxos算法的理解,我们才能进行下一步:自己编程实现Paxos算法。
2.代码实战
流程理解
Paxos算法核心的两个角色便是Proposer(提议者)和Acceptor(决策者),因此也必须围绕这两个对象进行算法架构的设计。
Paxos算法流程
Proposer行为分析
1.0 向所有Acceptor发出一个提议(proposal);
2.0 如果收到一个拒绝信息(reject),尝试重新发送被拒绝的提议;
2.1 如果收到一个Acceptor的承诺回应(agree),用一个标志(agreeCount)来计数给了自己承诺的Acceptor个数。当agreeCount超过Acceptor总数的一半时,表示有大多数Acceptor承诺将接受这个提议,需要将自己的提议状态置为承诺接受状态(agreed)。同时,还要通知其他Proposer我这个提议已经得到大多数Acceptor承诺会接受。
3.0 提议为承诺接受状态(agreed)时,Proposer需要再向Acceptor集合发送一个接受提议的确认请求,我们称该请求为Accept请求。
3.1 发出Accept请求后会收到Acceptor的回复,如果收到接受信息(Accept),用一个标志(acceptCount)来计数接受自己提议的Acceptor个数。同样当acceptCount超过半数时,表示大多数Acceptor接受了这个提议,需要将提议状态由承诺接受状态(agreed)置为接受状态(acceptd)。
同时,还要通知其他Proposer我这个提议已经得到大多数Acceptor接受。
以上1,2属于Paxos算法的Prepare阶段,3属于Accept阶段。
Acceptor行为分析
1.0 当Acceptor收到一个提议后,判断提议版本号是否大于自身保存的提议版本。
1.0 如果小于自身表示曾经已经给过别的Proposer承诺,发送一个拒绝消息(reject),表示自己拒绝给当前Proposer任何承诺。
1.1 反之,则替换自身保存的提议版本号并给当前Proposer发送一个承诺回应(agree),表示将承诺接受他的提议。同时,将自身状态置为已经给了某个Proposer承诺(agree)。
2.0 Acceptor收到一个Proposer的编号为N的Accept请求,只要该Acceptor之前不曾承诺编号M(M>N)的其他Proposer提议,那么他就接受该提案。同时,将自身状态置为已接受某个Proposer提议,并通知所有Proposer这个消息。
以上1属于Paxos算法的Prepare阶段,2属于Accept阶段。
以上行为分析针对本次Paxos算法编程实战!!!
类的设计
Paxos算法解决的是分布式系统一致性的问题,我们通过端口号在一台计算机上模拟多个节点。
毋庸置疑,我们分别需要一个Proposer类和Acceptor类。
PaxoProposer 提议者类
Proposer的作用是提出一个提议并发送给Acceptor,所以他本身必须知道所有的Acceptor,同时有些时候要跟其他Proposer通讯,所以也需要知道所有的Proposer(见init方法)。
基本的开始结束接口(start, stop)
在判断提议是否被大多数Acceptor承诺接受或最终接受,我们需要设定一个判定条件(getQuorumCount)
当提议被承诺接受或最终接受时需要通知其他Proposer(notifyProposer)
发送消息(提议或Accept请求)给Acceptor(sendMsg);接收来自Acceptor的消息(recvMsg)
为了方便调试,我们可能需要知道整个过程请求提议的历史记录(getHistory)
自己的提议最终被Acceptor接受的个数(getNumAccepted)
清楚Paxos算法流程后,我们发现假设有两个Proposer依次提出编号递增的提议,最终会陷入死循环使得Paxos算法无法保证活性。所以,一般的做法是选取一个主Proposer作为领导,只有领导才能提出提议(setPrimary)。
Proposer类的一个难点在于提议发出后的各种状态转变与对应数据的处理。从提议发出到提议被接受整个过程,提议的状态是在不断地变化,但最终总会到达一个终止态。对于这种情况的处理,状态机注定是一个不错的选择。由于这里有点复杂我们将提议功能单独拿出来抽象为一个Proposer的协议类PaxoProposerProtocol。
-由于各个节点收发消息是并行的,这里对消息的检测需要用到线程。这里HeartbeatListener来监听消息,HeartbeatSender用来发送消息。
""" @author: chaors @file: PaxoProposer.py @time: 2018/04/14 10:50 @desc: 提议者"""classPaxoProposer:#心跳监听类classHeartbeatListener(threading.Thread):pass#定时发送类classHeartbeatSender(threading.Thread):pass#初始化def__init__(self, port, proposers=None, acceptors=None):pass#开始defstart(self):pass#停止defstop(self):pass#设置是否为领导者defsetPrimary(self, isPrimary):pass#获取支持所有提议者的决策者defgetGroup(self):pass#获取所有提议者defgetProposers(self):pass#获取所有决策者defgetAcceptors(self):pass#提议被承诺接受或最终接受的条件必须满足:获得1/2以上的Acceptor支持defgetQuorumCount(self):pass#获取本地记录数据defgetInstanceValue(self, instanceID):pass#获取历史记录defgetHistory(self):pass#获取提议同意的数量defgetNumAccepted(self):pass#通知其他提议者defnotifyProposer(self, protocol, msg):pass#新的提议defnewProposal(self, value, instance=None):pass# 发送消息defsendMsg(self, msg):pass# 接收消息defrecvMsg(self, msg):pass
PaxoProposerProtocol类
用来提交一个提议,并用于提交提议后各种状态的处理。
定义一些状态来表示当前Proposoer提议的各种状态
发起提议(propose)
状态机处理(doTranition)
""" @author: chaors @file: PaxoProposerProtocol.py @time: 2018/04/14 10:50 @desc: 提议者协议"""classPaxoProposerProtocol:#常量 STATE_UNDEFIND = -1#提议协议未定义 STATE_PROPOSED = 0#提议类型 STATE_REJECTED = 1#拒绝状态 提议被拒绝 STATE_AGREED = 2#提议被承诺接受 Prepare阶段获取大多数Acceptor承诺后的协议状态 STATE_ACCEPTED = 3#提议被接受 STATE_UNACCEPTED = 4#提议未被拒绝def__init__(self, proposer):pass#发起提议defpropose(self, value, pID, instanceID):pass#状态过渡 根据状态机运行defdoTranition(self, msg):pass
PaxoAcceptor类
决策者,对Proposer提出的提议和Accept请求做出回应。
和Proposer类似的接口不再赘述。
需要对比Proposer发来的提议版本(getHighestProposal)
""" @author: chaors @file: PaxoAcceptor.py @time: 2018/04/14 10:50 @desc: 决策者"""classPaxoAcceptor:def__init__(self, port, proposers):pass#开始defstart(self):pass#停止defstop(self):pass#失败deffail(self):pass#恢复defrecover(self):pass#发送消息defsendMsg(self, msg):pass#接收消息defrecvMsg(self, msg):pass#通知客户端defnotifyClient(self, protocol, msg):pass#获取本地记录数据defgetInstanceValue(self, instanceID):pass#获取最高同意建议defgetHighestProposal(self, instanceID):pass
PaxoAcceptorProtocol类
决策者协议,用来处理Proposer提出的提议,并同样使用状态机来处理自身各种状态。
""" @author: chaors @file: PaxoAcceptorProtocol.py @time: 2018/04/14 10:50 @desc: 决策者协议"""from Message import Message #协议依赖消息classPaxoAcceptorProtocol:#常量 STATE_UNDEFIND = -1#协议未定义 STATE_PROPOSAL_RECEIVED = 0#收到消息 STATE_PROPOSAL_REJECTED = 1#拒绝链接,网络不通可能 STATE_PROPOSAL_AGREED = 2#承诺将接受该提议 针对Proposer的PROPOSED请求 STATE_PROPOSAL_ACCEPTED = 3#接受该协议 针对Proposer的Accept请求 STATE_PROPOSAL_UNACCEPTED = 4#拒绝请求def__init__(self, client):pass#收到提议defrecvProposal(self, msg):pass#过渡defdoTranition(self, msg):pass#通知客户端defnotifyClient(self, msg):pass
Message类
Proposer和Acceptor的角色都有了,还差一个他们之间传递的消息类。这个消息有以下几种:
Proposer发出的提议请求
Proposer发出的Accept请求
Acceptor对提议请求的拒绝
Acceptor对提议请求的承诺
Acceptor对Accept请求的接受
Acceptor对Accept请求的不接受
外部(Client)发给Proposer的提议
作为对消息的回复消息
定时的心跳信息,用来同步提议
""" @author: chaors @file: Message.py @time: 2018/04/14 09:31 @desc: 消息传递类"""classMessage:#常量 MSG_ACCEPTOR_AGREE = 0#Acceptor对提议请求的承诺 MSG_ACCEPTOR_ACCEPT = 1#Acceptor对Accept请求的接受 MSG_ACCEPTOR_REJECT = 2#Acceptor对提议请求的拒绝 MSG_ACCEPTOR_UNACCEPT = 3#Acceptor对Accept请求的不接受 MSG_ACCEPT = 4#Proposer发出的Accept请求 MSG_PROPOSE = 5#Proposer发出的提议请求 MSG_EXT_PROPOSE = 6#外部(Client)发给Proposer的提议 MSG_HEARTBEAT = 7#定时的心跳信息,用来同步提议def__init__(self, cmd=None):#消息初始化有个状态pass#对某个消息的回复消息defcopyAsReply(self, msg):pass
InstanceRecord类
提议被抽象在协议里,在系统达到一致性之前,Proposer可能尝试提交多次协议信息(包含提议)。在Proposer和Acceptor之间都需要保存所有的提议记录,所以两者都有一个InstanceRecord实例数组。
对于Proposer,InstanceRecord实例数组保存的是提交过的所有提议记录,并且会随着提议状态的改变更新记录状态(包括协议和记录的值)的值。
对于Acceptor,InstanceRecord实例数组保存的是Acceptor接收的Proposer提议请求,并随着提议版本的改变而更新。Acceptor给出承诺(agree)的条件是提议版本大于当前InstanceRecord里的协议版本;Acceptor接受提议(accept)的条件是当前Accept请求版本号比之前给出承诺的的提议版本号大。
协议,包含了每次请求的协议信息(protocols)
最高版本,当前所有提交的请求的最高版本(highestID)
记录值,该次请求的值
""" @author: chaors @file: InstanceRecord.py @time: 2018/04/14 10:31 @desc: 本地记录类,记录决策者,提议者之间协议"""import threading, socket, pickle, queue,random# InstanceRecord本地记录类,决策者,提议者之间协议from PaxoProposerProtocol import PaxoProposerProtocolclassInstanceRecord():def__init__(self): self.protocols = {} #协议字典self.highestID = (-1, -1) #最高版本(提议版本,端口号)self.value = None #提议值#增加协议defaddProtocol(self, protocol): self.protocols[protocol.proposalID] = protocol #取得版本最高的协议 假设端口较大的Proposer为领导,优先承诺 端口相同时取版本号较大的if protocol.proposalID[1] > self.highestID[1] or (protocol.proposalID[1] == self.highestID[1] and protocol.proposalID[0] > self.highestID[0]): self.highestID = protocol.proposalID #抓取协议defgetProtocol(self, protocolID): returnself.protocols[protocolID] #清理协议defcleanProtocols(self): keys = self.protocols.keys() #取得所有可以#遍历删除协议for key inkeys: protocol = self.protocols[key] if protocol.state == PaxoProposerProtocol.STATE_ACCEPTED: print("Deleting protocol") del self.protocols[key] #删除协议
MessagePump类
消息的结构是有了,但是它是怎么在节点(Proposer和Acceptor)之间传递的呢。这里我们封装一个基于Socket传递消息的网络类。这里接收消息需要借助一个线程,我们在构造一个接收消息的辅助类。
这里的只是不属于Paxos算法重点,就不赘述了。直接上代码。
Paxos_MainTest Paxos算法测试
""" @author: chaors @file: paxo_testMain.py @time: 2018/04/14 17:50 @desc: Paxos算法测试用例"""import threading, socket, pickle, queue,randomimport timefrom MessagePump import MessagePumpfrom Message import Messagefrom InstanceRecord import InstanceRecordfrom PaxoProposer import PaxoProposerfrom PaxoProposerProtocol import PaxoProposerProtocolfrom PaxoAcceptorProtocol import PaxoAcceptorProtocolfrom PaxoAcceptor import PaxoAcceptorif __name__ == '__main__': #Acceptor数量 numclients = 5#实例化决策者数组,决策者节点端口号为65520-65525 acceptors = [PaxoAcceptor(port, [56321, 56322]) for port in range(65520, 65520 + numclients)] #实例化提议者,端口号分别56321,56322 对应的决策者为acceptors proposer1 = PaxoProposer(56321, [56321, 56322], [acceptor.port for acceptor in acceptors]) proposer2 = PaxoProposer(56322, [56321, 56322], [acceptor.port for acceptor in acceptors]) #启动提议者提议程序 proposer1.start() proposer1.setPrimary(True) proposer2.setPrimary(True) proposer2.start() #启动决策者决策程序for acceptor in acceptors: acceptor.start() #模拟网络中两个节点宕机 acceptors[0].fail() acceptors[1].fail() #利用Socket机制发送提议给决策者 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) start = time.time() for i in range(1000): m = Message(Message.MSG_EXT_PROPOSE) m.value = 0 + i m.to = 56322 bytes = pickle.dumps(m) s.sendto(bytes, ("localhost", m.to)) # if i == 2 or i == 30:# print(leader2.getInstanceValue(1))#当提议被999个决策者接受时结束整个提议程序while proposer1.getNumAccepted() < 999: print(u"休眠1秒--被接受: %d" % proposer1.getNumAccepted()) time.sleep(1) end = time.time() print(u"休眠10秒") time.sleep(10) print(u"结束领导者") proposer1.stop() proposer2.stop() print(u"结束客户端") for acceptor in acceptors: acceptor.stop() print(u"领导者1 历史记录: %s" % proposer1.getHistory()) print(u"领导者 2 历史记录: %s " % proposer2.getHistory()) print(u"一共用了%d 秒" % (end - start))
3.通过代码进一步了解Paox算法处理逻辑
上面已经完成了基本代码的架构,详细源码稍后我会上传到github。
接下来,我们通过一个简单的测试用例来再一次更深入地从代码层面理解Paxos算法的处理逻辑。
我们运行paxo_testMain代码,事先我在关键步骤处都打了断点。这样就可以完整地从代码角度看一次Paxos算法两个阶段的运行,也能直观地观察到各个步骤的代码处理逻辑。
!!!阅读说明:
1.x 对应Paxos算法Prepare阶段
2.x 对应Paxos算法Commit阶段
废话少说上代码
1.0 [start---1.0]Proposer收到一个消息,消息类型为一个提议。首先判断自身是否为领导者,如果是创建协议
start---1.0
1.1 [start---1.1]Proposer借助自身的PaxoProposerProtocol实例发起一个提议请求
start---1.1_1
start---1.1_2有于消息是并发执行的,这里协议的状态改变需要等到提议请求发送发送给所有的Acceptor后才会执行到这,所以start---1.1_1断点之后可能不会是这个断点
1.2 [start---1.2]Acceptor收到一个消息,消息类型为提议。然后借助AcceptorProtocol实例处理提议。
start---1.2
1.3[start---1.3] AcceptorProtocol收到一个提议,判断提议版本回复Proposer承诺接受消息或拒绝消息
start---1.3
1.4[start---1.4]Proposer收到一个消息,类型为Acceptor的承诺(MSG_ACCEPTOR_AGREE)。既然不是Proposer最终要的接受提议的结果,转给ProposerPropotocal(当前消息的记录ID(instanceID)对应的协议)状态机处理。
start---1.4
[start---1.5] and [start---2.0]
--[start---1.5] ProposerPropotocal状态机函数收到一个MSG_ACCEPTOR_AGREE消息,此时表示新增加一个Acceptor承诺会接受我的请求。
-- [start---2.0_1]该条件下的代码会不断执行,直到许诺Proposer的数量超过半数,表示Prepare阶段基本结束。此时协议状态更新为协议被承诺接受(STATE_AGREED)。此时的Proposer向Acceptor集合发送Accept请求,请求Acceptor确认他们的许诺。
-- [start---2.0_2]同时,向其他Proposer广播该消息,使得其他Proposer知道Prepare阶段哪个提议获得的承诺最多。这样,在Commit阶段,他们可能通过改变提议来使系统尽快达到一致性。
start---1.5 & 2.0
2.1[start---2.1]Acceptor收到一个消息,类型为来自Proposer的Accept请求。借助AcceptorPropotal处理该消息。
start---2.1
2.2[start---2.2]
-- [start---2.2_1] AcceptorPropotal收到Proposer发出的Accept请求。按Paxos算法思想,这里需要判断请求版本号,当且仅当Acceptor之前承诺过的提议版本号最大值小于Accept请求版本号才会接受该Proposer提议。这里我们借助“先入为主”的思想简化问题,只要这时候协议状态为STATE_PROPOSAL_AGREED,就给所有Proposer广播消息表示自己确认接受该Proposer提议。
-- [start---2.2_2] AcceptorPropotal通知Acceptor更新InstanceRecord的值,到此时已有一个提议被一个Acceptor最终接受。
start---2.2
2.3[start---2.3]Proposer收到一个消息,类型为Acceptor确认接受提议(MSG_ACCEPTOR_ACCEPT),根据该消息更新Proposer的InstanceRecord(新建record,追加协议等)。并将消息交给ProposerProtocol状态机处理。
start---2.3
2.4[start---2.4]
[start---2.4_1] -- ProposerProtocol收到一个Acceptor的最终确认消息(MSG_ACCEPTOR_ACCEPT),此时表示新增一个Acceptor最终接受了我的提议。但此时的协议状态仍然是STATE_AGREED状态,因为一个提议最终被系统接受必须先被超半数的Acceptor节点确认接受。
[start---2.4_2] STATE_AGREED条件下的代码会不断执行,直到最终接受提议的Acceptor超过半数。这时,协议状态由STATE_AGREED状态更新为最终被系统确认状态(STATE_ACCEPTED)。最后,当前Proposer更新自己的InstanceRecord记录。当然,这里也有一种可能是被超过半数节点不接受,那么同样其他Proposer节点必有一个节点提议被接受。
start---2.4
2.5[start---2.5] Proposer更新InstanceRecord记录,如果协议最终被大多数Acceptor拒绝则尝试重新提议(步骤回到1.1)。
start---2.5
以上是关于区块链共识之Paxos算法理解与实战的主要内容,如果未能解决你的问题,请参考以下文章