redis源码阅读-持久化之RDB

Posted 5ycode

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis源码阅读-持久化之RDB相关的知识,希望对你有一定的参考价值。

持久化介绍:

redis的持久化有两种方式:

  • rdb :可以在指定的时间间隔内生成数据集的时间点快照(point-in-time snapshot)
  • aof : 记录redis执行的所有写操作命令

根据这两种方式,redis可以开启三种模式的持久化

  • rdb
  • aof
  • rdb+aof

rdb

  • rdb 是一个非常紧凑的文件
  • rdb适合灾难恢复,主从复制
  • rdb可以最大化redis的性能,rdb操作是会从主进程fork一个子进程;

本章节主要讲解rdb,aof保留到下一章节讲解。

在redis的配置文件 redis.conf 中这么一段这个配置

save 900 1  # 表示900秒内有一个键改动,就会执行rdb
save 300 10 # 表示300秒内有10个键改动,就会执行rdb
save 60 10000 # 表示60秒内有1万个键改动,就会执行rdb

我先把rdb流程放这,咱们再继续看代码。

从流程上看rdb的发起主要有以下几个口子

  • bgsaveCommand bgsave命令调用
  • saveCommand save调用
  • syncCommand 主从同步,直接执行命令
  • serverCron 中定期检测
    • replicationCron 主从定时
    • UpdateSlavesWaitingBgsave 这块可以理解为新加了从节点,或者把从节点数据清空了,重新拉取

前两个都是为备份服务的,后面三个是为主从复制服务的。

从上面的图片可以看到,在进行主从同步的时候,有两种模式,一种是落盘后,主从同步,一种是不落盘直接网络传输。

rdb核心代码

落入磁盘的RDB

整个rdb磁盘持久化核心在rdbSaverdbSaveBackground这里。

我们看下rdbSaveBackground 这个方法

