LevelDB 源码剖析版本管理:ManifestVersionVersionEditVersionSet

Posted 凌桓丶

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了LevelDB 源码剖析版本管理:ManifestVersionVersionEditVersionSet相关的知识,希望对你有一定的参考价值。

文章目录

版本管理

为什么 LevelDB 需要版本的概念呢?

针对共享的资源,有三种方式:

  • 悲观锁:这是最简单的处理方式。加锁保护,读写互斥。效率低。
  • 乐观锁:它假设多用户并发的事物在处理时不会彼此互相影响,各事务能够在不产生锁的的情况下处理各自影响的那部分数据。在提交数据更新之前,每个事务会先检查在该事务读取数据后,有没有其他事务又修改了该数据。 果其他事务有更新的话,正在提交的事务会进行回滚;这样做不会有锁竞争更不会产生死锁, 但如果数据竞争的概率较高,效率也会受影响 。
  • MVCC:MVCC 是一个数据库常用的概念。Multi Version Concurrency Control 多版本并发控制。每一个执行操作的用户,看到的都是数据库特定时刻的的快照 (Snapshot), Writer 的任何未完成的修改都不会被其他的用户所看到;当对数据进行更新的时候并是不直接覆盖,而是先进行标记,然后在其他地方添加新的数据(这些变更存储在 VersionEdit),从而形成一个新版本,此时再来读取的 Reader 看到的就是最新的版本了。所以这种处理策略是维护了多个版本的数据的,但只有一个是最新的(VersionSet 中维护着全局最新的 Seqnum)。

LevelDB 通过 Version 以及 VersionSet 来管理元信息,用 Manifest 来保存元信息。


Manifest

Manifest 文件专用于记录版本信息。LevelDB 采用了增量式的存储方式,记录每一个版本相较于一个版本的变化情况。

变化情况大致包括:

(1)新增了哪些 SSTable 文件;

(2)删除了哪些 SSTable 文件(由于Compaction导致);

(3)最新的 Journal 日志文件标号等;

一个 Manifest 文件中,包含了多条 Session Record。其中第一条 Session Record 记载了当时 LevelDB 的全量版本信息,其余若干条 Session Record 仅记录每次更迭的变化情况。具体结构如下图:

Manifest文件结构



LevelDB 启动时会先到数据目录寻找一个名为 CURRENT 的文件,该文件中会保存 Manifest 的文件名称,通过读取 Manifest 记录的 Session Record,从初始状态开始不断地应用这些版本改动,即可使得系统的版本信息恢复到最近一次使用的状态。


Version

Version 表示当前的一个版本,该结构中会保存每个层级拥有的文件信息以及指向前一个和后一个版本的指针等。

结构

class Version 
    public:
    struct GetStats 
        FileMetaData* seek_file;
        int seek_file_level;
    ;

    void AddIterators(const ReadOptions&, std::vector<Iterator*>* iters);

    Status Get(const ReadOptions&, const LookupKey& key, std::string* val,
               GetStats* stats);

    bool UpdateStats(const GetStats& stats);

    bool RecordReadSample(Slice key);

    void Ref();
    void Unref();

    void GetOverlappingInputs(
        int level,
        const InternalKey* begin,  // nullptr means before all keys
        const InternalKey* end,    // nullptr means after all keys
        std::vector<FileMetaData*>* inputs);

    bool OverlapInLevel(int level, const Slice* smallest_user_key,
                        const Slice* largest_user_key);

    int PickLevelForMemTableOutput(const Slice& smallest_user_key,
                                   const Slice& largest_user_key);

    int NumFiles(int level) const  return files_[level].size(); 

    std::string DebugString() const;

    private:
    friend class Compaction;
    friend class VersionSet;

    class LevelFileNumIterator;

    explicit Version(VersionSet* vset)
        : vset_(vset),
    next_(this),
    prev_(this),
    refs_(0),
    file_to_compact_(nullptr),
    file_to_compact_level_(-1),
    compaction_score_(-1),
    compaction_level_(-1) 

    Version(const Version&) = delete;
    Version& operator=(const Version&) = delete;

    ~Version();

    Iterator* NewConcatenatingIterator(const ReadOptions&, int level) const;

    void ForEachOverlapping(Slice user_key, Slice internal_key, void* arg,
                            bool (*func)(void*, int, FileMetaData*));

    VersionSet* vset_;  // VersionSet to which this Version belongs
    Version* next_;     // Next version in linked list
    Version* prev_;     // Previous version in linked list
    int refs_;          // Number of live refs to this version

    std::vector<FileMetaData*> files_[config::kNumLevels];

    FileMetaData* file_to_compact_;
    int file_to_compact_level_;

    double compaction_score_;
    int compaction_level_;
