etcd Backend存储引擎实现原理
Posted 惜暮
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了etcd Backend存储引擎实现原理相关的知识,希望对你有一定的参考价值。
文章目录
etcd KV存储层原理
要理解etcd存储层的原理,需要有一些背景:
- 对于读操作,不需要走Leader的共识算法,直接通过读事务读取存储层的KV;
- 对于写操作,首先需要通过raft集群的共识算法,共识算法达成一致之后,会apply写操作日志,也就是进入etcd的KV存储层
- etcd的KV存储层是基于BoltDB这个KV存储的嵌入式数据库,BoltDB本身已经实现了事务的隔离性、原子性、持久化、一致性,并提供了并发的单写+多读。但是BoltDB的写是基于COW(copy on write)技术的,这会影响写性能的吞吐量,此外BoltDB也提供了batch写事务以提升写事务吞吐。etcd的
mvcc/backend
层是基于 BoltDB 提供的并发1写+N读能力做了优化以提升写吞吐。
etcd KV存储层分为两部分,一部分是etcd实现的基于revision的MVCC层(https://github.com/etcd-io/etcd/tree/master/server/mvcc);另一部分是通过 backend 这一后端设计(https://github.com/etcd-io/etcd/tree/master/server/mvcc/backend),封装了KV存储引擎的细节,为上层提供了一个统一的接口。
Backend 存储引擎
Backend封装了一层通用的存储接口:
type Backend interface
// ReadTx returns a read transaction. It is replaced by ConcurrentReadTx in the main data path, see #10523.
ReadTx() ReadTx
BatchTx() BatchTx
// ConcurrentReadTx returns a non-blocking read transaction.
ConcurrentReadTx() ReadTx
Snapshot() Snapshot
Hash(ignores map[IgnoreKey]struct) (uint32, error)
// Size returns the current size of the backend physically allocated.
// The backend can hold DB space that is not utilized at the moment,
// since it can conduct pre-allocation or spare unused space for recycling.
// Use SizeInUse() instead for the actual DB size.
Size() int64
// SizeInUse returns the current size of the backend logically in use.
// Since the backend can manage free space in a non-byte unit such as
// number of pages, the returned value can be not exactly accurate in bytes.
SizeInUse() int64
// OpenReadTxN returns the number of currently open read transactions in the backend.
OpenReadTxN() int64
Defrag() error
ForceCommit()
Close() error
这里只解释最核心主流程,重点关注ReadTx() ReadTx
和 BatchTx() BatchTx
接口:
ReadTx() ReadTx
创建了一个读事务;BatchTx() BatchTx
创建了一个读写事务,这里的batch的含义也就是写事务的批量提交意思。
etcd默认是基于 BoltDB存储实现了Backend接口。backend结构体定义如下:
// backend是etcd基于BoltDB实现的KV存储
// BoltDB 天然支持一写多读并发事务;
// backend 基于BoltDB封装了事务,提升了并发读的性能
type backend struct
// size and commits are used with atomic operations so they must be
// 64-bit aligned, otherwise 32-bit tests will crash
// size is the number of bytes allocated in the backend
size int64
// sizeInUse is the number of bytes actually used in the backend
sizeInUse int64
// commits counts number of commits since start
commits int64
// openReadTxN is the number of currently open read transactions in the backend
openReadTxN int64
// 这里的锁也是隔离下面的db对象;
// 正常的创建bolt.DB事务只需要读锁;
// 但是做 defrag 时候需要写锁隔离
mu sync.RWMutex
db *bolt.DB
// 默认100ms
batchInterval time.Duration
// 默认defaultBatchLimit = 10000
batchLimit int
// backend 执行写事务的对象
batchTx *batchTxBuffered
// backend 执行只读事务的对象
readTx *readTx
stopc chan struct
donec chan struct
lg *zap.Logger
从封装的对象就可以看出,backend是使用 *bolt.DB
作为存储引擎。
读事务ReadTX
Backend接口返回读事务的有两个接口:
ReadTx() ReadTx
:ConcurrentReadTx() ReadTx
: 在主流程中都是使用的这个并发读事务。
ReadTx接口定义:
type ReadTx interface
Lock()
Unlock()
RLock()
RUnlock()
UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
不难看出外部对读事务的使用流程:
ReadTx.RLock()
ReadTx.UnsafeRange()
ReadTx.RUnlock()
并发读事务ConcurrentReadTx
// ConcurrentReadTx creates and returns a new ReadTx, which:
// A) creates and keeps a copy of backend.readTx.txReadBuffer,
// B) references the boltdb read Tx (and its bucket cache) of current batch interval.
func (b *backend) ConcurrentReadTx() ReadTx
// 这里需要读 readTx 的buffer, 所以需要读锁
// 这里的锁占用时间是很低的
b.readTx.RLock()
defer b.readTx.RUnlock()
// prevent boltdb read Tx from been rolled back until store read Tx is done. Needs to be called when holding readTx.RLock().
// 用于 ConcurrentReadTx 的,增加一个并发的ReadTx
// 在concurrentReadTx.RUnlock()时候会释放这个waiter信号
b.readTx.txWg.Add(1)
// TODO: might want to copy the read buffer lazily - create copy when A) end of a write transaction B) end of a batch interval.
return &concurrentReadTx
baseReadTx: baseReadTx
// copy一份backend的readTx.buf, 这样就可以不用持有readTx.mu对buffer的保护,从而提升读的性能
// 这里就是空间换时间(锁的竞争)
buf: b.readTx.buf.unsafeCopy(),
txMu: b.readTx.txMu,
tx: b.readTx.tx,
buckets: b.readTx.buckets,
txWg: b.readTx.txWg,
,
backend通过ConcurrentReadTx()接口创建了一个并发读事务,创建流程表明了几个很重要的点:
- 这里需要持有backend.readTx的读锁,创建完concurrentReadTx对象之后读锁会很快释放,整体对读锁的占用非常轻
- 新创建的concurrentReadTx对象会对backend.readTx里面的缓存做一次全拷贝,(这里的缓存与batch批量提交有关,后面将batchTx会讲),这样读事务对buf的访问就是独立的,在一个concurrentReadTx内不需要任何加锁。
看一下concurrentReadTx的一些实现:
type concurrentReadTx struct
baseReadTx
func (rt *concurrentReadTx) Lock()
func (rt *concurrentReadTx) Unlock()
// RLock is no-op. concurrentReadTx does not need to be locked after it is created.
func (rt *concurrentReadTx) RLock()
// RUnlock signals the end of concurrentReadTx.
func (rt *concurrentReadTx) RUnlock() rt.txWg.Done()
可以看到,就是因为concurrentReadTx前面的一次buf的拷贝操作,所以每个concurrentReadTx的加锁全部是空操作。
这里我们对比非并发的readTx的实现:
// Base type for readTx and concurrentReadTx to eliminate duplicate functions between these
// baseReadTx的访问是并发的,所以需要读写锁来保护。
type baseReadTx struct
// mu protects accesses to the txReadBuffer
// 写事务执行End时候,需要获取这个写锁,然后把写事务的更新写到 baseReadTx 的buffer里面;
// 创建 concurrentReadTx 时候,需要获取读锁,因为需要拷贝buffer
mu sync.RWMutex
buf txReadBuffer
// TODO: group and encapsulate txMu, tx, buckets, txWg, as they share the same lifecycle.
// txMu protects accesses to buckets and tx on Range requests.
// 这个读写锁是保护下面的tx和buckets
txMu *sync.RWMutex
tx *bolt.Tx
buckets map[string]*bolt.Bucket
// txWg protects tx from being rolled back at the end of a batch interval until all reads using this tx are done.
txWg *sync.WaitGroup
type readTx struct
baseReadTx
func (rt *readTx) Lock() rt.mu.Lock()
func (rt *readTx) Unlock() rt.mu.Unlock()
func (rt *readTx) RLock() rt.mu.RLock()
func (rt *readTx) RUnlock() rt.mu.RUnlock()
从上面对比可以看到,readTx的加锁过程,是真正意义的加锁,因为并发访问readTx的buf的时候,是需要加锁的。
有关concurrentReadTx
优化的设计可以参考相关PR的设计文档:etcd: Fully Concurrent Reads Design Proposal
这里的优化是针对一些大range查询场景:比如某些大range的查询在占用读事务ReadTx的读锁之后,会占用较长时间。但是对于batchTx这个写事务来说,每次写事务ReadTx提交需要更新读事务ReadTx的buf,所以这里需要竞争ReadTx的读写锁的写锁,但是写锁的获取会一直被读锁block住,因为前面的读锁一直得不到释放,这个场景下会造成写事务超时。针对这个场景,这里提出了concurrentReadTx
的优化,通过拷贝的形式,将读锁的占用缩小到创建concurrentReadTx
,这样无论读使用的range有多大,都不会block 读锁的竞争,因为每个concurrentReadTx
的buf都是基于拷贝私有的。
读事务的range查询
不管是concurrenctReadTx
还是readTx
,基本的range操作都是基于其继承的baseReadTx
实现的:
func (baseReadTx *baseReadTx) UnsafeRange(bucketName, key, endKey []byte, limit int64) ([][]byte, [][]byte)
if endKey == nil
// forbid duplicates for single keys
limit = 1
if limit <= 0
limit = math.MaxInt64
if limit > 1 && !bytes.Equal(bucketName, safeRangeBucket)
panic("do not use unsafeRange on non-keys bucket")
// tips: 优先找:local buffer cache
// 读缓存的时候不需要加锁,因为这里
keys, vals := baseReadTx.buf.Range(bucketName, key, endKey, limit)
if int64(len(keys)) == limit
return keys, vals
// find/cache bucket
// tips: 然后找持久化存储
bn := string(bucketName)
baseReadTx.txMu.RLock()
bucket, ok := baseReadTx.buckets[bn]
baseReadTx.txMu.RUnlock()
lockHeld := false
if !ok
baseReadTx.txMu.Lock()
lockHeld = true
bucket = baseReadTx.tx.Bucket(bucketName)
baseReadTx.buckets[bn] = bucket
// ignore missing bucket since may have been created in this batch
if bucket == nil
if lockHeld
baseReadTx.txMu.Unlock()
return keys, vals
if !lockHeld
baseReadTx.txMu.Lock()
lockHeld = true
c := bucket.Cursor()
baseReadTx.txMu.Unlock()
// 从持久化存储里面查找剩下的kv
k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys)))
return append(k2, keys...), append(v2, vals...)
Range的流程也很好理解:
- 首先从baseReadTx的buf里面查询,如果从buf里面已经拿到了足够的KV(入参里面有限制range查询的最大数量),那么就直接返回拿到的KVs;
- 如果buf里面的KV不足以满足要求,那么这里就会利用 BoltDB的读事务接口去BoltDB 里面查询KV,然后返回。
读写事务 batchTx
Backend接口BatchTx() BatchTx
返回的实际上就是backend.batchTx。 还是先看接口定义:
type BatchTx interface
ReadTx
UnsafeCreateBucket(name []byte)
UnsafePut(bucketName []byte, key []byte, value []byte)
UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
UnsafeDelete(bucketName []byte, key []byte)
// Commit commits a previous tx and begins a new writable one.
Commit()
// CommitAndStop commits the previous tx and does not create a new one.
CommitAndStop()
除了batchTx还有一个带缓存的batchTxBuffered,默认也是使用batchTxBuffered:
type batchTxBuffered struct
batchTx
buf txWriteBuffer
// batchTx是写事务的封装
// 这里的访问都需要提前获取排他锁
type batchTx struct
// 这里的锁是保证 batchTx 的排他,也就是单线程
// 所以对下面属性的访问是都是在锁的保护下。
sync.Mutex
tx *bolt.Tx
backend *backend
pending int
这里的读写事务之所以叫batch开头,是因为etcd存储层有做一定的优化,默认情况下,写事务不是执行结束后,不会马上提交到BoltDB数据库,而是通过一个默认定时100ms的ticker去触发批量提交,以提升写的吞吐量。但是上层对写事务结束之后需要能够马上读到,所以这里有一个buffer去缓存提交到BoltDB之前的数据,当然这里的buffer需要和读事务的buf一致,所以也存在一个回写到读事务的buf的过程。
这里我们基于使用BatchTx最基本的流程来分析代码:
BatchTx.Lock()
BatchTx.UnsafePut()
BatchTx.UnLock()
(1)首先加锁:
func (t *batchTx) Lock()
t.Mutex.Lock()
前面说过,etcd的存储层对写是单线程的,所以这里也就是通过Mutex这个排他锁,保证了batchTx的事务的排他性。
(2)Put操作:
func (t *batchTxBuffered) UnsafePut(bucketName []byte, key []byte, value []byte)
t.batchTx.UnsafePut(bucketName, key, value)
t.buf.put(bucketName, key, value)
// UnsafePut must be called holding the lock on the tx.
func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte)
t.unsafePut(bucketName, key, value, false)
func (t *batchTx) unsafePut(bucketName []byte, key []byte, value []byte, seq bool)
bucket := t.tx.Bucket(bucketName)
if bucket == nil
t.backend.lg.Fatal(
"failed to find a bucket",
zap.String("bucket-name", string(bucketName)),
)
if seq
// it is useful to increase fill percent when the workloads are mostly append-only.
// this can delay the page split and reduce space usage.
bucket.FillPercent = 0.9
if err := bucket.Put(key, value); err != nil
t.backend.lg.Fatal(
"failed to write to a bucket",
zap.String("bucket-name", string(bucketName)),
zap.Error(err),
)
t.pending++
这里要注意的点是:
- 我们实际上是调用batchTxBuffered的
UnsafePut
函数; - 这里会基于BoltDB 的写事务将put的KV写到BoltDB(还没有commit)
- 此外最重要的是这里会同步的写batchTxBuffered的缓存buf
(3)最后释放排它锁
func (t *batchTxBuffered) Unlock()
// 表示当前 batch 已经有写事务结束但是还没有提交
if t.pending != 0
// 获取读事务的写锁,然后把写事务的KV更新同步到读事务的缓存Buffer中
t.backend.readTx.Lock() // blocks txReadBuffer for writing.
t.buf.writeback(&t.backend.readTx.buf)
t.backend.readTx.Unlock()
// 如果当前的写事务的更新操作的次数已经超过了betch的上限,就将当前pending的
// 所有写操作一次提交到BoltDB数据库
if t.pending >= t.backend.batchLimit
t.commit(false)
t.batchTx.Unlock()
由于batchTxBuffered的实现使用了buf,对于上次来说,调用了UnLock就表示写操作已经执行成功,所以之后的读事务能够马上读取到最新数据。所以这里必须要把写事务的buf刷新回读事务中。
需要注意的是:
t.backend.readTx.Lock()
因为会写读事务的buf,所以这里会获取读事务的写锁。但是如果此时如果存在range特别大的读事务正在执行又没有concurrenctReadTx, 那么读锁就不会被释放,倒是这里获取读事务的阻塞过长,而导致写事务超时。这里也是为什么会有concurrenctReadTx
这个优化的原因。t.buf.writeback(&t.backend.readTx.buf)
就是实际执行把写事务中的buf写回读事务中的buf的逻辑。- 还有一点,backend 里面也有一些保护操作,如果积累pending的写事务超过了限制也会触发写事务的批量触发,实际执行写事务提交的就是函数:
batchTxBuffered.commit()
batchTx的批量提交
前面我们说了 backend 对于写事务的优化是通过buffer + 批量提交来实现的。
这里回到创建 backend实例的地方:
func newBackend(bcfg BackendConfig) *backend
......
// 通过mmap打开BoltDB
db, err := bolt.Open(bcfg.Path, 0600, bopts)
if err != nil
bcfg.Logger.Panic("failed to open database", zap.String("path", bcfg.Path), zap.Error(err))
// In future, may want to make buffering optional for low-concurrency systems
// or dynamically swap between buffered/non-buffered depending on workload.
b := &backend
db: db,
batchInterval: bcfg.BatchInterval,
batchLimit: bcfg.BatchLimit,
readTx: &readTx
baseReadTx: baseReadTx
buf: txReadBuffer
txBuffer: txBuffermake(map[string]*bucketBuffer),
,
buckets: make(map[string]*bolt.Bucket),
txWg: new(sync.WaitGroup),
txMu: new(sync.RWMutex),
,
,
stopc: make(chan struct),
donec: make(chan struct),
lg: bcfg.Logger,
b.batchTx = newBatchTxBuffered(b)
go b.run()
return b
上面初始化过程有几点需要注意:
- 创建readTx和batchTx实例时候,其内部的tx并没有被初始化;
newBatchTxBuffered()
函数里面会通过一次Commit()操作,来重置readTx和batchTx内部的BoltDB的tx实例。
实现异步批量提交就在 go b.run()
里面做的:
func (b *backend) run()
defer close(b.donec)
t := time.NewTimer(b.batchInterval)
defer t.Stop()
for
select
case <-t.C:
case <-b.stopc:
// 关闭了Backend才会触发
b.batchTx.CommitAndStop()
return
// 默认每100ms执行一次自动提交
if b.batchTx.safePending() != 0
// 如果当前周期内存在pending的写事务就提交事务
b.batchTx.Commit()
t.Reset(b.batchInterval)
这里的逻辑很清晰了,就是一个timer定时默认100ms执行一次batchTx.Commit()
,当然如果当前100ms周期内没有写事务,也就不会执行提交了。
具体我们看下实现:
func (t *batchTxBuffered) Commit()
t.Lock()
t.commit(false)
t.Unlock()
func (t *batchTxBuffered) commit(stop bool)
// all read txs must be closed to acquire boltdb commit rwlock
// 这里是个读写锁,会等待所有读锁关闭,然后竞争到独占的写锁
// 写事务的提交需要等所有读事务的锁释放
t.backend.readTx.Lock()
t.unsafeCommit(stop)
// 释放读事务的写锁
t.backend.readTx.Unlock()
func (t *batchTxBuffered) unsafeCommit(stop bool)
// 处理读事务
if t.backend.readTx.tx != nil
// wait all store read transactions using the current boltdb tx to finish,
Flannel配置详解