基于跳表实现的轻量级KV存储引擎 项目总结

Posted Ray Song

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了基于跳表实现的轻量级KV存储引擎 项目总结相关的知识,希望对你有一定的参考价值。

参考:https://github.com/youngyangyang04/Skiplist-CPP

项目介绍

KV存储引擎
众所周知,非关系型数据库redis,以及levedb,rockdb其核心存储引擎的数据结构就是跳表。

本项目就是基于跳表实现的轻量级键值型存储引擎,使用C++实现。插入数据、删除数据、查询数据、数据展示、数据落盘、文件加载数据,以及数据库大小显示。

在随机写读情况下,该项目每秒可处理啊请求数(QPS): 24.39w,每秒可处理读请求数(QPS): 18.41w

项目存储文件

  • main.cpp 包含skiplist.h使用跳表进行数据操作
  • skiplist.h 跳表核心实现
  • README.md 中文介绍
  • README-en.md 英文介绍
  • bin 生成可执行文件目录
  • makefile 编译脚本
  • store 数据落盘的文件存放在这个文件夹
  • stress_test_start.sh 压力测试脚本
  • LICENSE 使用协议

提供接口

  • insertElement(插入数据)
  • deleteElement(删除数据)
  • searchElement(查询数据)
  • displayList(展示已存数据)
  • dumpFile(数据落盘)
  • loadFile(加载数据)
  • size(返回数据规模)

存储引擎数据表现

  1. 插入操作

跳表树高:18

采用随机插入数据测试:

插入数据规模(万条)耗时(秒)
100.316763
501.86778
1004.10648

每秒可处理写请求数(QPS): 24.39w

  1. 取数据操作
取数据规模(万条)耗时(秒)
100.47148
502.56373
1005.43204

每秒可处理读请求数(QPS): 18.41w

  1. 项目运行方式
make            // complie demo main.cpp
./bin/main      // run 

如果想自己写程序使用这个kv存储引擎,只需要在你的CPP文件中include skiplist.h 就可以了。

可以运行如下脚本测试kv存储引擎的性能(当然你可以根据自己的需求进行修改)

sh stress_test_start.sh 

待优化

  • delete的时候没有释放内存
  • 压力测试并不是全自动的
  • 跳表的key用int型,如果使用其他类型需要自定义比较函数,当然把这块抽象出来更好
  • 如果再加上一致性协议,例如raft就构成了分布式存储,再启动一个http server就可以对外提供分布式存储服务了

部分代码解析

项目涉及知识

  • 函数模板、类模板
  • 跳表结构的增、删、查 以及 mutex的使用
  • 压力测试、线程(pthread 和 chrono)

代码阅读
从main函数开始阅读,然后跳到头文件,最后压力测试
SkipList<int, std::string> skipList(6); 定义了一个6层高度的跳表

头文件全局变量

node节点类

跳表类

该项目的跳表对Redis的跳表结构进行了一定的简化
Redis跳表
跳表数据结构:
代码实现:

本项目的跳表

  1. 节点
  2. 跳表

插入节点

template<typename K, typename V>
int SkipList<K, V>::insert_element(const K key, const V value) 
    
    mtx.lock();
    Node<K, V> *current = this->_header;

    // update 是个节点数组,用于后续插入位置的前向链接 和 后向链接
    Node<K, V> *update[_max_level+1];
    memset(update, 0, sizeof(Node<K, V>*)*(_max_level+1));

    // 寻找插入位置,并保存插入位置前面的节点
    // for进行高度遍历,while 行方向遍历
    for(int i = _skip_list_level; i >= 0; i--) 
        while(current->forward[i] != NULL && current->forward[i]->get_key() < key) 
            current = current->forward[i]; 
        
        update[i] = current;
    

    //  定位到 == key 或者 > key 的 节点
    current = current->forward[0];

    // 如果current节点存在且和key相等,提示,并解锁
    if (current != NULL && current->get_key() == key) 
        std::cout << "key: " << key << ", exists" << std::endl;
        mtx.unlock();
        return 1;
    

    //否则,需要在update[0]和current node节点之间插入 [节点]
    if (current == NULL || current->get_key() != key ) 
        
        // 随机生成节点的高度
        int random_level = get_random_level();

        // 如果随机高度比当前的跳表的高度大,update数组在多余高出来的部分保存头节点指针
        if (random_level > _skip_list_level) 
            for (int i = _skip_list_level+1; i < random_level+1; i++) 
                update[i] = _header;
            
            _skip_list_level = random_level;
        

        // 用随机生成的高度 创建 insert node
        Node<K, V>* inserted_node = create_node(key, value, random_level);
        
        // node节点 forword 指向 所在位置后面的节点指针
        // node节点 所在位置前面的节点 指向node
        for (int i = 0; i <= random_level; i++) 
            inserted_node->forward[i] = update[i]->forward[i];
            update[i]->forward[i] = inserted_node;
        
        std::cout << "Successfully inserted key:" << key << ", value:" << value << std::endl;
        _element_count ++;
    
    mtx.unlock();
    return 0;

