Tendermint 共识分析
Posted 小圣.
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Tendermint 共识分析相关的知识,希望对你有一定的参考价值。
概述
Tendermint的共识算法可以看成是POS+BFT,Tendermint在进行BFT共识算法确认区块前 ,首先使用POS算法从Validators中选举出Proposer。然后由Proposer进行提案,最后使用BFT算法生成区块。Tendermint 的共识协议使用的gossip协议。
其中使节点成为Validator有两种方法,具体可参考:https://docs.tendermint.com/master/nodes/validators.html
round-robin
从Validators中选举出proposer需要使用round-robin协议,这篇文章很好的解释了round-robin协议:https://zhuanlan.zhihu.com/p/84962067
round-based
在同一高度确认一个区块需要使用round-based协议,包括以下五个步骤:NewHeight, Propose, Prevote, Precommit 和 Commit
其中Propose、Prevote、Precommit又被称为round,在同一高度确认一个区块可能需要多个round。以下情况就会需要多个round:
- 指定的proposer节点不在线
- 由proposer提交的区块时无效的
- 被提案的区块没有及时的广播
- proposal block有效,但是没有足够多的节点在Precommit 阶段及时收到对应的 +2/3 的prevotes
- proposal block有效,也有足够多的节点接收到了+2/3 的prevotes,但是没有足够多的节点收到+2/3 的 precommits
round-based过程如下:
+-------------------------------------+
v |(Wait til `CommmitTime+timeoutCommit`)
+-----------+ +-----+-----+
+----------> | Propose +--------------+ | NewHeight |
| +-----------+ | +-----------+
| | ^
|(Else, after timeoutPrecommit) v |
+-----+-----+ +-----------+ |
| Precommit | <------------------------+ Prevote | |
+-----+-----+ +-----------+ |
|(When +2/3 Precommits for block found) |
v |
+--------------------------------------------------------------------+
| Commit |
| |
| * Set CommitTime = now; |
| * Wait for block, then stage/save/commit block; |
+--------------------------------------------------------------------+
Tendermint的共识算法大体流程就是这些,具体的细节将在分析源码的时候进行探讨。
这个文章对共识进行详细描述,并且也解释了重要的锁机制:https://www.odaily.com/post/5134145
源码分析
Tendermint的共识功能主要在tendermint/consensus/state.go文件里进行实现
NewState
func NewState(
config *cfg.ConsensusConfig,
state sm.State,
blockExec *sm.BlockExecutor,
blockStore sm.BlockStore,
txNotifier txNotifier,
evpool evidencePool,
options ...StateOption,
) *State {
cs := &State{
config: config,
blockExec: blockExec,
blockStore: blockStore,
txNotifier: txNotifier,
peerMsgQueue: make(chan msgInfo, msgQueueSize),
internalMsgQueue: make(chan msgInfo, msgQueueSize),
timeoutTicker: NewTimeoutTicker(),
statsMsgQueue: make(chan msgInfo, msgQueueSize),
done: make(chan struct{}),
doWALCatchup: true,
wal: nilWAL{},
evpool: evpool,
evsw: tmevents.NewEventSwitch(),
metrics: NopMetrics(),
}
// 设置一些默认函数,在reactor没有启动前可以被重写
cs.decideProposal = cs.defaultDecideProposal
cs.doPrevote = cs.defaultDoPrevote
cs.setProposal = cs.defaultSetProposal
// We have no votes, so reconstruct LastCommit from SeenCommit.
if state.LastBlockHeight > 0 {
cs.reconstructLastCommit(state)
}
cs.updateToState(state)
// NOTE: we do not call scheduleRound0 yet, we do that upon Start()
cs.BaseService = *service.NewBaseService(nil, "State", cs)
for _, option := range options {
option(cs)
}
return cs
}
OnStart
Onstart通过WAL加载最新的state,并开启超时和接收消息协程
func (cs *State) OnStart() error {
...
...
...
// Double Signing Risk Reduction
if err := cs.checkDoubleSigningRisk(cs.Height); err != nil {
return err
}
// 开启接收信息的协程
go cs.receiveRoutine(0)
// schedule the first round!
// use GetRoundState so we don't race the receiveRoutine for access
cs.scheduleRound0(cs.GetRoundState())
return nil
}
receiveRoutine
这个函数就比较重要了,它处理了可能导致状态转换的消息。其中超时消息、完成一个提案和超过2/3的投票都会导致状态转换。
func (cs *State) receiveRoutine(maxSteps int) {
onExit := func(cs *State) {
// NOTE: the internalMsgQueue may have signed messages from our
// priv_val that haven't hit the WAL, but its ok because
// priv_val tracks LastSig
// close wal now that we're done writing to it
if err := cs.wal.Stop(); err != nil {
cs.Logger.Error("failed trying to stop WAL", "error", err)
}
cs.wal.Wait()
close(cs.done)
}
defer func() {
if r := recover(); r != nil {
cs.Logger.Error("CONSENSUS FAILURE!!!", "err", r, "stack", string(debug.Stack()))
// stop gracefully
//
// NOTE: We most probably shouldn't be running any further when there is
// some unexpected panic. Some unknown error happened, and so we don't
// know if that will result in the validator signing an invalid thing. It
// might be worthwhile to explore a mechanism for manual resuming via
// some console or secure RPC system, but for now, halting the chain upon
// unexpected consensus bugs sounds like the better option.
onExit(cs)
}
}()
for {
if maxSteps > 0 {
if cs.nSteps >= maxSteps {
cs.Logger.Debug("reached max steps; exiting receive routine")
cs.nSteps = 0
return
}
}
rs := cs.RoundState
var mi msgInfo
select {
// 把有效交易添加到交易池的时候会设置TxAvailable
case <-cs.txNotifier.TxsAvailable():
cs.handleTxsAvailable()
// peer消息通道
case mi = <-cs.peerMsgQueue:
if err := cs.wal.Write(mi); err != nil {
cs.Logger.Error("failed writing to WAL", "err", err)
}
// 处理 proposal、block parts、votes的消息
cs.handleMsg(mi)
// 处理内部消息
case mi = <-cs.internalMsgQueue:
err := cs.wal.WriteSync(mi) // NOTE: fsync
if err != nil {
panic(fmt.Sprintf(
"failed to write %v msg to consensus WAL due to %v; check your file system and restart the node",
mi, err,
))
}
if _, ok := mi.Msg.(*VoteMessage); ok {
// we actually want to simulate failing during
// the previous WriteSync, but this isn't easy to do.
// Equivalent would be to fail here and manually remove
// some bytes from the end of the wal.
fail.Fail() // XXX
}
// handles proposals, block parts, votes
cs.handleMsg(mi)
// 处理超时消息
case ti := <-cs.timeoutTicker.Chan(): // tockChan:
if err := cs.wal.Write(ti); err != nil {
cs.Logger.Error("failed writing to WAL", "err", err)
}
// if the timeout is relevant to the rs
// go to the next step
cs.handleTimeout(ti, rs)
case <-cs.Quit():
onExit(cs)
return
}
}
}
上面的函数运行完毕后,就可以等待进入状态跃迁的函数,进行共识了。
官方的共识流程图表示如下:
单节点共识完整流程的代码流程为:
- 首先进入enterNewRound
- 之后从enterNewRound进入enterPropose
- 进入enterPropose后,判断自己是不是validator.只有一个节点自己就是,进入defaultDecideProposal
- 进入defaultDecideProposal,把proposal和blockPartMsg发送到internalMsgQueue
- 收到internalMsgQueue的消息,然后进入handleMsg,通过handleMsg进入addProposalBlockPart
- 通过addProposalBlockPart 最后进入到enterPrevote
- 通过enterPrevote进入到defaultDoPrevote,对proposal进行签名,并发送到internalMsgQueue
- handleMsg对收到的消息进行处理,进入到tryAddVote
- tryAddVote判断vote正确,并且满足超过三分之二的情况,进入enterPrevoteWait
- 计时器超时,从enterPrevoteWait进入到enterPrecommit
- 通过enterPrevote对proposal进行再次签名,并发送到internalMsgQueue
- handleMsg对收到的消息进行处理,进入到tryAddVote
- tryAddVote判断vote正确,进入enterCommit,这里涉及情况比较多(在多个节点的条件下).
- enterCommit落地区块,将区块发送给abci,收到返回后,此次共识结束.
由于代码较多,会把相对不太重要的代码给省略掉。这里主要列举对共识流程重要的代码。
enterNewRound
func (cs *State) enterNewRound(height int64, round int32) {
logger := cs.Logger.With("height", height, "round", round)
// 进行状态校验
if cs.Height != height || round < cs.Round || (cs.Round == round && cs.Step != cstypes.RoundStepNewHeight) {
logger.Debug(
"entering new round with invalid args",
"current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step),
)
return
}
// 开启定时器
if now := tmtime.Now(); cs.StartTime.After(now) {
logger.Debug("need to set a buffer and log message here for sanity", "start_time", cs.StartTime, "now", now)
}
logger.Debug("entering new round", "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step))
// 如果有新的Validator就添加
validators := cs.Validators
if cs.Round < round {
validators = validators.Copy()
validators.IncrementProposerPriority(tmmath.SafeSubInt32(round, cs.Round))
}
// 开始一轮新的round
cs.updateRoundStep(round, cstypes.RoundStepNewRound)
cs.Validators = validators
if round == 0 {
// We've already reset these upon new height,
// and meanwhile we might have received a proposal
// for round 0.
} else {
logger.Debug("resetting proposal info")
cs.Proposal = nil
cs.ProposalBlock = nil
cs.ProposalBlockParts = nil
}
cs.Votes.SetRound(tmmath.SafeAddInt32(round, 1)) // also track next round (round+1) to allow round-skipping
cs.TriggeredTimeoutPrecommit = false
if err := cs.eventBus.PublishEventNewRound(cs.NewRoundEvent()); err != nil {
cs.Logger.Error("failed publishing new round", "err", err)
}
cs.metrics.Rounds.Set(float64(round))
// 在我们进入round 0 之前要等待交易在mempool中设置为available,
// 如果最后一个区块改变了app hash我们需要一个空的proof区块,并且立即进入enterProposer函数
waitForTxs := cs.config.WaitForTxs() && round == 0 && !cs.needProofBlock(height)
if waitForTxs {
if cs.config.CreateEmptyBlocksInterval > 0 {
cs.scheduleTimeout(cs.config.CreateEmptyBlocksInterval, height, round,
cstypes.RoundStepNewRound)
}
} else {
// 进入enterPropose
cs.enterPropose(height, round)
}
}
enterPropose
func (cs *State) enterPropose(height int64, round int32) {
...
...
// 节点验证
if cs.privValidatorPubKey == nil {
// If this node is a validator & proposer in the current round, it will
// miss the opportunity to create a block.
logger.Error("propose step; empty priv validator public key", "err", errPubKeyIsNotSet)
return
}
address := cs.privValidatorPubKey.Address()
// if not a validator, we're done
if !cs.Validators.HasAddress(address) {
logger.Debug("node is not a validator", "addr", address, "vals", cs.Validators)
return
}
// 判断当前节点是否为proposer,如果是的话就开始准备提案
if cs.isProposer(address) {
logger.Debug(
"propose step; our turn to propose",
"proposer", address,
)
cs.decideProposal(height, round)
} else {
logger.Debug(
"propose step; not our turn to propose",
"proposer", cs.Validators.GetProposer().Address,
)
}
}
decideProposal
func (cs *State) defaultDecideProposal(height int64, round int32) {
...
...
// 创建proposal
propBlockID := types.BlockID{Hash: block.Hash(), PartSetHeader: blockParts.Header()}
proposal := types.NewProposal(height, round, cs.ValidRound, propBlockID)
p := proposal.ToProto()
// 等待最大数量的proposal
ctx, cancel := context.WithTimeout(context.TODO(), cs.config.TimeoutPropose)
defer cancel()
// 对proposal进行签名
if err := cs.privValidator.SignProposal(ctx, cs.state.ChainID, p); err == nil {
proposal.Signature = p.Signature
// 把数据发送到 sendInternalMessage channel中
// 这个channel在receiveRoutine函数启动、等待消息的传入
cs.sendInternalMessage(msgInfo{&ProposalMessage{proposal}, ""})
for i := 0; i < int(blockParts.Total()); i++ {
part := blockParts.GetPart(i)
cs.sendInternalMessage(msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""})
}
cs.Logger.Debug("signed proposal", "height", height, "round", round, "proposal", proposal)
} else if !cs.replayMode {
cs.Logger.Error("propose step; failed signing proposal", "height", height, "round", round, "err", err)
}
}
addProposalBlockPart
func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.NodeID) (added bool, err error) {
...
...
if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() {
// Move onto the next step
cs.enterPrevote(height, cs.Round)
if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added
cs.enterPrecommit(height, cs.Round)
}
} else if cs.Step == cstypes.RoundStepCommit {
// If we're waiting on the proposal block...
cs.tryFinalizeCommit(height)
}
return added, nil
}
return added, nil
}
signAddVote
addProposalBlockPart 会进入到enterPrevote,再然后进入到doPrevote,doPrevote的默认函数为doPrevoteproposal,doPrevoteproposal通过调用signAddVote对进行Proposal签名,并发送到internalMsgQueue。
func (cs *State) Tendermint 共识分析