tidwall/wal库源码分析

Posted 一名半路出家的后台技术人

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了tidwall/wal库源码分析相关的知识,希望对你有一定的参考价值。

本文是前段时间做一个技术建设项目过程中,技术调研的一个持久化wal的实现库,此库可谓短小精悍,尤其内部数据的组织结构尤为值得一学,下文将重点对其核心功能进行源码分析

1.tidwall/wal库数据存储架构

下图将详细分析了tidwall/wal库中核心的数据组织存储结构,不得不说,这种设计还是非常高效巧妙的

2.tidwall/wal库代码思维导图

3.tidwall/wal核心数据结构

3.1 Options配置

 
   
   
 
  1. // Options for Log

  2. type Options struct {

  3. // NoSync disables fsync after writes. This is less durable and puts the

  4. // log at risk of data loss when there's a server crash.

  5. NoSync bool

  6. // SegmentSize of each segment. This is just a target value, actual size

  7. // may differ. Default is 20 MB.

  8. // 每个segment的目标大小20M,实际值可能会有些偏差

  9. SegmentSize int

  10. // LogFormat is the format of the log files. Default is Binary.

  11. LogFormat LogFormat

  12. // SegmentCacheSize is the maximum number of segments that will be held in

  13. // memory for caching. Increasing this value may enhance performance for

  14. // concurrent read operations. Default is 1

  15. // 缓存的segment的最大个数

  16. SegmentCacheSize int

  17. // NoCopy allows for the Read() operation to return the raw underlying data

  18. // slice. This is an optimization to help minimize allocations. When this

  19. // option is set, do not modify the returned data because it may affect

  20. // other Read calls. Default false

  21. // 在读时是否拷贝一份数据返回给client,默认false

  22. NoCopy bool

  23. }


  24. // DefaultOptions for Open().

  25. var DefaultOptions = &Options{

  26. NoSync: false, // Fsync after every write

  27. SegmentSize: 20971520, // 20 MB log segment files.

  28. LogFormat: Binary, // Binary format is small and fast.

  29. SegmentCacheSize: 2, // Number of cached in-memory segments

  30. NoCopy: false, // Make a new copy of data for every Read call.

  31. }

3.2 Log结构

 
   
   
 
  1. // Log represents a write ahead log

  2. type Log struct {

  3. mu sync.RWMutex

  4. path string // absolute path to log directory

  5. opts Options // log options

  6. closed bool // log is closed

  7. corrupt bool // log may be corrupt

  8. segments []*segment // all known log segments

  9. firstIndex uint64 // index of the first entry in log

  10. lastIndex uint64 // index of the last entry in log

  11. sfile *os.File // tail segment file handle

  12. wbatch Batch // reusable write batch

  13. scache tinylru.LRU // segment entries cache

  14. }

3.3 segment结构

 
   
   
 
  1. // segment represents a single segment file.

  2. type segment struct {

  3. path string // path of segment file

  4. index uint64 // first index of segment

  5. ebuf []byte // cached entries buffer,该segment实际的数据

  6. epos []bpos // cached entries positions in buffer,该segment数据的索引

  7. // 对于其中存储的一条日志条目来说,假设其在epos中的位置为i,则其对应的数据的起始下标和结束下标分别为start=epos[i].pos,end=epos[i].end,

  8. // 因此最终的数据为 data=ebuf[start:end]


  9. }


  10. type bpos struct {

  11. pos int // byte position

  12. end int // one byte past pos

  13. }

3.4 Batch 和 batchEntry 结构

 
   
   
 
  1. // Batch of entries. Used to write multiple entries at once using WriteBatch().

  2. type Batch struct {

  3. entries []batchEntry

  4. datas []byte

  5. }


  6. type batchEntry struct {

  7. index uint64

  8. size int

  9. }


  10. // Write an entry to the batch

  11. func (b *Batch) Write(index uint64, data []byte) {

  12. b.entries = append(b.entries, batchEntry{index, len(data)})

  13. b.datas = append(b.datas, data...)

  14. }


  15. // Clear the batch for reuse.

  16. func (b *Batch) Clear() {

  17. b.entries = b.entries[:0]

  18. b.datas = b.datas[:0]

  19. }

