Redis事务相关源码探究

Posted 杨 戬

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis事务相关源码探究相关的知识,希望对你有一定的参考价值。

文章目录

Redis事务源码解读

源码地址:https://github.com/redis/redis/tree/7.0/src

从源码来简单分析下 Redis 中事务的实现过程

1、MULTI 声明事务

Redis 中使用 MULTI 命令来声明和开启一个事务

// https://github.com/redis/redis/blob/7.0/src/multi.c#L104
void multiCommand(client *c) 
	// 判断是否已经开启了事务
	// 不持之事务的嵌套
    if (c->flags & CLIENT_MULTI) 
        addReplyError(c,"MULTI calls can not be nested");
        return;
    
	// 设置事务标识
    c->flags |= CLIENT_MULTI;

    addReply(c,shared.ok);

1、首先会判断当前客户端是是否已经开启了事务,Redis 中的事务不支持嵌套;

2、给 flags 设置事务标识 CLIENT_MULTI

2、命令入队

开始事务之后,后面所有的命令都会被添加到事务队列中

// https://github.com/redis/redis/blob/7.0/src/multi.c#L59
/* Add a new command into the MULTI commands queue */
void queueMultiCommand(client *c) 
    multiCmd *mc;

    // 这里有两种情况的判断  
    // 1、如果命令在入队是有问题就不入队了,CLIENT_DIRTY_EXEC 表示入队的时候,命令有语法的错误
    // 2、如果 watch 的键值有更改也不用入队了, CLIENT_DIRTY_CAS 表示该客户端监听的键值有变动
    if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))
        return;
        
    // 在原commands后面配置空间以存放新命令
    c->mstate.commands = zrealloc(c->mstate.commands,
            sizeof(multiCmd)*(c->mstate.count+1));
    // 微信新配置的空间设置执行的命令和参数
    mc = c->mstate.commands+c->mstate.count;
    mc->cmd = c->cmd;
    mc->argc = c->argc;
    mc->argv = c->argv;
    mc->argv_len = c->argv_len;
    ...

入队的时候会做个判断:

1、如果命令在入队时有语法错误不入队了,CLIENT_DIRTY_EXEC 表示入队的时候,命令有语法的错误;

2、如果 watch 的键值有更改也不用入队了, CLIENT_DIRTY_CAS 表示该客户端监听的键值有变动;

3、client watch 的 key 有更新,当前客户端的 flags 就会被标记成 CLIENT_DIRTY_CASCLIENT_DIRTY_CAS 是在何时被标记,可继续看下文。

3、EXEC 执行事务

命令入队之后,再来看下事务的提交

// https://github.com/redis/redis/blob/7.0/src/multi.c#L140
void execCommand(client *c) 
    ...
    // 判断下是否开启了事务
    if (!(c->flags & CLIENT_MULTI)) 
        addReplyError(c,"EXEC without MULTI");
        return;
    

    // 事务中不能 watch 有过期时间的键值
    if (isWatchedKeyExpired(c)) 
        c->flags |= (CLIENT_DIRTY_CAS);
    

     // 检查是否需要中退出事务,有下面两种情况  
     // 1、 watch 的 key 有变化了
     // 2、命令入队的时候,有语法错误  
    if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC)) 
        if (c->flags & CLIENT_DIRTY_EXEC) 
            addReplyErrorObject(c, shared.execaborterr);
         else 
            addReply(c, shared.nullarray[c->resp]);
        
        // 取消事务
        discardTransaction(c);
        return;
    

    uint64_t old_flags = c->flags;

    /* we do not want to allow blocking commands inside multi */
    // 事务中不允许出现阻塞命令
    c->flags |= CLIENT_DENY_BLOCKING;

    /* Exec all the queued commands */
    unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */

    server.in_exec = 1;

    orig_argv = c->argv;
    orig_argv_len = c->argv_len;
    orig_argc = c->argc;
    orig_cmd = c->cmd;
    addReplyArrayLen(c,c->mstate.count);
    // 循环处理执行事务队列中的命令
    for (j = 0; j < c->mstate.count; j++) 
        c->argc = c->mstate.commands[j].argc;
        c->argv = c->mstate.commands[j].argv;
        c->argv_len = c->mstate.commands[j].argv_len;
        c->cmd = c->realcmd = c->mstate.commands[j].cmd;

        
        // 权限检查
        int acl_errpos;
        int acl_retval = ACLCheckAllPerm(c,&acl_errpos);
        if (acl_retval != ACL_OK) 
          ...
         else 
            // 执行命令
            if (c->id == CLIENT_ID_AOF)
                call(c,CMD_CALL_NONE);
            else
                call(c,CMD_CALL_FULL);

            serverAssert((c->flags & CLIENT_BLOCKED) == 0);
        

        // 命令执行后可能会被修改,需要更新操作
        c->mstate.commands[j].argc = c->argc;
        c->mstate.commands[j].argv = c->argv;
        c->mstate.commands[j].cmd = c->cmd;
    

    // restore old DENY_BLOCKING value
    if (!(old_flags & CLIENT_DENY_BLOCKING))
        c->flags &= ~CLIENT_DENY_BLOCKING;
        
    // 恢复原命令
    c->argv = orig_argv;
    c->argv_len = orig_argv_len;
    c->argc = orig_argc;
    c->cmd = c->realcmd = orig_cmd;
    // 清除事务
    discardTransaction(c);

    server.in_exec = 0;

