自己动手写数据库:实现表扫描

Posted tyler_download

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自己动手写数据库:实现表扫描相关的知识,希望对你有一定的参考价值。

在上一节我们实现了记录管理,本节我们看看记录的读取实现,也就是所谓的表扫描。我们将实现一个名为TableScan的类,它把表的记录当做数组来读取,通过挪到一个记录指针来遍历表的记录,它的作用类似于cursor。我们实现这个类的目的是增加一层抽象,前面我们实现的RecordPage暴露很多底层信息,例如记录的数据格式,区块号等,我们通过这个类将这些底层信息隐藏起来,通过它来读写记录可以避开对区块号,存储页面等底层对象。

一条记录可以由两个信息来确定,分别是区块号,其用来表明记录信息存储在哪个区块,另一个是插槽号,用来查找记录的起始偏移,我们先用一个类RecordIdentifier将这些信息封装起来,我们先看看其接口定义,在interface.go中添加代码如下:

type RIDInterface interface 
	BlockNumber() int //记录所在的区块号
	Slot()  int //记录的插槽号

TableScan类用来遍历给定表的记录,它首先定位”当前目录“,然后通过其提供的接口可以实现读取上一条或下一条目录,我们先看看其要实现的接口,在interface.go中添加代码如下:

type TableScanInterface interface 
	Close()
	HasField(field_name string) bool
	BeforeFirst()             //将指针放在第一条记录前
	Next() bool               //读取下一条记录
	MoveToRid(r RIDInterface) //跳转到给定目录
	Insert()
	
	GetInt(field_name string) int 
	GetString(field_name string) string 
	SetInt(field_name string, val int)
	SetString(field_name string, val string)
	CurrentRID() RIDInterface
	Delete() 

可以看到TableScan类对应接口跟RecordPage很像,只不过它不直接处理交易对象(Tx),区块号等底层数据。下面我们看看TableScan类的具体实现,增加名为table_scan.go文件,增加内容如下:

package record_manager

import (
	fm "file_manager"
	"tx"
)

type TableScan struct 
	tx           *tx.Transation
	layout       LayoutInterface
	rp           RecordManagerInterface
	file_name    string
	current_slot int


func NewTableScan(tx *tx.Transation, table_name string, layout LayoutInterface) *TableScan 
	table_scan := &TableScan
		tx:        tx,
		layout:    layout,
		file_name: table_name + ".tbl",
	

	size, err := tx.Size(table_scan.file_name)
	if err != nil 
		panic(err)
	
	if size == 0 
		//如果文件为空,那么增加一个区块
		table_scan.MoveToNewBlock()
	 else 
		//先读取第一个区块
		table_scan.MoveToBlock(0)
	

	return table_scan


func (t *TableScan) Close() 
	if t.rp != nil 
		t.tx.UnPin(t.rp.Block())
	


func (t *TableScan) BeforeFirst() 
	t.MoveToBlock(0)


func (t *TableScan) Next() bool 
	/*
		如果在当前区块找不到给定有效记录则遍历后续区块,直到所有区块都遍历为止
	*/
	t.current_slot = t.rp.NextAfter(t.current_slot)
	for t.current_slot < 0 
		if t.AtLastBlock() 
			//直到最后一个区块都找不到给定插槽
			return false
		

		t.MoveToBlock(int(t.rp.Block().Number() + 1))
		t.current_slot = t.rp.NextAfter(t.current_slot)
	

	return true


func (t *TableScan) GetInt(field_name string) int 
	return t.rp.GetInt(t.current_slot, field_name)


func (t *TableScan) GetString(field_name string) string 
	return t.rp.GetString(t.current_slot, field_name)


func (t *TableScan) GetVal(field_name string) *Constant 
	if t.layout.Schema().Type(field_name) == INTEGER 
		return NewConstantInt(t.GetInt(field_name))
	

	return NewConstantString(t.GetString(field_name))