4. Open函数的源码分析

 
   
   
 
  1. // Open a new write ahead log

  2. func Open(path string, opts *Options) (*Log, error) {

  3. if opts == nil {

  4. opts = DefaultOptions

  5. }

  6. if opts.SegmentCacheSize <= 0 {

  7. opts.SegmentCacheSize = DefaultOptions.SegmentCacheSize

  8. }

  9. if opts.SegmentSize <= 0 {

  10. opts.SegmentSize = DefaultOptions.SegmentSize

  11. }

  12. var err error

  13. path, err = abs(path)

  14. if err != nil {

  15. return nil, err

  16. }

  17. // 设置cache和初始化Log

  18. l := &Log{path: path, opts: *opts}

  19. l.scache.Resize(l.opts.SegmentCacheSize)

  20. if err := os.MkdirAll(path, 0777); err != nil {

  21. return nil, err

  22. }

  23. // 加载之前的数据

  24. if err := l.load(); err != nil {

  25. return nil, err

  26. }

  27. return l, nil

  28. }

 
   
   
 
  1. // load all the segments. This operation also cleans up any START/END segments.

  2. func (l *Log) load() error {

  3. // 读取所有的文件列表

  4. fis, err := ioutil.ReadDir(l.path)

  5. if err != nil {

  6. return err

  7. }

  8. startIdx := -1

  9. endIdx := -1

  10. // 遍历

  11. for _, fi := range fis {

  12. name := fi.Name()

  13. if fi.IsDir() || len(name) < 20 {

  14. // 非法的文件

  15. continue

  16. }

  17. index, err := strconv.ParseUint(name[:20], 10, 64)

  18. if err != nil || index == 0 {

  19. continue

  20. }

  21. isStart := len(name) == 26 && strings.HasSuffix(name, ".START")

  22. isEnd := len(name) == 24 && strings.HasSuffix(name, ".END")

  23. if len(name) == 20 || isStart || isEnd {

  24. if isStart {

  25. // startIdx要取最大的一个,所以一直赋值

  26. startIdx = len(l.segments)

  27. } else if isEnd && endIdx == -1 {

  28. // endIdx要取最小的一个

  29. endIdx = len(l.segments)

  30. }

  31. // 先全部恢复

  32. l.segments = append(l.segments, &segment{

  33. index: index,

  34. path: filepath.Join(l.path, name),

  35. })

  36. }

  37. }

  38. // 如果没有加载到数据,新初始化

  39. if len(l.segments) == 0 {

  40. // Create a new log

  41. l.segments = append(l.segments, &segment{

  42. index: 1,

  43. path: filepath.Join(l.path, segmentName(1)),

  44. })

  45. l.firstIndex = 1

  46. l.lastIndex = 0

  47. l.sfile, err = os.Create(l.segments[0].path)

  48. return err

  49. }

  50. // 有读到数据,执行下面的逻辑,并且之前存在截取的文件时,先完成截取逻辑

  51. // Open existing log. Clean up log if START of END segments exists.

  52. if startIdx != -1 {

  53. if endIdx != -1 {

  54. // There should not be a START and END at the same time

  55. return ErrCorrupt

  56. }

  57. // 从头开始删除到startIndex的数据

  58. // Delete all files leading up to START

  59. for i := 0; i < startIdx; i++ {

  60. if err := os.Remove(l.segments[i].path); err != nil {

  61. return err

  62. }

  63. }

  64. l.segments = append([]*segment{}, l.segments[startIdx:]...)

  65. // Rename the START segment

  66. orgPath := l.segments[0].path

  67. finalPath := orgPath[:len(orgPath)-len(".START")]

  68. err := os.Rename(orgPath, finalPath)

  69. if err != nil {

  70. return err

  71. }

  72. l.segments[0].path = finalPath

  73. }

  74. // 从后往前删除

  75. if endIdx != -1 {

  76. // Delete all files following END

  77. for i := len(l.segments) - 1; i > endIdx; i-- {

  78. if err := os.Remove(l.segments[i].path); err != nil {

  79. return err

  80. }

  81. }

  82. l.segments = append([]*segment{}, l.segments[:endIdx+1]...)

  83. // l.segments[len(l.segments)-2].index 最后的文件,但是未删除

  84. // l.segments[len(l.segments)-1].index 带.END的文件

  85. if len(l.segments) > 1 && l.segments[len(l.segments)-2].index ==

  86. l.segments[len(l.segments)-1].index {

  87. // remove the segment prior to the END segment because it shares

  88. // the same starting index.

  89. // 重新赋值

  90. l.segments[len(l.segments)-2] = l.segments[len(l.segments)-1]

  91. // 左移一个

  92. l.segments = l.segments[:len(l.segments)-1]

  93. }

  94. // Rename the END segment

  95. orgPath := l.segments[len(l.segments)-1].path

  96. finalPath := orgPath[:len(orgPath)-len(".END")]

  97. err := os.Rename(orgPath, finalPath)

  98. if err != nil {

  99. return err

  100. }

  101. l.segments[len(l.segments)-1].path = finalPath

  102. }

  103. l.firstIndex = l.segments[0].index

  104. // Open the last segment for appending,打开最后一个文件,方便直接写数据

  105. lseg := l.segments[len(l.segments)-1]

  106. l.sfile, err = os.OpenFile(lseg.path, os.O_WRONLY, 0666)

  107. if err != nil {

  108. return err

  109. }

  110. if _, err := l.sfile.Seek(0, 2); err != nil {

  111. return err

  112. }

  113. // Load the last segment entries

  114. // 加载最后一个文件的数据

  115. if err := l.loadSegmentEntries(lseg); err != nil {

  116. return err

  117. }

  118. // 更新lastIndex

  119. l.lastIndex = lseg.index + uint64(len(lseg.epos)) - 1

  120. return nil

  121. }