/**
 * 后台保存rdb
 * 调用 serverCron、bgsaveCommand、startBgsaveForReplication
 *
 * 时间主要耗费在了fork() 产生虚拟空间表的过程
 * @param filename
 * @param rsi
 * @return
 */
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) 
    pid_t childpid;
    long long start;

    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
    //开始执行 rdb 备份前的dirty 值,保存在dirty_before_bgsave中
    server.dirty_before_bgsave = server.dirty;
    server.lastbgsave_try = time(NULL);
    //创建一个pip管道,用于父子进程进行通信
    openChildInfoPipe();

    start = ustime();
    //fork 一个子线程 给childpid
    /**
     * fork调用的一个奇妙之处就是它仅仅被调用一次,却能够返回两次,它可能有三种不同的返回值
     *   - 在父进程中,fork返回新创建子进程的进程ID;
     *   - 在子进程中,fork返回0;
     *   - 如果出现错误,fork返回一个负值;
     *
     * 所以fork()成功,以后会执行两次
     *    == 0 的时候,是子进程执行
     *    == 1 的时候,是父进程执行
     *
     *  引用一位网友的话来解释fpid的值为什么在父子进程中不同。“其实就相当于链表,进程形成了链表,
     *    父进程的fpid(p 意味point)指向子进程的进程id, 因为子进程没有子进程,所以其fpid为0.
     *
     *  fork出错可能有两种原因:
     *    1)当前的进程数已经达到了系统规定的上限,这时errno的值被设置为EAGAIN。
     *    2)系统内存不足,这时errno的值被设置为ENOMEM。
     *  创建新进程成功后,系统中出现两个基本完全相同的进程,这两个进程执行没有固定的先后顺序,哪个进程先执行要看系统的进程调度策略。
     *  每个进程都有一个独特(互不相同)的进程标识符(process ID),可以通过getpid()函数获得,
     *   还有一个记录父进程pid的变量,可以通过getppid()函数获得变量的值。
     *
     *  https://www.cnblogs.com/jeakon/archive/2012/05/26/2816828.html
     *
     *  fork 为子进程创建了虚拟地址空间,仍与父进程共享同样的物理空间,当父子进程某一方发生写操作时,系统才会为其分配物理空间,
     *  并复制一份副本以供其修改。
     * proc文件系统为每个进程都提供了一个smaps文件
     *  - Shared_Clean:和其他进程共享的未被改写的page的大小
     *  - Shared_Dirty: 和其他进程共享的被改写的page的大小
     *  - Private_Clean:未被改写的私有页面的大小。
     *  - Private_Dirty: 已被改写的私有页面的大小
     *
     * 当子进程被fork出来时,空间是Private_Clean的,然后子进程对继承来的内存进行了修改,修改的部分就不能共享了。
     * 修改的部分就是Private_Dirty
     *
     *
     */
    if ((childpid = fork()) == 0) 
        int retval;

        /* Child */
        //关闭自己不使用的父进程的资源
        closeClildUnusedResourceAfterFork();
        redisSetProcTitle("redis-rdb-bgsave");
        //执行备份
        retval = rdbSave(filename,rsi);
        if (retval == C_OK) 
            //获取子进程修改的部分大小,相当于rdb耗费的内存
            size_t private_dirty = zmalloc_get_private_dirty(-1);

            if (private_dirty) 
                serverLog(LL_NOTICE,
                    "RDB: %zu MB of memory used by copy-on-write",
                    private_dirty/(1024*1024));
            
            //记录消耗内存的大小
            server.child_info_data.cow_size = private_dirty;
            //通过pipe和主进程通信
            sendChildInfo(CHILD_INFO_TYPE_RDB);
        
        //退出子进程,执行完成,为0 ,其他为1
        exitFromChild((retval == C_OK) ? 0 : 1);
     else 
        /* Parent */
        //计算fork子进程花费的时间
        server.stat_fork_time = ustime()-start;
        //计算fork的速度
        server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
        //周期性采样
        latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
        if (childpid == -1) 
            //如果fork 失败,关闭管道
            closeChildInfoPipe();
            //记录备份状态为失败
            server.lastbgsave_status = C_ERR;
            serverLog(LL_WARNING,"Can't save in background: fork: %s",
                strerror(errno));
            return C_ERR;
        

        serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);
        //记录rdb保存的开始时间
        server.rdb_save_time_start = time(NULL);
        //设置子进程id
        server.rdb_child_pid = childpid;
        //设置rdb类型 是到磁盘
        server.rdb_child_type = RDB_CHILD_TYPE_DISK;
        updateDictResizePolicy();
        return C_OK;
    
    return C_OK; /* unreached */

在这里主要是fork一个子进程,然后让子进程去执行rdb。具体子进程的创建以及备份分析不再讲解,请看上一篇。

在这里唯一会阻塞主进程的地方就是fork,虽然是操作系统的操作,只是创建一个页面映射表,如果数据量很大,也会有一定的阻塞(虽然时间极短),根据fork的原理,就有快照备份的说法。

在rdbSave中

int rdbSave(char *filename, rdbSaveInfo *rsi) 
    char tmpfile[256];
    char cwd[MAXPATHLEN]; /* Current working dir path for error messages. */
    FILE *fp;
    rio rdb;
    int error = 0;
    //格式化生成一个临时文件名
    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
    //以写模式,创建一个临时文件,准备写
    fp = fopen(tmpfile,"w");
    if (!fp) 
        //创建失败的处理
        char *cwdp = getcwd(cwd,MAXPATHLEN);
        return C_ERR;
    
    //初始化rdb的文件rio对象,因为要写到文件里,所以都是文件操作
    rioInitWithFile(&rdb,fp);

    if (server.rdb_save_incremental_fsync)
        //设置缓冲区32mb
        riosetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES);
    //将所有的db写入文件
    if (rdbSaveRio(&rdb,&error,RDB_SAVE_NONE,rsi) == C_ERR) 
        errno = error;
        goto werr;
    

    /**
     * 1, 刷新缓冲区
     * 2,刷盘
     * 3,释放文件资源
     */
    if (fflush(fp) == EOF) goto werr;
    if (fsync(fileno(fp)) == -1) goto werr;
    if (fclose(fp) == EOF) goto werr;

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
    //将临时文件重命名为rd的名称
    if (rename(tmpfile,filename) == -1) 
        char *cwdp = getcwd(cwd,MAXPATHLEN);
        //释放临时文件
        unlink(tmpfile);
        return C_ERR;
    
    //记录log
    serverLog(LL_NOTICE,"DB saved on disk");
    //结束状态
    server.dirty = 0;
    //记录执行完成的时间
    server.lastsave = time(NULL);
    //记录状态为成功
    server.lastbgsave_status = C_OK;
    return C_OK;