func (t *TableScan) HasField(field_name string) bool 
	return t.layout.Schema().HasFields(field_name)


func (t *TableScan) SetInt(field_name string, val int) 
	t.rp.SetInt(t.current_slot, field_name, val)


func (t *TableScan) SetString(field_name string, val string) 
	t.rp.SetString(t.current_slot, field_name, val)


func (t *TableScan) SetVal(field_name string, val *Constant) 
	if t.layout.Schema().Type(field_name) == INTEGER 
		t.SetInt(field_name, val.IVal)
	 else 
		t.SetString(field_name, val.SVal)
	


func (t *TableScan) Insert() 
	/*
		将当前插槽号指向下一个可用插槽
	*/
	t.current_slot = t.rp.InsertAfter(t.current_slot)
	for t.current_slot < 0  //当前区块找不到可用插槽
		if t.AtLastBlock() 
			//如果当前处于最后一个区块,那么新增一个区块
			t.MoveToNewBlock()
		 else 
			t.MoveToBlock(int(t.rp.Block().Number() + 1))
		

		t.current_slot = t.rp.InsertAfter(t.current_slot)
	


func (t *TableScan) Delete() 
	t.rp.Delete(t.current_slot)


func (t *TableScan) MoveToRid(r RIDInterface) 
	t.Close()
	blk := fm.NewBlockId(t.file_name, uint64(r.BlockNumber()))
	t.rp = NewRecordPage(t.tx, blk, t.layout)
	t.current_slot = r.Slot()


func (t *TableScan) GetRid() RIDInterface 
	return NewRID(int(t.rp.Block().Number()), t.current_slot)


func (t *TableScan) MoveToBlock(blk_num int) 
	t.Close()
	blk := fm.NewBlockId(t.file_name, uint64(blk_num))
	t.rp = NewRecordPage(t.tx, blk, t.layout)
	t.current_slot = -1


func (t *TableScan) MoveToNewBlock() 
	t.Close()
	blk, err := t.tx.Append(t.file_name)
	if err != nil 
		panic(err)
	
	t.rp = NewRecordPage(t.tx, blk, t.layout)
	t.rp.Format()
	t.current_slot = -1


func (t *TableScan) AtLastBlock() bool 
	size, err := t.tx.Size(t.file_name)
	if err != nil 
		panic(err)
	
	return t.rp.Block().Number() == size-1


TableScan 主要是在RecordPage基础上对记录的读取进行了封装,用于依次读取给定表的记录。在使用它时,首先调用其BeforeFirst接口将记录的指针挪到首条记录,然后调用Next挪到记录指针指向下一条要读取的记录,最后使用GetInt,GetString来读取对应字段的内容。它的实现中还用到两个类,分别是Constant和RID,这两个类的实现都很简单,先创建Constant.go,实现内容如下:

package record_manager

import (
	"strconv"
)

type Constant struct 
	IVal  int
	SVal  string
	isInt bool


func NewConstantInt(val int) *Constant 
	return &Constant
		IVal:  val,
		isInt: true,
	


func NewConstantString(val string) *Constant 
	return &Constant
		SVal:  val,
		isInt: false,
	


func (c *Constant) AsInt() int 
	return c.IVal


func (c *Constant) AsString() string 
	return c.SVal


func (c *Constant) Equals(other *Constant) bool 
	if c.isInt 
		return c.IVal == other.IVal
	

	return c.SVal == other.SVal


func (c *Constant) CompareTo(other *Constant) int 
	if c.isInt 
		return c.IVal - other.IVal
	

	if c.SVal > other.SVal 
		return 1
	 else if c.SVal == other.SVal 
		return 0
	 else 
		return -1
	


func (c *Constant) ToString() string 
	if c.isInt 
		return strconv.Itoa(c.IVal)
	

	return c.SVal


创建rid.go,实现内容如下:

package record_manager

import (
	"fmt"
)

