etcd Backend存储引擎实现原理

Posted 惜暮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了etcd Backend存储引擎实现原理相关的知识,希望对你有一定的参考价值。

文章目录

etcd KV存储层原理

要理解etcd存储层的原理,需要有一些背景:

  1. 对于读操作,不需要走Leader的共识算法,直接通过读事务读取存储层的KV;
  2. 对于写操作,首先需要通过raft集群的共识算法,共识算法达成一致之后,会apply写操作日志,也就是进入etcd的KV存储层
  3. etcd的KV存储层是基于BoltDB这个KV存储的嵌入式数据库,BoltDB本身已经实现了事务的隔离性、原子性、持久化、一致性,并提供了并发的单写+多读。但是BoltDB的写是基于COW(copy on write)技术的,这会影响写性能的吞吐量,此外BoltDB也提供了batch写事务以提升写事务吞吐。etcd的 mvcc/backend 层是基于 BoltDB 提供的并发1写+N读能力做了优化以提升写吞吐。

etcd KV存储层分为两部分,一部分是etcd实现的基于revision的MVCC层(https://github.com/etcd-io/etcd/tree/master/server/mvcc);另一部分是通过 backend 这一后端设计(https://github.com/etcd-io/etcd/tree/master/server/mvcc/backend),封装了KV存储引擎的细节,为上层提供了一个统一的接口。

Backend 存储引擎

Backend封装了一层通用的存储接口:

type Backend interface 
	// ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
	ReadTx() ReadTx
	BatchTx() BatchTx
	// ConcurrentReadTx returns a non-blocking read transaction.
	ConcurrentReadTx() ReadTx

	Snapshot() Snapshot
	Hash(ignores map[IgnoreKey]struct) (uint32, error)
	// Size returns the current size of the backend physically allocated.
	// The backend can hold DB space that is not utilized at the moment,
	// since it can conduct pre-allocation or spare unused space for recycling.
	// Use SizeInUse() instead for the actual DB size.
	Size() int64
	// SizeInUse returns the current size of the backend logically in use.
	// Since the backend can manage free space in a non-byte unit such as
	// number of pages, the returned value can be not exactly accurate in bytes.
	SizeInUse() int64
	// OpenReadTxN returns the number of currently open read transactions in the backend.
	OpenReadTxN() int64
	Defrag() error
	ForceCommit()
	Close() error

这里只解释最核心主流程,重点关注ReadTx() ReadTxBatchTx() BatchTx接口:

  • ReadTx() ReadTx 创建了一个读事务;
  • BatchTx() BatchTx 创建了一个读写事务,这里的batch的含义也就是写事务的批量提交意思。

etcd默认是基于 BoltDB存储实现了Backend接口。backend结构体定义如下:

// backend是etcd基于BoltDB实现的KV存储
// BoltDB 天然支持一写多读并发事务;
// backend 基于BoltDB封装了事务,提升了并发读的性能
type backend struct 
	// size and commits are used with atomic operations so they must be
	// 64-bit aligned, otherwise 32-bit tests will crash

	// size is the number of bytes allocated in the backend
	size int64
	// sizeInUse is the number of bytes actually used in the backend
	sizeInUse int64
	// commits counts number of commits since start
	commits int64
	// openReadTxN is the number of currently open read transactions in the backend
	openReadTxN int64

	// 这里的锁也是隔离下面的db对象;
	// 正常的创建bolt.DB事务只需要读锁;
	// 但是做 defrag 时候需要写锁隔离
	mu sync.RWMutex
	db *bolt.DB

	// 默认100ms
	batchInterval time.Duration
	// 默认defaultBatchLimit    = 10000
	batchLimit int
	// backend 执行写事务的对象
	batchTx *batchTxBuffered

	// backend 执行只读事务的对象
	readTx *readTx

	stopc chan struct
	donec chan struct

	lg *zap.Logger

从封装的对象就可以看出,backend是使用 *bolt.DB 作为存储引擎。

读事务ReadTX

Backend接口返回读事务的有两个接口:

  • ReadTx() ReadTx:
  • ConcurrentReadTx() ReadTx: 在主流程中都是使用的这个并发读事务。

ReadTx接口定义:

type ReadTx interface 
	Lock()
	Unlock()
	RLock()
	RUnlock()

	UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
	UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error

不难看出外部对读事务的使用流程:

ReadTx.RLock()
ReadTx.UnsafeRange()
ReadTx.RUnlock()

并发读事务ConcurrentReadTx

// ConcurrentReadTx creates and returns a new ReadTx, which:
// A) creates and keeps a copy of backend.readTx.txReadBuffer,
// B) references the boltdb read Tx (and its bucket cache) of current batch interval.
func (b *backend) ConcurrentReadTx() ReadTx 
	// 这里需要读 readTx 的buffer, 所以需要读锁
	// 这里的锁占用时间是很低的
	b.readTx.RLock()
	defer b.readTx.RUnlock()
	// prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock().
	// 用于 ConcurrentReadTx 的,增加一个并发的ReadTx
	// 在concurrentReadTx.RUnlock()时候会释放这个waiter信号
	b.readTx.txWg.Add(1)
	// TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.
	return &concurrentReadTx
		baseReadTx: baseReadTx
			// copy一份backend的readTx.buf, 这样就可以不用持有readTx.mu对buffer的保护,从而提升读的性能
			// 这里就是空间换时间(锁的竞争)
			buf:     b.readTx.buf.unsafeCopy(),
			txMu:    b.readTx.txMu,
			tx:      b.readTx.tx,
			buckets: b.readTx.buckets,
			txWg:    b.readTx.txWg,
		,
	

backend通过ConcurrentReadTx()接口创建了一个并发读事务,创建流程表明了几个很重要的点:

  • 这里需要持有backend.readTx的读锁,创建完concurrentReadTx对象之后读锁会很快释放,整体对读锁的占用非常轻
  • 新创建的concurrentReadTx对象会对backend.readTx里面的缓存做一次全拷贝,(这里的缓存与batch批量提交有关,后面将batchTx会讲),这样读事务对buf的访问就是独立的,在一个concurrentReadTx内不需要任何加锁。

看一下concurrentReadTx的一些实现:

type concurrentReadTx struct 
	baseReadTx


func (rt *concurrentReadTx) Lock()   
func (rt *concurrentReadTx) Unlock() 

// RLock is no-op. concurrentReadTx does not need to be locked after it is created.
func (rt *concurrentReadTx) RLock() 

// RUnlock signals the end of concurrentReadTx.
func (rt *concurrentReadTx) RUnlock()  rt.txWg.Done() 

可以看到,就是因为concurrentReadTx前面的一次buf的拷贝操作,所以每个concurrentReadTx的加锁全部是空操作。

这里我们对比非并发的readTx的实现:

// Base type for readTx and concurrentReadTx to eliminate duplicate functions between these
// baseReadTx的访问是并发的,所以需要读写锁来保护。
type baseReadTx struct 
	// mu protects accesses to the txReadBuffer
	// 写事务执行End时候,需要获取这个写锁,然后把写事务的更新写到 baseReadTx 的buffer里面;
	// 创建 concurrentReadTx 时候,需要获取读锁,因为需要拷贝buffer
	mu  sync.RWMutex
	buf txReadBuffer

	// TODO: group and encapsulate txMu, tx, buckets, txWg, as they share the same lifecycle.
	// txMu protects accesses to buckets and tx on Range requests.
	// 这个读写锁是保护下面的tx和buckets
	txMu    *sync.RWMutex
	tx      *bolt.Tx
	buckets map[string]*bolt.Bucket
	// txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done.
	txWg *sync.WaitGroup


type readTx struct 
	baseReadTx


func (rt *readTx) Lock()     rt.mu.Lock() 
func (rt *readTx) Unlock()   rt.mu.Unlock() 
func (rt *readTx) RLock()    rt.mu.RLock() 
func (rt *readTx) RUnlock()  rt.mu.RUnlock() 

从上面对比可以看到,readTx的加锁过程,是真正意义的加锁,因为并发访问readTx的buf的时候,是需要加锁的。

有关concurrentReadTx 优化的设计可以参考相关PR的设计文档:etcd: Fully Concurrent Reads Design Proposal

这里的优化是针对一些大range查询场景:比如某些大range的查询在占用读事务ReadTx的读锁之后,会占用较长时间。但是对于batchTx这个写事务来说,每次写事务ReadTx提交需要更新读事务ReadTx的buf,所以这里需要竞争ReadTx的读写锁的写锁,但是写锁的获取会一直被读锁block住,因为前面的读锁一直得不到释放,这个场景下会造成写事务超时。针对这个场景,这里提出了concurrentReadTx 的优化,通过拷贝的形式,将读锁的占用缩小到创建concurrentReadTx ,这样无论读使用的range有多大,都不会block 读锁的竞争,因为每个concurrentReadTx 的buf都是基于拷贝私有的。

读事务的range查询

不管是concurrenctReadTx还是readTx,基本的range操作都是基于其继承的baseReadTx实现的:

func (baseReadTx *baseReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte) 
	if endKey == nil 
		// forbid duplicates for single keys
		limit = 1
	
	if limit <= 0 
		limit = math.MaxInt64
	
	if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket) 
		panic("do not use unsafeRange on non-keys bucket")
	

	// tips: 优先找:local buffer cache
	// 读缓存的时候不需要加锁,因为这里
	keys, vals := baseReadTx.buf.Range(bucketName, key, endKey, limit)
	if int64(len(keys)) == limit 
		return keys, vals
	

	// find/cache bucket
	// tips: 然后找持久化存储
	bn := string(bucketName)
	baseReadTx.txMu.RLock()
	bucket, ok := baseReadTx.buckets[bn]
	baseReadTx.txMu.RUnlock()
	lockHeld := false
	if !ok 
		baseReadTx.txMu.Lock()
		lockHeld = true
		bucket = baseReadTx.tx.Bucket(bucketName)
		baseReadTx.buckets[bn] = bucket
	

	// ignore missing bucket since may have been created in this batch
	if bucket == nil 
		if lockHeld 
			baseReadTx.txMu.Unlock()
		
		return keys, vals
	
	if !lockHeld 
		baseReadTx.txMu.Lock()
		lockHeld = true
	
	c := bucket.Cursor()
	baseReadTx.txMu.Unlock()

	// 从持久化存储里面查找剩下的kv
	k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
	return append(k2, keys...), append(v2, vals...)

