Tendermint 共识分析

Posted 小圣.

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Tendermint 共识分析相关的知识,希望对你有一定的参考价值。

概述

Tendermint的共识算法可以看成是POS+BFT,Tendermint在进行BFT共识算法确认区块前 ,首先使用POS算法从Validators中选举出Proposer。然后由Proposer进行提案,最后使用BFT算法生成区块。Tendermint 的共识协议使用的gossip协议。

其中使节点成为Validator有两种方法,具体可参考:https://docs.tendermint.com/master/nodes/validators.html

round-robin

从Validators中选举出proposer需要使用round-robin协议,这篇文章很好的解释了round-robin协议:https://zhuanlan.zhihu.com/p/84962067

round-based

在同一高度确认一个区块需要使用round-based协议,包括以下五个步骤:NewHeight, Propose, Prevote, Precommit 和 Commit

其中Propose、Prevote、Precommit又被称为round,在同一高度确认一个区块可能需要多个round。以下情况就会需要多个round:

  1. 指定的proposer节点不在线
  2. 由proposer提交的区块时无效的
  3. 被提案的区块没有及时的广播
  4. proposal block有效,但是没有足够多的节点在Precommit 阶段及时收到对应的 +2/3 的prevotes
  5. proposal block有效,也有足够多的节点接收到了+2/3 的prevotes,但是没有足够多的节点收到+2/3 的 precommits

round-based过程如下:

                    +-------------------------------------+
                         v                                     |(Wait til `CommmitTime+timeoutCommit`)
                   +-----------+                         +-----+-----+
      +----------> |  Propose  +--------------+          | NewHeight |
      |            +-----------+              |          +-----------+
      |                                       |                ^
      |(Else, after timeoutPrecommit)         v                |
+-----+-----+                           +-----------+          |
| Precommit |  <------------------------+  Prevote  |          |
+-----+-----+                           +-----------+          |
      |(When +2/3 Precommits for block found)                  |
      v                                                        |
+--------------------------------------------------------------------+
|  Commit                                                            |
|                                                                    |
|  * Set CommitTime = now;                                           |
|  * Wait for block, then stage/save/commit block;                   |
+--------------------------------------------------------------------+

Tendermint的共识算法大体流程就是这些,具体的细节将在分析源码的时候进行探讨。

这个文章对共识进行详细描述,并且也解释了重要的锁机制:https://www.odaily.com/post/5134145

源码分析

Tendermint的共识功能主要在tendermint/consensus/state.go文件里进行实现

NewState

func NewState(
	config *cfg.ConsensusConfig,
	state sm.State,
	blockExec *sm.BlockExecutor,
	blockStore sm.BlockStore,
	txNotifier txNotifier,
	evpool evidencePool,
	options ...StateOption,
) *State 
	cs := &State
		config:           config,
		blockExec:        blockExec,
		blockStore:       blockStore,
		txNotifier:       txNotifier,
		peerMsgQueue:     make(chan msgInfo, msgQueueSize),
		internalMsgQueue: make(chan msgInfo, msgQueueSize),
		timeoutTicker:    NewTimeoutTicker(),
		statsMsgQueue:    make(chan msgInfo, msgQueueSize),
		done:             make(chan struct),
		doWALCatchup:     true,
		wal:              nilWAL,
		evpool:           evpool,
		evsw:             tmevents.NewEventSwitch(),
		metrics:          NopMetrics(),
	

	// 设置一些默认函数,在reactor没有启动前可以被重写
	cs.decideProposal = cs.defaultDecideProposal
	cs.doPrevote = cs.defaultDoPrevote
	cs.setProposal = cs.defaultSetProposal

	// We have no votes, so reconstruct LastCommit from SeenCommit.
	if state.LastBlockHeight > 0 
		cs.reconstructLastCommit(state)
	

	cs.updateToState(state)

	// NOTE: we do not call scheduleRound0 yet, we do that upon Start()

	cs.BaseService = *service.NewBaseService(nil, "State", cs)
	for _, option := range options 
		option(cs)
	

	return cs

OnStart

Onstart通过WAL加载最新的state,并开启超时和接收消息协程

func (cs *State) OnStart() error 
	...
	...
	...

	// Double Signing Risk Reduction
	if err := cs.checkDoubleSigningRisk(cs.Height); err != nil 
		return err
	

	// 开启接收信息的协程
	go cs.receiveRoutine(0)

	// schedule the first round!
	// use GetRoundState so we don't race the receiveRoutine for access
	cs.scheduleRound0(cs.GetRoundState())

	return nil

receiveRoutine

这个函数就比较重要了,它处理了可能导致状态转换的消息。其中超时消息、完成一个提案和超过2/3的投票都会导致状态转换。

