手撸golang etcd raft协议之8

Posted ioly

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手撸golang etcd raft协议之8相关的知识,希望对你有一定的参考价值。

手撸golang etcd raft协议之8

缘起

最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop

raft分布式一致性算法

分布式存储系统通常会通过维护多个副本来进行容错,
以提高系统的可用性。
这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?

Raft算法把问题分解成了四个子问题:
1. 领袖选举(leader election)、
2. 日志复制(log replication)、
3. 安全性(safety)
4. 成员关系变化(membership changes)
这几个子问题。

目标

  • 根据raft协议,实现高可用分布式强一致的kv存储

子目标(Day 8)

  • 简化rpc连接管理器tRaftClientService的实现
  • 剥离IRaftLSM的内部支持接口到iRaftStateContext接口
  • 完成Candidate状态的处理逻辑

设计

  • IRaftLSM:raft有限状态机接口
  • iRaftStateContext:提供状态模式下的上下文支持
  • tCandidateState:Candidate(候选人)状态的实现。基于事件驱动的逻辑编排,基于读写分离的字段管理。
  • tRaftClient:管理到指定raft节点的rpc连接
  • tRaftClientService:管理当前节点到其他raft节点的rpc连接

IRaftLSM.go

raft有限状态机接口

package lsm

import (
    "learning/gooop/etcd/raft/rpc"
)

// IRaftLSM raft有限状态自动机
type IRaftLSM interface {
    rpc.IRaftRPC
    iRaftStateContext

    State() IRaftState
}

iRaftStateContext.go

提供状态模式下的上下文支持

package lsm

import (
    "learning/gooop/etcd/raft/config"
    "learning/gooop/etcd/raft/rpc/client"
    "learning/gooop/etcd/raft/store"
)

type iRaftStateContext interface {
    Config() config.IRaftConfig
    Store() store.ILogStore
    HandleStateChanged(state IRaftState)
    RaftClientService() client.IRaftClientService
}

tCandidateState.go

Candidate(候选人)状态的实现。基于事件驱动的逻辑编排,基于读写分离的字段管理。

package lsm

import (
    "learning/gooop/etcd/raft/roles"
    "learning/gooop/etcd/raft/rpc"
    "learning/gooop/etcd/raft/timeout"
    "sync"
    "time"
)

// tCandidateState presents a candidate node
type tCandidateState struct {
    tEventDrivenModel

    context    iRaftStateContext
    mInitOnce  sync.Once
    mStartOnce sync.Once

    // update: init / ceAskingForVote
    mTerm int64

    // update: ceInit / ceAskingForVote / ceVoteToCandidate
    mVotedTerm int64

    // update: ceInit / ceAskingForVote / ceVoteToCandidate
    mVotedCandidateID string

    // update: ceInit / ceAskingForVote / ceVoteToCandidate
    mVotedTimestamp int64

    // update: ceInit / ceAskingForVote / ceReceiveTicket / ceDisposing
    mTicketCount map[string]bool
    mTicketMutex *sync.Mutex

    // update: ceInit / ceDisposing
    mDisposedFlag bool
}

// trigger: init()
// args: empty
const ceInit = "candidate.init"

// trigger: Start()
// args: empty
const ceStart = "candidate.Start"

// trigger: whenAskingForVoteThenWatchElectionTimeout()
// args: empty
const ceElectionTimeout = "candidate.ElectionTimeout"

// trigger: Heartbeat() / AppendLog() / CommitLog()
// args: empty
const ceLeaderAnnounced = "candidate.LeaderAnnounced"

// trigger: RequestVote()
// args: *rpc.RequestVoteCmd
const ceVoteToCandidate = "candidate.VoteToCandidate"

// trigger: whenLeaderAnnouncedThenSwitchToFollower()
// args: empty
const ceDisposing = "candidate.Disposing"

// trigger: beginAskForVote()
// args: empty
const ceAskingForVote = "candidate.AskingForVote"

// trigger: handleRequestVoteOK()
// args: empty
const ceReceiveTicket = "candidate.ReceiveTicket"

// trigger: whenReceiveTicketThenCheckTicketCount
// args: empty
const ceWinningTheVote = "candidate.ceWinningTheVote"

func newCandidateState(ctx iRaftStateContext, term int64) IRaftState {
    it := new(tCandidateState)
    it.init(ctx, term)
    return it
}