werr:
    fclose(fp);
    unlink(tmpfile);
    return C_ERR;

在这里还有一个核心方法

/**
 * 将db生成的rdb写入到指定的 I/O通道中。这个通道可以是磁盘IO,也可以是网络,也可以是内存
 * @param rdb  指定的rdb格式和 io通道
 * @param error
 * @param flags
 * @param rsi
 * @return
 */
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) 
    dictIterator *di = NULL;
    dictEntry *de;
    char magic[10];
    int j;
    uint64_t cksum;
    size_t processed = 0;
    //校验和
    if (server.rdb_checksum)
        rdb->update_cksum = rioGenericUpdateChecksum;
    snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
    //前9个字节为rdb的魔数,用于标识rdb的情况,恢复的时候,能不能用,可以根据这个判断,java是0xCAFEBABE
    if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
    // 写入一些别的信息
    if (rdbSaveInfoAuxFields(rdb,flags,rsi) == -1) goto werr;
    //用了哪些模块也写入进来了
    if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
    //遍历所有的db,写入
    for (j = 0; j < server.dbnum; j++) 
        redisDb *db = server.db+j;
        //当前db的全局hash表
        dict *d = db->dict;
        if (dictSize(d) == 0) continue;
        //获取hash表的迭代器
        di = dictGetSafeIterator(d);

        /* Write the SELECT DB opcode */
        //写入db的操作码 254 ,一个字节
        if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
        //保存数据库的序号
        if (rdbSaveLen(rdb,j) == -1) goto werr;
        /**
         * 写入db和expires的大小
         */
        uint64_t db_size, expires_size;
        db_size = dictSize(db->dict);
        expires_size = dictSize(db->expires);
        if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
        if (rdbSaveLen(rdb,db_size) == -1) goto werr;
        if (rdbSaveLen(rdb,expires_size) == -1) goto werr;

        //迭代全局hash表,一个个的获取数据,写入
        while((de = dictNext(di)) != NULL) 
            sds keystr = dictGetKey(de);
            robj key, *o = dictGetVal(de);
            long long expire;

            initStaticStringObject(key,keystr);
            expire = getExpire(db,&key);
            //将key,val 和过期时间一起写入,这里会根据数据类型,解析数据,将这些标识 key val都写入到rdb中
            if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
            if (flags & RDB_SAVE_AOF_PREAMBLE &&
                rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
            
                processed = rdb->processed_bytes;
                aofReadDiffFromParent();
            
        
        dictReleaseIterator(di);
        di = NULL; /* So that we don't release it again on error. */
    

    if (rsi && dictSize(server.lua_scripts)) 
        di = dictGetIterator(server.lua_scripts);
        while((de = dictNext(di)) != NULL) 
            robj *body = dictGetVal(de);
            if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
                goto werr;
        
        dictReleaseIterator(di);
        di = NULL; /* So that we don't release it again on error. */
    
    //写完db后,写入一个结束标识
    if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;

    /* EOF opcode */
    //写入文件结束标识
    if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;

    //CRC64 校验,不支持CRC64直接写0
    cksum = rdb->cksum;
    memrev64ifbe(&cksum);
    if (rioWrite(rdb,&cksum,8) == 0) goto werr;
    return C_OK;

werr:
    if (error) *error = errno;
    if (di) dictReleaseIterator(di);
    return C_ERR;

在这里会把文件头(魔数)一些基本信息先写入文件,然后才会将数据一个个的获取到写入。

整个的文件格式如下:

看下写入rdbSaveKeyValuePair,具体的拆解就不说了

/**
 * 保存key val 键值对 到磁盘
 *  先获取过期策略,根据不同的过期策略计算最后的到期时间
 *  - 写到期时间
 *  - 写val的类型
 *  - 写key
 *  - 写val
 * @param rdb rdb文件
 * @param key
 * @param val
 * @param expiretime
 * @return
 */
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) 
    int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
    int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;

    //写入到期时间
    if (expiretime != -1) 
        if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
        if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
    

    //写入LRU的过期时间,通过RDB_OPCODE_IDLE标识识别是LRU
    if (savelru) 
        //计算一次空闲时间
        uint64_t idletime = estimateObjectIdleTime(val);
        idletime /= 1000; /* Using seconds is enough and requires less space.*/
        if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
        //将空闲时间写入
        if (rdbSaveLen(rdb,idletime) == -1) return -1;
    

    //写入LFU的信息,通过RDB_OPCODE_FREQ标识识别
    if (savelfu) 
        uint8_t buf[1];
        //写入之前还得再衰减下
        buf[0] = LFUDecrAndReturn(val);
        if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
        if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
    

    /* Save type, key, value */
    //写入数据类型标识(通过val的redisObject获取)
    if (rdbSaveObjectType(rdb,val) == -1) return -1;
    //写入key的值(最终转换为字符串)
    if (rdbSaveStringObject(rdb,key) == -1) return -1;
    //根据val类型组装不同写入值(list,hash,set这些都会一条条的解析出来)
    if (rdbSaveObject(rdb,val,key) == -1) return -1;
    return 1;