删除节点

template<typename K, typename V> 
void SkipList<K, V>::delete_element(K key) 

    mtx.lock();
    Node<K, V> *current = this->_header; 
    // update数组 用于保存 删除节点前面的节点 ,用于删除节点后的指向链接,步骤和插入节点类似
    Node<K, V> *update[_max_level+1];
    memset(update, 0, sizeof(Node<K, V>*)*(_max_level+1));

    // 寻找要删除的节点,并将该节点前面的节点保存到update数组中
    for (int i = _skip_list_level; i >= 0; i--) 
        while (current->forward[i] !=NULL && current->forward[i]->get_key() < key) 
            current = current->forward[i];
        
        update[i] = current;
    

    //  ==key  或者 > key 的元素节点
    current = current->forward[0];

    // 如果相等进行下面的操作,不相等直接解锁 return
    if (current != NULL && current->get_key() == key) 
       
        // 从低级别开始删除每层的节点,
        for (int i = 0; i <= _skip_list_level; i++) 

            // 如果在第i曾,其下一个节点不是所要删除的节点直接break
            if (update[i]->forward[i] != current) 
                break;
            // 如果是要删除的节点 重新设置指针的指向,将其指向下一个节点的位置
            update[i]->forward[i] = current->forward[i];
        

        // 减少没有元素的层,更新跳表的level
        while (_skip_list_level > 0 && _header->forward[_skip_list_level] == 0) 
            _skip_list_level --; 
        

        std::cout << "Successfully deleted key "<< key << std::endl;
        _element_count --;
    
    mtx.unlock();
    return;

搜索节点

// Search for element in skip list 
/*
                           +------------+
                           |  select 60 |
                           +------------+
level 4     +-->1+                                                      100
                 |
                 |
level 3         1+-------->10+------------------>50+           70       100
                                                   |
                                                   |
level 2         1          10         30         50|           70       100
                                                   |
                                                   |
level 1         1    4     10         30         50|           70       100
                                                   |
                                                   |
level 0         1    4   9 10         30   40    50+-->60      70       100
*/
template<typename K, typename V> 
bool SkipList<K, V>::search_element(K key) 

    std::cout << "search_element-----------------" << std::endl;
    Node<K, V> *current = _header;

    // for循环高度遍历,while循环水平遍历每一层。
    for (int i = _skip_list_level; i >= 0; i--) 
        while (current->forward[i] && current->forward[i]->get_key() < key) 
            current = current->forward[i];
        
    

    // 找到 > 或者 =  key的 节点
    current = current->forward[0];

    // 如果存在且相等,成功找到
    if (current and current->get_key() == key) 
        std::cout << "Found key: " << key << ", value: " << current->get_value() << std::endl;
        return true;
    
    // 否则 没有找到 进行提示
    std::cout << "Not Found Key:" << key << std::endl;
    return false;

进一步优化

  • 原始代码使用key进行排序,不具备通用型,增加score变量,通过score权重进行排序;
  • 增加反向back指针
  • 删除节点的时候没有释放内存,增加智能指针

github网址:

性能测试

  1. 参数设置
    #define NUM_THREADS 1
    #define TEST_COUNT 100000
    SkipList<int, std::string> skipList(18);

  2. 测试的代码


pthread_t threads[NUM_THREADS];
pthread_create(&threads[i], NULL, insertElement, (void *)i);
 
void *insertElement(void* threadid) 
    long tid; 
    tid = (long)threadid;
    std::cout << tid << std::endl;  
    int tmp = TEST_COUNT/NUM_THREADS; 
	for (int i=tid*tmp, count=0; count<tmp; i++) 
        count++;
		skipList.insert_element(rand() % TEST_COUNT, "a"); 
	
    pthread_exit(NULL);


  1. 计时
    auto finish = std::chrono::high_resolution_clock::now();
    std::chrono::duration elapsed = finish - start;
    std::cout << “insert elapsed:” << elapsed.count() << std::endl;

虚拟机测试:

项目官方测试:


知识补充:
4. c语言中__attribute__的意义 取消内存对齐
5. C++多线程之使用Mutex和Critical_Section
mutex和临界区的区别