Range的流程也很好理解:

  1. 首先从baseReadTx的buf里面查询,如果从buf里面已经拿到了足够的KV(入参里面有限制range查询的最大数量),那么就直接返回拿到的KVs;
  2. 如果buf里面的KV不足以满足要求,那么这里就会利用 BoltDB的读事务接口去BoltDB 里面查询KV,然后返回。

读写事务 batchTx

Backend接口BatchTx() BatchTx 返回的实际上就是backend.batchTx。 还是先看接口定义:

type BatchTx interface 
	ReadTx
	UnsafeCreateBucket(name []byte)
	UnsafePut(bucketName []byte, key []byte, value []byte)
	UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
	UnsafeDelete(bucketName []byte, key []byte)
	// Commit commits a previous tx and begins a new writable one.
	Commit()
	// CommitAndStop commits the previous tx and does not create a new one.
	CommitAndStop()

除了batchTx还有一个带缓存的batchTxBuffered,默认也是使用batchTxBuffered:

type batchTxBuffered struct 
	batchTx
	buf txWriteBuffer

// batchTx是写事务的封装
// 这里的访问都需要提前获取排他锁
type batchTx struct 
	// 这里的锁是保证 batchTx 的排他,也就是单线程
	// 所以对下面属性的访问是都是在锁的保护下。
	sync.Mutex
	tx      *bolt.Tx
	backend *backend

	pending int

