自己动手写数据库:并发管理组件lock_table的原理和实现

Posted tyler_download

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自己动手写数据库:并发管理组件lock_table的原理和实现相关的知识,希望对你有一定的参考价值。

在前面章节,我们描述的并发控制的一些基本原理。其中一个重要原则就是“序列化”,也就数据库引擎要对交易提交的请求进行调度,调度的结果要使得每个交易就好像独占了引擎那样。要实现这样的效果就必须进行相应的加锁。但是加锁必然会降低高并发的效率,因此改进办法是实现两种锁,一种是互斥锁,他用于保证区块写入的安全性,一个区块加锁后其他任何操作,无论是读还是写,都不能执行,必须要等到互斥锁释放。另一种是共享锁,他运行多个读操作同时进行,但是不允许执行写操作,必须等到共享锁全部释放后才可以。

本节的目的就在于如何实现两种锁机制。尽管go语言提供了很多并发机制,他也有共享锁和互斥锁,但还不足以满足一个数据库的并发要求,这也是我们需要进行相应设计的原因。我们所设计的锁机制作用的对象是区块,因此我们需要一种对应机制,不同的区块要对应不同的锁,因此我们要实现一个类似map的对象,他也称为LockTable。

他要实现的目的有,一,让不同的区块对应不同的锁,因此引擎在读写区块1时,不影响对区块2的操作。第二,他要有超时回退机制,当引擎通过他读写某个区块,发现长时间得不到操作所需要的锁时,这个时候极有可能发生了死锁,因此他要能检测此种情况并实现操作的回滚。下面我们看看代码的实现,首先在tx模块中添加新文件命名为lock_table.go,然后先输入如下代码:

package tx

import (
	"errors"
	fm "file_manager"
	"sync"
	"time"
)

const (
	MAX_WAITING_TIME = 3 //3用于测试,在正式使用时设置为10
)

func NewLockTable() *LockTable 
	/*
		如果给定blk对应的值为-1,表明有互斥锁,如果大于0表明有相应数量的共享锁加在对应区块上,
		如果是0则表示没有锁
	*/
	lock_table := &LockTable
		lock_map:    make(map[*fm.BlockId]int64),
		notify_chan: make(map[*fm.BlockId]chan struct),
		notify_wg:   make(map[*fm.BlockId]*sync.WaitGroup),
	

	return lock_table

接下来我们要实现这样的功能,假设有3个线程编号分别为1,2,3,第一个线程要写入区块1,第二,三个线程要读取区块1,如果线程1先获得了互斥锁,那么线程2,3就必须挂起,等待线程1释放锁。如果在时间MAX_WAITING_TIME范围内线程1完成操作释放了互斥锁,那么线程2,3就能被唤醒,他们将获得共享锁,然后同时读取区块1的数据。如果在MAX_WAITING_TIME所表达的时间范围内依然无法读取区块1,那么他们就需要唤醒,然后放弃读取操作。

确切的说这里需要实现WaitGivenTimeOut功能,当调用这个函数时,对应的线程会在给定时间段内挂起,一旦超时后才被唤醒。同时我们还需要实现NotifyAll接口,一旦某个线程调用这个函数后,所有因为执行WaitGivenTimeOut而被挂起的线程都要被唤醒,然后往下执行。

如果了解go语言的同学可能知道,sync包中有个Cond类,他的Wait接口能实现调用线程挂起功能,同时他对应的Broadcast能实现唤醒所有因为调用Wait而挂起的线程。但是问题在于Wait接口不能实现挂起特定时间,因此一旦调用该接口后,必须等待其他线程调用Signal接口或是Broadcast接口才能实现唤醒。

由此我们要自己实现WaitGivenTimeOut对应的功能,相关代码如下:

type LockTable struct 
	lock_map    map[*fm.BlockId]int64           //将锁和区块对应起来
	notify_chan map[*fm.BlockId]chan struct   //用于实现超时回退的管道
	notify_wg   map[*fm.BlockId]*sync.WaitGroup //用于实现唤醒通知
	method_lock sync.Mutex                      //实现方法调用的线程安全,相当于java的synchronize关键字


func (l *LockTable) waitGivenTimeOut(blk *fm.BlockId) 
	wg, ok := l.notify_wg[blk]
	if !ok 
		var new_wg sync.WaitGroup
		l.notify_wg[blk] = &new_wg
		wg = &new_wg
	
	wg.Add(1)
	defer wg.Done()
    l.method_lock.Unlock() //挂起前释放方法锁
	select 
	case <-time.After(MAX_WAITING_TIME * time.Second):
	case <-l.notify_chan[blk]:
	
	l.method_lock.Lock() //唤起后加上方法锁