func (me *tCandidateState) init(ctx iRaftStateContext, term int64) {
    me.mInitOnce.Do(func() {
        me.context = ctx
        me.mTerm = term
        me.initEventHandlers()

        me.raise(ceInit)
    })
}

func (me *tCandidateState) initEventHandlers() {
    // write only logic
    me.hookEventsForTerm()
    me.hookEventsForVotedTerm()
    me.hookEventsForVotedCandidateID()
    me.hookEventsForVotedTimestamp()
    me.hookEventsForTicketCount()
    me.hookEventsForDisposedFlag()

    // read only logic
    me.hook(ceStart,
        me.whenStartThenAskForVote)
    me.hook(ceAskingForVote,
        me.whenAskingForVoteThenWatchElectionTimeout)
    me.hook(ceReceiveTicket,
        me.whenReceiveTicketThenCheckTicketCount)
    me.hook(ceElectionTimeout,
        me.whenElectionTimeoutThenAskForVoteAgain)
    me.hook(ceWinningTheVote,
        me.whenWinningTheVoteThenSwitchToLeader)
    me.hook(ceLeaderAnnounced,
        me.whenLeaderAnnouncedThenSwitchToFollower)


}

// hookEventsForTerm maintains field: mTerm
// update: ceElectionTimeout
func (me *tCandidateState) hookEventsForTerm() {
    me.hook(ceAskingForVote, func(e string, args ...interface{}) {
        me.mTerm++
    })
}

// hookEventsForVotedTerm maintains field: mVotedTerm
// update: ceInit / ceElectionTimeout / ceVoteToCandidate
func (me *tCandidateState) hookEventsForVotedTerm() {
    me.hook(ceInit, func(e string, args ...interface{}) {
        // initially, vote to itself
        me.mVotedTerm = me.mTerm
    })

    me.hook(ceAskingForVote, func(e string, args ...interface{}) {
        // when timeout, reset to itself
        me.mVotedTerm = me.mTerm
    })

    me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
        // after vote to candidate
        cmd := args[0].(*rpc.RequestVoteCmd)
        me.mVotedTerm = cmd.Term
    })
}

// hookEventsForVotedCandidateID maintains field: mVotedCandidateID
// update: ceInit / ceElectionTimeout / ceVoteToCandidate
func (me *tCandidateState) hookEventsForVotedCandidateID() {
    me.hook(ceInit, func(e string, args ...interface{}) {
        // initially, vote to itself
        me.mVotedCandidateID = me.context.Config().ID()
    })

    me.hook(ceAskingForVote, func(e string, args ...interface{}) {
        me.mVotedCandidateID = me.context.Config().ID()
    })

    me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
        // after vote to candidate
        cmd := args[0].(*rpc.RequestVoteCmd)
        me.mVotedCandidateID = cmd.CandidateID
    })
}

func (me *tCandidateState) hookEventsForVotedTimestamp() {
    me.hook(ceInit, func(e string, args ...interface{}) {
        // initially, vote to itself
        me.mVotedTimestamp = time.Now().UnixNano()
    })

    me.hook(ceAskingForVote, func(e string, args ...interface{}) {
        me.mVotedTimestamp = time.Now().UnixNano()
    })

    me.hook(ceVoteToCandidate, func(e string, args ...interface{}) {
        // after vote to candidate
        me.mVotedTimestamp = time.Now().UnixNano()
    })
}

func (me *tCandidateState) hookEventsForTicketCount() {
    me.hook(ceInit, func(e string, args ...interface{}) {
        me.mTicketMutex = new(sync.Mutex)
        me.mTicketCount = make(map[string]bool, 0)
        me.mTicketCount[me.context.Config().ID()] = true
    })

    me.hook(ceAskingForVote, func(e string, args ...interface{}) {
        me.mTicketMutex.Lock()
        defer me.mTicketMutex.Unlock()

        me.mTicketCount = make(map[string]bool, 0)
        me.mTicketCount[me.context.Config().ID()] = true
    })

    me.hook(ceReceiveTicket, func(e string, args ...interface{}) {
        peerID := args[0].(string)

        me.mTicketMutex.Lock()
        defer me.mTicketMutex.Unlock()
        me.mTicketCount[peerID] = true
    })

    me.hook(ceDisposing, func(e string, args ...interface{}) {
        me.mTicketMutex.Lock()
        defer me.mTicketMutex.Unlock()
        me.mTicketCount = make(map[string]bool, 0)
    })
}


