自己动手写数据库:实现表扫描
Posted tyler_download
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自己动手写数据库:实现表扫描相关的知识,希望对你有一定的参考价值。
在上一节我们实现了记录管理,本节我们看看记录的读取实现,也就是所谓的表扫描。我们将实现一个名为TableScan的类,它把表的记录当做数组来读取,通过挪到一个记录指针来遍历表的记录,它的作用类似于cursor。我们实现这个类的目的是增加一层抽象,前面我们实现的RecordPage暴露很多底层信息,例如记录的数据格式,区块号等,我们通过这个类将这些底层信息隐藏起来,通过它来读写记录可以避开对区块号,存储页面等底层对象。
一条记录可以由两个信息来确定,分别是区块号,其用来表明记录信息存储在哪个区块,另一个是插槽号,用来查找记录的起始偏移,我们先用一个类RecordIdentifier将这些信息封装起来,我们先看看其接口定义,在interface.go中添加代码如下:
type RIDInterface interface
BlockNumber() int //记录所在的区块号
Slot() int //记录的插槽号
TableScan类用来遍历给定表的记录,它首先定位”当前目录“,然后通过其提供的接口可以实现读取上一条或下一条目录,我们先看看其要实现的接口,在interface.go中添加代码如下:
type TableScanInterface interface
Close()
HasField(field_name string) bool
BeforeFirst() //将指针放在第一条记录前
Next() bool //读取下一条记录
MoveToRid(r RIDInterface) //跳转到给定目录
Insert()
GetInt(field_name string) int
GetString(field_name string) string
SetInt(field_name string, val int)
SetString(field_name string, val string)
CurrentRID() RIDInterface
Delete()
可以看到TableScan类对应接口跟RecordPage很像,只不过它不直接处理交易对象(Tx),区块号等底层数据。下面我们看看TableScan类的具体实现,增加名为table_scan.go文件,增加内容如下:
package record_manager
import (
fm "file_manager"
"tx"
)
type TableScan struct
tx *tx.Transation
layout LayoutInterface
rp RecordManagerInterface
file_name string
current_slot int
func NewTableScan(tx *tx.Transation, table_name string, layout LayoutInterface) *TableScan
table_scan := &TableScan
tx: tx,
layout: layout,
file_name: table_name + ".tbl",
size, err := tx.Size(table_scan.file_name)
if err != nil
panic(err)
if size == 0
//如果文件为空,那么增加一个区块
table_scan.MoveToNewBlock()
else
//先读取第一个区块
table_scan.MoveToBlock(0)
return table_scan
func (t *TableScan) Close()
if t.rp != nil
t.tx.UnPin(t.rp.Block())
func (t *TableScan) BeforeFirst()
t.MoveToBlock(0)
func (t *TableScan) Next() bool
/*
如果在当前区块找不到给定有效记录则遍历后续区块,直到所有区块都遍历为止
*/
t.current_slot = t.rp.NextAfter(t.current_slot)
for t.current_slot < 0
if t.AtLastBlock()
//直到最后一个区块都找不到给定插槽
return false
t.MoveToBlock(int(t.rp.Block().Number() + 1))
t.current_slot = t.rp.NextAfter(t.current_slot)
return true
func (t *TableScan) GetInt(field_name string) int
return t.rp.GetInt(t.current_slot, field_name)
func (t *TableScan) GetString(field_name string) string
return t.rp.GetString(t.current_slot, field_name)
func (t *TableScan) GetVal(field_name string) *Constant
if t.layout.Schema().Type(field_name) == INTEGER
return NewConstantInt(t.GetInt(field_name))
return NewConstantString(t.GetString(field_name))
func (t *TableScan) HasField(field_name string) bool
return t.layout.Schema().HasFields(field_name)
func (t *TableScan) SetInt(field_name string, val int)
t.rp.SetInt(t.current_slot, field_name, val)
func (t *TableScan) SetString(field_name string, val string)
t.rp.SetString(t.current_slot, field_name, val)
func (t *TableScan) SetVal(field_name string, val *Constant)
if t.layout.Schema().Type(field_name) == INTEGER
t.SetInt(field_name, val.IVal)
else
t.SetString(field_name, val.SVal)
func (t *TableScan) Insert()
/*
将当前插槽号指向下一个可用插槽
*/
t.current_slot = t.rp.InsertAfter(t.current_slot)
for t.current_slot < 0 //当前区块找不到可用插槽
if t.AtLastBlock()
//如果当前处于最后一个区块,那么新增一个区块
t.MoveToNewBlock()
else
t.MoveToBlock(int(t.rp.Block().Number() + 1))
t.current_slot = t.rp.InsertAfter(t.current_slot)
func (t *TableScan) Delete()
t.rp.Delete(t.current_slot)
func (t *TableScan) MoveToRid(r RIDInterface)
t.Close()
blk := fm.NewBlockId(t.file_name, uint64(r.BlockNumber()))
t.rp = NewRecordPage(t.tx, blk, t.layout)
t.current_slot = r.Slot()
func (t *TableScan) GetRid() RIDInterface
return NewRID(int(t.rp.Block().Number()), t.current_slot)
func (t *TableScan) MoveToBlock(blk_num int)
t.Close()
blk := fm.NewBlockId(t.file_name, uint64(blk_num))
t.rp = NewRecordPage(t.tx, blk, t.layout)
t.current_slot = -1
func (t *TableScan) MoveToNewBlock()
t.Close()
blk, err := t.tx.Append(t.file_name)
if err != nil
panic(err)
t.rp = NewRecordPage(t.tx, blk, t.layout)
t.rp.Format()
t.current_slot = -1
func (t *TableScan) AtLastBlock() bool
size, err := t.tx.Size(t.file_name)
if err != nil
panic(err)
return t.rp.Block().Number() == size-1
TableScan 主要是在RecordPage基础上对记录的读取进行了封装,用于依次读取给定表的记录。在使用它时,首先调用其BeforeFirst接口将记录的指针挪到首条记录,然后调用Next挪到记录指针指向下一条要读取的记录,最后使用GetInt,GetString来读取对应字段的内容。它的实现中还用到两个类,分别是Constant和RID,这两个类的实现都很简单,先创建Constant.go,实现内容如下:
package record_manager
import (
"strconv"
)
type Constant struct
IVal int
SVal string
isInt bool
func NewConstantInt(val int) *Constant
return &Constant
IVal: val,
isInt: true,
func NewConstantString(val string) *Constant
return &Constant
SVal: val,
isInt: false,
func (c *Constant) AsInt() int
return c.IVal
func (c *Constant) AsString() string
return c.SVal
func (c *Constant) Equals(other *Constant) bool
if c.isInt
return c.IVal == other.IVal
return c.SVal == other.SVal
func (c *Constant) CompareTo(other *Constant) int
if c.isInt
return c.IVal - other.IVal
if c.SVal > other.SVal
return 1
else if c.SVal == other.SVal
return 0
else
return -1
func (c *Constant) ToString() string
if c.isInt
return strconv.Itoa(c.IVal)
return c.SVal
创建rid.go,实现内容如下:
package record_manager
import (
"fmt"
)
type RID struct
blk_num int
slot int
func NewRID(blk_num int, slot int) *RID
return &RID
blk_num: blk_num,
slot: slot,
func (r *RID) BlockNumber() int
return r.blk_num
func (r *RID) Slot() int
return r.slot
func (r *RID) Equals(other RIDInterface) bool
return r.blk_num == other.BlockNumber() && r.slot == other.Slot()
func (r *RID) ToString() string
return fmt.Sprintf("[ %d , %d ]", r.blk_num, r.slot)
最后添加table_scan_test.go,实现TableScan对象的测试用例:
package record_manager
import (
bmg "buffer_manager"
fm "file_manager"
"fmt"
"github.com/stretchr/testify/require"
lm "log_manager"
"math/rand"
"testing"
"tx"
)
func TestTableScanInsertAndDelete(t *testing.T)
file_manager, _ := fm.NewFileManager("recordtest", 400)
log_manager, _ := lm.NewLogManager(file_manager, "logfile.log")
buffer_manager := bmg.NewBufferManager(file_manager, log_manager, 3)
tx := tx.NewTransation(file_manager, log_manager, buffer_manager)
sch := NewSchema()
sch.AddIntField("A")
sch.AddStringField("B", 9)
layout := NewLayoutWithSchema(sch)
for _, field_name := range layout.Schema().Fields()
offset := layout.Offset(field_name)
fmt.Printf("%s has offset %d\\n", field_name, offset)
ts := NewTableScan(tx, "T", layout)
fmt.Println("Filling the table with 50 random records")
ts.BeforeFirst()
val_for_field_A := make([]int, 0)
for i := 0; i < 50; i++
ts.Insert() //指向一个可用插槽
n := rand.Intn(50)
ts.SetInt("A", n)
val_for_field_A = append(val_for_field_A, n)
s := fmt.Sprintf("rec%d", n)
ts.SetString("B", s)
fmt.Printf("inserting into slot %s: %d , %s\\n", ts.GetRid().ToString(), n, s)
ts.BeforeFirst()
//测试插入的内容是否正确
slot := 0
for ts.Next()
a := ts.GetInt("A")
b := ts.GetString("B")
require.Equal(t, a, val_for_field_A[slot])
require.Equal(t, b, fmt.Sprintf("rec%d", a))
slot += 1
fmt.Println("Deleting records with A-values < 25")
count := 0
ts.BeforeFirst()
for ts.Next()
a := ts.GetInt("A")
b := ts.GetString("B")
if a < 25
count += 1
fmt.Printf("slot %s : %d , %s\\n", ts.GetRid().ToString(), a, b)
ts.Delete()
fmt.Println("Here are the remaining records:")
ts.BeforeFirst()
for ts.Next()
a := ts.GetInt("A")
b := ts.GetString("B")
require.Equal(t, a >= 25, true)
fmt.Printf("slot %s : %d , %s\\n", ts.GetRid(), a, b)
ts.Close()
tx.Commit()
其内容和RecordPage的测试用例一模一样,只不过调用的接口稍微有所变化而已。更详细的调试演示在B站搜索Coding迪斯尼。代码下载:链接: https://pan.baidu.com/s/1gaFleDPkR1FcPt6wwHCEdA 提取码: 2tji
以上是关于自己动手写数据库:实现表扫描的主要内容,如果未能解决你的问题,请参考以下文章