func (cs *State) receiveRoutine(maxSteps int) 
	onExit := func(cs *State) 
		// NOTE: the internalMsgQueue may have signed messages from our
		// priv_val that haven't hit the WAL, but its ok because
		// priv_val tracks LastSig

		// close wal now that we're done writing to it
		if err := cs.wal.Stop(); err != nil 
			cs.Logger.Error("failed trying to stop WAL", "error", err)
		

		cs.wal.Wait()
		close(cs.done)
	

	defer func() 
		if r := recover(); r != nil 
			cs.Logger.Error("CONSENSUS FAILURE!!!", "err", r, "stack", string(debug.Stack()))
			// stop gracefully
			//
			// NOTE: We most probably shouldn't be running any further when there is
			// some unexpected panic. Some unknown error happened, and so we don't
			// know if that will result in the validator signing an invalid thing. It
			// might be worthwhile to explore a mechanism for manual resuming via
			// some console or secure RPC system, but for now, halting the chain upon
			// unexpected consensus bugs sounds like the better option.
			onExit(cs)
		
	()

	for 
		if maxSteps > 0 
			if cs.nSteps >= maxSteps 
				cs.Logger.Debug("reached max steps; exiting receive routine")
				cs.nSteps = 0
				return
			
		

		rs := cs.RoundState
		var mi msgInfo

		select 
		// 把有效交易添加到交易池的时候会设置TxAvailable
		case <-cs.txNotifier.TxsAvailable():
			cs.handleTxsAvailable()
	    // peer消息通道
		case mi = <-cs.peerMsgQueue:
			if err := cs.wal.Write(mi); err != nil 
				cs.Logger.Error("failed writing to WAL", "err", err)
			

			// 处理 proposal、block parts、votes的消息
			cs.handleMsg(mi)

		// 处理内部消息
		case mi = <-cs.internalMsgQueue:
			err := cs.wal.WriteSync(mi) // NOTE: fsync
			if err != nil 
				panic(fmt.Sprintf(
					"failed to write %v msg to consensus WAL due to %v; check your file system and restart the node",
					mi, err,
				))
			

			if _, ok := mi.Msg.(*VoteMessage); ok 
				// we actually want to simulate failing during
				// the previous WriteSync, but this isn't easy to do.
				// Equivalent would be to fail here and manually remove
				// some bytes from the end of the wal.
				fail.Fail() // XXX
			

			// handles proposals, block parts, votes
			cs.handleMsg(mi)
		// 处理超时消息
		case ti := <-cs.timeoutTicker.Chan(): // tockChan:
			if err := cs.wal.Write(ti); err != nil 
				cs.Logger.Error("failed writing to WAL", "err", err)
			

			// if the timeout is relevant to the rs
			// go to the next step
			cs.handleTimeout(ti, rs)

		case <-cs.Quit():
			onExit(cs)
			return
		
	

上面的函数运行完毕后,就可以等待进入状态跃迁的函数,进行共识了。
官方的共识流程图表示如下:
单节点共识完整流程的代码流程为:

  1. 首先进入enterNewRound
  2. 之后从enterNewRound进入enterPropose
  3. 进入enterPropose后,判断自己是不是validator.只有一个节点自己就是,进入defaultDecideProposal
  4. 进入defaultDecideProposal,把proposal和blockPartMsg发送到internalMsgQueue
  5. 收到internalMsgQueue的消息,然后进入handleMsg,通过handleMsg进入addProposalBlockPart
  6. 通过addProposalBlockPart 最后进入到enterPrevote
  7. 通过enterPrevote进入到defaultDoPrevote,对proposal进行签名,并发送到internalMsgQueue
  8. handleMsg对收到的消息进行处理,进入到tryAddVote
  9. tryAddVote判断vote正确,并且满足超过三分之二的情况,进入enterPrevoteWait
  10. 计时器超时,从enterPrevoteWait进入到enterPrecommit
  11. 通过enterPrevote对proposal进行再次签名,并发送到internalMsgQueue
  12. handleMsg对收到的消息进行处理,进入到tryAddVote
  13. tryAddVote判断vote正确,进入enterCommit,这里涉及情况比较多(在多个节点的条件下).
  14. enterCommit落地区块,将区块发送给abci,收到返回后,此次共识结束.

由于代码较多,会把相对不太重要的代码给省略掉。这里主要列举对共识流程重要的代码。

enterNewRound

