redis源码分析redis cluster 集群实现
Posted 看,未来
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis源码分析redis cluster 集群实现相关的知识,希望对你有一定的参考价值。
redis集群概述
哨兵、主从、集群,一串下来。
redis cluster 主要作用如下(虽然是显而易见了):
数据分片,流量分发。
cluster 将不同的数据分发给不同的节点,不过没有使用一致性hash算法,而是引入了Hash槽位的概念。cluster有16384个槽位,每个槽位只能指派给一个节点。
所以本文的重心也就很明确了:redis集群是如何实现通信及数据分片、流量分发的
关于实操:【redis】闲得无聊,来聊聊当下爆火的 redis集群,顺便搭一个玩玩呗
客户端重定向
如果cluster中的某个节点收到客户端请求,但请求中查询的键不是当前节点负责的,则它将通知客户端进行重定向,客户端重新发送请求给真正的数据存储节点。
那是怎么实现?包发过来,不在我这儿,我告诉她在他那儿,让她重发给他?
还是包发过来,不在我这儿,我告诉她不在我这儿,因为我也不知道在谁那儿,让她一个一个自己去试试?
还是什么其他的方式?
我想是第一种,但是现实是什么样的,再看看。
int processCommand(client *c)
......
/* If cluster is enabled perform the cluster redirection here.
* However we don't perform the redirection if:
* 1) The sender of this command is our master.
* 2) The command has no key arguments. */
if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA &&
server.lua_caller->flags & CLIENT_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
c->cmd->proc != execCommand))
int hashslot;
int error_code;
//查找真正的存储节点
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
&hashslot,&error_code);
if (n == NULL || n != server.cluster->myself)
if (c->cmd->proc == execCommand)
discardTransaction(c);
else
flagTransaction(c);
//返回 ASK 或 MOBED 转向标志及重定向目标节点,通知客户端重定向
/*
如果对应槽位数据正在迁出,则返回 ASK,提示客户端仅在下一条命令中重定向目标节点。
否则返回MOVED,提示客户端可以长期重定向。
*/
clusterRedirectClient(c,n,hashslot,error_code);
return C_OK;
......
getNodeByQuery函数负责查找数据存储节点:
/* Return the pointer to the cluster node that is able to serve the command.
* For the function to succeed the command should only target either:
*
* 1) A single key (even multiple times like LPOPRPUSH mylist mylist).
* 2) Multiple keys in the same hash slot, while the slot is stable (no
* resharding in progress).
*
* On success the function returns the node that is able to serve the request.
* If the node is not 'myself' a redirection must be perfomed. The kind of
* redirection is specified setting the integer passed by reference
* 'error_code', which will be set to CLUSTER_REDIR_ASK or
* CLUSTER_REDIR_MOVED.
*
* When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE.
*
* If the command fails NULL is returned, and the reason of the failure is
* provided via 'error_code', which will be set to:
*
* CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
* don't belong to the same hash slot.
*
* CLUSTER_REDIR_UNSTABLE if the request contains multiple keys
* belonging to the same slot, but the slot is not stable (in migration or
* importing state, likely because a resharding is in progress).
*
* CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
* not bound to any node. In this case the cluster global state should be
* already "down" but it is fragile to rely on the update of the global state,
* so we also handle it here.
*
* CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is
* down but the user attempts to execute a command that addresses one or more keys. */
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code)
clusterNode *n = NULL;
robj *firstkey = NULL;
int multiple_keys = 0;
multiState *ms, _ms;
multiCmd mc;
int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;
/* Allow any key to be set if a module disabled cluster redirections. */
if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
return myself;
/* Set error code optimistically for the base case. */
if (error_code) *error_code = CLUSTER_REDIR_NONE;
/* Modules can turn off Redis Cluster redirection: this is useful
* when writing a module that implements a completely different
* distributed system. */
/* We handle all the cases as if they were EXEC commands, so we have
* a common code path for everything */
if (cmd->proc == execCommand)
/* If CLIENT_MULTI flag is not set EXEC is just going to return an
* error. */
if (!(c->flags & CLIENT_MULTI)) return myself;
ms = &c->mstate;
else
/* In order to have a single codepath create a fake Multi State
* structure if the client is not in MULTI/EXEC state, this way
* we have a single codepath below. */
ms = &_ms;
_ms.commands = &mc;
_ms.count = 1;
mc.argv = argv;
mc.argc = argc;
mc.cmd = cmd;
/* Check that all the keys are in the same hash slot, and obtain this
* slot and the node associated. */
for (i = 0; i < ms->count; i++)
struct redisCommand *mcmd;
robj **margv;
int margc, *keyindex, numkeys, j;
mcmd = ms->commands[i].cmd;
margc = ms->commands[i].argc;
margv = ms->commands[i].argv;
keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
for (j = 0; j < numkeys; j++)
robj *thiskey = margv[keyindex[j]];
int thisslot = keyHashSlot((char*)thiskey->ptr,
sdslen(thiskey->ptr));
if (firstkey == NULL)
/* This is the first key we see. Check what is the slot
* and node. */
firstkey = thiskey;
slot = thisslot;
n = server.cluster->slots[slot];
/* Error: If a slot is not served, we are in "cluster down"
* state. However the state is yet to be updated, so this was
* not trapped earlier in processCommand(). Report the same
* error to the client. */
if (n == NULL)
getKeysFreeResult(keyindex);
if (error_code)
*error_code = CLUSTER_REDIR_DOWN_UNBOUND;
return NULL;
/* If we are migrating or importing this slot, we need to check
* if we have all the keys in the request (the only way we
* can safely serve the request, otherwise we return a TRYAGAIN
* error). To do so we set the importing/migrating state and
* increment a counter for every missing key. */
if (n == myself &&
server.cluster->migrating_slots_to[slot] != NULL)
migrating_slot = 1;
else if (server.cluster->importing_slots_from[slot] != NULL)
importing_slot = 1;
else
/* If it is not the first key, make sure it is exactly
* the same key as the first we saw. */
if (!equalStringObjects(firstkey,thiskey))
if (slot != thisslot)
/* Error: multiple keys from different slots. */
getKeysFreeResult(keyindex);
if (error_code)
*error_code = CLUSTER_REDIR_CROSS_SLOT;
return NULL;
else
/* Flag this request as one with multiple different
* keys. */
multiple_keys = 1;
/* Migarting / Improrting slot? Count keys we don't have. */
if ((migrating_slot || importing_slot) &&
lookupKeyRead(&server.db[0],thiskey) == NULL)
missing_keys++;
getKeysFreeResult(keyindex);
/* No key at all in command? then we can serve the request
* without redirections or errors in all the cases. */
if (n == NULL) return myself;
/* Cluster is globally down but we got keys? We only serve the request
* if it is a read command and when allow_reads_when_down is enabled. */
if (server.cluster->state != CLUSTER_OK)
if (!server.cluster_allow_reads_when_down)
/* The cluster is configured to block commands when the
* cluster is down. */
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
return NULL;
else if (!(cmd->flags & CMD_READONLY) && !(cmd->proc == evalCommand)
&& !(cmd->proc == evalShaCommand))
/* The cluster is configured to allow read only commands
* but this command is neither readonly, nor EVAL or
* EVALSHA. */
if (error_code) *error_code = CLUSTER_REDIR_DOWN_RO_STATE;
return NULL;
else
/* Fall through and allow the command to be executed:
* this happens when server.cluster_allow_reads_when_down is
* true and the command is a readonly command or EVAL / EVALSHA. */
/* Return the hashslot by reference. */
if (hashslot) *hashslot = slot;
/* MIGRATE always works in the context of the local node if the slot
* is open (migrating or importing state). We need to be able to freely
* move keys among instances in this case. */
if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
return myself;
/* If we don't have all the keys and we are migrating the slot, send
* an ASK redirection. */
if (migrating_slot && missing_keys)
if (error_code) *error_code = CLUSTER_REDIR_ASK;
return server.cluster->migrating_slots_to[slot];
/* If we are receiving the slot, and the client correctly flagged the
* request as "ASKING", we can serve the request. However if the request
* involves multiple keys and we don't have them all, the only option is
* to send a TRYAGAIN error. */
if (importing_slot &&
(c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
if (multiple_keys && missing_keys)
if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
return NULL;
else
return myself;
/* Handle the read-only client case reading from a slave: if this
* node is a slave and the request is about an hash slot our master
* is serving, we can reply without redirection. */
if (c->flags & CLIENT_READONLY &&
(cmd->flags & CMD_READONLY || cmd->proc == evalCommand ||
cmd->proc == evalShaCommand) &&
nodeIsSlave(myself) &&
myself->slaveof == n)
return myself;
/* Base case: just return the right node. However if this node is not
* myself, set error_code to MOVED since we need to issue a rediretion. */
if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
return n;
槽位迁移
什么是槽位迁移?rehash,就这样理解嘛。
什么时候发生?比方我某个节点挂了、比方我某个节点又复活可以分摊压力了;
/* MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password |
* AUTH2 username password]
*
* On in the multiple keys form:
*
* MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password |
* AUTH2 username password] KEYS key1 key2 ... keyN */
void migrateCommand(client *c)
migrateCachedSocket *cs;
int copy = 0, replace = 0, j;
char *username = NULL;
char *password = NULL;
long timeout;
long dbid;
robj **ov = NULL; /* Objects to migrate. */
robj **kv = NULL; /* Key names. */
robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
rio cmd, payload;
int may_retry = 1;
int write_error = 0;
int argv_rewritten = 0;
/* To support the KEYS option we need the following additional state. */
int first_key = 3; /* Argument index of the first key. */
int num_keys = 1; /* By default only migrate the 'key' argument. */
/* Parse additional options */
for (j = 6; j < c->argc; j++)
int moreargs = (c->argc-1) - j;
if (!strcasecmp(c->argv[j]->ptr,"copy"))
copy = 1;
else if (!strcasecmp(c->argv[j]->ptr,"replace"))
replace = 1;
else if (!strcasecmp(c->argv[j]->ptr,"auth"))
if (!moreargs)
addReply(c,shared.syntaxerr);
return;
j++;
password = c->argv[j]->ptr;
else if (!strcasecmp(c->argv[j]->ptr,"auth2"))
if (moreargs < 2)
addReply(c,shared.syntaxerr);
return;
username = c->argv[++j]->ptr;
password = c->argv[++j]->ptr;
else if (!strcasecmp(c->argv[j]->ptr,"keys"))
if (sdslen(c->argv[3]->ptr) != 0)
addReplyError(c,
"When using MIGRATE KEYS option, the key argument"
" must be set to the empty string");
return;
first_key = j+1;
num_keys = c->argc - j - 1;
break; /* All the remaining args are keys. */
else
addReply(c,shared.syntaxerr);
return;
/* Sanity check */
if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
return;
if (timeout <= 0) timeout = 1000;
/* Check if the keys are here. If at least one key is to migrate, do it
* otherwise if all the keys are missing reply with "NOKEY" to signal
* the caller there was nothing to migrate. We don't return an error in
* this case, since often this is due to a normal condition like the key
* expiring in the meantime. */
ov = zrealloc(ov,sizeof(robj*)*num_keys);
kv = zrealloc(kv,sizeof(robj*)*num_keys);
int oi = 0;
for (j = 0; j < num_keys; j++)
if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL)
kv[oi] = c->argv[first_key+j];
oi++;
num_keys = oi;
if (num_keys == 0)
zfree(ov); zfree(kv);
addReplySds(c,sdsnew("+NOKEY\\r\\n"));
return;
try_again:
write_error = 0;
/* Connect */
cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
if (cs == NULL)
zfree(ov); zfree(kv);
return; /* error sent to the client by migrateGetSocket() */
rioInitWithBuffer(&cmd,sdsempty());
/* Authentication */
if (password)
int arity = username ? 3 : 2;
serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',arity));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4));
if (username)
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,username,
sdslen(username)));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password,
sdslen(password)));
/* Send the SELECT command if the current DB is not already selected. */
int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
if (select)
serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
serverAssertWithInfo(c,以上是关于redis源码分析redis cluster 集群实现的主要内容,如果未能解决你的问题,请参考以下文章