redis源码学习从源码角度看主从复制:全量同步 && 部分同步
Posted 看,未来
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis源码学习从源码角度看主从复制:全量同步 && 部分同步相关的知识,希望对你有一定的参考价值。
全量同步 && 部分同步
上一篇,我们的主从机以及搭上线了,那么从机连上主机,自然要更新一下缺失的数据,以期达到节点之同步状态。
从节点发起同步流程
对,没看错,同步流程是由从节点发起的。
主节点那么忙,是吧。
int slaveTryPartialResynchronization(connection *conn, int read_reply)
char *psync_replid;
char psync_offset[32];
sds reply;
/* Writing half */
if (!read_reply) //read_reply 参数为0,发送 PSYNC 命令
/* Initially set master_initial_offset to -1 to mark the current
* master run_id and offset as not valid. Later if we'll be able to do
* a FULL resync using the PSYNC command we'll set the offset at the
* right value, so that this information will be propagated to the
* client structure representing the master into server.master. */
server.master_initial_offset = -1;
if (server.cached_master) //若有缓存,则尝试发起部分同步流程
psync_replid = server.cached_master->replid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
else
serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
psync_replid = "?";
memcpy(psync_offset,"-1",3);
/* Issue the PSYNC command */
reply = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PSYNC",psync_replid,psync_offset,NULL);
if (reply != NULL)
serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
sdsfree(reply);
connSetReadHandler(conn, NULL);
return PSYNC_WRITE_ERROR;
return PSYNC_WAIT_REPLY;
/* Reading half */
reply = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
if (sdslen(reply) == 0)
/* The master may send empty newlines after it receives PSYNC
* and before to reply, just to keep the connection alive. */
sdsfree(reply);
return PSYNC_WAIT_REPLY;
connSetReadHandler(conn, NULL);
......
sdsfree(reply);
replicationDiscardCachedMaster();
return PSYNC_NOT_SUPPORTED;
部分同步
还是那个函数。
int slaveTryPartialResynchronization(connection *conn, int read_reply)
char *psync_replid;
char psync_offset[32];
sds reply;
......
// 读取主节点对PSYNC 命令的响应数据
reply = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
if (sdslen(reply) == 0)
/* The master may send empty newlines after it receives PSYNC
* and before to reply, just to keep the connection alive. */
sdsfree(reply);
return PSYNC_WAIT_REPLY;
connSetReadHandler(conn, NULL);
if (!strncmp(reply,"+FULLRESYNC",11))
char *replid = NULL, *offset = NULL;
/* FULL RESYNC, parse the reply in order to extract the run id
* and the replication offset. */
replid = strchr(reply,' ');
if (replid)
replid++;
offset = strchr(replid,' ');
if (offset) offset++;
if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE)
serverLog(LL_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
* replid to make sure next PSYNCs will fail. */
memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);
else // 主节点要求全量同步
memcpy(server.master_replid, replid, offset-replid-1);
server.master_replid[CONFIG_RUN_ID_SIZE] = '\\0';
server.master_initial_offset = strtoll(offset,NULL,10);
serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
server.master_replid,
server.master_initial_offset);
/* We are going to full resync, discard the cached master structure. */
replicationDiscardCachedMaster();
sdsfree(reply);
return PSYNC_FULLRESYNC;
//可以进行部分同步
if (!strncmp(reply,"+CONTINUE",9))
/* Partial resync was accepted. */
serverLog(LL_NOTICE,
"Successful partial resynchronization with master.");
/* Check the new replication ID advertised by the master. If it
* changed, we need to set the new ID as primary ID, and set or
* secondary ID as the old master ID up to the current offset, so
* that our sub-slaves will be able to PSYNC with us after a
* disconnection. */
char *start = reply+10;
char *end = reply+9;
while(end[0] != '\\r' && end[0] != '\\n' && end[0] != '\\0') end++;
if (end-start == CONFIG_RUN_ID_SIZE)
char new[CONFIG_RUN_ID_SIZE+1];
memcpy(new,start,CONFIG_RUN_ID_SIZE);
new[CONFIG_RUN_ID_SIZE] = '\\0';
if (strcmp(new,server.cached_master->replid))
/* Master ID changed. */
serverLog(LL_WARNING,"Master replication ID changed to %s",new);
/* Set the old ID as our ID2, up to the current offset+1. */
memcpy(server.replid2,server.cached_master->replid,
sizeof(server.replid2));
server.second_replid_offset = server.master_repl_offset+1;
/* Update the cached master ID and our own primary ID to the
* new one. */
memcpy(server.replid,new,sizeof(server.replid));
memcpy(server.cached_master->replid,new,sizeof(server.replid));
/* Disconnect all the sub-slaves: they need to be notified. */
disconnectSlaves();
/* Setup the replication to continue. */
sdsfree(reply);
replicationResurrectCachedMaster(conn);
//调用 replicationResurrectCachedMaster 函数,使用当前主从连接,
//将 server.cached_master 转化为 server.master,为主从连接注册
//READ事件回调函数,负责接收并处理主节点传播的命令。
//server.repl_state 进入 REPL_STATE_CONNECTED 状态,
//初始化从节点复制积压区,返回PSYNC_NOT_SUPPORTED,部分同步完成,进入复制阶段。
/* If this instance was restarted and we read the metadata to
* PSYNC from the persistence file, our replication backlog could
* be still not initialized. Create it. */
if (server.repl_backlog == NULL) createReplicationBacklog();
return PSYNC_CONTINUE;
/* If we reach this point we received either an error (since the master does
* not understand PSYNC or because it is in a special state and cannot
* serve our request), or an unexpected reply from the master.
*
* Return PSYNC_NOT_SUPPORTED on errors we don't understand, otherwise
* return PSYNC_TRY_LATER if we believe this is a transient error. */
......
sdsfree(reply);
replicationDiscardCachedMaster();
return PSYNC_NOT_SUPPORTED;
全量复制
全量复制比较简单些,在握手过程中处理掉:
/* This handler fires when the non blocking connect was able to
* establish a connection with the master. */
void syncWithMaster(connection *conn)
char tmpfile[256], *err = NULL;
int dfd = -1, maxtries = 5;
int psync_result;
......
psync_result = slaveTryPartialResynchronization(conn,1);
if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */
/* If the master is in an transient error, we should try to PSYNC
* from scratch later, so go to the error path. This happens when
* the server is loading the dataset or is not connected with its
* master and so forth. */
if (psync_result == PSYNC_TRY_LATER) goto error;
/* Note: if PSYNC does not return WAIT_REPLY, it will take care of
* uninstalling the read handler from the file descriptor. */
if (psync_result == PSYNC_CONTINUE)
......
return;
......
//需要进行全量同步。
//为主从连接设置 READ 事件函数,负责接收主节点发送的 RDB 数据。
//server.repl_state 进入 REPL_STATE_TRANSFER 状态。
/* Setup the non blocking download of the bulk file. */
if (connSetReadHandler(conn, readSyncBulkPayload)
== C_ERR)
char conninfo[CONN_INFO_LEN];
serverLog(LL_WARNING,
"Can't create readable event for SYNC: %s (%s)",
strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo)));
goto error;
server.repl_state = REPL_STATE_TRANSFER;
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_lastio = server.unixtime;
return;
......
以上是关于redis源码学习从源码角度看主从复制:全量同步 && 部分同步的主要内容,如果未能解决你的问题,请参考以下文章