tidwall之raft-wal源码分析

Posted 一名半路出家的后台技术人

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了tidwall之raft-wal源码分析相关的知识,希望对你有一定的参考价值。

1. tidwall/wal模块简要介绍

在分析tidwall/raft-wal的源码前,我们先介绍一下tidwall/wal模块,raft-wal内部是采用tidwall/wal来实现的一套持久化方案

具体关于tidwall/wal的详细分析可以参考之前的

2.tidwall/raft-wal的持久化数据存储结构

3.源码分析

所谓的持久化,无非就是几个接口:

  • 初始化

  • 写入

  • 读取

  • 删除

下面进行一一分析

3.1 核心数据结构介绍

 
   
   
 
  1. // LogStore is a write ahead Raft log

  2. type LogStore struct {

  3. mu sync.Mutex

  4. // wal的日志对象

  5. log *wal.Log

  6. buf []byte

  7. batch wal.Batch

  8. }

 
   
   
 
  1. // Log entries are replicated to all members of the Raft cluster

  2. // and form the heart of the replicated state machine.

  3. type Log struct {

  4. // Index holds the index of the log entry.

  5. Index uint64


  6. // Term holds the election term of the log entry.

  7. Term uint64


  8. // Type holds the type of the log entry.

  9. Type LogType


  10. // Data holds the log entry's type-specific data.

  11. Data []byte


  12. // Extensions holds an opaque byte slice of information for middleware. It

  13. // is up to the client of the library to properly modify this as it adds

  14. // layers and remove those layers when appropriate. This value is a part of

  15. // the log, so very large values could cause timing issues.

  16. //

  17. // N.B. It is _up to the client_ to handle upgrade paths. For instance if

  18. // using this with go-raftchunking, the client should ensure that all Raft

  19. // peers are using a version that can handle that extension before ever

  20. // actually triggering chunking behavior. It is sometimes sufficient to

  21. // ensure that non-leaders are upgraded first, then the current leader is

  22. // upgraded, but a leader changeover during this process could lead to

  23. // trouble, so gating extension behavior via some flag in the client

  24. // program is also a good idea.

  25. Extensions []byte

  26. }

3.2 Read、Write、DeleteRange源码分析

3.2.1 初始化、基础接口

 
   
   
 
  1. var _ raft.LogStore = &LogStore{}


  2. // Options for Open

  3. type Options struct {

  4. // NoSync disables fsync after writes. This is less durable and puts the

  5. // log at risk of data loss when there's a server crash. Default false.

  6. NoSync bool

  7. }


  8. // Open the Raft log

  9. func Open(path string, opts *Options) (*LogStore, error) {

  10. s := new(LogStore)

  11. // 默认选项

  12. wopts := *wal.DefaultOptions

  13. if opts != nil {

  14. wopts.NoSync = opts.NoSync

  15. }

  16. // opts.LogFormat = wal.JSON

  17. var err error

  18. s.log, err = wal.Open(path, &wopts)

  19. if err != nil {

  20. return nil, err

  21. }

  22. return s, nil

  23. }


  24. // Close the Raft log

  25. func (s *LogStore) Close() error {

  26. s.mu.Lock()

  27. defer s.mu.Unlock()

  28. return s.log.Close()

  29. }


  30. // FirstIndex returns the first known index from the Raft log.

  31. func (s *LogStore) FirstIndex() (uint64, error) {

  32. s.mu.Lock()

  33. defer s.mu.Unlock()

  34. return s.log.FirstIndex()

  35. }


  36. // LastIndex returns the last known index from the Raft log.

  37. func (s *LogStore) LastIndex() (uint64, error) {

  38. s.mu.Lock()

  39. defer s.mu.Unlock()

  40. return s.log.LastIndex()

  41. }

