手撸golang etcd raft协议之4

Posted ioly

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手撸golang etcd raft协议之4相关的知识,希望对你有一定的参考价值。

手撸golang etcd raft协议之4

缘起

最近阅读 [云原生分布式存储基石:etcd深入解析] (杜军 , 2019.1)
本系列笔记拟采用golang练习之
gitee: https://gitee.com/ioly/learning.gooop

raft分布式一致性算法

分布式存储系统通常会通过维护多个副本来进行容错,
以提高系统的可用性。
这就引出了分布式存储系统的核心问题——如何保证多个副本的一致性?

Raft算法把问题分解成了领袖选举(leader election)、
日志复制(log replication)、安全性(safety)
和成员关系变化(membership changes)这几个子问题。

Raft算法的基本操作只需2种RPC即可完成。
RequestVote RPC是在选举过程中通过旧的Leader触发的,
AppendEntries RPC是领导人触发的,目的是向其他节点复制日志条目和发送心跳(heartbeat)。

目标

  • 根据raft协议,实现高可用分布式强一致的kv存储

子目标(Day 4)

  • 使用boltdb存储操作日志和kv键值数据

    • unstable存储桶:已收到未提交的日志,重启后清空
    • committed存储桶:已提交的日志
    • data存储桶:kv键值数据
    • meta存储桶:记录末次提交的index和term

设计

  • model/LogEntry: 日志条目
  • ICmd:操作指令接口
  • ICmdFactory:操作指令工厂
  • ILogStore:日志存储接口
  • tCmdBase:指令基类
  • PutCmd:put指令
  • DelCmd:del指令
  • tBoltDBStore:基于boltdb实现日志暂存,提交和应用

LogEntry.go

日志条目

package model

import "encoding/json"

type LogEntry struct {
    Tag       int
    Term      int64
    Index     int64
    PrevTerm  int64
    PrevIndex int64
    Command   []byte
}

func (me *LogEntry) Marshal() (error, []byte) {
    j, e := json.Marshal(me)
    if e != nil {
        return e, nil
    }
    return nil, j
}

func (me *LogEntry) Unmarshal(data []byte) error {
    return json.Unmarshal(data, me)
}

ICmd.go

操作指令接口

package store

import "github.com/boltdb/bolt"

type ICmd interface {
    Marshal() []byte
    Unmarshal(data []byte)
    Apply(tx *bolt.Tx) error
}

ICmdFactory.go

操作指令工厂

package store

import "fmt"

type ICmdFactory interface {
    OfTag(tag int) ICmd
    Tag(cmd ICmd) int
}

type tDefaultCmdFactory int

const gPutCmdTag = 1
const gDelCmdTag = 2

func (me *tDefaultCmdFactory) OfTag(tag int) ICmd {
    switch tag {
    case gPutCmdTag:
        return new(PutCmd)

    case gDelCmdTag:
        return new(DelCmd)
    }

    panic(fmt.Sprintf("unknown tag: %d", tag))
}

func (me *tDefaultCmdFactory) Tag(cmd ICmd) int {
    if _, ok := cmd.(*PutCmd); ok {
        return gPutCmdTag
    }

    if _, ok := cmd.(*DelCmd); ok {
        return gDelCmdTag
    }

    panic(fmt.Sprintf("unknown cmd: %v", cmd))
}

var gCmdFactory = new(tDefaultCmdFactory)

ILogStore.go

日志存储接口

package store

import "learning/gooop/etcd/raft/model"

type ILogStore interface {
    Term() int64
    Index() int64
    Append(entry *model.LogEntry) error
    Commit(index int64) error
}

tCmdBase.go

指令基类

package store

import "encoding/json"

type tCmdBase struct {
}

func (me *tCmdBase) Marshal() []byte {
    j, e := json.Marshal(me)
    if e != nil {
        return nil
    }
    return j
}

func (me *tCmdBase) Unmarshal(data []byte) {
    _ = json.Unmarshal(data, me)
}

PutCmd.go

put指令

package store

import "github.com/boltdb/bolt"

type PutCmd struct {
    tCmdBase

    Key   string
    Value []byte
}

func (me *PutCmd) Apply(tx *bolt.Tx) error {
    b := tx.Bucket(gDataBucket)
    return b.Put([]byte(me.Key), me.Value)
}

DelCmd.go

del指令

package store

import "github.com/boltdb/bolt"

type DelCmd struct {
    tCmdBase

    Key string
}

func (me *DelCmd) Apply(tx *bolt.Tx) error {
    b := tx.Bucket(gDataBucket)
    return b.Delete([]byte(me.Key))
}

tBoltDBStore.go

基于boltdb实现日志暂存,提交和应用

package store

import (
    "bytes"
    "encoding/binary"
    "errors"
    "github.com/boltdb/bolt"
    "learning/gooop/etcd/raft/model"
)

