自己动手写数据库:实现交易对象和恢复管理器

Posted tyler_download

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自己动手写数据库:实现交易对象和恢复管理器相关的知识,希望对你有一定的参考价值。

前面一节我们完成了用于实现系统恢复的日志,本节我们看看如何基于日志内容实现系统恢复。我们将设计一个系统恢复管理器,它在系统启动时读取日志内容,根据读到的日志对数据进行恢复,由于所谓“恢复”其实是交易的回滚,因此我们首先实现交易对象,前面为了测试方便,我们简单的提供了交易对象的几个简单接口,这里我们将实现一个逻辑完整的交易对象,只不过我们暂时忽略其并发管理逻辑,并发功能我们将在后面的章节进行实现。

首先我们先了解交易对象的基本结构:

这里我们先忽略并发管理,它将在后一节进行针对性的研究,我们首先实现Transation,BufferList,和RecoverMgr。前面章节中我们使用Buffer对象来实现数据写入缓存页面或者存入磁盘,而Transation其实是对Buffer提供接口的封装和调用,它除了支持数据读写功能外,还在此基础上提供了并发控制,恢复控制等功能,后面其他模块都必须通过Transation对象来实现数据的写入和读取,首先我们在interface.go中增加一个常量定义:

const (
	UINT64_LENGTH = 8
	END_OF_FILE = -1
)

接着增加一个buffer_list.go,它用来记录或快速查询当前被pin的内存页面,其内容如下:

package tx

import (
	fm "file_manager"
	"fmt"
	bm "buffer_manager"
)

type BufferList struct 
	buffers map[*fm.BlockId]*bm.Buffer 
	buffer_mgr *bm.BufferManager
	pins []*fm.BlockId


func NewBufferList(buffer_mgr *bm.BufferManager) *BufferList 
	buffer_list := &BufferList
		buffer_mgr : buffer_mgr,
		buffers: make(map[*fm.BlockId]*bm.Buffer ),
		pins: []fm.BlockId,
	

	return buffer_list


func (b *BufferList) get_buffer(blk *fm.BlockId) *bm.Buffer 
	buff, _ := b.buffers[blk]
	return buff 


func (b *BufferList) Pin(blk *fm.BlockId) error
	//一旦一个内存页被pin后,将其加入map进行追踪管理
	buff, err := b.buffer_mgr.Pin(blk)
	if err != nil 
		return err
	
    s.buffers[blk] = buff
	b.pins = append(b.pins, blk)
	return nil 


func (b *BufferList)Unpin(blk *fm.BlockId) 
	buffer, ok := b.buffers[blk]
	if !ok 
		return 
	

	b.buffer_mgr.Unpin(blk)
	for idx, pinned_blk := range b.pins 
		if pinned_blk == blk 
			b.pins = append(s.pins[:idx], s.pins[idx+1]...)
			break
		
	

	delete(s.buffers, blk)


func (b *BufferList)UnpinAll() 
	for _, blk in range b.pins 
		buffer := s.buffers[blk]
		s.buffer_mgr.Unpin(buffer)
	

    s.buffers = make(map[*fm.BlockId]*bm.Buffer)
    s.pins = make([]*fm.BlockId)

下面我们看看交易对象的实现,增加transation.go,添加代码如下:

package tx

import (
	fm "file_manager"
	"fmt"
	lg "log_manager"
	bm "buffer_manager"
	"sync"
	"errors"
)

var tx_num_mu sync.Mutex
next_tx_num := int32(0)

func NxtTxNum() int32
	tx_num_mu.Lock()
	defer tx_num_mu.Unlock()

	next_tx_num = next_tx_num + 1

	return next_tx_num


type  Transation struct
    //concur_mgr  ConcurrentMgr*
	//recovery_mgr RecorveryMgr* 
    file_manager *fm.FileManager
	log_manager *lg.LogManager 
	buffer_manager *bm.BufferManager
	my_buffers  *BufferList 
    tx_num      int32


func NewTransation(file_manager *fm.FileManager, log_manager *lg.LogManager, 
	buffer_manager bm.BufferManager) *Transation 
		tx := &Transation 
			//创建同步管理器
			//创建恢复管理器
			file_manager: file_manager,
			log_manager: log_manager,
			buffer_manager: buffer_manager,
			my_buffers: NewBufferList(buffer_manager), 
		

		return tx