事务提交的时候,命令的执行逻辑还是比较简单的

1、首先会进行一些检查;

  • 检查事务有没有嵌套;
  • watch 监听的键值是否有变动;
  • 事务中命令入队列的时候,是否有语法错误;

2、循环执行,事务队列中的命令。

通过源码可以看到语法错误的时候事务才会结束执行,如果命令操作的类型不对,事务是不会停止的,还是会把正确的命令执行。

4、WATCH 监听变量

WATCH 命令用于在事务开始之前监视任意数量的键: 当调用 EXEC 命令执行事务时, 如果任意一个被监视的键已经被其他客户端修改了, 那么整个事务不再执行, 直接返回失败。

看下 watch 的键值对是如何和客户端进行映射的

// https://github.com/redis/redis/blob/7.0/src/server.h#L918
typedef struct redisDb 
    ...
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    ...
 redisDb;

// https://github.com/redis/redis/blob/7.0/src/server.h#L1083
typedef struct client 
    ...
    list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
    ...
 client;

// https://github.com/redis/redis/blob/7.0/src/multi.c#L262
// 服务端中每一个db 中都有一个 hash table 来记录客户端和 watching key 的映射,当这些 key 修改,可以标识监听这些 key 的客户端。   
//
// 每个客户端中也有一个被监听的键值对的列表,当客户端被释放或者 un-watch 被调用,可以取消监听这些 key .
typedef struct watchedKey 
    // 键值
    robj *key;
    // 键值所在的db
    redisDb *db;
    // 客户端
    client *client;
    // 正在监听过期key 的标识
    unsigned expired:1; /* Flag that we're watching an already expired key. */
 watchedKey;

变量映射关系吐下所示

分析完数据结构,看下 watch 的代码实现

// https://github.com/redis/redis/blob/7.0/src/multi.c#L441
void watchCommand(client *c) 
    int j;

    if (c->flags & CLIENT_MULTI) 
        addReplyError(c,"WATCH inside MULTI is not allowed");
        return;
    
    /* No point in watching if the client is already dirty. */
    if (c->flags & CLIENT_DIRTY_CAS) 
        addReply(c,shared.ok);
        return;
    
    for (j = 1; j < c->argc; j++)
        watchForKey(c,c->argv[j]);
    addReply(c,shared.ok);


// https://github.com/redis/redis/blob/7.0/src/multi.c#L270
/* Watch for the specified key */
void watchForKey(client *c, robj *key) 
    list *clients = NULL;
    listIter li;
    listNode *ln;
    watchedKey *wk;

    // 检查是否正在 watch 传入的 key 
    listRewind(c->watched_keys,&li);
    while((ln = listNext(&li))) 
        wk = listNodeValue(ln);
        if (wk->db == c->db && equalStringObjects(key,wk->key))
            return; /* Key already watched */
    
    // 没有监听,添加监听的 key 到 db 中的 watched_keys 中
    clients = dictFetchValue(c->db->**watched_keys**,key);
    if (!clients) 
        clients = listCreate();
        dictAdd(c->db->watched_keys,key,clients);
        incrRefCount(key);
    
    // 添加 key 到 client 中的  watched_keys 中
    wk = zmalloc(sizeof(*wk));
    wk->key = key;
    wk->client = c;
    wk->db = c->db;
    wk->expired = keyIsExpired(c->db, key);
    incrRefCount(key);
    listAddNodeTail(c->watched_keys,wk);
    listAddNodeTail(clients,wk);