5.tidwall/wal如何写入

5.1日志Write和BatchWrite流程:

  1. 首先加锁、检查文件是否冲突和关闭

  2. Write的话,将单条日志条目也写入到内部的wbatch中,最后执行writeBatch(b)

  3. writeBatch中,首先对进来的日志index进行检查,所有的index都是递增的,如果小于当前记录的最后一条日志lastIndex的话,就直接报错

  4. 所有的日志都是进行追加的操作,往最后一个segment文件中追加,遍历b中的条目,根据entry中记录的size拿到entry中的data。最后往segment的ebuf中写入。并同时记录epos信息。

  5. 其中要注意一点,每条日志写入后都要判断当前的segment文件是否已经达到阈值(SegmentSize)了,如果达到的话,则调用cycle()方法,将当前的数据刷到磁盘,然后新建一个新的segment文件,记录segment.Index,并将当前的segmentt放入cache中,继续进行写入。

  6. 日志写入分为两种格式:json和二进制,json的话,会将index和data拼接成一个json串,然后写入到文件中,一个条目一行。

  7. 所有日志写完后,然后释放锁

5.2 源码分析

 
   
   
 
  1. // Write an entry to the log.

  2. func (l *Log) Write(index uint64, data []byte) error {

  3. l.mu.Lock()

  4. // 异常检查

  5. defer l.mu.Unlock()

  6. if l.corrupt {

  7. return ErrCorrupt

  8. } else if l.closed {

  9. return ErrClosed

  10. }

  11. l.wbatch.Clear()

  12. // 写入到内置的wbatch中

  13. l.wbatch.Write(index, data)

  14. return l.writeBatch(&l.wbatch)

  15. }



  16. // WriteBatch writes the entries in the batch to the log in the order that they

  17. // were added to the batch. The batch is cleared upon a successful return.

  18. func (l *Log) WriteBatch(b *Batch) error {

  19. l.mu.Lock()

  20. defer l.mu.Unlock()

  21. if l.corrupt {

  22. return ErrCorrupt

  23. } else if l.closed {

  24. return ErrClosed

  25. }

  26. if len(b.entries) == 0 {

  27. return nil

  28. }

  29. return l.writeBatch(b)

  30. }


  31. func (l *Log) writeBatch(b *Batch) error {

  32. // check that all indexes in batch are sane

  33. for i := 0; i < len(b.entries); i++ {

  34. if b.entries[i].index != l.lastIndex+uint64(i+1) {

  35. return ErrOutOfOrder

  36. }

  37. }


  38. // load the tail segment

  39. s := l.segments[len(l.segments)-1]

  40. // 写之前先检查是否满了,满了的话,重新开一个新的segment,往新的segment中写入数据

  41. if len(s.ebuf) > l.opts.SegmentSize {

  42. // tail segment has reached capacity. Close it and create a new one.

  43. if err := l.cycle(); err != nil {

  44. return err

  45. }

  46. s = l.segments[len(l.segments)-1]

  47. }


  48. mark := len(s.ebuf)

  49. datas := b.datas

  50. for i := 0; i < len(b.entries); i++ {

  51. // 1. 拿到index对应的data,1和4是相互对应的

  52. data := datas[:b.entries[i].size]


  53. var epos bpos

  54. // Write(index,data)一样

  55. s.ebuf, epos = l.appendEntry(s.ebuf, b.entries[i].index, data)

  56. s.epos = append(s.epos, epos)

  57. // 每写入一次,判断是否大于一个块的文件内容了,大于的话,新建一个segment

  58. if len(s.ebuf) >= l.opts.SegmentSize {

  59. // segment has reached capacity, cycle now

  60. if _, err := l.sfile.Write(s.ebuf[mark:]); err != nil {

  61. return err

  62. }

  63. l.lastIndex = b.entries[i].index

  64. if err := l.cycle(); err != nil {

  65. return err

  66. }

  67. s = l.segments[len(l.segments)-1]

  68. mark = 0

  69. }


  70. // 4. 移动datas

  71. datas = datas[b.entries[i].size:]

  72. }


  73. // 集中写入一个batch后,往文件中写一次

  74. if len(s.ebuf)-mark > 0 {

  75. if _, err := l.sfile.Write(s.ebuf[mark:]); err != nil {

  76. return err

  77. }

  78. l.lastIndex = b.entries[len(b.entries)-1].index

  79. }


  80. // 判断是否需要刷盘

  81. if !l.opts.NoSync {

  82. if err := l.sfile.Sync(); err != nil {

  83. return err

  84. }

  85. }


  86. // 清空wbatch

  87. b.Clear()

  88. return nil

  89. }

 
   
   
 
  1. // Cycle the old segment for a new segment.

  2. func (l *Log) cycle() error {

  3. if err := l.sfile.Sync(); err != nil {

  4. return err

  5. }

  6. if err := l.sfile.Close(); err != nil {

  7. return err

  8. }

  9. // cache the previous segment

  10. l.pushCache(len(l.segments) - 1)

  11. s := &segment{

  12. index: l.lastIndex + 1,

  13. path: filepath.Join(l.path, segmentName(l.lastIndex+1)),

  14. }

  15. var err error

  16. l.sfile, err = os.Create(s.path)

  17. if err != nil {

  18. return err

  19. }

  20. l.segments = append(l.segments, s)

  21. return nil

  22. }

 
   
   
 
  1. func (l *Log) appendEntry(dst []byte, index uint64, data []byte) (out []byte,

  2. epos bpos) {

  3. if l.opts.LogFormat == JSON {

  4. return appendJSONEntry(dst, index, data)

  5. }

  6. return appendBinaryEntry(dst, data)

  7. }

  8. func appendJSONEntry(dst []byte, index uint64, data []byte) (out []byte,

  9. epos bpos) {

  10. // {"index":number,"data":string}

  11. mark := len(dst)

  12. dst = append(dst, `{"index":"`...)

  13. dst = strconv.AppendUint(dst, index, 10)

  14. dst = append(dst, `","data":`...)

  15. dst = appendJSONData(dst, data)

  16. dst = append(dst, '}', '\n')

  17. return dst, bpos{mark, len(dst)}

  18. }


  19. func appendJSONData(dst []byte, s []byte) []byte {

  20. if utf8.Valid(s) {

  21. b, _ := json.Marshal(*(*string)(unsafe.Pointer(&s)))

  22. dst = append(dst, '"', '+')

  23. return append(dst, b[1:]...)

  24. }

  25. dst = append(dst, '"', '$')

  26. dst = append(dst, base64.URLEncoding.EncodeToString(s)...)

  27. return append(dst, '"')

  28. }


  29. func appendBinaryEntry(dst []byte, data []byte) (out []byte, epos bpos) {

  30. // data_size + data

  31. pos := len(dst)

  32. dst = appendUvarint(dst, uint64(len(data)))

  33. dst = append(dst, data...)

  34. return dst, bpos{pos, len(dst)}

  35. }


  36. func appendUvarint(dst []byte, x uint64) []byte {

  37. var buf [10]byte

  38. n := binary.PutUvarint(buf[:], x)

  39. dst = append(dst, buf[:n]...)

  40. return dst

  41. }

