redis源码阅读-持久化之aof详解

Posted 5ycode

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis源码阅读-持久化之aof详解相关的知识,希望对你有一定的参考价值。

aof相关配置


aof-rewrite-incremental-fsync yes
# aof 开关,默认是关闭的,改为yes表示开启
appendonly no
# aof的文件名,默认
appendfilename "appendonly.aof"
# aof刷数据的策略,有no/everysec/aways
appendfsync everysec
no-appendfsync-on-rewrite no
# aof超出配置大小的比例,模式是100%,可以理解为阈值 
auto-aof-rewrite-percentage 100
# aof 配置的文件的大小,默认64mb
auto-aof-rewrite-min-size 64mb
aof-load-truncated yes
# rewrite 进行时候,rewrite 文件分两种格式,1. 先 用 rdb 序列化,序列化结果写入aof文件,然后期间积累的差异用追加aof命令格式 ,2 整个文件都是aof的命令追加格式
aof-use-rdb-preamble yes 

appendfsync 一共有3种策略

  • alays 主要有数据改动就把数据刷入磁盘,性能相对最差,但最安全
  • everysec 每隔1秒刷一次数据,redis默认的,也是redis推荐的
  • no 不主动刷,什么时候刷数据,取决于操作系统,大多数linux 30秒提交一次

aof写入:


在之前分析redis的流程的时候《redis源码阅读三-终于把主线任务执行搞明白了》里读取客户端信息的时候

在processCommand 函数里,解析出来执行命令,放入了client中

int processCommand(client *c) 
    //解析出来命令
	c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    //CMD_CALL_FULL 包含着CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_PROPAGATE
    //CMD_CALL_PROPAGATE 包含着CMD_CALL_PROPAGATE_AOF|CMD_CALL_PROPAGATE_REPL
    call(c,CMD_CALL_FULL);

//命令执行
void call(client *c, int flags) 
    /**
     * dirty 记录修改次数
     * start记录命令开始执行时间us
     * duration记录命令执行花费时间
     */
    long long dirty;
    ustime_t start, duration;
    int client_old_flags = c->flags;
    struct redisCommand *real_cmd = c->cmd;
    //从server.dirty中获取
    dirty = server.dirty;
    //命令执行,会执行对应的redisCommand
    c->cmd->proc(c);
    //计算执行时间
    duration = ustime()-start;
    //在对应的命令里执行一次,dirty+1
    //这里表示本次命令有多少次修改
    dirty = server.dirty-dirty;
    if (flags & CMD_CALL_PROPAGATE && (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
        if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
            propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
        

/**
 * 传播执行的命令到aof或从节点
 * @param cmd
 * @param dbid
 * @param argv
 * @param argc
 * @param flags
 */
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
               int flags)

    if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
        //aof追加(主要是生成aof内容,并追加到.aof_buf)
        feedAppendOnlyFile(cmd,dbid,argv,argc);
    if (flags & PROPAGATE_REPL)
        //主从复制处理
        replicationFeedSlaves(server.slaves,dbid,argv,argc);

在feedAppendOnlyFile方法里主要是aof内容生成,方法就不具体列了,主要做了三件事

  • 组装刚执行命令的aof内容buf,将过期时间由相对转成绝对(重点)
  • 如果AOF开启的情况,将刚组装的buf放入到server.aof_buf 后
  • 如果正在重写aof,将buf写到 server.aof_rewrite_buf_blocks中(在aofRewriteBufferAppend里)
/**
 * aof重写的情况下调用,将buf写到一个链表里
 * @param s  aof信息
 * @param len
 */
void aofRewriteBufferAppend(unsigned char *s, unsigned long len) 
    listNode *ln = listLast(server.aof_rewrite_buf_blocks);
    aofrwblock *block = ln ? ln->value : NULL;
    while(len) 
        //将传递过来的s写入到aof_rewrite_buf_blocks
    
    //现在是主进程,如果有管道,通过管道server.aof_pipe_write_data_to_child 给子进程
    if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) 
        aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child,AE_WRITABLE, aofChildWriteDiffData, NULL);