1、服务端中每一个db 中都有一个 hash table 来记录客户端和 watching key 的映射,当这些 key 修改,可以标识监听这些 key 的客户端;

2、每个客户端中也有一个被监听的键值对的列表,当客户端被释放或者 un-watch 被调用,可以取消监听这些 key ;

3、当用 watch 命令的时候,过期键会被分别添加到 redisDb 中的 watched_keys 中,和 client 中的 watched_keys 中。

上面事务的执行的时候,客户端有一个 flags, CLIENT_DIRTY_CAS 标识当前客户端 watch 的键值对有更新,那么 CLIENT_DIRTY_CAS 是在何时被标记的呢?

// https://github.com/redis/redis/blob/7.0/src/db.c#L535
/*-----------------------------------------------------------------------------
 * Hooks for key space changes.
 *
 * Every time a key in the database is modified the function
 * signalModifiedKey() is called.
 *
 * Every time a DB is flushed the function signalFlushDb() is called.
 *----------------------------------------------------------------------------*/

// 每次修改数据库中的一个键时,都会调用函数signalModifiedKey()。
// 每次DB被刷新时,函数signalFlushDb()被调用。
/* Note that the 'c' argument may be NULL if the key was modified out of
 * a context of a client. */
// 当 键值对有变动的时候,会调用 touchWatchedKey 标识对应的客户端状态为 CLIENT_DIRTY_CAS
void signalModifiedKey(client *c, redisDb *db, robj *key) 
    touchWatchedKey(db,key);
    trackingInvalidateKey(c,key,1);


// https://github.com/redis/redis/blob/7.0/src/multi.c#L348
/* "Touch" a key, so that if this key is being WATCHed by some client the
 * next EXEC will fail. */
// 修改 key 对应的客户端状态为 CLIENT_DIRTY_CAS,当前客户端 watch 的 key 已经发生了更新
void touchWatchedKey(redisDb *db, robj *key) 
    list *clients;
    listIter li;
    listNode *ln;

    // 如果 redisDb 中的 watched_keys 为空,直接返回
    if (dictSize(db->watched_keys) == 0) return;
    // 通过传入的 key 在 redisDb 的 watched_keys 中找到监听该 key 的客户端信息
    clients = dictFetchValue(db->watched_keys, key);
    if (!clients) return;

    /* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
    /* Check if we are already watching for this key */
    // 将监听该 key 的所有客户端信息标识成 CLIENT_DIRTY_CAS 状态  
    listRewind(clients,&li);
    while((ln = listNext(&li))) 
        watchedKey *wk = listNodeValue(ln);
        client *c = wk->client;

        if (wk->expired) 
            /* The key was already expired when WATCH was called. */
            if (db == wk->db &&
                equalStringObjects(key, wk->key) &&
                dictFind(db->dict, key->ptr) == NULL)
            
                /* Already expired key is deleted, so logically no change. Clear
                 * the flag. Deleted keys are not flagged as expired. */
                wk->expired = 0;
                goto skip_client;
            
            break;
        

        c->flags |= CLIENT_DIRTY_CAS;
        /* As the client is marked as dirty, there is no point in getting here
         * again in case that key (or others) are modified again (or keep the
         * memory overhead till EXEC). */
         // 这个客户端应该被表示成 dirty,这个客户端就不需要在判断监听了,取消这个客户端监听的 key
        unwatchAllKeys(c);

    skip_client:
        continue;
    

Redis 中 redisClient 的 flags 设置被设置成 REDIS_DIRTY_CAS 位,有下面两种情况:

1、每次修改数据库中的一个键值时;

2、每次DB被 flush 时,整个 Redis 的键值被清空;

上面的这两种情况发生,redis 就会修改 watch 对应的 key 的客户端 flags 为 CLIENT_DIRTY_CAS 表示该客户端 watch 有更新,事务处理就能通过这个状态来进行判断。

几乎所有对 key 进行操作的函数都会调用 signalModifiedKey 函数,比如 setKey、delCommand、hsetCommand 等。也就所有修改 key 的值的函数,都会去调用 signalModifiedKey 来检查是否修改了被 watch 的 key,只要是修改了被 watch 的 key,就会对 redisClient 的 flags 设置 REDIS_DIRTY_CAS 位。

以上是关于Redis事务相关源码探究的主要内容,如果未能解决你的问题,请参考以下文章

Redis事务

redis事务

Redis学习之旅--事务

Redis的事务

redis-事务

Redis的事务