异步日志系统

异步日志系统主要涉及了两个模块,一个是日志模块,一个是阻塞队列模块,其中加入阻塞队列模块主要是解决异步写入日志做准备.

  • 自定义阻塞队列
  • 单例模式创建日志
  • 异步日志

0. 生产者消费者模型

1. 阻塞队列

/*************************************************************
*循环数组实现的阻塞队列,m_back = (m_back + 1) % m_max_size;  
*线程安全,每个操作前都要先加互斥锁,操作完后,再解锁
**************************************************************/
private:
    locker m_mutex;   // 循环数组加锁
    cond m_cond;      // 条件变量通知
    
    T *m_array;     // 循环数组
    int m_size;     // 已经有的size
    int m_max_size; // 最大的size
    int m_front;    // 头部元素
    int m_back;     // 尾部元素
  • 构造函数
    block_queue(int max_size = 1000)
    
        if (max_size <= 0)
        
            exit(-1);
        

        m_max_size = max_size;
        m_array = new T[max_size];
        m_size = 0;
        m_front = -1;
        m_back = -1;
    
  • 析构函数
    ~block_queue()
    
        m_mutex.lock();
        if (m_array != NULL)
            delete [] m_array;

        m_mutex.unlock();
    
  • 队列中添加元素push():
    - 0. 先加锁
    - 1. 判断阻塞队列是否满了,满了的话就条件变量广播让pop来取,解锁,同时返回false
    - 2. 如果阻塞队列没有满的话,就队列尾部添加string, size++,条件变量广播让pop来取,解锁
bool push(const T &item)
    

        m_mutex.lock();
        if (m_size >= m_max_size)
        

            m_cond.broadcast();
            m_mutex.unlock();
            return false;
        

        m_back = (m_back + 1) % m_max_size;
        m_array[m_back] = item;

        m_size++;

        m_cond.broadcast();
        m_mutex.unlock();
        return true;
    
  • pop消费操作
    - 1. 先对阻塞队列加锁
    - 2. 如果阻塞队列为空,条件变量阻塞在
    -
//pop时,如果当前队列没有元素,将会等待条件变量
bool pop(T &item)
    
        m_mutex.lock();
        //多个消费者的时候,这里要是用while而不是if
        while (m_size <= 0)
        
            //当重新抢到互斥锁
            if (!m_cond.wait(m_mutex.get()))
            
                m_mutex.unlock();
                return false;
            
        
        
        m_front = (m_front + 1) % m_max_size;
        item = m_array[m_front];
        m_size--;
        m_mutex.unlock();
        return true;
    
  • 清空
    void clear()
    
        m_mutex.lock();
        m_size = 0;
        m_front = -1;
        m_back = -1;
        m_mutex.unlock();
    
  • 判断队列满还是空 返回队首元素 队尾元素
    bool full() 
    bool empty() 
    bool front(T &value) 
    bool back(T &value) 
  • 在size() 和max_size()操作的时候需要对size和max_size变量加锁

2. Log类头文件

单例模式

  • 成员变量
private:
    char dir_name[128]; //路径名
    char log_name[128]; //log文件名
    int m_log_buf_size; //日志缓冲区大小
    FILE *m_fp;         //打开log的文件指针
    char *m_buf;
    block_queue<string> *m_log_queue; //阻塞队列
    locker m_mutex;
  • 成员函数
private:
    Log();
    virtual ~Log();
  • public成员函数
    //C++11以后,使用局部变量懒汉不用加锁
    static Log *get_instance()
    
        static Log instance;
        return &instance;
    

    static void *flush_log_thread(void *args)
    
        string single_log;
        //从阻塞队列中取出一个日志string,写入文件
        while (m_log_queue->pop(single_log))
        
            m_mutex.lock();
            fputs(single_log.c_str(), m_fp);
            m_mutex.unlock();
         
    
    //可选择的参数有日志文件、日志缓冲区大小、最长日志条队列
    bool init(const char *file_name, int log_buf_size = 8192, int max_queue_size = 1000);

    void write_log(int level, const char *format, ...);

    void flush(void);

3. Log实现文件

  1. init函数: 初始化
    m_log_queue = new block_queue<string>(max_queue_size);
    pthread_t tid;
    //flush_log_thread为回调函数,这里表示创建线程异步写日志
    pthread_create(&tid, NULL, flush_log_thread, NULL);
    
    m_fp = fopen(log_full_name, "a");
  1. write_log函数 : 如果阻塞队列满了,就直接将内容追加到日志文件中
    if (!m_log_queue->full())
    
        m_log_queue->push(log_str);
    
    else
    
        m_mutex.lock();
        fputs(log_str.c_str(), m_fp);
        m_mutex.unlock();
    
  1. flush强制刷新缓冲区