3.2.2 Read读取日志接口

 
   
   
 
  1. // GetLog is used to retrieve a log from FastLogDB at a given index.

  2. // 根据index读取日志

  3. func (s *LogStore) GetLog(index uint64, log *raft.Log) error {

  4. s.mu.Lock()

  5. defer s.mu.Unlock()

  6. data, err := s.log.Read(index)

  7. if err != nil {

  8. if err == wal.ErrNotFound {

  9. return raft.ErrLogNotFound

  10. }

  11. return err

  12. }

  13. // 设置index

  14. log.Index = index

  15. if len(data) == 0 {

  16. return wal.ErrCorrupt

  17. }

  18. // 日志读取解析的格式:Type+Term+len(Data)+Data+len(Ext)+Ext

  19. // 读取1个字节的type

  20. log.Type = raft.LogType(data[0])

  21. // 偏移data

  22. data = data[1:]

  23. var n int

  24. // 读取varint term

  25. log.Term, n = binary.Uvarint(data)

  26. if n <= 0 {

  27. return wal.ErrCorrupt

  28. }

  29. // 偏移data

  30. data = data[n:]

  31. // 读取数据长度

  32. size, n := binary.Uvarint(data)

  33. if n <= 0 {

  34. return wal.ErrCorrupt

  35. }


  36. // 偏移数据

  37. data = data[n:]

  38. if uint64(len(data)) < size {

  39. return wal.ErrCorrupt

  40. }

  41. // 读取数据

  42. log.Data = data[:size]

  43. // 偏移数据

  44. data = data[size:]

  45. // 读取扩展数据长度

  46. size, n = binary.Uvarint(data)

  47. if n <= 0 {

  48. return wal.ErrCorrupt

  49. }

  50. // 偏移数据

  51. data = data[n:]

  52. if uint64(len(data)) < size {

  53. return wal.ErrCorrupt

  54. }

  55. // 读取偏移数据内容

  56. log.Extensions = data[:size]

  57. // 偏移数据

  58. data = data[size:]

  59. if len(data) > 0 {

  60. return wal.ErrCorrupt

  61. }

  62. return nil

  63. }


  64. func appendUvarint(dst []byte, x uint64) []byte {

  65. var buf [10]byte

  66. n := binary.PutUvarint(buf[:], x)

  67. dst = append(dst, buf[:n]...)

  68. return dst

  69. }

3.2.3 Write写入日志接口

 
   
   
 
  1. // StoreLog is used to store a single raft log

  2. func (s *LogStore) StoreLog(log *raft.Log) error {

  3. s.mu.Lock()

  4. defer s.mu.Unlock()

  5. s.buf = s.buf[:0]

  6. s.buf = appendLog(s.buf, log)

  7. return s.log.Write(log.Index, s.buf)

  8. }


  9. // StoreLogs is used to store a set of raft logs

  10. func (s *LogStore) StoreLogs(logs []*raft.Log) error {

  11. s.mu.Lock()

  12. defer s.mu.Unlock()

  13. s.batch.Clear()

  14. for _, log := range logs {

  15. s.buf = s.buf[:0]

  16. s.buf = appendLog(s.buf, log)

  17. s.batch.Write(log.Index, s.buf)

  18. }

  19. return s.log.WriteBatch(&s.batch)

  20. }

  21. // 日志写入的格式:Type+Term+len(Data)+Data+len(Ext)+Ext

  22. func appendLog(dst []byte, log *raft.Log) []byte {

  23. dst = append(dst, byte(log.Type))

  24. dst = appendUvarint(dst, log.Term)

  25. dst = appendUvarint(dst, uint64(len(log.Data)))

  26. dst = append(dst, log.Data...)

  27. dst = appendUvarint(dst, uint64(len(log.Extensions)))

  28. dst = append(dst, log.Extensions...)

  29. return dst

  30. }

3.2.4 DeleteRange删除日志接口

 
   
   
 
  1. // DeleteRange is used to delete logs within a given range inclusively.

  2. func (s *LogStore) DeleteRange(min, max uint64) error {

  3. s.mu.Lock()

  4. defer s.mu.Unlock()

  5. first, err := s.log.FirstIndex()

  6. if err != nil {

  7. return err

  8. }

  9. last, err := s.log.LastIndex()

  10. if err != nil {

  11. return err

  12. }

  13. if min == first {

  14. // 删除包括max在内的之前的数据

  15. if err := s.log.TruncateFront(max + 1); err != nil {

  16. return err

  17. }

  18. } else if max == last {

  19. // 删除包括min在内之后的数据

  20. if err := s.log.TruncateBack(min - 1); err != nil {

  21. return err

  22. }

  23. } else {

  24. return wal.ErrOutOfRange

  25. }

  26. return nil

  27. }


  28. // Sync performs an fsync on the log. This is not necessary when the

  29. // durability is set to High.

  30. func (s *LogStore) Sync() {

  31. s.mu.Lock()

  32. defer s.mu.Unlock()

  33. s.log.Sync()

  34. }

4.相关资料


以上是关于tidwall之raft-wal源码分析的主要内容,如果未能解决你的问题,请参考以下文章

Android 插件化VirtualApp 源码分析 ( 目前的 API 现状 | 安装应用源码分析 | 安装按钮执行的操作 | 返回到 HomeActivity 执行的操作 )(代码片段

Android 逆向整体加固脱壳 ( DEX 优化流程分析 | DexPrepare.cpp 中 dvmOptimizeDexFile() 方法分析 | /bin/dexopt 源码分析 )(代码片段

Android 事件分发事件分发源码分析 ( Activity 中各层级的事件传递 | Activity -> PhoneWindow -> DecorView -> ViewGroup )(代码片段

ArrayPool 源码解读之 byte[] 也能池化?

ArrayPool 源码解读之 byte[] 也能池化?

《Docker 源码分析》全球首发啦!