;

struct FileMetaData 
    FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) 

    int refs;				 // 引用计数
    int allowed_seeks;     // 用于seek compaction
    uint64_t number;		 // 唯一标识一个sstable
    uint64_t file_size;    // 文件大小
    InternalKey smallest;  // 最小key
    InternalKey largest;   // 最大key
;
  • 成员变量

    • GetStats:键查找时用来保存中间状态的一个结构。
    • vset_:该版本属于的版本集合。
    • next_:指向后一个版本的指针。
    • prev_:指向前一个版本的指针。
    • refs_:该版本的引用计数。
    • files_:每个层级所包含的 SSTable 文件,每一个文件以一个 FileMetaData 结构表示。
    • file_to_compact_:下次需要进行 Compaction 操作的文件。
    • file_to_compact_level_:下次需要进行 Compaction 操作的文件所属的层级。
    • compaction_score_:如果 compaction_score_ 大于 1,说明需要进行一次 Compaction 操作。
    • compaction_level_:表明需要进行 Compaction 操作的层级。


VersionEdit

VersionEdit 是一个版本的中间状态,会保存一次 Compaction 操作后增加的删除文件信息以及其他一些元数据。 当数据库正常/非正常关闭,重新打开时,只需要按顺序把 Manifest 文件中的 VersionEdit 执行一遍,就可以把数据恢复到宕机前的最新版本。

结构

// https://github.com/google/leveldb/blob/master/db/version_edit.h

class VersionEdit 
    public:
    VersionEdit()  Clear(); 
    ~VersionEdit() = default;

    void Clear();

    void SetComparatorName(const Slice& name) 
        has_comparator_ = true;
        comparator_ = name.ToString();
    
    void SetLogNumber(uint64_t num) 
        has_log_number_ = true;
        log_number_ = num;
    
    void SetPrevLogNumber(uint64_t num) 
        has_prev_log_number_ = true;
        prev_log_number_ = num;
    
    void SetNextFile(uint64_t num) 
        has_next_file_number_ = true;
        next_file_number_ = num;
    
    void SetLastSequence(SequenceNumber seq) 
        has_last_sequence_ = true;
        last_sequence_ = seq;
    
    void SetCompactPointer(int level, const InternalKey& key) 
        compact_pointers_.push_back(std::make_pair(level, key));
    

    void AddFile(int level, uint64_t file, uint64_t file_size,
                 const InternalKey& smallest, const InternalKey& largest) 
        FileMetaData f;
        f.number = file;
        f.file_size = file_size;
        f.smallest = smallest;
        f.largest = largest;
        new_files_.push_back(std::make_pair(level, f));
    

    void RemoveFile(int level, uint64_t file) 
        deleted_files_.insert(std::make_pair(level, file));
    

    void EncodeTo(std::string* dst) const;
    Status DecodeFrom(const Slice& src);

    std::string DebugString() const;

    private:
    friend class VersionSet;

    typedef std::set<std::pair<int, uint64_t>> DeletedFileSet;

    std::string comparator_;
    uint64_t log_number_;		//已经弃用
    uint64_t prev_log_number_;
    uint64_t next_file_number_;
    SequenceNumber last_sequence_;
    bool has_comparator_;
    bool has_log_number_;
    bool has_prev_log_number_;
    bool has_next_file_number_;
    bool has_last_sequence_;

    std::vector<std::pair<int, InternalKey>> compact_pointers_;
    DeletedFileSet deleted_files_;
    std::vector<std::pair<int, FileMetaData>> new_files_;