func (t *Transation)Commit() 
	//调用恢复管理器执行commit
	//t.recovery_mgr.Commit()

	r := fmt.Sprintf("transation %d  committed", t.tx_num)
	fmt.Println(r)
	//释放同步管理器
	t.my_buffers.UnpinAll()


func (t *Transation) Rollback() 
	//调用恢复管理器rollback
	//t.recovery_mft.Rollback()
	r := fmt.Sprintf("transation %d roll back", t.tx_num)
	//释放同步管理器
	t.my_buffers.UnpinAll()


func(t *Transation)Recover() 
	//系统启动时会在所有交易执行前执行该函数
	t.bm.FlushAll(t.tx_num)
	//调用回复管理器的recover接口
	//t.recovery_mgr.Recover()


func (t *Transation)Pin(blk *fm.BlockId) 
	t.my_buffers.Pin(blk)


func (t *Transation) Unpin(blk *fm.BlockId) 
	t.my_buffers.Unpin(blk)


func (t *Transation) buffer_no_exist(blk *fm.BlockId) error
	err_s := fmt.Sprintf("No buffer found for given blk : %d with file name: %s\\n", 
	blk.Number(), blk.FileName())
	err := errors.New(err_s)
	return err 


func (t *Transation) GetInt(blk *fm.BlockId, offset uint64) int64, error 
	//调用同步管理器加s锁
	//t.concur_mgr.Slock(blk)

	buff := t.my_buffers.get_buffer(blk)
	if buff == nil 
		return -1, t.buffer_no_exist(blk) 
	

	return int64(buff.Contents.GetInt(offset)), nil 


func(t *Transation) GetString(blk *fm.BlockId, offset uint64) string, error 
	//调用同步管理器加s锁
	//t.concur_mgr.Slock(blk)

	buff := t.my_buffers.get_buffer(blk)
	if buff == nil 
		return "", t.buffer_no_exist(blk) 
	

	return buff.Contents().GetString(offset), nil 


func (t *Transation) SetInt(blk *fm.BlockId, offset uint64, val int64, okToLog bool) error 
	//调用同步管理器加x锁
	//t.concur_mgr.Xlock(blk)

	buff := t.my_buffers.get_buffer(blk)
	if buff == nil 
		return t.buffer_no_exist(blk)
	

	lsn := 0
	if okToLog 
		//调用恢复管理器的SetInt方法
		//lsn = t.recovery_mgr.SetInt(buff, offset, val)
	

	p = buff.Contents()
	p.SetInt(offset, uint64(val))
	buff.SetModified(t.tx_num, lsn)


func (t *Transation) SetString(blk *fm.BlockId, offset uint64, val string, okToLog bool) error 
	//使用同步管理器加x锁
	//t.concur_mgr.Xlock(blk)
	
	buff := t.my_buffers.get_buffer(blk)
	if buff == nil 
		return t.buffer_no_exist(blk)
	

	lsn := 0
	if okToLog 
		//调用恢复管理器SetString方法
		//lsn = t.recovery_mgr.SetString(buff, offset, val)
	

	p := buff.Contents()
	p.SetString(offset, val)
	buff.SetModified(t.tx_num, lsn)


func (t *Transation) Size(file_name string) uint64 
	//调用同步管理器加锁
	//dummy_blk := fm.NewBlockId(file_name, END_OF_FILE)
	//t.concur_mgr.Slock(dummy_blk)
	return t.file_manager.Size(file_name)


func (t *Transation)Append(file_name string) *fm.BlockId
	//调用同步管理器加锁
	//dummy_blk := fm.NewBlockId(file_name, END_OF_FILE)
	//t.concur_mgr.Xlock(dummy_blk)
	blk, err := t.file_manager.Append(file_name)
	if err != nil 
		return nil 
	

	return blk 


func (t *Transation) BlockSize() uint64
	return t.file_manager.BlockSize()


func (t *Transation) AvailableBuffers() uint64
	return t.buffer_manager.Available()