func (me *tCandidateState) hookEventsForDisposedFlag() {
    me.hook(ceInit, func(e string, args ...interface{}) {
        me.mDisposedFlag = false
    })

    me.hook(ceDisposing, func(e string, args ...interface{}) {
        me.mDisposedFlag = true
    })
}

func (me *tCandidateState) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
    // check term
    if cmd.Term <= me.mTerm {
        // bad leader
        ret.Code = rpc.HBTermMismatch
        return nil
    }

    // new leader
    me.raise(ceLeaderAnnounced)

    // return ok
    ret.Code = rpc.HBOk
    return nil
}

func (me *tCandidateState) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
    // check term
    if cmd.Term <= me.mTerm {
        // bad leader
        ret.Code = rpc.ALTermMismatch
        return nil
    }

    // new leader
    me.raise(ceLeaderAnnounced)

    // ignore and return
    ret.Code = rpc.ALInternalError
    return nil
}

func (me *tCandidateState) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
    // ignore and return
    ret.Code = rpc.CLInternalError
    return nil
}

func (me *tCandidateState) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
    // check voted term
    if cmd.Term < me.mVotedTerm {
        ret.Code = rpc.RVTermMismatch
        return nil
    }

    if cmd.Term == me.mVotedTerm {
        if me.mVotedCandidateID != "" && me.mVotedCandidateID != cmd.CandidateID {
            // already vote another
            ret.Code = rpc.RVVotedAnother
            return nil
        } else {
            // already voted
            ret.Code = rpc.RVOk
            return nil
        }
    }

    if cmd.Term > me.mVotedTerm {
        // new term, check log
        if cmd.LastLogIndex >= me.context.Store().LastCommittedIndex() {
            // good log
            me.raise(ceVoteToCandidate, cmd)
            ret.Code = rpc.RVOk

        } else {
            // bad log
            ret.Code = rpc.RVLogMismatch
        }

        return nil
    }

    // should not reaches here
    ret.Code = rpc.RVTermMismatch
    return nil
}

func (me *tCandidateState) Role() roles.RaftRole {
    return roles.Candidate
}

func (me *tCandidateState) Start() {
    me.mStartOnce.Do(func() {
        me.raise(feStart)
    })
}

func (me *tCandidateState) whenLeaderAnnouncedThenSwitchToFollower(_ string, _ ...interface{}) {
    me.raise(ceDisposing)
    me.context.HandleStateChanged(newFollowerState(me.context))
}

func (me *tCandidateState) whenElectionTimeoutThenAskForVoteAgain(_ string, _ ...interface{}) {
    me.beginAskForVote()
}

func (me *tCandidateState) whenStartThenAskForVote(_ string, _ ...interface{}) {
    me.beginAskForVote()
}

func (me *tCandidateState) beginAskForVote() {
    // raise ceAskingForVote
    me.raise(ceAskingForVote)

    // for each node, call node.RequestVote
    cmd := new(rpc.RequestVoteCmd)
    cmd.CandidateID = me.context.Config().ID()
    cmd.Term = me.mTerm

    store := me.context.Store()
    cmd.LastLogIndex = store.LastCommittedIndex()
    cmd.LastLogTerm = store.LastCommittedTerm()

    term := me.mTerm
    for _,node := range me.context.Config().Nodes() {
        if node.ID() == me.context.Config().ID() {
            continue
        }

        peerID := node.ID()
        go func() {
            ret := new(rpc.RequestVoteRet)
            err := me.context.RaftClientService().Using(peerID, func(client rpc.IRaftRPC) error {
                return client.RequestVote(cmd, ret)
            })

            if err == nil && ret.Code == rpc.RVOk {
                me.handleRequestVoteOK(peerID, term)
            }
        }()
    }
}


func (me *tCandidateState) whenAskingForVoteThenWatchElectionTimeout(_ string, _ ...interface{}) {
    term := me.mTerm
    go func() {
        time.Sleep(timeout.RandElectionTimeout())

        if me.mDisposedFlag || me.mTerm != term {
            return
        }

        tc := me.getTicketCount()
        if tc < len(me.context.Config().Nodes())/2 + 1 {
            me.raise(ceElectionTimeout)
        }
    }()
}


func (me *tCandidateState) handleRequestVoteOK(peerID string, term int64) {
    if me.mDisposedFlag || me.mTerm != term {
        return
    }

    me.raise(ceReceiveTicket, peerID)
}