这里的读写事务之所以叫batch开头,是因为etcd存储层有做一定的优化,默认情况下,写事务不是执行结束后,不会马上提交到BoltDB数据库,而是通过一个默认定时100ms的ticker去触发批量提交,以提升写的吞吐量。但是上层对写事务结束之后需要能够马上读到,所以这里有一个buffer去缓存提交到BoltDB之前的数据,当然这里的buffer需要和读事务的buf一致,所以也存在一个回写到读事务的buf的过程。

这里我们基于使用BatchTx最基本的流程来分析代码:

BatchTx.Lock()
BatchTx.UnsafePut()
BatchTx.UnLock()

(1)首先加锁:

func (t *batchTx) Lock() 
	t.Mutex.Lock()

前面说过,etcd的存储层对写是单线程的,所以这里也就是通过Mutex这个排他锁,保证了batchTx的事务的排他性。

(2)Put操作:

func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte) 
	t.batchTx.UnsafePut(bucketName, key, value)
	t.buf.put(bucketName, key, value)


// UnsafePut must be called holding the lock on the tx.
func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) 
	t.unsafePut(bucketName, key, value, false)


func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool) 
	bucket := t.tx.Bucket(bucketName)
	if bucket == nil 
		t.backend.lg.Fatal(
			"failed to find a bucket",
			zap.String("bucket-name", string(bucketName)),
		)
	
	if seq 
		// it is useful to increase fill percent when the workloads are mostly append-only.
		// this can delay the page split and reduce space usage.
		bucket.FillPercent = 0.9
	
	if err := bucket.Put(key, value); err != nil 
		t.backend.lg.Fatal(
			"failed to write to a bucket",
			zap.String("bucket-name", string(bucketName)),
			zap.Error(err),
		)
	
	t.pending++

这里要注意的点是:

  • 我们实际上是调用batchTxBuffered的UnsafePut函数;
  • 这里会基于BoltDB 的写事务将put的KV写到BoltDB(还没有commit)
  • 此外最重要的是这里会同步的写batchTxBuffered的缓存buf

(3)最后释放排它锁