void Log::flush(void)

    m_mutex.lock();
    //强制刷新写入流缓冲区
    fflush(m_fp);
    m_mutex.unlock();

编写你的第一个 Java 版 Raft 分布式 KV 存储

前言

本文旨在讲述如何使用 Java 语言实现基于 Raft 算法的,分布式的,KV 结构的存储项目。该项目的背景是为了深入理解 Raft 算法,从而深刻理解分布式环境下数据强一致性该如何实现;该项目的目标是:在复杂的分布式环境中,多个存储节点能够保证数据强一致性。

项目地址:https://github.com/stateIs0/lu-raft-kv

欢迎 star :)

什么是 Java 版 Raft 分布式 KV 存储

Raft 算法大部分人都已经了解,也有很多实现,从 GitHub 上来看,似乎 Golang 语言实现的较多,比较有名的,例如 etcd。而 Java 版本的,在生产环境大规模使用的实现则较少;

同时,他们的设计目标大部分都是命名服务,即服务注册发现,也就是说,他们通常都是基于 AP 实现,就像 DNS,DNS 是一个命名服务,同时也不是一个强一致性的服务。

比较不同的是 Zookeeper,ZK 常被大家用来做命名服务,但他更多的是一个分布式服务协调者。

而上面的这些都不是存储服务,虽然也都可以做一些存储工作。甚至像 kafka,可以利用 ZK 实现分布式存储。

回到我们这边。

此次我们语言部分使用 Java,RPC 网络通信框架使用的是蚂蚁金服 SOFA-Bolt,底层 KV 存储使用的是 RocksDB,其中核心的 Raft 则由我们自己实现(如果不自己实现,那这个项目没有意义)。 注意,该项目将舍弃一部分性能和可用性,以追求尽可能的强一致性。

为什么要费尽心力重复造轮子

小时候,我们阅读关于高可用的文章时,最后都会提到一个问题:服务挂了怎么办?

通常有 2 种回答:

  1. 如果是无状态服务,那么毫不影响使用。
  2. 如果是有状态服务,可以将状态保存到一个别的地方,例如 Redis。如果 Redis 挂了怎么办?那就放到 ZK。

很多中间件,都会使用 ZK 来保证状态一致,例如 codis,kafka。因为使用 ZK 能够帮我们节省大量的时间。但有的时候,中间件的用户觉得引入第三方中间件很麻烦,那么中间件开发者会尝试自己实现一致性,例如 Redis Cluster, TiDB 等。

而通常自己实现,都会使用 Raft 算法,那有人问,为什么不使用"更牛逼的" paxos 算法?对不起,这个有点难,至少目前开源的、生产环境大规模使用的 paxos 算法实现还没有出现,只听过 Google 或者 alibaba 在其内部实现过,具体是什么样子的,这里我们就不讨论了。

回到我们的话题,为什么重复造轮子?从 3 个方面来回答:

  1. 有的时候 ZK 和 etcd 并不能解决我们的问题,或者像上面说的,引入其他的中间件部署起来太麻烦也太重。
  2. 完全处于好奇,好奇为什么 Raft 可以保证一致性(这通常可以通过汗牛充栋的文章来得到解答)?但是到底该怎么实现?
  3. 分布式开发的要求,作为开发分布式系统的程序员,如果能够更深刻的理解分布式系统的核心算法,那么对如何合理设计一个分布式系统将大有益处。

好,有了以上 3 个原因,我们就有足够的动力来造轮子了,接下来就是如何造的问题了。

编写前的 Raft 理论基础

任何实践都是理论先行。如果你对 Raft 理论已经非常熟悉,那么可以跳过此节,直接看实现的步骤。

Raft 为了算法的可理解性,将算法分成了 4 个部分。

  1. leader 选举
  2. 日志复制
  3. 成员变更
  4. 日志压缩

同 zk 一样,leader 都是必须的,所有的写操作都是由 leader 发起,从而保证数据流向足够简单。而 leader 的选举则通过比较每个节点的逻辑时间(term)大小,以及日志下标(index)的大小。

刚刚说 leader 选举涉及日志下标,那么就要讲日志复制。日志复制可以说是 Raft 核心的核心,说简单点,Raft 就是为了保证多节点之间日志的一致。当日志一致,我们可以认为整个系统的状态是一致的。这个日志你可以理解成 mysql 的 binlog。