6.tidwall/wal如何读取

6.1日志Read流程:

  1. 首先加锁、检查文件是否冲突和关闭、检查index范围是否合法

  2. 接着加载segment

  3. 首先判断该index是否在最后一个segment,如果是就返回

    1. 其次在cache中寻找,cache中找到后,也就返回

    2. 走到这一步说明该index所在的segment只有在磁盘中了,需要从磁盘进行加载,所以先找该 index命中哪个segment(segment文件名有序,按照二分查找即可),找到后从segmentFile 中恢复segment(一方面恢复数据ebuf,另外一方面恢复索引epos),最后将该segment再放进cache中缓存起来

  4. 找到segment后,可以根据当前的index以及segment记录的index快读定位到位置,然后再从ebuf中读取数据,如果是json的话再进行处理返回,否则直接返回

  5. 最后释放锁

6.2 源码分析

 
   
   
 
  1. // Read an entry from the log. Returns a byte slice containing the data entry.

  2. func (l *Log) Read(index uint64) (data []byte, err error) {

  3. l.mu.RLock()

  4. defer l.mu.RUnlock()

  5. if l.corrupt {

  6. return nil, ErrCorrupt

  7. } else if l.closed {

  8. return nil, ErrClosed

  9. }

  10. // 判断index是否合法,必须在firstIndex~lastIndex之间

  11. if index == 0 || index < l.firstIndex || index > l.lastIndex {

  12. return nil, ErrNotFound

  13. }

  14. // 根据index加载segment

  15. s, err := l.loadSegment(index)

  16. if err != nil {

  17. return nil, err

  18. }

  19. // 根据index找到其索引epos,然后直接从ebuf中读取数据

  20. epos := s.epos[index-s.index]

  21. edata := s.ebuf[epos.pos:epos.end]

  22. if l.opts.LogFormat == JSON {

  23. return readJSON(edata)

  24. }

  25. // binary read

  26. // 先读取长度

  27. size, n := binary.Uvarint(edata)

  28. if n <= 0 {

  29. return nil, ErrCorrupt

  30. }

  31. if uint64(len(edata)-n) < size {

  32. return nil, ErrCorrupt

  33. }

  34. // 然后读取数据

  35. if l.opts.NoCopy {

  36. data = edata[n : uint64(n)+size]

  37. } else {

  38. data = make([]byte, size)

  39. copy(data, edata[n:])

  40. }

  41. return data, nil

  42. }



  43. //go:noinline

  44. func readJSON(edata []byte) ([]byte, error) {

  45. var data []byte

  46. s := gjson.Get(*(*string)(unsafe.Pointer(&edata)), "data").String()

  47. if len(s) > 0 && s[0] == '$' {

  48. var err error

  49. data, err = base64.URLEncoding.DecodeString(s[1:])

  50. if err != nil {

  51. return nil, ErrCorrupt

  52. }

  53. } else if len(s) > 0 && s[0] == '+' {

  54. data = make([]byte, len(s[1:]))

  55. copy(data, s[1:])

  56. } else {

  57. return nil, ErrCorrupt

  58. }

  59. return data, nil

  60. }

 
   
   
 
  1. // loadSegment loads the segment entries into memory, pushes it to the front

  2. // of the lru cache, and returns it.

  3. func (l *Log) loadSegment(index uint64) (*segment, error) {


  4. // check the last segment first.

  5. // 先判断是否在最后一个中

  6. lseg := l.segments[len(l.segments)-1]

  7. if index >= lseg.index {

  8. return lseg, nil

  9. }


  10. // check the most recent cached segment

  11. // 再从最近的缓存中寻找

  12. var rseg *segment

  13. l.scache.Range(func(_, v interface{}) bool {

  14. s := v.(*segment)

  15. if index >= s.index && index < s.index+uint64(len(s.epos)) {

  16. rseg = s

  17. }

  18. return false

  19. })

  20. if rseg != nil {

  21. return rseg, nil

  22. }

  23. // 前面两个策略都没找到的话,则从文件中找,首先定位命中的segment是哪个,然后再从segment File中读取数据和重新构建索引

  24. // find in the segment array

  25. idx := l.findSegment(index)

  26. s := l.segments[idx]

  27. if len(s.epos) == 0 {

  28. // load the entries from cache

  29. if err := l.loadSegmentEntries(s); err != nil {

  30. return nil, err

  31. }

  32. }

  33. // push the segment to the front of the cache

  34. // 放入cache中

  35. l.pushCache(idx)

  36. return s, nil

  37. }



  38. // findSegment performs a bsearch on the segments

  39. // 因为segment的文件名是有序的,所以按照二分查找

  40. func (l *Log) findSegment(index uint64) int {

  41. i, j := 0, len(l.segments)

  42. for i < j {

  43. h := i + (j-i)/2

  44. if index >= l.segments[h].index {

  45. i = h + 1

  46. } else {

  47. j = h

  48. }

  49. }

  50. return i - 1

  51. }


  52. func (l *Log) loadSegmentEntries(s *segment) error {

  53. data, err := ioutil.ReadFile(s.path)

  54. if err != nil {

  55. return err

  56. }

  57. ebuf := data

  58. var epos []bpos

  59. var pos int

  60. // 相当于构建epos索引

  61. for exidx := s.index; len(data) > 0; exidx++ {

  62. var n int

  63. if l.opts.LogFormat == JSON {

  64. n, err = loadNextJSONEntry(data)

  65. } else {

  66. n, err = loadNextBinaryEntry(data)

  67. }

  68. if err != nil {

  69. return err

  70. }

  71. data = data[n:]

  72. epos = append(epos, bpos{pos, pos + n})

  73. pos += n

  74. }

  75. s.ebuf = ebuf

  76. s.epos = epos

  77. return nil

  78. }

 
   
   
 
  1. func loadNextJSONEntry(data []byte) (n int, err error) {

  2. // {"index":number,"data":string}

  3. idx := bytes.IndexByte(data, '\n')

  4. if idx == -1 {

  5. return 0, ErrCorrupt

  6. }

  7. line := data[:idx]

  8. dres := gjson.Get(*(*string)(unsafe.Pointer(&line)), "data")

  9. if dres.Type != gjson.String {

  10. return 0, ErrCorrupt

  11. }

  12. return idx + 1, nil

  13. }


  14. func loadNextBinaryEntry(data []byte) (n int, err error) {

  15. // data_size + data

  16. size, n := binary.Uvarint(data)

  17. if n <= 0 {

  18. return 0, ErrCorrupt

  19. }

  20. if uint64(len(data)-n) < size {

  21. return 0, ErrCorrupt

  22. }

  23. return n + int(size), nil

  24. }