type RID struct 
	blk_num int
	slot    int


func NewRID(blk_num int, slot int) *RID 
	return &RID
		blk_num: blk_num,
		slot:    slot,
	


func (r *RID) BlockNumber() int 
	return r.blk_num


func (r *RID) Slot() int 
	return r.slot


func (r *RID) Equals(other RIDInterface) bool 
	return r.blk_num == other.BlockNumber() && r.slot == other.Slot()


func (r *RID) ToString() string 
	return fmt.Sprintf("[ %d , %d ]", r.blk_num, r.slot)


最后添加table_scan_test.go,实现TableScan对象的测试用例:

package record_manager

import (
	bmg "buffer_manager"
	fm "file_manager"
	"fmt"
	"github.com/stretchr/testify/require"
	lm "log_manager"
	"math/rand"
	"testing"
	"tx"
)

func TestTableScanInsertAndDelete(t *testing.T) 
	file_manager, _ := fm.NewFileManager("recordtest", 400)
	log_manager, _ := lm.NewLogManager(file_manager, "logfile.log")
	buffer_manager := bmg.NewBufferManager(file_manager, log_manager, 3)

	tx := tx.NewTransation(file_manager, log_manager, buffer_manager)
	sch := NewSchema()

	sch.AddIntField("A")
	sch.AddStringField("B", 9)
	layout := NewLayoutWithSchema(sch)
	for _, field_name := range layout.Schema().Fields() 
		offset := layout.Offset(field_name)
		fmt.Printf("%s has offset %d\\n", field_name, offset)
	

	ts := NewTableScan(tx, "T", layout)
	fmt.Println("Filling the table with 50 random records")
	ts.BeforeFirst()
	val_for_field_A := make([]int, 0)
	for i := 0; i < 50; i++ 
		ts.Insert() //指向一个可用插槽
		n := rand.Intn(50)
		ts.SetInt("A", n)
		val_for_field_A = append(val_for_field_A, n)
		s := fmt.Sprintf("rec%d", n)
		ts.SetString("B", s)
		fmt.Printf("inserting into slot %s: %d , %s\\n", ts.GetRid().ToString(), n, s)
	

	ts.BeforeFirst()
	//测试插入的内容是否正确
	slot := 0
	for ts.Next() 
		a := ts.GetInt("A")
		b := ts.GetString("B")
		require.Equal(t, a, val_for_field_A[slot])
		require.Equal(t, b, fmt.Sprintf("rec%d", a))
		slot += 1
	

	fmt.Println("Deleting records with A-values < 25")
	count := 0
	ts.BeforeFirst()
	for ts.Next() 
		a := ts.GetInt("A")
		b := ts.GetString("B")
		if a < 25 
			count += 1
			fmt.Printf("slot %s :  %d , %s\\n", ts.GetRid().ToString(), a, b)
			ts.Delete()
		
	

	fmt.Println("Here are the remaining records:")
	ts.BeforeFirst()
	for ts.Next() 
		a := ts.GetInt("A")
		b := ts.GetString("B")
		require.Equal(t, a >= 25, true)
		fmt.Printf("slot %s :  %d , %s\\n", ts.GetRid(), a, b)
	

	ts.Close()
	tx.Commit()


其内容和RecordPage的测试用例一模一样,只不过调用的接口稍微有所变化而已。更详细的调试演示在B站搜索Coding迪斯尼。代码下载:链接: https://pan.baidu.com/s/1gaFleDPkR1FcPt6wwHCEdA 提取码: 2tji

以上是关于自己动手写数据库:实现表扫描的主要内容,如果未能解决你的问题,请参考以下文章

java单链表的实现自己动手写一个单链表

自己动手写数据库:实现数据库表的元数据管理

自己动手写数据库:实现数据库表的元数据管理

自己动手写数据库:数据库系统的日志模块实现

自己动手写编译器:符号表及其实现

自己动手写一个单链表