Raft 通过各种补丁,保证了日志复制的正确性。

Raft leader 节点会将客户端的请求都封装成日志,发送到各个 follower 中,如果集群中超过一半的 follower 回复成功,那么这个日志就可以被提交(commit),这个 commit 可以理解为 ACID 的 D ,即持久化。当日志被持久化到磁盘,后面的事情就好办了。

而第三点则是为了节点的扩展性。第四点是为了性能。相比较 leader 选举和 日志复制,不是那么的重要,可以说,如果没有成员变更和日志压缩,也可以搞出一个可用的 Raft 分布式系统,但没有 leader 选举和日志复制,是万万不能的。

因此,本文和本项目将重点放在 leader 选举和日志复制。

以上,就简单说明了 Raft 的算法,关于 Raft 算法更多的文章,请参考本人博客中的其他文章(包含官方各个版本论文和 PPT & 动画 & 其他博客文章),博客地址:thinkinjava.cn

实现的步骤

实现目标:基于 Raft 论文实现 Raft 核心功能,即 Leader 选举 & 日志复制。

Raft 核心组件包括:一致性模块,RPC 通信,日志模块,状态机。

技术选型:

  • 一致性模块,是 Raft 算法的核心实现,通过一致性模块,保证 Raft 集群节点数据的一致性。这里我们需要自己根据论文描述去实现
  • RPC 通信,可以使用 HTTP 短连接,也可以直接使用 TCP 长连接,考虑到集群各个节点频繁通信,同时节点通常都在一个局域网内,因此我们选用 TCP 长连接。而 Java 社区长连接框架首选 Netty,这里我们选用蚂蚁金服网络通信框架 SOFA-Bolt(基于 Netty),便于快速开发。
  • 日志模块,Raft 算法中,日志实现是基础,考虑到时间因素,我们选用 RocksDB 作为日志存储。
  • 状态机,可以是任何实现,其实质就是将日志中的内容进行处理。可以理解为 Mysql binlog 中的具体数据。由于我们是要实现一个 KV 存储,那么可以直接使用日志模块的 RocksDB 组件。

以上。我们可以看到,得益于开源世界,我们开发一个 Raft 存储,只需要编写一个“一致性模块”就行了,其他模块都有现成的轮子可以使用,真是美滋滋。

接口设计:

上面我们说了 Raft 的几个核心功能,事实上,就可以理解为接口。所以我们定义以下几个接口:

  1. Consensus, 一致性模块接口
  2. LogModule,日志模块接口
  3. StateMachine, 状态机接口
  4. RpcServer & RpcClient, RPC 接口
  5. Node,同时,为了聚合上面的几个接口,我们需要定义一个 Node 接口,即节点,Raft 抽象的机器节点。
  6. LifeCycle, 最后,我们需要管理以上组件的生命周期,因此需要一个 LifeCycle 接口。

接下来,我们需要详细定义核心接口 Consensus。我们根据论文定义了 2 个核心接口:

   /**
     * 请求投票 RPC
     *
     * 接收者实现:
     *
     *      如果term < currentTerm返回 false (5.2 节)
     *      如果 votedFor 为空或者就是 candidateId,并且候选人的日志至少和自己一样新,那么就投票给他(5.2 节,5.4 节)
     */
    RvoteResult requestVote(RvoteParam param);

    /**
     * 附加日志(多个日志,为了提高效率) RPC
     *
     * 接收者实现:
     *
     *    如果 term < currentTerm 就返回 false (5.1 节)
     *    如果日志在 prevLogIndex 位置处的日志条目的任期号和 prevLogTerm 不匹配,则返回 false (5.3 节)
     *    如果已经存在的日志条目和新的产生冲突(索引值相同但是任期号不同),删除这一条和之后所有的 (5.3 节)
     *    附加任何在已有的日志中不存在的条目
     *    如果 leaderCommit > commitIndex,令 commitIndex 等于 leaderCommit 和 新日志条目索引值中较小的一个
     */
    AentryResult appendEntries(AentryParam param);

请求投票 & 附加日志。也就是我们的 Raft 节点的核心功能,leader 选举和 日志复制。实现这两个接口是 Raft 的关键所在。

然后再看 LogModule 接口,这个自由发挥,考虑日志的特点,我定义了以下几个接口:

void write(LogEntry logEntry);

LogEntry read(Long index);

void removeOnStartIndex(Long startIndex);

LogEntry getLast();

Long getLastIndex();

