redis源码分析redis cluster 集群实现

Posted 看,未来

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis源码分析redis cluster 集群实现相关的知识,希望对你有一定的参考价值。

redis集群概述

哨兵、主从、集群,一串下来。

redis cluster 主要作用如下(虽然是显而易见了):

数据分片,流量分发。
cluster 将不同的数据分发给不同的节点,不过没有使用一致性hash算法,而是引入了Hash槽位的概念。cluster有16384个槽位,每个槽位只能指派给一个节点。

所以本文的重心也就很明确了:redis集群是如何实现通信及数据分片、流量分发的

关于实操:【redis】闲得无聊,来聊聊当下爆火的 redis集群,顺便搭一个玩玩呗


客户端重定向

如果cluster中的某个节点收到客户端请求,但请求中查询的键不是当前节点负责的,则它将通知客户端进行重定向,客户端重新发送请求给真正的数据存储节点。

那是怎么实现?包发过来,不在我这儿,我告诉她在他那儿,让她重发给他?
还是包发过来,不在我这儿,我告诉她不在我这儿,因为我也不知道在谁那儿,让她一个一个自己去试试?
还是什么其他的方式?
我想是第一种,但是现实是什么样的,再看看。

int processCommand(client *c) 
    ......

    /* If cluster is enabled perform the cluster redirection here.
     * However we don't perform the redirection if:
     * 1) The sender of this command is our master.
     * 2) The command has no key arguments. */
    if (server.cluster_enabled &&
        !(c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_LUA &&
          server.lua_caller->flags & CLIENT_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
          c->cmd->proc != execCommand))
    
        int hashslot;
        int error_code;
	
		//查找真正的存储节点
        clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
                                        &hashslot,&error_code);
        if (n == NULL || n != server.cluster->myself) 
            if (c->cmd->proc == execCommand) 
                discardTransaction(c);
             else 
                flagTransaction(c);
            
		
			//返回 ASK 或 MOBED 转向标志及重定向目标节点,通知客户端重定向
			/*
				如果对应槽位数据正在迁出,则返回 ASK,提示客户端仅在下一条命令中重定向目标节点。
				否则返回MOVED,提示客户端可以长期重定向。
			*/
            clusterRedirectClient(c,n,hashslot,error_code);
            return C_OK;
        
    
	......


getNodeByQuery函数负责查找数据存储节点:


/* Return the pointer to the cluster node that is able to serve the command.
 * For the function to succeed the command should only target either:
 *
 * 1) A single key (even multiple times like LPOPRPUSH mylist mylist).
 * 2) Multiple keys in the same hash slot, while the slot is stable (no
 *    resharding in progress).
 *
 * On success the function returns the node that is able to serve the request.
 * If the node is not 'myself' a redirection must be perfomed. The kind of
 * redirection is specified setting the integer passed by reference
 * 'error_code', which will be set to CLUSTER_REDIR_ASK or
 * CLUSTER_REDIR_MOVED.
 *
 * When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE.
 *
 * If the command fails NULL is returned, and the reason of the failure is
 * provided via 'error_code', which will be set to:
 *
 * CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
 * don't belong to the same hash slot.
 *
 * CLUSTER_REDIR_UNSTABLE if the request contains multiple keys
 * belonging to the same slot, but the slot is not stable (in migration or
 * importing state, likely because a resharding is in progress).
 *
 * CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
 * not bound to any node. In this case the cluster global state should be
 * already "down" but it is fragile to rely on the update of the global state,
 * so we also handle it here.
 *
 * CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is
 * down but the user attempts to execute a command that addresses one or more keys. */
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) 
    clusterNode *n = NULL;
    robj *firstkey = NULL;
    int multiple_keys = 0;
    multiState *ms, _ms;
    multiCmd mc;
    int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0;

    /* Allow any key to be set if a module disabled cluster redirections. */
    if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
        return myself;

    /* Set error code optimistically for the base case. */
    if (error_code) *error_code = CLUSTER_REDIR_NONE;

    /* Modules can turn off Redis Cluster redirection: this is useful
     * when writing a module that implements a completely different
     * distributed system. */

    /* We handle all the cases as if they were EXEC commands, so we have
     * a common code path for everything */
    if (cmd->proc == execCommand) 
        /* If CLIENT_MULTI flag is not set EXEC is just going to return an
         * error. */
        if (!(c->flags & CLIENT_MULTI)) return myself;
        ms = &c->mstate;
     else 
        /* In order to have a single codepath create a fake Multi State
         * structure if the client is not in MULTI/EXEC state, this way
         * we have a single codepath below. */
        ms = &_ms;
        _ms.commands = &mc;
        _ms.count = 1;
        mc.argv = argv;
        mc.argc = argc;
        mc.cmd = cmd;
    

    /* Check that all the keys are in the same hash slot, and obtain this
     * slot and the node associated. */
    for (i = 0; i < ms->count; i++) 
        struct redisCommand *mcmd;
        robj **margv;
        int margc, *keyindex, numkeys, j;

        mcmd = ms->commands[i].cmd;
        margc = ms->commands[i].argc;
        margv = ms->commands[i].argv;

        keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
        for (j = 0; j < numkeys; j++) 
            robj *thiskey = margv[keyindex[j]];
            int thisslot = keyHashSlot((char*)thiskey->ptr,
                                       sdslen(thiskey->ptr));

            if (firstkey == NULL) 
                /* This is the first key we see. Check what is the slot
                 * and node. */
                firstkey = thiskey;
                slot = thisslot;
                n = server.cluster->slots[slot];

                /* Error: If a slot is not served, we are in "cluster down"
                 * state. However the state is yet to be updated, so this was
                 * not trapped earlier in processCommand(). Report the same
                 * error to the client. */
                if (n == NULL) 
                    getKeysFreeResult(keyindex);
                    if (error_code)
                        *error_code = CLUSTER_REDIR_DOWN_UNBOUND;
                    return NULL;
                

                /* If we are migrating or importing this slot, we need to check
                 * if we have all the keys in the request (the only way we
                 * can safely serve the request, otherwise we return a TRYAGAIN
                 * error). To do so we set the importing/migrating state and
                 * increment a counter for every missing key. */
                if (n == myself &&
                    server.cluster->migrating_slots_to[slot] != NULL)
                
                    migrating_slot = 1;
                 else if (server.cluster->importing_slots_from[slot] != NULL) 
                    importing_slot = 1;
                
             else 
                /* If it is not the first key, make sure it is exactly
                 * the same key as the first we saw. */
                if (!equalStringObjects(firstkey,thiskey)) 
                    if (slot != thisslot) 
                        /* Error: multiple keys from different slots. */
                        getKeysFreeResult(keyindex);
                        if (error_code)
                            *error_code = CLUSTER_REDIR_CROSS_SLOT;
                        return NULL;
                     else 
                        /* Flag this request as one with multiple different
                         * keys. */
                        multiple_keys = 1;
                    
                
            

            /* Migarting / Improrting slot? Count keys we don't have. */
            if ((migrating_slot || importing_slot) &&
                lookupKeyRead(&server.db[0],thiskey) == NULL)
            
                missing_keys++;
            
        
        getKeysFreeResult(keyindex);
    

    /* No key at all in command? then we can serve the request
     * without redirections or errors in all the cases. */
    if (n == NULL) return myself;

    /* Cluster is globally down but we got keys? We only serve the request
     * if it is a read command and when allow_reads_when_down is enabled. */
    if (server.cluster->state != CLUSTER_OK) 
        if (!server.cluster_allow_reads_when_down) 
            /* The cluster is configured to block commands when the
             * cluster is down. */
            if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
            return NULL;
         else if (!(cmd->flags & CMD_READONLY) && !(cmd->proc == evalCommand)
                && !(cmd->proc == evalShaCommand))
        
            /* The cluster is configured to allow read only commands
             * but this command is neither readonly, nor EVAL or
             * EVALSHA. */
            if (error_code) *error_code = CLUSTER_REDIR_DOWN_RO_STATE;
            return NULL;
         else 
            /* Fall through and allow the command to be executed:
             * this happens when server.cluster_allow_reads_when_down is
             * true and the command is a readonly command or EVAL / EVALSHA. */
        
    

    /* Return the hashslot by reference. */
    if (hashslot) *hashslot = slot;

    /* MIGRATE always works in the context of the local node if the slot
     * is open (migrating or importing state). We need to be able to freely
     * move keys among instances in this case. */
    if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
        return myself;

    /* If we don't have all the keys and we are migrating the slot, send
     * an ASK redirection. */
    if (migrating_slot && missing_keys) 
        if (error_code) *error_code = CLUSTER_REDIR_ASK;
        return server.cluster->migrating_slots_to[slot];
    

    /* If we are receiving the slot, and the client correctly flagged the
     * request as "ASKING", we can serve the request. However if the request
     * involves multiple keys and we don't have them all, the only option is
     * to send a TRYAGAIN error. */
    if (importing_slot &&
        (c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
    
        if (multiple_keys && missing_keys) 
            if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
            return NULL;
         else 
            return myself;
        
    

    /* Handle the read-only client case reading from a slave: if this
     * node is a slave and the request is about an hash slot our master
     * is serving, we can reply without redirection. */
    if (c->flags & CLIENT_READONLY &&
        (cmd->flags & CMD_READONLY || cmd->proc == evalCommand ||
         cmd->proc == evalShaCommand) &&
        nodeIsSlave(myself) &&
        myself->slaveof == n)
    
        return myself;
    

    /* Base case: just return the right node. However if this node is not
     * myself, set error_code to MOVED since we need to issue a rediretion. */
    if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
    return n;


槽位迁移

什么是槽位迁移?rehash,就这样理解嘛。

什么时候发生?比方我某个节点挂了、比方我某个节点又复活可以分摊压力了;


/* MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password |
 *         AUTH2 username password]
 *
 * On in the multiple keys form:
 *
 * MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password |
 *         AUTH2 username password] KEYS key1 key2 ... keyN */
void migrateCommand(client *c) 
    migrateCachedSocket *cs;
    int copy = 0, replace = 0, j;
    char *username = NULL;
    char *password = NULL;
    long timeout;
    long dbid;
    robj **ov = NULL; /* Objects to migrate. */
    robj **kv = NULL; /* Key names. */
    robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
    rio cmd, payload;
    int may_retry = 1;
    int write_error = 0;
    int argv_rewritten = 0;

    /* To support the KEYS option we need the following additional state. */
    int first_key = 3; /* Argument index of the first key. */
    int num_keys = 1;  /* By default only migrate the 'key' argument. */

    /* Parse additional options */
    for (j = 6; j < c->argc; j++) 
        int moreargs = (c->argc-1) - j;
        if (!strcasecmp(c->argv[j]->ptr,"copy")) 
            copy = 1;
         else if (!strcasecmp(c->argv[j]->ptr,"replace")) 
            replace = 1;
         else if (!strcasecmp(c->argv[j]->ptr,"auth")) 
            if (!moreargs) 
                addReply(c,shared.syntaxerr);
                return;
            
            j++;
            password = c->argv[j]->ptr;
         else if (!strcasecmp(c->argv[j]->ptr,"auth2")) 
            if (moreargs < 2) 
                addReply(c,shared.syntaxerr);
                return;
            
            username = c->argv[++j]->ptr;
            password = c->argv[++j]->ptr;
         else if (!strcasecmp(c->argv[j]->ptr,"keys")) 
            if (sdslen(c->argv[3]->ptr) != 0) 
                addReplyError(c,
                    "When using MIGRATE KEYS option, the key argument"
                    " must be set to the empty string");
                return;
            
            first_key = j+1;
            num_keys = c->argc - j - 1;
            break; /* All the remaining args are keys. */
         else 
            addReply(c,shared.syntaxerr);
            return;
        
    

    /* Sanity check */
    if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
        getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
    
        return;
    
    if (timeout <= 0) timeout = 1000;

    /* Check if the keys are here. If at least one key is to migrate, do it
     * otherwise if all the keys are missing reply with "NOKEY" to signal
     * the caller there was nothing to migrate. We don't return an error in
     * this case, since often this is due to a normal condition like the key
     * expiring in the meantime. */
    ov = zrealloc(ov,sizeof(robj*)*num_keys);
    kv = zrealloc(kv,sizeof(robj*)*num_keys);
    int oi = 0;

    for (j = 0; j < num_keys; j++) 
        if ((ov[oi] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) 
            kv[oi] = c->argv[first_key+j];
            oi++;
        
    
    num_keys = oi;
    if (num_keys == 0) 
        zfree(ov); zfree(kv);
        addReplySds(c,sdsnew("+NOKEY\\r\\n"));
        return;
    

try_again:
    write_error = 0;

    /* Connect */
    cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
    if (cs == NULL) 
        zfree(ov); zfree(kv);
        return; /* error sent to the client by migrateGetSocket() */
    

    rioInitWithBuffer(&cmd,sdsempty());

    /* Authentication */
    if (password) 
        int arity = username ? 3 : 2;
        serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',arity));
        serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4));
        if (username) 
            serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,username,
                                 sdslen(username)));
        
        serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password,
            sdslen(password)));
    

    /* Send the SELECT command if the current DB is not already selected. */
    int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
    if (select) 
        serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
        serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
        serverAssertWithInfo(c,以上是关于redis源码分析redis cluster 集群实现的主要内容,如果未能解决你的问题,请参考以下文章

redis源码分析--cluster消息

Redis集群cluster实操

Jedis cluster集群初始化源码剖析

Redis-Cluster集群

Redis-Cluster集群

redis-快速搭建集群