7.tidwall/wal如何删除

  • TruncateFront: 清空该index之前的所有数据

  • TruncateBack: 清空该index之后的所有数据

7.1日志TruncateFront流程

该方法是移除指定的index之前的所有数据,将该index置为firstIndex

  1. 找到该index对应的segment的序号segIdx

  2. 加载该index对应的segment

  3. 将该index之后的数据写入到一个零时文件(TEMP)中,写入完成后,重命名为s.index.wal.START

  4. 将该segment(segIdx)之前的segmentFile文件全部删除掉,完成后,将s.index.wal.START重命名为原先的s.index.wal.START

  5. 该方法删除时,需要考虑当删除的文件时最后一个文件时 需要关闭该文件,并重新再打开该文件,读取更新后最新的数据

  6. 最后更新segments信息、清除缓存

7.2 TruncateFront源码分析

 
   
   
 
  1. // segmentName returns a 20-byte textual representation of an index

  2. // for lexical ordering. This is used for the file names of log segments.

  3. func segmentName(index uint64) string {

  4. return fmt.Sprintf("%020d", index)

  5. }


  6. // TruncateFront truncates the front of the log by removing all entries that

  7. // are before the provided `index`. In other words the entry at

  8. // `index` becomes the first entry in the log.

  9. func (l *Log) TruncateFront(index uint64) error {

  10. l.mu.Lock()

  11. defer l.mu.Unlock()

  12. if l.corrupt {

  13. return ErrCorrupt

  14. } else if l.closed {

  15. return ErrClosed

  16. }

  17. return l.truncateFront(index)

  18. }

  19. func (l *Log) truncateFront(index uint64) (err error) {

  20. if index == 0 || l.lastIndex == 0 ||

  21. index < l.firstIndex || index > l.lastIndex {

  22. return ErrOutOfRange

  23. }

  24. if index == l.firstIndex {

  25. // nothing to truncate

  26. return nil

  27. }

  28. // 先找到该index对应的segment的下标segIdx,首先将当前的segment中index之后的数据重新保存,然后后面把该1~segIdx范围内的都删除掉

  29. segIdx := l.findSegment(index)

  30. var s *segment

  31. s, err = l.loadSegment(index)

  32. if err != nil {

  33. return err

  34. }

  35. epos := s.epos[index-s.index:]

  36. ebuf := s.ebuf[epos[0].pos:]

  37. // Create a temp file contains the truncated segment.

  38. tempName := filepath.Join(l.path, "TEMP")

  39. // 保存本segment中该index之后的所有数据到temp中

  40. err = func() error {

  41. f, err := os.Create(tempName)

  42. if err != nil {

  43. return err

  44. }

  45. defer f.Close()

  46. if _, err := f.Write(ebuf); err != nil {

  47. return err

  48. }

  49. if err := f.Sync(); err != nil {

  50. return err

  51. }

  52. return f.Close()

  53. }()

  54. // Rename the TEMP file to it's START file name.重命名

  55. startName := filepath.Join(l.path, segmentName(index)+".START")

  56. if err = os.Rename(tempName, startName); err != nil {

  57. return err

  58. }

  59. // The log was truncated but still needs some file cleanup. Any errors

  60. // following this message will not cause an on-disk data ocorruption, but

  61. // may cause an inconsistency with the current program, so we'll return

  62. // ErrCorrupt so the the user can attempt a recover by calling Close()

  63. // followed by Open().

  64. defer func() {

  65. if v := recover(); v != nil {

  66. err = ErrCorrupt

  67. l.corrupt = true

  68. }

  69. }()

  70. // 删除的是最后一个文件中的数据的时候

  71. if segIdx == len(l.segments)-1 {

  72. // Close the tail segment file

  73. if err = l.sfile.Close(); err != nil {

  74. return err

  75. }

  76. }

  77. // Delete truncated segment files,删除

  78. for i := 0; i <= segIdx; i++ {

  79. if err = os.Remove(l.segments[i].path); err != nil {

  80. return err

  81. }

  82. }

  83. // Rename the START file to the final truncated segment name.重命名

  84. newName := filepath.Join(l.path, segmentName(index))

  85. if err = os.Rename(startName, newName); err != nil {

  86. return err

  87. }

  88. s.path = newName

  89. s.index = index

  90. if segIdx == len(l.segments)-1 {

  91. // Reopen the tail segment file

  92. if l.sfile, err = os.OpenFile(newName, os.O_WRONLY, 0666); err != nil {

  93. return err

  94. }

  95. var n int64

  96. if n, err = l.sfile.Seek(0, 2); err != nil {

  97. return err

  98. }

  99. if n != int64(len(ebuf)) {

  100. err = errors.New("invalid seek")

  101. return err

  102. }

  103. // Load the last segment entries

  104. // 加载该segment中所有数据

  105. if err = l.loadSegmentEntries(s); err != nil {

  106. return err

  107. }

  108. }

  109. // 更新segments

  110. l.segments = append([]*segment{}, l.segments[segIdx:]...)

  111. l.firstIndex = index

  112. l.clearCache()

  113. return nil

  114. }