分别是写,读,删,最后是两个关于 Last 的接口,在 Raft 中,Last 是一个非常关键的东西,因此我这里单独定义了 2个方法,虽然看起来不是很好看 :)

状态机接口,在 Raft 论文中,将数据保存到状态机,作者称之为应用,那么我们也这么命名,说白了,就是将已成功提交的日志应用到状态机中:

    /**
     * 将数据应用到状态机.
     *
     * 原则上,只需这一个方法(apply). 其他的方法是为了更方便的使用状态机.
     * @param logEntry 日志中的数据.
     */
    void apply(LogEntry logEntry);

    LogEntry get(String key);

    String getString(String key);

    void setString(String key, String value);

    void delString(String... key);
    

第一个 apply 方法,就是 Raft 论文常常提及的方法,即将日志应用到状态机中,后面的几个方法,都是我为了方便获取数据设计的,可以不用在意,甚至于,这几个方法不存在也不影响 Raft 的实现,但影响 KV 存储的实现,试想:一个系统只有保存功能,没有获取功能,要你何用?。

RpcClient 和 RPCServer 没什么好讲的,其实就是 send 和 receive。

然后是 Node 接口,Node 接口也是 Raft 没有定义的,我们依靠自己的理解定义了几个接口:


    /**
     * 设置配置文件.
     *
     * @param config
     */
    void setConfig(NodeConfig config);

    /**
     * 处理请求投票 RPC.
     *
     * @param param
     * @return
     */
    RvoteResult handlerRequestVote(RvoteParam param);

    /**
     * 处理附加日志请求.
     *
     * @param param
     * @return
     */
    AentryResult handlerAppendEntries(AentryParam param);

    /**
     * 处理客户端请求.
     *
     * @param request
     * @return
     */
    ClientKVAck handlerClientRequest(ClientKVReq request);

    /**
     * 转发给 leader 节点.
     * @param request
     * @return
     */
    ClientKVAck redirect(ClientKVReq request);

首先,一个 Node 肯定需要配置文件,所以有一个 setConfig 接口,
然后,肯定需要处理“请求投票”和“附加日志”,同时,还需要接收用户,也就是客户端的请求(不然数据从哪来?),所以有 handlerClientRequest 接口,最后,考虑到灵活性,我们让每个节点都可以接收客户端的请求,但 follower 节点并不能处理请求,所以需要重定向到 leader 节点,因此,我们需要一个重定向接口。

最后是生命周期接口,这里我们简单定义了 2 个,有需要的话,再另外加上组合接口:

    void init() throws Throwable;

    void destroy() throws Throwable;

好,基本的接口定义完了,后面就是实现了。实现才是关键。

Leader 选举的实现

选举,其实就是一个定时器,根据 Raft 论文描述,如果超时了就需要重新选举,我们使用 Java 的定时任务线程池进行实现,实现之前,需要确定几个点:

  1. 选举者必须不是 leader。
  2. 必须超时了才能选举,具体超时时间根据你的设计而定,注意,每个节点的超时时间不能相同,应当使用随机算法错开(Raft 关键实现),避免无谓的死锁。
  3. 选举者优先选举自己,将自己变成 candidate。
  4. 选举的第一步就是把自己的 term 加一。
  5. 然后像其他节点发送请求投票 RPC,请求参数参照论文,包括自身的 term,自身的 lastIndex,以及日志的 lastTerm。同时,请求投票 RPC 应该是并行请求的。
  6. 等待投票结果应该有超时控制,如果超时了,就不等待了。
  7. 最后,如果有超过半数的响应为 success,那么就需要立即变成 leader ,并发送心跳阻止其他选举。
  8. 如果失败了,就需要重新选举。注意,这个期间,如果有其他节点发送心跳,也需要立刻变成 follower,否则,将死循环。

具体代码,可参见 https://github.com/stateIs0/lu-raft-kv/blob/master/lu-raft-kv/src/main/java/cn/think/in/java/impl/DefaultNode.java#L546

上面说的,其实是 Leader 选举中,请求者的实现,那么接收者如何实现呢?接收者在收到“请求投票” RPC 后,需要做以下事情:

  1. 注意,选举操作应该是串行的,因为涉及到状态修改,并发操作将导致数据错乱。也就是说,如果抢锁失败,应当立即返回错误。
  2. 首先判断对方的 term 是否小于自己,如果小于自己,直接返回失败。
  3. 如果当前节点没有投票给任何人,或者投的正好是对方,那么就可以比较日志的大小,反之,返回失败。
  4. 如果对方日志没有自己大,返回失败。反之,投票给对方,并变成 follower。变成 follower 的同时,异步的选举任务在最后从 condidate 变成 leader 之前,会判断是否是 follower,如果是 follower,就放弃成为 leader。这是一个兜底的措施。