由于交易对象在执行写入或读出时需要根据并发情况加相应的锁,而且它在写入数据时还需要调用恢复管理器记录写入状况以便未来执行恢复操作,但是并发管理器和恢复管理器目前尚未实现,因此我们在调用他们的接口时先注释掉。下面我们看看恢复管理器的实现,一旦完成恢复管理器的代码后,我们再将上面涉及到恢复管理器的注释进行返注释。

下面我们再看看恢复管理器的实现,添加recovery_mgr.go,添加代码如下:

package tx

import (
	bm "buffer_manager"
	fm "file_manager"
	lg "log_manager"
)

type RecoveryManager struct 
	log_manager    *lg.LogManager
	buffer_manager *bm.BufferManager
	tx             *Transation
	tx_num         int32


func NewRecoveryManager(tx *Transation, tx_num int32, log_manager *lg.LogManager,
	buffer_manager *bm.BufferManager) *RecoveryManager 
	recovery_mgr := &RecoveryManager
		tx:             tx,
		log_manager:    log_manager,
		buffer_manager: buffer_manager,
	

	p := fm.NewPageBySize(32)
	p.SetInt(0, uint64(START))
	p.SetInt(8, uint64(tx_num))
	start_record := NewStartRecord(p, log_manager)
	start_record.WriteToLog()

	return recovery_mgr


func (r *RecoveryManager) Commit() error 
	r.buffer_manager.FlushAll(r.tx_num)
	lsn, err := WriteCommitkRecordLog(r.log_manager, uint64(r.tx_num))
	if err != nil 
		return err
	

	r.log_manager.FlushByLSN(lsn)
	return nil


func (r *RecoveryManager) Rollback() error 
	r.doRollback()
	r.buffer_manager.FlushAll(r.tx_num)
	lsn, err := WriteRollBackLog(r.log_manager, uint64(r.tx_num))
	if err != nil 
		return err
	

	r.log_manager.FlushByLSN(lsn)
	return nil


func (r *RecoveryManager) Recover() error 
	r.doRecover()
	r.buffer_manager.FlushAll(r.tx_num)
	lsn, err := WriteCheckPointToLog(r.log_manager)
	if err != nil 
		return err
	

	r.log_manager.FlushByLSN(lsn)
	return nil


func (r *RecoveryManager) SetInt(buffer *bm.Buffer, offset uint64, new_val int64) (uint64, error) 
	old_val := buffer.Contents().GetInt(offset)
	blk := buffer.Block()
	buffer.Contents().SetInt(offset, uint64(new_val))
	return WriteSetIntLog(r.log_manager, uint64(r.tx_num), blk, offset, old_val)


func (r *RecoveryManager) SetString(buffer *bm.Buffer, offset uint64, new_val string) (uint64, error) 
	old_val := buffer.Contents().GetString(offset)
	blk := buffer.Block()
	buffer.Contents().SetString(offset, new_val)
	return WriteSetStringLog(r.log_manager, uint64(r.tx_num), blk, offset, old_val)


func (r *RecoveryManager) CreateLogRecord(bytes []byte) LogRecordInterface 
	p := fm.NewPageByBytes(bytes)
	switch RECORD_TYPE(p.GetInt(0)) 
	case CHECKPOINT:
		return NewCheckPointRecord()
	case START:
		return NewStartRecord(p, r.log_manager)
	case COMMIT:
		return NewCommitkRecordRecord(p)
	case ROLLBACK:
		return NewRollBackRecord(p)
	case SETINT:
		return NewSetIntRecord(p)
	case SETSTRING:
		return NewSetStringRecord(p)
	default:
		panic("Unknow log interface")
	


func (r *RecoveryManager) doRollback() 
	iter := r.log_manager.Iterator()
	for iter.HasNext() 
		rec := iter.Next()
		log_record := r.CreateLogRecord(rec)
		if log_record.TxNumber() == uint64(r.tx_num) 
			if log_record.Op() == START 
				return
			

			log_record.Undo(r.tx)
		
	