7.3日志TruncateBack流程

移除指定的index之后的所有日志,将该index变成lastIndex

  1. 首先记录该index对应的segIdx

  2. 然后再加载该segment,并将该segment中index之前的数据写入到一个零时文件TEMP中,完成后将该文件重命名为s.index.wal.END。

  3. 删除该segment(segIdx)之后的所有segmentFile

  4. 再将s.index.wal.END文件重命名为s.index.wal文件

  5. 重新打开该文件,并定位到文件尾,方便追加数据

  6. 更新segments和lastIndex、然后清空缓存,最后再把该segment从segmentFile中加载出来

7.4TruncateBack源码分析

 
   
   
 
  1. // TruncateBack truncates the back of the log by removing all entries that

  2. // are after the provided `index`. In other words the entry at `index`

  3. // becomes the last entry in the log.

  4. func (l *Log) TruncateBack(index uint64) error {

  5. l.mu.Lock()

  6. defer l.mu.Unlock()

  7. if l.corrupt {

  8. return ErrCorrupt

  9. } else if l.closed {

  10. return ErrClosed

  11. }

  12. return l.truncateBack(index)

  13. }


  14. func (l *Log) truncateBack(index uint64) (err error) {

  15. if index == 0 || l.lastIndex == 0 ||

  16. index < l.firstIndex || index > l.lastIndex {

  17. return ErrOutOfRange

  18. }

  19. if index == l.lastIndex {

  20. // nothing to truncate

  21. return nil

  22. }

  23. // 找到该index对应的segment的下标segIdx,然后将该本segment中的数据保存到临时文件后,移除该segIdx~lastSegmentIdx的文件和数据

  24. segIdx := l.findSegment(index)

  25. var s *segment

  26. s, err = l.loadSegment(index)

  27. if err != nil {

  28. return err

  29. }

  30. // 该index中对应的数据

  31. epos := s.epos[:index-s.index+1]

  32. ebuf := s.ebuf[:epos[len(epos)-1].end]

  33. // Create a temp file contains the truncated segment.

  34. tempName := filepath.Join(l.path, "TEMP")

  35. // 保存数据到TEMP文件中

  36. err = func() error {

  37. f, err := os.Create(tempName)

  38. if err != nil {

  39. return err

  40. }

  41. defer f.Close()

  42. if _, err := f.Write(ebuf); err != nil {

  43. return err

  44. }

  45. if err := f.Sync(); err != nil {

  46. return err

  47. }

  48. return f.Close()

  49. }()

  50. // Rename the TEMP file to it's END file name.

  51. endName := filepath.Join(l.path, segmentName(s.index)+".END")

  52. if err = os.Rename(tempName, endName); err != nil {

  53. return err

  54. }

  55. // The log was truncated but still needs some file cleanup. Any errors

  56. // following this message will not cause an on-disk data ocorruption, but

  57. // may cause an inconsistency with the current program, so we'll return

  58. // ErrCorrupt so the the user can attempt a recover by calling Close()

  59. // followed by Open().

  60. defer func() {

  61. if v := recover(); v != nil {

  62. err = ErrCorrupt

  63. l.corrupt = true

  64. }

  65. }()


  66. // Close the tail segment file

  67. if err = l.sfile.Close(); err != nil {

  68. return err

  69. }

  70. // Delete truncated segment files

  71. // 删除之后的数据

  72. for i := segIdx; i < len(l.segments); i++ {

  73. if err = os.Remove(l.segments[i].path); err != nil {

  74. return err

  75. }

  76. }

  77. // Rename the END file to the final truncated segment name.重命名

  78. newName := filepath.Join(l.path, segmentName(s.index))

  79. if err = os.Rename(endName, newName); err != nil {

  80. return err

  81. }

  82. // Reopen the tail segment file

  83. if l.sfile, err = os.OpenFile(newName, os.O_WRONLY, 0666); err != nil {

  84. return err

  85. }

  86. var n int64

  87. //移动到文件尾,可以继续write

  88. n, err = l.sfile.Seek(0, 2)

  89. if err != nil {

  90. return err

  91. }

  92. if n != int64(len(ebuf)) {

  93. err = errors.New("invalid seek")

  94. return err

  95. }

  96. s.path = newName

  97. l.segments = append([]*segment{}, l.segments[:segIdx+1]...)

  98. l.lastIndex = index

  99. l.clearCache()

  100. //加载之前的旧数据

  101. if err = l.loadSegmentEntries(s); err != nil {

  102. return err

  103. }

  104. return nil

  105. }

8.参考资料


以上是关于tidwall/wal库源码分析的主要内容,如果未能解决你的问题,请参考以下文章

Android 插件化VirtualApp 源码分析 ( 目前的 API 现状 | 安装应用源码分析 | 安装按钮执行的操作 | 返回到 HomeActivity 执行的操作 )(代码片段

Android 逆向整体加固脱壳 ( DEX 优化流程分析 | DexPrepare.cpp 中 dvmOptimizeDexFile() 方法分析 | /bin/dexopt 源码分析 )(代码片段

Android 事件分发事件分发源码分析 ( Activity 中各层级的事件传递 | Activity -> PhoneWindow -> DecorView -> ViewGroup )(代码片段

微信小程序代码片段分享

《Docker 源码分析》全球首发啦!

mysql jdbc源码分析片段 和 Tomcat's JDBC Pool