func (l *LockTable) notifyAll(blk *fm.BlockId) 
	go func() 
		//等待所有线程返回后再重新设置channel
		l.notify_wg[blk].Wait()
		l.notify_chan[blk] = make(chan struct)
	()

	close(l.notify_chan[blk])

函数waitGivenTimeOut需要输入模块对应的BlockId对象。notify_wg是一个map对象,他将WaitGoup和特定的区块号对应起来,如果有多个线程要访问同一个区块,那么对应的WaitGroup就会执行Add(1)操作。接下来的Select 语句用于实现线程挂起,第一个case用于将线程挂起给定时间,第二个case用于将线程唤醒。

假设线程2,3因为执行waitGivenTimeOut函数而被挂起,那么这两个线程会因为两种情况会被重新唤起,第一种情况就是超时,也就是time.After对应的管道会启动,从而将两个线程唤起。第二种情况是线程1调用了notifyAll,注意到这里关闭了对应区块的管道,于是select语句中第二个case得到执行,于是挂起的线程被唤醒。注意到notifyAll还启动了一个线程,他的作用是等待给定区块对应的WaitGroup能完成,当l.notify_wg[blk].Wait返回后,那意味着所有挂起的线程都完成了唤醒操作,这时他就重新给区块对应的管道重新赋值,以便于下次使用。

并发设计难度很大,也很容易出错,上面的做法可能存在一些问题,以后发现时我们再进行修改。接下来我们需要设计互斥锁和共享锁,互斥锁对应的接口为XLock, 共享锁对应的接口为SLock:


func (l *LockTable) initWaitingOnBlk(blk *fm.BlockId) 
	_, ok := l.notify_chan[blk]
	if !ok 
		l.notify_chan[blk] = make(chan struct)
	

	_, ok = l.notify_wg[blk]
	if !ok 
		l.notify_wg[blk] = &sync.WaitGroup
	


func (l *LockTable) SLock(blk *fm.BlockId) error 
	l.method_lock.Lock()
	defer l.method_lock.Unlock()
    l.initWaitingOnBlk(blk)
	start := time.Now()
	for l.hasXlock(blk) && !l.waitingTooLong(start) 
		l.waitGivenTimeOut(blk)
	
	//如果等待过长时间,有可能是产生了死锁
	if l.hasXlock(blk) 
		return errors.New("SLock Exception: XLock on given blk")
	

	val := l.getLockVal(blk)
	l.lock_map[blk] = val + 1
	return nil


func (l *LockTable) XLock(blk *fm.BlockId) error 
	l.method_lock.Lock()
	defer l.method_lock.Unlock()
    l.initWaitingOnBlk(blk)
	start := time.Now()
	for l.hasOtherSLocks(blk) && !l.waitingTooLong(start) 
		l.waitGivenTimeOut(blk)
	

	if l.hasOtherSLocks(blk) 
		return errors.New("XLock error: SLock on given blk")
	

	//-1表示区块被加上互斥锁
	l.lock_map[blk] = -1

	return nil


func (l *LockTable) UnLock(blk *fm.BlockId) 
	l.method_lock.Lock()
	defer l.method_lock.Unlock()

	val := l.getLockVal(blk)
	if val >= 1 
		l.lock_map[blk] = val - 1
	 else 
		delete(l.lock_map, blk)
		//通知所有等待给定区块的线程从Wait中恢复
		l.notifyAll(blk)
	


func (l *LockTable) hasXlock(blk *fm.BlockId) bool 
	return l.getLockVal(blk) < 0


func (l *LockTable) hasOtherSLocks(blk *fm.BlockId) bool 
	return l.getLockVal(blk) >= 1


func (l *LockTable) waitingTooLong(start time.Time) bool 
	elapsed := time.Since(start).Seconds()
	if elapsed >= MAX_WAITING_TIME 
		return true
	

	return false


func (l *LockTable) getLockVal(blk *fm.BlockId) int64 
	val, ok := l.lock_map[blk]
	if !ok 
		return 0
	

	return val

在上面的代码实现中,需要注意的是,在调用XLock或是SLock时,首先需要判断给定区块是否被加上其他锁,也就是调用XLock时需要判断区块是否已经被加了共享锁,调用SLock时判断区块是否已经被加了互斥锁。