func (me *tCandidateState) whenReceiveTicketThenCheckTicketCount(_ string, _ ...interface{}) {
    tc := me.getTicketCount()
    if tc >= len(me.context.Config().Nodes())/2 + 1 {
        // win the vote
        me.raise(ceWinningTheVote)
    }
}

func (me *tCandidateState) getTicketCount() int {
    me.mTicketMutex.Lock()
    defer me.mTicketMutex.Unlock()
    return len(me.mTicketCount)
}

func (me *tCandidateState) whenWinningTheVoteThenSwitchToLeader(_ string, _ ...interface{}) {
    me.raise(ceDisposing)
    me.context.HandleStateChanged(newLeaderState(me.context, me.mTerm))
}

tRaftClient.go

管理到指定raft节点的rpc连接

package client

import (
    "learning/gooop/etcd/raft/config"
    rrpc "learning/gooop/etcd/raft/rpc"
    "net/rpc"
)

type tRaftClient struct {
    cfg   config.IRaftNodeConfig
    conn  *rpc.Client
    state iClientState
}

func newRaftClient(cfg config.IRaftNodeConfig, conn *rpc.Client) IRaftClient {
    it := new(tRaftClient)
    it.init(cfg, conn)
    return it
}

func (me *tRaftClient) init(cfg config.IRaftNodeConfig, conn *rpc.Client) {
    me.cfg = cfg
    me.conn = conn

    if conn == nil {
        me.state = newBrokenState(me)
    } else {
        me.state = newConnectedState(me)
    }
    me.state.Start()
}

func (me *tRaftClient) Config() config.IRaftNodeConfig {
    return me.cfg
}

func (me *tRaftClient) GetConn() *rpc.Client {
    return me.conn
}

func (me *tRaftClient) SetConn(conn *rpc.Client) {
    me.conn = conn
}

func (me *tRaftClient) HandleStateChanged(state iClientState) {
    me.state = state
    state.Start()
}

func (me *tRaftClient) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {
    return me.state.Heartbeat(cmd, ret)
}

func (me *tRaftClient) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {
    return me.state.AppendLog(cmd, ret)
}

func (me *tRaftClient) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {
    return me.state.CommitLog(cmd, ret)
}

func (me *tRaftClient) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {
    return me.state.RequestVote(cmd, ret)
}

func (me *tRaftClient) Ping(cmd *PingCmd, ret *PingRet) error {
    return me.state.Ping(cmd, ret)
}

tRaftClientService.go

管理当前节点到其他raft节点的rpc连接

package client

import (
    "errors"
    "learning/gooop/etcd/raft/config"
    "learning/gooop/etcd/raft/rpc"
    netrpc "net/rpc"
)

type tRaftClientService struct {
    cfg     config.IRaftConfig
    clients map[string]IRaftClient
}

func NewRaftClientService(cfg config.IRaftConfig) IRaftClientService {
    it := new(tRaftClientService)
    it.init(cfg)
    return it
}

func (me *tRaftClientService) init(cfg config.IRaftConfig) {
    me.cfg = cfg
    me.clients = make(map[string]IRaftClient)

    for _,nc := range me.cfg.Nodes() {
        me.clients[nc.ID()] = me.createRaftClient(nc)
    }
}


func (me  *tRaftClientService) createRaftClient(nodeCfg config.IRaftNodeConfig) IRaftClient {
    // dial to peer
    conn, err := netrpc.Dial("tcp", nodeCfg.Endpoint())
    if err != nil {
        return newRaftClient(nodeCfg, nil)
    } else {
        return newRaftClient(nodeCfg, conn)
    }
}

func (me *tRaftClientService) Using(peerID string, action func(client rpc.IRaftRPC) error) error {
    it, ok := me.clients[peerID]
    if ok {
        return action(it)
    } else {
        return gErrorUnknownRaftPeer
    }
}


var gErrorUnknownRaftPeer = errors.New("unknown raft peer")

(未完待续)

以上是关于手撸golang etcd raft协议之8的主要内容,如果未能解决你的问题,请参考以下文章

手撸golang etcd raft协议之4

手撸golang etcd raft协议之9,10

手撸golang 学etcd 手写raft协议之12 单元测试

手撸golang 学ectd 手写raft协议13 小结

etcd源码解读之raft协议实现

深入浅出etcd之raft实现