;
  • 成员变量

    • comparator_:比较器名称。
    • log_number_:日志文件序号。
    • next_file_number_: 下一个文件序列号。
    • last_sequence_:下一个写入序列号。
    • has_xxxxx_:has_comparator_ ,has_log_number_ ,has_prev_log_number_ ,has_last_sequence_ ,has_next_file_number_,布尔型变量,表明相应的成员变量是否已经设置。
    • compact_pointers_:该变量用来指示 LevelDB 中每个层级下一次进行 Compaction 操作时需要从哪个键开始。
    • deleted_files_: 记录每个层级执行 Compaction 操作之后删除掉的文件。
    • new_files_:记录每个层级执行 Compaction 操作之后新增的文件。新增文件记录为一个个FileMetaData 结构体。


EncodeTo

EncodeTo 会将 VersionEdit 各个成员变量的信息编码为一个字符串,编码时会先给每个成员变量定义一个 Tag,Tag的枚举值如下所示:

// https://github.com/google/leveldb/blob/master/db/version_edit.cc

enum Tag 
    kComparator = 1,		//比较器
    kLogNumber = 2,		//日志文件序列号
    kNextFileNumber = 3,	//下一个文件序列号
    kLastSequence = 4,	//下一个写入序列号
    kCompactPointer = 5,	//CompactPointer类型
    kDeletedFile = 6,		//删除的文件
    kNewFile = 7,			//增加的文件
    // 8 曾经用于大Value的引用,现以弃用
    kPrevLogNumber = 9	//前一个日志文件序列号
;

接下来看看 EncodeTo 的实现逻辑:

// https://github.com/google/leveldb/blob/master/db/version_edit.cc

void VersionEdit::EncodeTo(std::string* dst) const 
    //如果为true,则先将kComparator Tag编码,然后将比较器名称编码写入。
    if (has_comparator_) 
        PutVarint32(dst, kComparator);
        PutLengthPrefixedSlice(dst, comparator_);
    
    //与上面类似,就不再重复
    if (has_log_number_) 
        PutVarint32(dst, kLogNumber);
        PutVarint64(dst, log_number_);
    
    if (has_prev_log_number_) 
        PutVarint32(dst, kPrevLogNumber);
        PutVarint64(dst, prev_log_number_);
    
    if (has_next_file_number_) 
        PutVarint32(dst, kNextFileNumber);
        PutVarint64(dst, next_file_number_);
    
    if (has_last_sequence_) 
        PutVarint32(dst, kLastSequence);
        PutVarint64(dst, last_sequence_);
    

    //依次将compact_pointers_中的level和Key编码
    for (size_t i = 0; i < compact_pointers_.size(); i++) 
        PutVarint32(dst, kCompactPointer);
        PutVarint32(dst, compact_pointers_[i].first);  // level
        PutLengthPrefixedSlice(dst, compact_pointers_[i].second.Encode());
    

    //依次将deleted_file_中的level和file number编码
    for (const auto& deleted_file_kvp : deleted_files_) 
        PutVarint32(dst, kDeletedFile);
        PutVarint32(dst, deleted_file_kvp.first);   // level
        PutVarint64(dst, deleted_file_kvp.second);  // file number
    

    //依次将new_files_中的level和FileMetaData进行编码
    for (size_t i = 0; i < new_files_.size(); i++) 
        const FileMetaData& f = new_files_[i].second;
        PutVarint32(dst, kNewFile);
        PutVarint32(dst, new_files_[i].first);  // level

        //编码FileMetaData
        PutVarint64(dst, f.number);
        PutVarint64(dst, f.file_size);
        PutLengthPrefixedSlice(dst, f.smallest.Encode());
        PutLengthPrefixedSlice(dst, f.largest.Encode());
    


