Redis事务相关源码探究
Posted 杨 戬
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis事务相关源码探究相关的知识,希望对你有一定的参考价值。
文章目录
Redis事务源码解读
源码地址:https://github.com/redis/redis/tree/7.0/src
从源码来简单分析下 Redis 中事务的实现过程
1、MULTI 声明事务
Redis 中使用 MULTI 命令来声明和开启一个事务
// https://github.com/redis/redis/blob/7.0/src/multi.c#L104
void multiCommand(client *c)
// 判断是否已经开启了事务
// 不持之事务的嵌套
if (c->flags & CLIENT_MULTI)
addReplyError(c,"MULTI calls can not be nested");
return;
// 设置事务标识
c->flags |= CLIENT_MULTI;
addReply(c,shared.ok);
1、首先会判断当前客户端是是否已经开启了事务,Redis 中的事务不支持嵌套;
2、给 flags 设置事务标识 CLIENT_MULTI
。
2、命令入队
开始事务之后,后面所有的命令都会被添加到事务队列中
// https://github.com/redis/redis/blob/7.0/src/multi.c#L59
/* Add a new command into the MULTI commands queue */
void queueMultiCommand(client *c)
multiCmd *mc;
// 这里有两种情况的判断
// 1、如果命令在入队是有问题就不入队了,CLIENT_DIRTY_EXEC 表示入队的时候,命令有语法的错误
// 2、如果 watch 的键值有更改也不用入队了, CLIENT_DIRTY_CAS 表示该客户端监听的键值有变动
if (c->flags & (CLIENT_DIRTY_CAS|CLIENT_DIRTY_EXEC))
return;
// 在原commands后面配置空间以存放新命令
c->mstate.commands = zrealloc(c->mstate.commands,
sizeof(multiCmd)*(c->mstate.count+1));
// 微信新配置的空间设置执行的命令和参数
mc = c->mstate.commands+c->mstate.count;
mc->cmd = c->cmd;
mc->argc = c->argc;
mc->argv = c->argv;
mc->argv_len = c->argv_len;
...
入队的时候会做个判断:
1、如果命令在入队时有语法错误不入队了,CLIENT_DIRTY_EXEC
表示入队的时候,命令有语法的错误;
2、如果 watch 的键值有更改也不用入队了, CLIENT_DIRTY_CAS
表示该客户端监听的键值有变动;
3、client watch 的 key 有更新,当前客户端的 flags 就会被标记成 CLIENT_DIRTY_CAS
,CLIENT_DIRTY_CAS
是在何时被标记,可继续看下文。
3、EXEC 执行事务
命令入队之后,再来看下事务的提交
// https://github.com/redis/redis/blob/7.0/src/multi.c#L140
void execCommand(client *c)
...
// 判断下是否开启了事务
if (!(c->flags & CLIENT_MULTI))
addReplyError(c,"EXEC without MULTI");
return;
// 事务中不能 watch 有过期时间的键值
if (isWatchedKeyExpired(c))
c->flags |= (CLIENT_DIRTY_CAS);
// 检查是否需要中退出事务,有下面两种情况
// 1、 watch 的 key 有变化了
// 2、命令入队的时候,有语法错误
if (c->flags & (CLIENT_DIRTY_CAS | CLIENT_DIRTY_EXEC))
if (c->flags & CLIENT_DIRTY_EXEC)
addReplyErrorObject(c, shared.execaborterr);
else
addReply(c, shared.nullarray[c->resp]);
// 取消事务
discardTransaction(c);
return;
uint64_t old_flags = c->flags;
/* we do not want to allow blocking commands inside multi */
// 事务中不允许出现阻塞命令
c->flags |= CLIENT_DENY_BLOCKING;
/* Exec all the queued commands */
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
server.in_exec = 1;
orig_argv = c->argv;
orig_argv_len = c->argv_len;
orig_argc = c->argc;
orig_cmd = c->cmd;
addReplyArrayLen(c,c->mstate.count);
// 循环处理执行事务队列中的命令
for (j = 0; j < c->mstate.count; j++)
c->argc = c->mstate.commands[j].argc;
c->argv = c->mstate.commands[j].argv;
c->argv_len = c->mstate.commands[j].argv_len;
c->cmd = c->realcmd = c->mstate.commands[j].cmd;
// 权限检查
int acl_errpos;
int acl_retval = ACLCheckAllPerm(c,&acl_errpos);
if (acl_retval != ACL_OK)
...
else
// 执行命令
if (c->id == CLIENT_ID_AOF)
call(c,CMD_CALL_NONE);
else
call(c,CMD_CALL_FULL);
serverAssert((c->flags & CLIENT_BLOCKED) == 0);
// 命令执行后可能会被修改,需要更新操作
c->mstate.commands[j].argc = c->argc;
c->mstate.commands[j].argv = c->argv;
c->mstate.commands[j].cmd = c->cmd;
// restore old DENY_BLOCKING value
if (!(old_flags & CLIENT_DENY_BLOCKING))
c->flags &= ~CLIENT_DENY_BLOCKING;
// 恢复原命令
c->argv = orig_argv;
c->argv_len = orig_argv_len;
c->argc = orig_argc;
c->cmd = c->realcmd = orig_cmd;
// 清除事务
discardTransaction(c);
server.in_exec = 0;
事务提交的时候,命令的执行逻辑还是比较简单的
1、首先会进行一些检查;
- 检查事务有没有嵌套;
- watch 监听的键值是否有变动;
- 事务中命令入队列的时候,是否有语法错误;
2、循环执行,事务队列中的命令。
通过源码可以看到语法错误的时候事务才会结束执行,如果命令操作的类型不对,事务是不会停止的,还是会把正确的命令执行。
4、WATCH 监听变量
WATCH 命令用于在事务开始之前监视任意数量的键: 当调用 EXEC 命令执行事务时, 如果任意一个被监视的键已经被其他客户端修改了, 那么整个事务不再执行, 直接返回失败。
看下 watch 的键值对是如何和客户端进行映射的
// https://github.com/redis/redis/blob/7.0/src/server.h#L918
typedef struct redisDb
...
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
...
redisDb;
// https://github.com/redis/redis/blob/7.0/src/server.h#L1083
typedef struct client
...
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
...
client;
// https://github.com/redis/redis/blob/7.0/src/multi.c#L262
// 服务端中每一个db 中都有一个 hash table 来记录客户端和 watching key 的映射,当这些 key 修改,可以标识监听这些 key 的客户端。
//
// 每个客户端中也有一个被监听的键值对的列表,当客户端被释放或者 un-watch 被调用,可以取消监听这些 key .
typedef struct watchedKey
// 键值
robj *key;
// 键值所在的db
redisDb *db;
// 客户端
client *client;
// 正在监听过期key 的标识
unsigned expired:1; /* Flag that we're watching an already expired key. */
watchedKey;
变量映射关系吐下所示
分析完数据结构,看下 watch 的代码实现
// https://github.com/redis/redis/blob/7.0/src/multi.c#L441
void watchCommand(client *c)
int j;
if (c->flags & CLIENT_MULTI)
addReplyError(c,"WATCH inside MULTI is not allowed");
return;
/* No point in watching if the client is already dirty. */
if (c->flags & CLIENT_DIRTY_CAS)
addReply(c,shared.ok);
return;
for (j = 1; j < c->argc; j++)
watchForKey(c,c->argv[j]);
addReply(c,shared.ok);
// https://github.com/redis/redis/blob/7.0/src/multi.c#L270
/* Watch for the specified key */
void watchForKey(client *c, robj *key)
list *clients = NULL;
listIter li;
listNode *ln;
watchedKey *wk;
// 检查是否正在 watch 传入的 key
listRewind(c->watched_keys,&li);
while((ln = listNext(&li)))
wk = listNodeValue(ln);
if (wk->db == c->db && equalStringObjects(key,wk->key))
return; /* Key already watched */
// 没有监听,添加监听的 key 到 db 中的 watched_keys 中
clients = dictFetchValue(c->db->**watched_keys**,key);
if (!clients)
clients = listCreate();
dictAdd(c->db->watched_keys,key,clients);
incrRefCount(key);
// 添加 key 到 client 中的 watched_keys 中
wk = zmalloc(sizeof(*wk));
wk->key = key;
wk->client = c;
wk->db = c->db;
wk->expired = keyIsExpired(c->db, key);
incrRefCount(key);
listAddNodeTail(c->watched_keys,wk);
listAddNodeTail(clients,wk);
1、服务端中每一个db 中都有一个 hash table 来记录客户端和 watching key 的映射,当这些 key 修改,可以标识监听这些 key 的客户端;
2、每个客户端中也有一个被监听的键值对的列表,当客户端被释放或者 un-watch 被调用,可以取消监听这些 key ;
3、当用 watch 命令的时候,过期键会被分别添加到 redisDb 中的 watched_keys 中,和 client 中的 watched_keys 中。
上面事务的执行的时候,客户端有一个 flags, CLIENT_DIRTY_CAS
标识当前客户端 watch 的键值对有更新,那么 CLIENT_DIRTY_CAS
是在何时被标记的呢?
// https://github.com/redis/redis/blob/7.0/src/db.c#L535
/*-----------------------------------------------------------------------------
* Hooks for key space changes.
*
* Every time a key in the database is modified the function
* signalModifiedKey() is called.
*
* Every time a DB is flushed the function signalFlushDb() is called.
*----------------------------------------------------------------------------*/
// 每次修改数据库中的一个键时,都会调用函数signalModifiedKey()。
// 每次DB被刷新时,函数signalFlushDb()被调用。
/* Note that the 'c' argument may be NULL if the key was modified out of
* a context of a client. */
// 当 键值对有变动的时候,会调用 touchWatchedKey 标识对应的客户端状态为 CLIENT_DIRTY_CAS
void signalModifiedKey(client *c, redisDb *db, robj *key)
touchWatchedKey(db,key);
trackingInvalidateKey(c,key,1);
// https://github.com/redis/redis/blob/7.0/src/multi.c#L348
/* "Touch" a key, so that if this key is being WATCHed by some client the
* next EXEC will fail. */
// 修改 key 对应的客户端状态为 CLIENT_DIRTY_CAS,当前客户端 watch 的 key 已经发生了更新
void touchWatchedKey(redisDb *db, robj *key)
list *clients;
listIter li;
listNode *ln;
// 如果 redisDb 中的 watched_keys 为空,直接返回
if (dictSize(db->watched_keys) == 0) return;
// 通过传入的 key 在 redisDb 的 watched_keys 中找到监听该 key 的客户端信息
clients = dictFetchValue(db->watched_keys, key);
if (!clients) return;
/* Mark all the clients watching this key as CLIENT_DIRTY_CAS */
/* Check if we are already watching for this key */
// 将监听该 key 的所有客户端信息标识成 CLIENT_DIRTY_CAS 状态
listRewind(clients,&li);
while((ln = listNext(&li)))
watchedKey *wk = listNodeValue(ln);
client *c = wk->client;
if (wk->expired)
/* The key was already expired when WATCH was called. */
if (db == wk->db &&
equalStringObjects(key, wk->key) &&
dictFind(db->dict, key->ptr) == NULL)
/* Already expired key is deleted, so logically no change. Clear
* the flag. Deleted keys are not flagged as expired. */
wk->expired = 0;
goto skip_client;
break;
c->flags |= CLIENT_DIRTY_CAS;
/* As the client is marked as dirty, there is no point in getting here
* again in case that key (or others) are modified again (or keep the
* memory overhead till EXEC). */
// 这个客户端应该被表示成 dirty,这个客户端就不需要在判断监听了,取消这个客户端监听的 key
unwatchAllKeys(c);
skip_client:
continue;
Redis 中 redisClient 的 flags 设置被设置成 REDIS_DIRTY_CAS
位,有下面两种情况:
1、每次修改数据库中的一个键值时;
2、每次DB被 flush 时,整个 Redis 的键值被清空;
上面的这两种情况发生,redis 就会修改 watch 对应的 key 的客户端 flags 为 CLIENT_DIRTY_CAS 表示该客户端 watch 有更新,事务处理就能通过这个状态来进行判断。
几乎所有对 key 进行操作的函数都会调用 signalModifiedKey 函数,比如 setKey、delCommand、hsetCommand
等。也就所有修改 key 的值的函数,都会去调用 signalModifiedKey 来检查是否修改了被 watch 的 key,只要是修改了被 watch 的 key,就会对 redisClient 的 flags 设置 REDIS_DIRTY_CAS 位。
以上是关于Redis事务相关源码探究的主要内容,如果未能解决你的问题,请参考以下文章