具体代码参见 https://github.com/stateIs0/lu-raft-kv/blob/master/lu-raft-kv/src/main/java/cn/think/in/java/impl/DefaultConsensus.java#L51

到这里,基本就能够实现 Raft Leader 选举的逻辑。

注意,我们上面涉及到的 LastIndex 等参数,还没有实现,但不影响我们编写伪代码,毕竟日志复制比 leader 选举要复杂的多,我们的原则是从易到难。:)

日志复制的实现

日志复制是 Raft 实现一致性的核心。

日志复制有 2 种形式,1种是心跳,一种是真正的日志,心跳的日志内容是空的,其他部分基本相同,也就是说,接收方在收到日志时,如果发现是空的,那么他就是心跳。

心跳

既然是心跳,肯定就是个定时任务,和选举一样。在我们的实现中,我们每 5 秒发送一次心跳。注意点:

  1. 首先自己必须是 leader 才能发送心跳。
  2. 必须满足 5 秒的时间间隔。
  3. 并发的向其他 follower 节点发送心跳。
  4. 心跳参数包括自身的 ID,自身的 term,以便让对方检查 term,防止网络分区导致的脑裂。
  5. 如果任意 follower 的返回值的 term 大于自身,说明自己分区了,那么需要变成 follower,并更新自己的 term。然后重新发起选举。

具体代码查看:https://github.com/stateIs0/lu-raft-kv/blob/master/lu-raft-kv/src/main/java/cn/think/in/java/impl/DefaultNode.java#L695

然后是心跳接收者的实现,这个就比较简单了,接收者需要做几件事情:

  1. 无论成功失败首先设置返回值,也就是将自己的 term 返回给 leader。
  2. 判断对方的 term 是否大于自身,如果大于自身,变成 follower,防止异步的选举任务误操作。同时更新选举时间和心跳时间。
  3. 如果对方 term 小于自身,返回失败。不更新选举时间和心跳时间。以便触发选举。

具体代码参见:https://github.com/stateIs0/lu-raft-kv/blob/master/lu-raft-kv/src/main/java/cn/think/in/java/impl/DefaultConsensus.java#L109

说完了心跳,再说说真正的日志附加。

简单来说,当用户向 Leader 发送一个 KV 数据,那么 Leader 需要将 KV数据封装成日志,并行的发送到其他的 follower 节点,只要在指定的超时时间内,有过半几点返回成功,那么久提交(持久化)这条日志,返回客户端成功,否者返回失败。

因此,Leader 节点会有一个 ClientKVAck handlerClientRequest(ClientKVReq request) 接口,用于接收用户的 KV 数据,同时,会并行向其他节点复制数据,具体步骤如下:

  1. 每个节点都可能会接收到客户端的请求,但只有 leader 能处理,所以如果自身不是 leader,则需要转发给 leader。
  2. 然后将用户的 KV 数据封装成日志结构,包括 term,index,command,预提交到本地。
  3. 并行的向其他节点发送数据,也就是日志复制。
  4. 如果在指定的时间内,过半节点返回成功,那么就提交这条日志。
  5. 最后,更新自己的 commitIndex,lastApplied 等信息。

注意,复制不仅仅是简单的将这条日志发送到其他节点,这可能比我们想象的复杂,为了保证复杂网络环境下的一致性,Raft 保存了每个节点的成功复制过的日志的 index,即 nextIndex ,因此,如果对方之前一段时间宕机了,那么,从宕机那一刻开始,到当前这段时间的所有日志,都要发送给对方。

甚至于,如果对方觉得你发送的日志还是太大,那么就要递减的减小 nextIndex,复制更多的日志给对方。注意:这里是 Raft 实现分布式一致性的关键所在

具体代码参见:https://github.com/stateIs0/lu-raft-kv/blob/master/lu-raft-kv/src/main/java/cn/think/in/java/impl/DefaultNode.java#L244

再来看看日志接收者的实现步骤:

  1. 和心跳一样,要先检查对方 term,如果 term 都不对,那么就没什么好说的了。
  2. 如果日志不匹配,那么返回 leader,告诉他,减小 nextIndex 重试。
  3. 如果本地存在的日志和 leader 的日志冲突了,以 leader 的为准,删除自身的。
  4. 最后,将日志应用到状态机,更新本地的 commitIndex,返回 leader 成功。