EncodeTo 函数会依次以 Tag 开头,将比较器名称、日志序列号、上一个日志序列号、下一个文件序列号、最后一个序列号、CompactPointers、每个层级删除的文件以及增加的文件信息保存到一个字符串中。


DecodeFrom

解码逻辑与上面的类似,根据不同的 tag 解析对应的成员变量,代码如下:

// https://github.com/google/leveldb/blob/master/db/version_edit.cc

Status VersionEdit::DecodeFrom(const Slice& src) 
    Clear();
    Slice input = src;
    const char* msg = nullptr;
    uint32_t tag;

    // Temporary storage for parsing
    int level;
    uint64_t number;
    FileMetaData f;
    Slice str;
    InternalKey key;

    while (msg == nullptr && GetVarint32(&input, &tag)) 
        //根据不同的tag执行对应的解码逻辑
        switch (tag) 
            case kComparator:
                if (GetLengthPrefixedSlice(&input, &str)) 
                    comparator_ = str.ToString();
                    has_comparator_ = true;
                 else 
                    msg = "comparator name";
                
                break;

            case kLogNumber:
                if (GetVarint64(&input, &log_number_)) 
                    has_log_number_ = true;
                 else 
                    msg = "log number";
                
                break;

            case kPrevLogNumber:
                if (GetVarint64(&input, &prev_log_number_)) 
                    has_prev_log_number_ = true;
                 else 
                    msg = "previous log number";
                
                break;

            case kNextFileNumber:
                if (GetVarint64(&input, &next_file_number_)) 
                    has_next_file_number_ = true;
                 else 
                    msg = "next file number";
                
                break;

            case kLastSequence:
                if (GetVarint64(&input, &last_sequence_)) 
                    has_last_sequence_ = true;
                 else 
                    msg = "last sequence number";
                
                break;

            case kCompactPointer:
                if (GetLevel(&input, &level) && GetInternalKey(&input, &key)) 
                    compact_pointers_.push_back(std::make_pair(level, key));
                 else 
                    msg = "compaction pointer";
                
                break;

            case kDeletedFile:
                if (GetLevel(&input, &level) && GetVarint64(&input, &number)) 
                    deleted_files_.insert(std::make_pair(level, number));
                 else 
                    msg = "deleted file";
                
                break;

            case kNewFile:
                if (GetLevel(&input, &level) && GetVarint64(&input, &f.number) &&
                    GetVarint64(&input, &f.file_size) &&
                    GetInternalKey(&input, &f.smallest) &&
                    GetInternalKey(&input, &f.largest)) 
                    new_files_.push_back(std::make_pair(level, f));
                 else 
                    msg = "new-file entry";
                
                break;

            default:
                msg = "unknown tag";
                break;
        
    

    if (msg == nullptr && !input.empty()) 
        msg = "invalid tag";
    

    Status result;
    if (msg != nullptr) 
        result = Status::Corruption("VersionEdit", msg);
    
    return result;


VersionSet

LevelDB 为了支持 MVCC,引入了 Version 和 VersionEdit 的概念,那么如何来有效的管理这些 Version 呢?于是引出了 VersionSet,VersionSet 是一个双向链表,而且整个 DB 只会有一个 VersionSet。

结构

class VersionSet 
    public:
    VersionSet(const std::string& dbname, const Options* options,
               TableCache* table_cache, const InternalKeyComparator*);
    VersionSet(const VersionSet&) = delete;
    VersionSet& operator=(cons

以上是关于LevelDB 源码剖析版本管理:ManifestVersionVersionEditVersionSet的主要内容,如果未能解决你的问题,请参考以下文章

LevelDB 源码剖析公共基础:内存管理数值编码Env家族文件操作

LevelDB 源码剖析公共基础:内存管理数值编码Env家族文件操作

LevelDB 源码剖析公共基础:内存管理数值编码Env家族文件操作

LevelDB 源码剖析整体架构与基本组件:ComparatorSliceStatusIteratorOption

LevelDB 源码剖析整体架构与基本组件:ComparatorSliceStatusIteratorOption

LevelDB 源码剖析整体架构与基本组件:ComparatorSliceStatusIteratorOption