func (r *RecoveryManager) doRecover() 
	finishedTxs := make(map[uint64]bool)
	iter := r.log_manager.Iterator()
	for iter.HasNext() 
		rec := iter.Next()
		log_record := r.CreateLogRecord(rec)
		if log_record.Op() == CHECKPOINT 
			return
		
		if log_record.Op() == COMMIT || log_record.Op() == ROLLBACK 
			finishedTxs[log_record.TxNumber()] = true
		
		existed, _ := finishedTxs[log_record.TxNumber()]
		if existed 
			log_record.Undo(r.tx)
		
	


完成上面代码后,我们记得取消掉transation.go里面关于恢复管理器的注释,为了检测我们的代码基本逻辑是否正确,我们在main.go中拟写如下代码:

package main

import (
	//"encoding/binary"
	fm "file_manager"
	lm "log_manager"

	bmg "buffer_manager"

	"fmt"

	"tx"
)

func main() 
	file_manager, _ := fm.NewFileManager("txtest", 400)
	log_manager, _ := lm.NewLogManager(file_manager, "logfile")
	buffer_manager := bmg.NewBufferManager(file_manager, log_manager, 3)

	tx1 := tx.NewTransation(file_manager, log_manager, buffer_manager)
	blk := fm.NewBlockId("testfile", 1)
	tx1.Pin(blk)
	//设置log为false,因为一开始数据没有任何意义,因此不能进行日志记录
	tx1.SetInt(blk, 80, 1, false) 
	tx1.SetString(blk, 40, "one", false)
	tx1.Commit() //执行回滚操作后,数据会还原到这里写入的内容

	tx2 := tx.NewTransation(file_manager, log_manager, buffer_manager)
	tx2.Pin(blk)
	ival, _ := tx2.GetInt(blk, 80)
	sval, _ := tx2.GetString(blk, 40)
	fmt.Println("initial value at location 80 = ", ival)
	fmt.Println("initial value at location 40 = ", sval)
	new_ival := ival + 1
	new_sval := sval + "!"
	tx2.SetInt(blk, 80, new_ival, true)
	tx2.SetString(blk, 40, new_sval, true)
	tx2.Commit() //尝试写入新的数据

	tx3 := tx.NewTransation(file_manager, log_manager, buffer_manager)
	tx3.Pin(blk)
	ival, _ = tx3.GetInt(blk, 80)
	sval, _ = tx3.GetString(blk, 40)
	fmt.Println("new ivalue at location 80: ", ival)
	fmt.Println("new svalue at location 40: ", sval)
	tx3.SetInt(blk, 80, 999, true)
	ival, _ = tx3.GetInt(blk, 80)
	//写入数据后检查是否写入正确
	fmt.Println("pre-rollback ivalue at location 80: ", ival)
	tx3.RollBack() //执行回滚操作,并确定回滚到第一次写入内容

	tx4 := tx.NewTransation(file_manager, log_manager, buffer_manager)
	tx4.Pin(blk)
	ival, _ = tx4.GetInt(blk, 80)
	fmt.Println("post-rollback at location 80 = ", ival)
	tx4.Commit() //执行到这里时,输出内容应该与第一次写入内容相同


完成后我们执行上面代码得到输出如下:

从输出结果看,我们先在给定位置写入数值1,然后再写入数组2,999,最后调用RollBack()执行回滚,然后再取出同样位置的数据进行打印,通过输出可以看到,RollBack执行后,给定位置的数值变成了最开始我们输入的数值,如此看来,恢复管理器的基本逻辑应该是正确的。更详细的调试演示请在b站搜索Coding迪斯尼,更多干货:http://m.study.163.com/provider/7600199/index.htm?share=2&shareId=7600199,代码下载:
链接: https://pan.baidu.com/s/1rmNZngVOEnuCpjdGKshkKg 提取码: t772

以上是关于自己动手写数据库:实现交易对象和恢复管理器的主要内容,如果未能解决你的问题,请参考以下文章

自己动手写数据库:并发管理器的实现,以及并发交易流程的分析

自己动手写数据库:并发管理器的实现,以及并发交易流程的分析

自己动手写数据库:并发管理组件lock_table的原理和实现

自己动手写数据库:并发管理组件lock_table的原理和实现

自己动手写数据库:记录管理器的设计和实现

自己动手写区块链之交易