/**
 * 将server.aof_rewrite_buf_blocks 中的数据写入server.aof_pipe_write_data_to_child子进程的管道里
 * 此处增量更新的数据,会由子进程在将虚拟空间里的数据落地后再次读取
 * @param el
 * @param fd
 * @param privdata
 * @param mask
 */
void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) 
     while(1) 
          ln = listFirst(server.aof_rewrite_buf_blocks);
          block = ln ? ln->value : NULL;
          if (block->used > 0) 
              //通过管道把block->buf里的数据写给子进程
              nwritten = write(server.aof_pipe_write_data_to_child, block->buf,block->used);
              if (nwritten <= 0) return;
              memmove(block->buf,block->buf+nwritten,block->used-nwritten);
              block->used -= nwritten;
              block->free += nwritten;
          
          if (block->used == 0) listDelNode(server.aof_rewrite_buf_blocks,ln);
     
    

这里有几个变量

  • server.aof_buf aof缓冲区,用于存放每次执行命令后的aof信息
  • server.aof_rewrite_buf_blocks 只要是有aof的子进程,就把新产生的命令添加上去
  • server.aof_pipe_write_data_to_child aof 子进程的管道,用于将server.aof_rewrite_buf_blocks信息给子进程

在主进程执行命令后,并没有写aof文件,只是将命令拼装成了字符串,放入到了aof缓冲区server.aof_buf 中,如果有aof的子进程,将aof信息放入到server.aof_rewrite_buf_blocks 然后通过管道将该信息给到子进程。

那aof是什么时候写入到文件里呢?,别急,看下图

aof有几个场景的写入:

  • 在主流程的循环体里
    • 在循环执行前的beforesleep里
    • 在serverCron里
  • 准备停止之前调用一次
  • 通过configSetCommand设置
  • 主从复制

最终都是调用的flushAppendOnlyFile

//在beforeSleep里,没有任何的逻辑判断,直接调用
void beforeSleep(struct aeEventLoop *eventLoop) 
    //将aof缓冲区写入磁盘
    flushAppendOnlyFile(0);    

//在serverCron中
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) 
    //设置了刷入延期开始时间,直接调用
    if (server.aof_flush_postponed_start) flushAppendOnlyFile(0);
    //每秒再判断一次,如果上一次执行异常,再刷入一次
    run_with_period(1000) 
        if (server.aof_last_write_status == C_ERR)
            flushAppendOnlyFile(0);
    


/**
  * 刷入文件
  * @param force 1 强制,0不强制
  */
void flushAppendOnlyFile(int force) 
    //每秒刷入的时候,判断一下aof的fsync是否在bio线程里执行了,是sync_in_progress 为true
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
        sync_in_progress = aofFsyncInProgress();
    //每秒并且非强制刷入
    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) 
        if (sync_in_progress) 
            //刷入延期开始时间为0,表示没有任何的执行
            if (server.aof_flush_postponed_start == 0) 
                //先将刷入延期开始时间置为当前时间
                server.aof_flush_postponed_start = server.unixtime;
                return;
                //以后再来,只要当前时间-开始时间<2直接返回(所以对redis的aof来说,并不是每秒执行,而是至少2秒以上,如果阻塞了,时间更久)
             else if (server.unixtime - server.aof_flush_postponed_start < 2) 
                return;
            
            server.aof_delayed_fsync++;
        
    
    //这里用了卫语句的方式,将不能触发的在前面拦截
    //写入到aof文件里
    nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
    //写完aof后,将刷入延期开始时间置为0
    server.aof_flush_postponed_start = 0;
    //省略异常的处理
    //记录aof当前的大小
    server.aof_current_size += nwritten;
     /**
     * aof的可用buf比较小了,就清空buf
     */
    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) 
        sdsclear(server.aof_buf);
     else 
        sdsfree(server.aof_buf);
        server.aof_buf = sdsempty();
    

    
    
//aof写入方式的枚举
configEnum aof_fsync_enum[] = 
    "everysec", AOF_FSYNC_EVERYSEC,
    "always", AOF_FSYNC_ALWAYS,
    "no", AOF_FSYNC_NO,
    NULL, 0