如果区块已经被加上了其他锁,那么线程就会先挂起一段时间,例如某个线程调用SLock给区块1加上共享锁,此时他发现区块1已经被加上了互斥锁,那么他就会调用waitGivenTimeOut来挂起一段时间,超时后他会自动唤醒,然后再次判断互斥锁是否已经解除,如果还没解除,那么他需要返回一个错误,于是对应的交易就要放弃读取给定区块的内容。如果它在挂起期间,另一个线程已经完成了写入操作,于是就会调用notifyAll接口,然后该线程就会从挂起中恢复,接下来他就会对区块加上共享锁,然后读取区块的数据。

这里我们实现共享锁和互斥锁的机制很简单,我们使用一个map来实现。如果给定区块被加上互斥锁,那么该区块对应数值就是-1,如果被加上共享锁,那么对应的数值就是一个大于0的数,如果有三个线程针对同一区块获取共享锁,那么该区块对应的数值就是3,当给定区块数值对应为0时,表示没有锁加在给定区块上。

下面我们需要对上面实现的逻辑进行检测,首先要检验waitGivenTimeOut和notifyAll的正确性,测试用例这么做,首先创建区块1,然后启动4个线程,第一个线程先在区块1上获取互斥锁,接下来启动线程2,3,4,后面三个线程针对区块1获得共享锁,由于此时区块1已经被加上互斥锁,因此后面三个线程会被挂起给定时长MAX_WAITING_TIME,一旦超时后他们被唤醒,然后发现区块1依然被加上了互斥锁,于是返回错误,下面是测试用例的实现:


func TestRoutinesWithSLockTimeout(t *testing.T) 
	var err_array []error
	var err_array_lock sync.Mutex
	blk := fm.NewBlockId("testfile", 1)
	lock_table := NewLockTable()
	lock_table.XLock(blk)
	var wg sync.WaitGroup
	for i := 0; i < 3; i++ 
		go func() 
			wg.Add(1)
			defer wg.Done()
			err_array_lock.Lock()
			defer err_array_lock.Unlock()
			err := lock_table.SLock(blk)
			if err == nil 
				fmt.Println("access slock ok")
			
			err_array = append(err_array, err)
		()
	
	time.Sleep(1 * time.Second) //让线程都运行起来
	start := time.Now()
	wg.Wait()
	elapsed := time.Since(start).Seconds()
	require.Equal(t, elapsed >= MAX_WAITING_TIME, true)
	require.Equal(t, len(err_array), 3)
	for i := 0; i < 3; i++ 
		require.Equal(t, err_array[i], errors.New("SLock Exception: XLock on given blk"))
	

我们用如下命令执行上面的用例:

go test tx -run TestRoutinesWithSLockTimeout

运行后返回结果为:

ok      tx      9.753s

也就是说测试用例的执行是成功的。我们再看一个用例,线程1先获取互斥锁,然后启动3个线程去获取共享锁并进入挂起状态,线程1在挂起超时前释放互斥锁,调用notifyAll唤起所有挂起的线程,被唤起的线程都能获得共享锁并读取区块数据,代码如下:


func TestRoutinesWithSLockAfterXLockRelease(t *testing.T) 
	var err_array []error
	var err_array_lock sync.Mutex
	blk := fm.NewBlockId("testfile", 1)
	lock_table := NewLockTable()
	lock_table.XLock(blk)
	var wg sync.WaitGroup
	for i := 0; i < 3; i++ 
		go func() 
			wg.Add(1)
			defer wg.Done()
			err_array_lock.Lock()
			defer err_array_lock.Unlock()
			err := lock_table.SLock(blk)
			if err == nil 
				fmt.Println("access slock ok")
			
			err_array = append(err_array, err)
		()
	
	time.Sleep(1 * time.Second) //让线程都运行起来
	lock_table.UnLock(blk)      //释放加在区块上的互斥锁
	start := time.Now()
	wg.Wait()
	elapsed := time.Since(start).Seconds()
	require.Equal(t, elapsed < MAX_WAITING_TIME, true)
	require.Equal(t, len(err_array), 3)
	for i := 0; i < 3; i++ 
		require.Nil(t, err_array[i]) //所有线程能获得共享锁然后读取数据
	

	require.Equal(t, lock_table.lock_map[blk], int64(3))

上面的用例经过多次运行均能通过。更多更详细的视频演示和讲解请参看b站,搜索Coding迪斯尼。

以上是关于自己动手写数据库:并发管理组件lock_table的原理和实现的主要内容,如果未能解决你的问题,请参考以下文章

自己动手写数据库:并发管理器的实现,以及并发交易流程的分析

自己动手写数据库:并发管理器的实现,以及并发交易流程的分析

自己动手写数据库:并发管理的基本原理

自己动手写数据库:并发管理的基本原理

自己动手写数据库:实现交易对象和恢复管理器

自己动手写数据库:视图元数据管理,统计元数据管理