redis源码学习从源码角度看主从复制:全量同步 && 部分同步

Posted 看,未来

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis源码学习从源码角度看主从复制:全量同步 && 部分同步相关的知识,希望对你有一定的参考价值。

全量同步 && 部分同步

上一篇,我们的主从机以及搭上线了,那么从机连上主机,自然要更新一下缺失的数据,以期达到节点之同步状态。

从节点发起同步流程

对,没看错,同步流程是由从节点发起的。
主节点那么忙,是吧。

int slaveTryPartialResynchronization(connection *conn, int read_reply) 
    char *psync_replid;
    char psync_offset[32];
    sds reply;

    /* Writing half */
    if (!read_reply) 	//read_reply 参数为0,发送 PSYNC 命令
        /* Initially set master_initial_offset to -1 to mark the current
         * master run_id and offset as not valid. Later if we'll be able to do
         * a FULL resync using the PSYNC command we'll set the offset at the
         * right value, so that this information will be propagated to the
         * client structure representing the master into server.master. */
        server.master_initial_offset = -1;

        if (server.cached_master) 	//若有缓存,则尝试发起部分同步流程
            psync_replid = server.cached_master->replid;
            snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
            serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
         else 
            serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
            psync_replid = "?";
            memcpy(psync_offset,"-1",3);
        

        /* Issue the PSYNC command */
        reply = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PSYNC",psync_replid,psync_offset,NULL);
        if (reply != NULL) 
            serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
            sdsfree(reply);
            connSetReadHandler(conn, NULL);
            return PSYNC_WRITE_ERROR;
        
        return PSYNC_WAIT_REPLY;
    

    /* Reading half */
    reply = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
    if (sdslen(reply) == 0) 
        /* The master may send empty newlines after it receives PSYNC
         * and before to reply, just to keep the connection alive. */
        sdsfree(reply);
        return PSYNC_WAIT_REPLY;
    

    connSetReadHandler(conn, NULL);

	......
	
    sdsfree(reply);
    replicationDiscardCachedMaster();
    return PSYNC_NOT_SUPPORTED;


部分同步

还是那个函数。