;    
  • aof的写入是在beforeSleep 里,在serverCron 主要是处理延期写入或者处理写入异常
  • aof通过server.aof_flush_postponed_start来延期写入,第一次将此值赋值为当前时间,写完以后置为0
  • aof通过write写入文件(获取的是文件fd对应的偏移指针,顺序写)

aof重写


在上一篇《redis源码阅读-持久化之RDB》中

serverCron在执行的时候,有一个backgroundRewriteDoneHandler方法,当时就有疑问,这个重写是干啥的?

然后搜索了下,发现了这个函数rewriteAppendOnlyFileBackground

我们看下这个函数的触发的时机

先说下触发时机:

  • 在周期性循环里进行
  • 通过命令调用执行aof重写
    • 一个是bgrewriteaofCommand
    • 一个是configSetCommand
  • 主从复制时触发

在serverCron里有两次调用

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) 
    /**调用①,这次调用是之前有rdb进程的时候,设置为了aof重写等待任务(本该执行的逻辑之前因为rdb没有执行,所以直接触发)*/
    if (server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.aof_rewrite_scheduled)
        rewriteAppendOnlyFileBackground();
    
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1 ||ldbPendingChildren())
        
    else 
       
        /** 调用②
         *  在没有rdb和aof子进程的时候,
         *  先判断rdb是否达到了执行条件
         *     save 900 1  配置的判断
         *  再判断aof是否达到了执行条件
         *    1,先判断aof是否打开
         *    2,如果有rdb或aof子进程了也不再执行(互斥)
         *    3,有触发阈值,
         *    4,aof当前的文件的大小大于设置的最小大小
         */
        if (server.aof_state == AOF_ON && server.rdb_child_pid == -1 && server.aof_child_pid == -1 && server.aof_rewrite_perc &&
            server.aof_current_size > server.aof_rewrite_min_size)
        
            /**
             * aof_rewrite_base_size 启动默认为0,第一次达到min_size以后设置为1,这时候触发
             * 等待aof重写完以后,会把aof_rewrite_base_size设置为重写后的大小,这个值一致随着aof文件大小自增
             */
            long long base = server.aof_rewrite_base_size ?server.aof_rewrite_base_size : 1;
            /**
              * 这个公式也很有意思,aof的重写会随着文件的变大而逐步变大,而不是64mb
              */
            long long growth = (server.aof_current_size*100/base) - 100;
            if (growth >= server.aof_rewrite_perc) 
                //触发aof重写
                rewriteAppendOnlyFileBackground();
            
        
    

  • ① 位置标注的调用是为了延迟补偿
  • ②位置标注的每次在没有rdb和aof子进程的情况下都会进来
  • redis通过server.aof_rewrite_scheduled来控制aof的延迟执行
/**
 * 后台的方式重写aof,通过fork进程处理
 *
 * @return
 */
int rewriteAppendOnlyFileBackground(void) 
    pid_t childpid;
    long long start;
    //双重校验,只要有aof或rdb的子进程就不再执行,
    if (server.aof_child_pid != -1 || server.rdb_child_pid != -1) return C_ERR;
    //创建6个管道,分别用来数据传输,父子进程之间的ack响应
    if (aofCreatePipes() != C_OK) return C_ERR;
    //创建执行aof完以后回写的管道
    openChildInfoPipe();
    start = ustime();
    //fork子进程进行重写
    if ((childpid = fork()) == 0) 
        char tmpfile[256];

        /* Child */
        //关闭自己不需要关注的
        closeClildUnusedResourceAfterFork();
        //设置进程名称
        redisSetProcTitle("redis-aof-rewrite");
        //格式化重写aof的文件名
        snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
        //重写aof文件
        if (rewriteAppendOnlyFile(tmpfile) == C_OK) 
            size_t private_dirty = zmalloc_get_private_dirty(-1);
            server.child_info_data.cow_size = private_dirty;
            sendChildInfo(CHILD_INFO_TYPE_AOF);
            exitFromChild(0);
         else 
            exitFromChild(1);
        
     else 
        /* Parent */
        server.stat_fork_time = ustime()-start;
        server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */
        latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);
        if (childpid == -1) 
            closeChildInfoPipe();
            aofClosePipes();
            return C_ERR;
        
        //已经开始处理了,就把等待任务表示写回0
        server.aof_rewrite_scheduled = 0;
        //标记aof重写开始的时间
        server.aof_rewrite_time_start = time(NULL);
        //标记aof的的子进程
        server.aof_child_pid = childpid;
        //禁用hash resize
        updateDictResizePolicy();
        server.aof_selected_db = -1;
        replicationScriptCacheFlush();
        return C_OK;
    
    return C_OK; /* unreached */