具体代码参见:https://github.com/stateIs0/lu-raft-kv/blob/master/lu-raft-kv/src/main/java/cn/think/in/java/impl/DefaultConsensus.java#L109

到这里,日志复制的部分就讲完了。

注意,实现日志复制的前提是,必须有一个正确的日志存储系统,即我们的 RocksDB,我们在 RocksDB 的基础上,使用一种机制,维护了 每个节点 的LastIndex,无论何时何地,都能够得到正确的 LastIndex,这是实现日志复制不可获取的一部分。

验证“Leader 选举”和“日志复制”

写完了程序,如何验证是否正确呢?

当然是写验证程序。

我们首先验证 “Leader 选举”。其实这个比较好测试。
  1. 在 idea 中配置 5 个 application 启动项,配置 main 类为 RaftNodeBootStrap 类, 加入 -DserverPort=8775 -DserverPort=8776 -DserverPort=8777 -DserverPort=8778 -DserverPort=8779
    系统配置, 表示分布式环境下的 5 个机器节点.
  2. 依次启动 5 个 RaftNodeBootStrap 节点, 端口分别是 8775,8776, 8777, 8778, 8779.
  3. 观察控制台, 约 6 秒后, 会发生选举事件,此时,会产生一个 leader. 而 leader 会立刻发送心跳维持自己的地位.
  4. 如果leader 的端口是 8775, 使用 idea 关闭 8775 端口,模拟节点挂掉, 大约 15 秒后, 会重新开始选举, 并且会在剩余的 4 个节点中,产生一个新的 leader. 并开始发送心跳日志。

然后验证 日志复制,分为 2 种情况:

正常状态下
  1. 在 idea 中配置 5 个 application 启动项,配置 main 类为 RaftNodeBootStrap 类, 加入 -DserverPort=8775 -DserverPort=8776 -DserverPort=8777 -DserverPort=8778 -DserverPort=8779
  2. 依次启动 5 个 RaftNodeBootStrap 节点, 端口分别是 8775,8776, 8777, 8778, 8779.
  3. 使用客户端写入 kv 数据.
  4. 杀掉所有节点, 使用 junit test 读取每个 rocksDB 的值, 验证每个节点的数据是否一致.
非正常状态下
  1. 在 idea 中配置 5 个 application 启动项,配置 main 类为 RaftNodeBootStrap 类, 加入 -DserverPort=8775 -DserverPort=8776 -DserverPort=8777 -DserverPort=8778 -DserverPort=8779
  2. 依次启动 5 个 RaftNodeBootStrap 节点, 端口分别是 8775,8776, 8777, 8778, 8779.
  3. 使用客户端写入 kv 数据.
  4. 杀掉 leader (假设是 8775).
  5. 再次写入数据.
  6. 重启 8775.
  7. 关闭所有节点, 读取 RocksDB 验证数据一致性.

Summary

本文并没有贴很多代码,如果要贴代码的话,阅读体验将不会很好,并且代码也不能说明什么,如果想看具体实现,可以到 github 上看看,顺便给个 star :)

该项目 Java 代码约 2500 行,核心代码估计也就 1000 多行。你甚至可以说,这是个玩具代码,但我相信毕玄大师所说,玩具代码经过优化后,也是可以变成可在商业系统中真正健壮运行的代码(http://hellojava.info/?p=508) :)

回到我们的初衷,我们并不奢望这段代码能够运行在生产环境中,就像我的另一个项目 Lu-RPC 一样。但,经历了一次编写可正确运行的玩具代码的经历,下次再次编写工程化的代码,应该会更加容易些。这点我深有体会。

可以稍微展开讲一下,在写完 Lu-RPC 项目后,我就接到了开发生产环境运行的限流熔断框架任务,此时,开发 Lu-RPC 的经历让我在开发该框架时,更加的从容和自如:)

再回到 Raft 上面来,虽然上面的测试用例跑过了,程序也经过了我反反复复的测试,但不代表这个程序就是 100% 正确的,特别是在复杂的分布式环境下。如果你对 Raft 有兴趣,欢迎一起交流沟通 :)

项目地址:https://github.com/stateIs0/lu-raft-kv

以上是关于基于跳表实现的轻量级KV存储引擎 项目总结的主要内容,如果未能解决你的问题,请参考以下文章

flinkRocksDB介绍以及Flink对RocksDB的支持

编写你的第一个 Java 版 Raft 分布式 KV 存储

Mustache 使用总结

周末实践第四弹 | 编写你的第一个 Java 版 Raft 分布式 KV 存储

LevelDB源码分析-MemTable

BAT工程师自研存储引擎,火爆Github!!大家速度顶起来