type tBoltDBStore struct {
    file  string
    term  int64
    index int64

    db bolt.DB
}

func NewBoltStore(file string) (error, ILogStore) {
    db, err := bolt.Open(file, 0600, nil)
    if err != nil {
        return err, nil
    }

    store := new(tBoltDBStore)
    err = db.Update(func(tx *bolt.Tx) error {
        b, e := tx.CreateBucketIfNotExists(gMetaBucket)
        if e != nil {
            return e
        }

        v := b.Get(gKeyCommittedTerm)
        if v == nil {
            e = b.Put(gKeyCommittedTerm, int64ToBytes(gDefaultTerm))
            if e != nil {
                return e
            }
            store.term = gDefaultTerm

        } else {
            store.term = bytesToInt64(v)
        }

        v = b.Get(gKeyCommittedIndex)
        if v == nil {
            e = b.Put(gKeyCommittedIndex, int64ToBytes(gDefaultIndex))
            if e != nil {
                return e
            }
            store.index = gDefaultIndex

        } else {
            store.index = bytesToInt64(v)
        }

        b, e = tx.CreateBucketIfNotExists(gDataBucket)
        if e != nil {
            return e
        }

        e = tx.DeleteBucket(gUnstableBucket)
        if e != nil {
            return e
        }
        _, e = tx.CreateBucket(gUnstableBucket)
        if e != nil {
            return e
        }

        _, e = tx.CreateBucketIfNotExists(gCommittedBucket)
        if e != nil {
            return e
        }

        return nil
    })

    if err != nil {
        return err, nil
    }

    return nil, store
}

func int64ToBytes(i int64) []byte {
    buf := bytes.NewBuffer(make([]byte, 8))
    _ = binary.Write(buf, binary.BigEndian, i)
    return buf.Bytes()
}

func bytesToInt64(data []byte) int64 {
    var i int64
    buf := bytes.NewBuffer(data)
    _ = binary.Read(buf, binary.BigEndian, &i)
    return i
}

func (me *tBoltDBStore) Term() int64 {
    return me.term
}

func (me *tBoltDBStore) Index() int64 {
    return me.index
}

func (me *tBoltDBStore) Append(entry *model.LogEntry) error {
    cmd := gCmdFactory.OfTag(entry.Tag)
    cmd.Unmarshal(entry.Command)

    e, entryData := entry.Marshal()
    if e != nil {
        return e
    }

    return me.db.Update(func(tx *bolt.Tx) error {
        // save log to unstable
        b := tx.Bucket(gUnstableBucket)
        e = b.Put(int64ToBytes(entry.Index), entryData)
        if e != nil {
            return e
        }

        me.index = entry.Index
        me.term = entry.Term

        return nil
    })
}

func (me *tBoltDBStore) Commit(index int64) error {
    return me.db.Update(func(tx *bolt.Tx) error {
        // read unstable log
        ub := tx.Bucket(gUnstableBucket)
        k := int64ToBytes(index)
        data := ub.Get(k)
        if data == nil {
            return gErrorCommitLogNotFound
        }

        entry := new(model.LogEntry)
        e := entry.Unmarshal(data)
        if e != nil {
            return e
        }

        // apply cmd
        cmd := gCmdFactory.OfTag(entry.Tag)
        cmd.Unmarshal(entry.Command)
        e = cmd.Apply(tx)
        if e != nil {
            return e
        }

        // save to committed log
        cb := tx.Bucket(gCommittedBucket)
        e = cb.Put(k, data)
        if e != nil {
            return e
        }

        // update committed.index, committed.term
        mb := tx.Bucket(gMetaBucket)
        e = mb.Put(gKeyCommittedIndex, int64ToBytes(index))
        if e != nil {
            return e
        }

        e = mb.Put(gKeyCommittedTerm, int64ToBytes(entry.Term))
        if e != nil {
            return e
        }

        // del unstable.index
        e = ub.Delete(k)
        if e != nil {
            return e
        }

        return nil
    })
}

var gMetaBucket = []byte("meta")
var gUnstableBucket = []byte("unstable")
var gCommittedBucket = []byte("committed")
var gDataBucket = []byte("data")

var gKeyCommittedIndex = []byte("committed.index")
var gKeyCommittedTerm = []byte("committed.term")

var gDefaultTerm int64 = 0
var gDefaultIndex int64 = 0

var gErrorCommitLogNotFound = errors.New("committing log not found")

(未完待续)

以上是关于手撸golang etcd raft协议之4的主要内容,如果未能解决你的问题,请参考以下文章

手撸golang etcd raft协议之8

手撸golang etcd raft协议之9,10

手撸golang 学etcd 手写raft协议之12 单元测试

手撸golang 学ectd 手写raft协议13 小结

etcd源码解读之raft协议实现

深入浅出etcd之raft实现