/**
 * 创建aof管道
 * @return
 */
int aofCreatePipes(void) 
    int fds[6] = -1, -1, -1, -1, -1, -1;
    int j;

    if (pipe(fds) == -1) goto error; /* parent -> children data. */
    if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
    if (pipe(fds+4) == -1) goto error; /* parent -> children ack. */
    /* 设置数据管道为非阻塞 */
    if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
    if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
    /**
     * 注册时间处理,监听 server.aof_pipe_read_ack_from_child 传递过来的ack信息,如果有就 server.aof_stop_sending_diff = 1;
     * 同时删除server.aof_pipe_read_ack_from_child的事件处理器
     */
    if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
    //父进程往子进程写数据的管道
    server.aof_pipe_write_data_to_child = fds[1];
    //子进程从父进程读数据的管道
    server.aof_pipe_read_data_from_parent = fds[0];
    //子进程发送ack给父进程的管道
    server.aof_pipe_write_ack_to_parent = fds[3];
    //父进程读子进程发送的ack信息的管道
    server.aof_pipe_read_ack_from_child = fds[2];
    //父进程发送ack信息给子进程的管道
    server.aof_pipe_write_ack_to_child = fds[5];
    //子进程读取父进程发送的ack信息的管道
    server.aof_pipe_read_ack_from_parent = fds[4];
    server.aof_stop_sending_diff = 0;
    return C_OK;

error:
    for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
    return C_ERR;

rof重写

/**
 * 同步重写aof
 * @param filename
 * @return
 */
int rewriteAppendOnlyFile(char *filename) 
    rio aof;
    FILE *fp;
    char tmpfile[256];
    char byte;

    //又创建了一个临时文件
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) 
        return C_ERR;
    

    server.aof_child_diff = sdsempty();
    //初始化一个文件io对象
    rioInitWithFile(&aof,fp);

    if (server.aof_rewrite_incremental_fsync)
        riosetAutoSync(&aof,REDIS_AUTOSYNC_BYTES);
    /**
     * aof_use_rdb_preamble = yes 表示开启了混合模式,先将rdb信息刷入aof文件
     * rdb是比较节省空间
     */
    if (server.aof_use_rdb_preamble) 
        int error;
        if (rdbSaveRio(&aof,&error,RDB_SAVE_AOF_PREAMBLE,NULL) == C_ERR) 
            errno = error;
            goto werr;
        
     else 
        //直接以redis的协议从所有db的全局hash表中读取数据写入aof文件
        if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr;
    
    //清空缓冲区
    if (fflush(fp) == EOF) goto werr;
    //刷入磁盘,正常都是在操作系统的页缓存里,fsync可以刷入磁盘
    if (fsync(fileno(fp)) == -1) goto werr;

    int nodata = 0;
    mstime_t start = mstime();
    /**
     * 在1秒内,每次等待1毫秒,从管道里获取数据
     */
    while(mstime()-start < 1000 && nodata < 20) 
        //等待管道数据,没有数据,下一次轮训
        if redis源码阅读-持久化之RDB

redis源码阅读-持久化之RDB

Redis源码剖析 - Redis持久化之AOF

Redis源码剖析 - Redis持久化之AOF

Redis源码剖析之AOF

《Redis设计与实现》[第二部分]单机数据库的实现-C源码阅读