不落盘的rdb

在以下的代码里

int startBgsaveForReplication(int mincapa) 
    if (rsiptr) 
        if (socket_target)
            //不落盘进行传输(直接写到网络流里)
            retval = rdbSaveToSlavesSockets(rsiptr);
        else
            //落入磁盘进行rdb
            retval = rdbSaveBackground(server.rdb_filename,rsiptr);
     

这里的逻辑主要是主从复制使用。等到后面再讲。

serverCron中的调用

我们看下周期性任务serverCron是如何调用的。

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) 
    //这段代码下篇再讲
	if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 &&server.aof_rewrite_scheduled)
        //aof重写
        rewriteAppendOnlyFileBackground();
    

    /* Check if a background saving or AOF rewrite in progress terminated. */
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||ldbPendingChildren())
        int statloc;
        pid_t pid;

        /**
         * 获取终止的进程id
         * statloc: 保存着子进程退出时的一些状态,它是一个指向int类型的指针,设置为null,直接kill掉子进程
         * options:选项
         *    WNOHANG  如果没有结束的子进程,马上返回,不等待
         *    WUNTRACED 如果子进程进入暂停执行状态,则马上返回,不理会结束状态
         *    也可以WNOHANG | WUNTRACED 没有任何已结束了的子进程或子进程进入暂停执行的状态,则马上返回不等待
         */

        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) 
            //获取子进程的结束代码
            int exitcode = WEXITSTATUS(statloc);
            int bysignal = 0;
            //如果子进程因为信号而结束,获取信号代码
            if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

            if (pid == -1) 
                //日志输出
             else if (pid == server.rdb_child_pid) 
                //是rdb子进程,说明rdb执行完了,执行后续的事件
                backgroundSaveDoneHandler(exitcode,bysignal);
                Redis 持久化与故障恢复之rdb

Redis源码剖析 - Redis持久化之AOF

Redis源码剖析 - Redis持久化之AOF

Redis源码剖析 - Redis持久化之RDB

阅读redis持久化RDB源码的时候一些c知识

阅读redis持久化RDB源码的时候一些c知识