func (t *batchTxBuffered) Unlock() 
	// 表示当前 batch 已经有写事务结束但是还没有提交
	if t.pending != 0 
		// 获取读事务的写锁,然后把写事务的KV更新同步到读事务的缓存Buffer中
		t.backend.readTx.Lock() // blocks txReadBuffer for writing.
		t.buf.writeback(&t.backend.readTx.buf)
		t.backend.readTx.Unlock()
		// 如果当前的写事务的更新操作的次数已经超过了betch的上限,就将当前pending的
		// 所有写操作一次提交到BoltDB数据库
		if t.pending >= t.backend.batchLimit 
			t.commit(false)
		
	
	t.batchTx.Unlock()

由于batchTxBuffered的实现使用了buf,对于上次来说,调用了UnLock就表示写操作已经执行成功,所以之后的读事务能够马上读取到最新数据。所以这里必须要把写事务的buf刷新回读事务中。

需要注意的是:

  1. t.backend.readTx.Lock() 因为会写读事务的buf,所以这里会获取读事务的写锁。但是如果此时如果存在range特别大的读事务正在执行又没有concurrenctReadTx, 那么读锁就不会被释放,倒是这里获取读事务的阻塞过长,而导致写事务超时。这里也是为什么会有concurrenctReadTx这个优化的原因。
  2. t.buf.writeback(&t.backend.readTx.buf)就是实际执行把写事务中的buf写回读事务中的buf的逻辑。
  3. 还有一点,backend 里面也有一些保护操作,如果积累pending的写事务超过了限制也会触发写事务的批量触发,实际执行写事务提交的就是函数:batchTxBuffered.commit()

batchTx的批量提交

前面我们说了 backend 对于写事务的优化是通过buffer + 批量提交来实现的。

这里回到创建 backend实例的地方:

func newBackend(bcfg BackendConfig) *backend 
	......
	// 通过mmap打开BoltDB
	db, err := bolt.Open(bcfg.Path, 0600, bopts)
	if err != nil 
		bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err))
	

	// In future, may want to make buffering optional for low-concurrency systems
	// or dynamically swap between buffered/non-buffered depending on workload.
	b := &backend
		db: db,

		batchInterval: bcfg.BatchInterval,
		batchLimit:    bcfg.BatchLimit,
		
		readTx: &readTx
			baseReadTx: baseReadTx
				buf: txReadBuffer
					txBuffer: txBuffermake(map[string]*bucketBuffer),
				,
				buckets: make(map[string]*bolt.Bucket),
				txWg:    new(sync.WaitGroup),
				txMu:    new(sync.RWMutex),
			,
		,

		stopc: make(chan struct),
		donec: make(chan struct),

		lg: bcfg.Logger,
	
	b.batchTx = newBatchTxBuffered(b)
	go b.run()
	return b

上面初始化过程有几点需要注意:

  • 创建readTx和batchTx实例时候,其内部的tx并没有被初始化;
  • newBatchTxBuffered() 函数里面会通过一次Commit()操作,来重置readTx和batchTx内部的BoltDB的tx实例。

实现异步批量提交就在 go b.run() 里面做的:

func (b *backend) run() 
	defer close(b.donec)
	t := time.NewTimer(b.batchInterval)
	defer t.Stop()
	for 
		select 
		case <-t.C:
		case <-b.stopc:
			// 关闭了Backend才会触发
			b.batchTx.CommitAndStop()
			return
		
		// 默认每100ms执行一次自动提交
		if b.batchTx.safePending() != 0 
			// 如果当前周期内存在pending的写事务就提交事务
			b.batchTx.Commit()
		
		t.Reset(b.batchInterval)
	

这里的逻辑很清晰了,就是一个timer定时默认100ms执行一次batchTx.Commit(),当然如果当前100ms周期内没有写事务,也就不会执行提交了。

具体我们看下实现:

func (t *batchTxBuffered) Commit() 
	t.Lock()
	t.commit(false)
	t.Unlock()


func (t *batchTxBuffered) commit(stop bool) 
	// all read txs must be closed to acquire boltdb commit rwlock
	// 这里是个读写锁,会等待所有读锁关闭,然后竞争到独占的写锁
	// 写事务的提交需要等所有读事务的锁释放
	t.backend.readTx.Lock()
	t.unsafeCommit(stop)
	// 释放读事务的写锁
	t.backend.readTx.Unlock()


func (t *batchTxBuffered) unsafeCommit(stop bool) 
	// 处理读事务
	if t.backend.readTx.tx != nil 
		// wait all store read transactions using the current boltdb tx to finish,
		Flannel配置详解

Flannel配置详解

技术分享| 基于 Etcd 的分布式锁实现原理及方案

etcd原理

etcd原理

confd+etcd实现高可用自动发现