func (cs *State) enterNewRound(height int64, round int32) 
	logger := cs.Logger.With("height", height, "round", round)

	// 进行状态校验
	if cs.Height != height || round < cs.Round || (cs.Round == round && cs.Step != cstypes.RoundStepNewHeight) 
		logger.Debug(
			"entering new round with invalid args",
			"current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step),
		)
		return
	
	// 开启定时器
	if now := tmtime.Now(); cs.StartTime.After(now) 
		logger.Debug("need to set a buffer and log message here for sanity", "start_time", cs.StartTime, "now", now)
	

	logger.Debug("entering new round", "current", fmt.Sprintf("%v/%v/%v", cs.Height, cs.Round, cs.Step))

	// 如果有新的Validator就添加
	validators := cs.Validators
	if cs.Round < round 
		validators = validators.Copy()
		validators.IncrementProposerPriority(tmmath.SafeSubInt32(round, cs.Round))
	

	// 开始一轮新的round
	cs.updateRoundStep(round, cstypes.RoundStepNewRound)
	cs.Validators = validators
	if round == 0 
		// We've already reset these upon new height,
		// and meanwhile we might have received a proposal
		// for round 0.
	 else 
		logger.Debug("resetting proposal info")
		cs.Proposal = nil
		cs.ProposalBlock = nil
		cs.ProposalBlockParts = nil
	

	cs.Votes.SetRound(tmmath.SafeAddInt32(round, 1)) // also track next round (round+1) to allow round-skipping
	cs.TriggeredTimeoutPrecommit = false

	if err := cs.eventBus.PublishEventNewRound(cs.NewRoundEvent()); err != nil 
		cs.Logger.Error("failed publishing new round", "err", err)
	

	cs.metrics.Rounds.Set(float64(round))

	// 在我们进入round 0 之前要等待交易在mempool中设置为available,
	// 如果最后一个区块改变了app hash我们需要一个空的proof区块,并且立即进入enterProposer函数
	waitForTxs := cs.config.WaitForTxs() && round == 0 && !cs.needProofBlock(height)
	if waitForTxs 
		if cs.config.CreateEmptyBlocksInterval > 0 
			cs.scheduleTimeout(cs.config.CreateEmptyBlocksInterval, height, round,
				cstypes.RoundStepNewRound)
		
	 else 
		// 进入enterPropose
		cs.enterPropose(height, round)
	

enterPropose

func (cs *State) enterPropose(height int64, round int32) 
	
	...
	...

	// 节点验证
	if cs.privValidatorPubKey == nil 
		// If this node is a validator & proposer in the current round, it will
		// miss the opportunity to create a block.
		logger.Error("propose step; empty priv validator public key", "err", errPubKeyIsNotSet)
		return
	

	address := cs.privValidatorPubKey.Address()

	// if not a validator, we're done
	if !cs.Validators.HasAddress(address) 
		logger.Debug("node is not a validator", "addr", address, "vals", cs.Validators)
		return
	

	// 判断当前节点是否为proposer,如果是的话就开始准备提案
	if cs.isProposer(address) 
		logger.Debug(
			"propose step; our turn to propose",
			"proposer", address,
		)

		cs.decideProposal(height, round)
	 else 
		logger.Debug(
			"propose step; not our turn to propose",
			"proposer", cs.Validators.GetProposer().Address,
		)
	

decideProposal

func (cs *State) defaultDecideProposal(height int64, round int32) 
	
	...
	...
	
	// 创建proposal
	propBlockID := types.BlockIDHash: block.Hash(), PartSetHeader: blockParts.Header()
	proposal := types.NewProposal(height, round, cs.ValidRound, propBlockID)
	p := proposal.ToProto()

	// 等待最大数量的proposal
	ctx, cancel := context.WithTimeout(context.TODO(), cs.config.TimeoutPropose)
	defer cancel()
	// 对proposal进行签名
	if err := cs.privValidator.SignProposal(ctx, cs.state.ChainID, p); err == nil 
		proposal.Signature = p.Signature

		// 把数据发送到 sendInternalMessage channel中
		// 这个channel在receiveRoutine函数启动、等待消息的传入	
		cs.sendInternalMessage(msgInfo&ProposalMessageproposal, "")

		for i := 0; i < int(blockParts.Total()); i++ 
			part := blockParts.GetPart(i)
			cs.sendInternalMessage(msgInfo&BlockPartMessagecs.Height, cs.Round, part, "")
		

		cs.Logger.Debug("signed proposal", "height", height, "round", round, "proposal", proposal)
	 else if !cs.replayMode 
		cs.Logger.Error("propose step; failed signing proposal", "height", height, "round", round, "err", err)
	

addProposalBlockPart

func (cs *State) addProposalBlockPart(msg *BlockPartMessage, peerID p2p.NodeID) (added bool, err error) 
		
		...
		...

		if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() 
			// Move onto the next step
			cs.enterPrevote(height, cs.Round)
			if hasTwoThirds  // this is optimisation as this will be triggered when prevote is added
				cs.enterPrecommit(height, cs.Round)
			
		 else if cs.Step == cstypes.RoundStepCommit 
			// If we're waiting on the proposal block...
			cs.tryFinalizeCommit(height)
		

		return added, nil
	

	return added, nil

signAddVote

addProposalBlockPart 会进入到enterPrevote,再然后进入到doPrevote,doPrevote的默认函数为doPrevoteproposal,doPrevoteproposal通过调用signAddVote对进行Proposal签名,并发送到internalMsgQueue。

func (cs *State) signAddVote(msgType tmproto.SignedMsgType, hash Tendermint 共识分析

深入Tendermint --- 出块节点选择

tendermint state分析

tendermint state分析

详解Tendermint共识算法

深度知识Tendermint共识算法原理和框架流程