int slaveTryPartialResynchronization(connection *conn, int read_reply) 
    char *psync_replid;
    char psync_offset[32];
    sds reply;

	......
	
    // 读取主节点对PSYNC 命令的响应数据
    reply = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
    
    if (sdslen(reply) == 0) 
        /* The master may send empty newlines after it receives PSYNC
         * and before to reply, just to keep the connection alive. */
        sdsfree(reply);
        return PSYNC_WAIT_REPLY;
    

    connSetReadHandler(conn, NULL);

    if (!strncmp(reply,"+FULLRESYNC",11)) 
        char *replid = NULL, *offset = NULL;

        /* FULL RESYNC, parse the reply in order to extract the run id
         * and the replication offset. */
        replid = strchr(reply,' ');
        if (replid) 
            replid++;
            offset = strchr(replid,' ');
            if (offset) offset++;
        
       
        if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) 
            serverLog(LL_WARNING,
                "Master replied with wrong +FULLRESYNC syntax.");
            /* This is an unexpected condition, actually the +FULLRESYNC
             * reply means that the master supports PSYNC, but the reply
             * format seems wrong. To stay safe we blank the master
             * replid to make sure next PSYNCs will fail. */
            memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);
         else 	// 主节点要求全量同步
            memcpy(server.master_replid, replid, offset-replid-1);
            server.master_replid[CONFIG_RUN_ID_SIZE] = '\\0';
            server.master_initial_offset = strtoll(offset,NULL,10);
            serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
                server.master_replid,
                server.master_initial_offset);
        
        /* We are going to full resync, discard the cached master structure. */
        replicationDiscardCachedMaster();
        sdsfree(reply);
        return PSYNC_FULLRESYNC;
    

	//可以进行部分同步
    if (!strncmp(reply,"+CONTINUE",9)) 
        /* Partial resync was accepted. */
        serverLog(LL_NOTICE,
            "Successful partial resynchronization with master.");

        /* Check the new replication ID advertised by the master. If it
         * changed, we need to set the new ID as primary ID, and set or
         * secondary ID as the old master ID up to the current offset, so
         * that our sub-slaves will be able to PSYNC with us after a
         * disconnection. */
        char *start = reply+10;
        char *end = reply+9;
        while(end[0] != '\\r' && end[0] != '\\n' && end[0] != '\\0') end++;
        if (end-start == CONFIG_RUN_ID_SIZE) 
            char new[CONFIG_RUN_ID_SIZE+1];
            memcpy(new,start,CONFIG_RUN_ID_SIZE);
            new[CONFIG_RUN_ID_SIZE] = '\\0';

            if (strcmp(new,server.cached_master->replid)) 
                /* Master ID changed. */
                serverLog(LL_WARNING,"Master replication ID changed to %s",new);

                /* Set the old ID as our ID2, up to the current offset+1. */
                memcpy(server.replid2,server.cached_master->replid,
                    sizeof(server.replid2));
                server.second_replid_offset = server.master_repl_offset+1;

                /* Update the cached master ID and our own primary ID to the
                 * new one. */
                memcpy(server.replid,new,sizeof(server.replid));
                memcpy(server.cached_master->replid,new,sizeof(server.replid));

                /* Disconnect all the sub-slaves: they need to be notified. */
                disconnectSlaves();
            
        

        /* Setup the replication to continue. */
        sdsfree(reply);
        replicationResurrectCachedMaster(conn);	
        //调用 replicationResurrectCachedMaster 函数,使用当前主从连接,
        //将 server.cached_master 转化为 server.master,为主从连接注册
        //READ事件回调函数,负责接收并处理主节点传播的命令。
        //server.repl_state 进入 REPL_STATE_CONNECTED 状态,
        //初始化从节点复制积压区,返回PSYNC_NOT_SUPPORTED,部分同步完成,进入复制阶段。

        /* If this instance was restarted and we read the metadata to
         * PSYNC from the persistence file, our replication backlog could
         * be still not initialized. Create it. */
        if (server.repl_backlog == NULL) createReplicationBacklog();
        return PSYNC_CONTINUE;
    

    /* If we reach this point we received either an error (since the master does
     * not understand PSYNC or because it is in a special state and cannot
     * serve our request), or an unexpected reply from the master.
     *
     * Return PSYNC_NOT_SUPPORTED on errors we don't understand, otherwise
     * return PSYNC_TRY_LATER if we believe this is a transient error. */

	......
	
    sdsfree(reply);
    replicationDiscardCachedMaster();
    return PSYNC_NOT_SUPPORTED;


全量复制

全量复制比较简单些,在握手过程中处理掉:

/* This handler fires when the non blocking connect was able to
 * establish a connection with the master. */
void syncWithMaster(connection *conn) 
    char tmpfile[256], *err = NULL;
    int dfd = -1, maxtries = 5;
    int psync_result;

 	......

    psync_result = slaveTryPartialResynchronization(conn,1);
    if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */

    /* If the master is in an transient error, we should try to PSYNC
     * from scratch later, so go to the error path. This happens when
     * the server is loading the dataset or is not connected with its
     * master and so forth. */
    if (psync_result == PSYNC_TRY_LATER) goto error;

    /* Note: if PSYNC does not return WAIT_REPLY, it will take care of
     * uninstalling the read handler from the file descriptor. */

    if (psync_result == PSYNC_CONTINUE) 
        ......
        return;
    

    ......

	//需要进行全量同步。
	//为主从连接设置 READ 事件函数,负责接收主节点发送的 RDB 数据。
	//server.repl_state 进入 REPL_STATE_TRANSFER 状态。
    /* Setup the non blocking download of the bulk file. */
    if (connSetReadHandler(conn, readSyncBulkPayload)
            == C_ERR)
    
        char conninfo[CONN_INFO_LEN];
        serverLog(LL_WARNING,
            "Can't create readable event for SYNC: %s (%s)",
            strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo)));
        goto error;
    

    server.repl_state = REPL_STATE_TRANSFER;
    server.repl_transfer_size = -1;
    server.repl_transfer_read = 0;
    server.repl_transfer_last_fsync_off = 0;
    server.repl_transfer_lastio = server.unixtime;
    return;

	......

以上是关于redis源码学习从源码角度看主从复制:全量同步 && 部分同步的主要内容,如果未能解决你的问题,请参考以下文章

redis源码学习从源码角度看主从复制:主从之间的“三次握手”

深入剖析redis主从同步机制

redis 源码解析:主从同步

Redis主从之全量复制和增量复制

redis主从同步机制

RocketMQ源码-主从同步复制和异步复制