Redis:list/lpush/lrange/lpop 命令源码解析
Posted 等你归去来
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis:list/lpush/lrange/lpop 命令源码解析相关的知识,希望对你有一定的参考价值。
上一篇讲了hash数据类型的相关实现方法,没有茅塞顿开也至少知道redis如何搞事情的了吧。
本篇咱们继续来看redis中的数据类型的实现: list 相关操作实现。
同样,我们以使用者的角度,开始理解list提供的功能,相应的数据结构承载,再到具体实现,以这样一个思路来理解redis之list。
零、redis list相关操作方法
从官方的手册中可以查到相关的使用方法。
1> BLPOP key1 [key2] timeout
功能: 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。(LPOP的阻塞版本)
返回值: 获取到元素的key和被弹出的元素值2> BRPOP key1 [key2 ] timeout
功能: 移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。(RPOP 的阻塞版本)
返回值: 获取到元素的key和被弹出的元素值3> BRPOPLPUSH source destination timeout
功能: 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。(RPOPLPUSH 的阻塞版本)
返回值: 被转移的元素值或者为nil4> LINDEX key index
功能: 通过索引获取列表中的元素
返回值: 查找到的元素值,超出范围时返回nil5> LINSERT key BEFORE|AFTER pivot value
功能: 在列表的元素前或者后插入元素
返回值: 插入后的list长度6> LLEN key
功能: 获取列表长度
返回值: 列表长度7> LPOP key
功能: 移出并获取列表的第一个元素
返回值: 第一个元素或者nil8> LPUSH key value1 [value2]
功能: 将一个或多个值插入到列表头部
返回值: 插入后的list长度9> LPUSHX key value
将一个值插入到已存在的列表头部,如果key不存在则不做任何操作
返回值: 插入后的list长度10> LRANGE key start stop
功能: 获取列表指定范围内的元素 (包含起止边界)
返回值: 值列表11> LREM key count value
功能: 移除列表元素, count>0:移除正向匹配的count个元素,count<0:移除逆向匹配的count个元素, count=0,只移除匹配的元素
返回值: 移除的元素个数12> LSET key index value
功能: 通过索引设置列表元素的值
返回值: OK or err13> LTRIM key start stop
功能: 对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区间之内的元素都将被删除。
返回值: OK14> RPOP key
功能: 移除列表的最后一个元素,返回值为移除的元素。
返回值: 最后一个元素值或者nil15> RPOPLPUSH source destination
功能: 移除列表的最后一个元素,并将该元素添加到另一个列表并返回
返回值: 被转移的元素16> RPUSH key value1 [value2]
功能: 在列表中添加一个或多个值
返回值: 插入后的list长度17> RPUSHX key value
功能: 为已存在的列表添加值
返回值: 插入后的list长度
redis中的实现方法定义如下:
{"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0}, {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0}, {"rpushx",rpushxCommand,3,"wmF",0,NULL,1,1,1,0,0}, {"lpushx",lpushxCommand,3,"wmF",0,NULL,1,1,1,0,0}, {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0}, {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0}, {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0}, {"brpop",brpopCommand,-3,"ws",0,NULL,1,1,1,0,0}, {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0}, {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0}, {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0}, {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0}, {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0}, {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0}, {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0}, {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0}, {"rpoplpush",rpoplpushCommand,3,"wm",0,NULL,1,2,1,0,0},
一、list相关数据结构
说到list或者说链表,我们能想到什么数据结构呢?单向链表、双向链表、循环链表... 好像都挺简单的,还有啥?? 我们来看下redis 的实现:
// quicklist 是其实数据容器,由head,tail 进行迭代,所以算是一个双向链表 /* quicklist is a 32 byte struct (on 64-bit systems) describing a quicklist. * \'count\' is the number of total entries. * \'len\' is the number of quicklist nodes. * \'compress\' is: -1 if compression disabled, otherwise it\'s the number * of quicklistNodes to leave uncompressed at ends of quicklist. * \'fill\' is the user-requested (or default) fill factor. */ typedef struct quicklist { // 头节点 quicklistNode *head; // 尾节点 quicklistNode *tail; // 现有元素个数 unsigned long count; /* total count of all entries in all ziplists */ // 现有的 quicklistNode 个数,一个 node 可能包含n个元素 unsigned int len; /* number of quicklistNodes */ // 填充因子 int fill : 16; /* fill factor for individual nodes */ // 多深的链表无需压缩 unsigned int compress : 16; /* depth of end nodes not to compress;0=off */ } quicklist; // 链表中的每个节点 typedef struct quicklistEntry { const quicklist *quicklist; quicklistNode *node; // 当前迭代元素的ziplist的偏移位置指针 unsigned char *zi; // 纯粹的 value, 值来源 zi unsigned char *value; // 占用空间大小 unsigned int sz; long long longval; // 当前节点偏移 int offset; } quicklistEntry; // 链表元素节点使用 quicklistNode /* quicklistNode is a 32 byte struct describing a ziplist for a quicklist. * We use bit fields keep the quicklistNode at 32 bytes. * count: 16 bits, max 65536 (max zl bytes is 65k, so max count actually < 32k). * encoding: 2 bits, RAW=1, LZF=2. * container: 2 bits, NONE=1, ZIPLIST=2. * recompress: 1 bit, bool, true if node is temporarry decompressed for usage. * attempted_compress: 1 bit, boolean, used for verifying during testing. * extra: 12 bits, free for future use; pads out the remainder of 32 bits */ typedef struct quicklistNode { struct quicklistNode *prev; struct quicklistNode *next; // zl 为ziplist链表,保存count个元素值 unsigned char *zl; unsigned int sz; /* ziplist size in bytes */ unsigned int count : 16; /* count of items in ziplist */ unsigned int encoding : 2; /* RAW==1 or LZF==2 */ unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */ unsigned int recompress : 1; /* was this node previous compressed? */ unsigned int attempted_compress : 1; /* node can\'t compress; too small */ unsigned int extra : 10; /* more bits to steal for future usage */ } quicklistNode; // list迭代器 typedef struct quicklistIter { const quicklist *quicklist; quicklistNode *current; unsigned char *zi; long offset; /* offset in current ziplist */ int direction; } quicklistIter; // ziplist 数据结构 typedef struct zlentry { unsigned int prevrawlensize, prevrawlen; unsigned int lensize, len; unsigned int headersize; unsigned char encoding; unsigned char *p; } zlentry;
二、rpush/lpush 新增元素操作实现
rpush是所尾部添加元素,lpush是从头部添加元素,本质上都是一样的,redis实际上也是完全复用一套代码。
// t_list.c, lpush void lpushCommand(client *c) { // 使用 LIST_HEAD|LIST_TAIL 作为插入位置标识 pushGenericCommand(c,LIST_HEAD); } void rpushCommand(client *c) { pushGenericCommand(c,LIST_TAIL); } // t_list.c, 实际的push操作 void pushGenericCommand(client *c, int where) { int j, waiting = 0, pushed = 0; // 在db中查找对应的key实例,查到或者查不到 robj *lobj = lookupKeyWrite(c->db,c->argv[1]); // 查到的情况下,需要验证数据类型 if (lobj && lobj->type != OBJ_LIST) { addReply(c,shared.wrongtypeerr); return; } for (j = 2; j < c->argc; j++) { c->argv[j] = tryObjectEncoding(c->argv[j]); if (!lobj) { // 1. 在没有key实例的情况下,先创建key实例到db中 lobj = createQuicklistObject(); // 2. 设置 fill和depth 参数 // fill 默认: -2 // depth 默认: 0 quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size, server.list_compress_depth); dbAdd(c->db,c->argv[1],lobj); } // 3. 一个个元素添加进去 listTypePush(lobj,c->argv[j],where); pushed++; } // 返回list长度 addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0)); if (pushed) { // 命令传播 char *event = (where == LIST_HEAD) ? "lpush" : "rpush"; signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id); } server.dirty += pushed; } // 1. 创建初始list // object.c, 创建初始list robj *createQuicklistObject(void) { quicklist *l = quicklistCreate(); robj *o = createObject(OBJ_LIST,l); o->encoding = OBJ_ENCODING_QUICKLIST; return o; } // quicklist.c, 创建一个新的list容器,初始化默认值 /* Create a new quicklist. * Free with quicklistRelease(). */ quicklist *quicklistCreate(void) { struct quicklist *quicklist; quicklist = zmalloc(sizeof(*quicklist)); quicklist->head = quicklist->tail = NULL; quicklist->len = 0; quicklist->count = 0; quicklist->compress = 0; quicklist->fill = -2; return quicklist; } // 2. 设置quicklist 的fill和depth 值 // quicklist.c void quicklistSetOptions(quicklist *quicklist, int fill, int depth) { quicklistSetFill(quicklist, fill); quicklistSetCompressDepth(quicklist, depth); } // quicklist.c, 设置 fill 参数 void quicklistSetFill(quicklist *quicklist, int fill) { if (fill > FILL_MAX) { fill = FILL_MAX; } else if (fill < -5) { fill = -5; } quicklist->fill = fill; } // quicklist.c, 设置 depth 参数 void quicklistSetCompressDepth(quicklist *quicklist, int compress) { if (compress > COMPRESS_MAX) { compress = COMPRESS_MAX; } else if (compress < 0) { compress = 0; } quicklist->compress = compress; } // 3. 将元素添加进list中 // t_list.c, /* The function pushes an element to the specified list object \'subject\', * at head or tail position as specified by \'where\'. * * There is no need for the caller to increment the refcount of \'value\' as * the function takes care of it if needed. */ void listTypePush(robj *subject, robj *value, int where) { if (subject->encoding == OBJ_ENCODING_QUICKLIST) { int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL; // 解码value value = getDecodedObject(value); size_t len = sdslen(value->ptr); // 将value添加到链表中 quicklistPush(subject->ptr, value->ptr, len, pos); // 减小value的引用,如果是被解编码后的对象,此时会将内存释放 decrRefCount(value); } else { serverPanic("Unknown list encoding"); } } // object.c /* Get a decoded version of an encoded object (returned as a new object). * If the object is already raw-encoded just increment the ref count. */ robj *getDecodedObject(robj *o) { robj *dec; // OBJ_ENCODING_RAW,OBJ_ENCODING_EMBSTR 编码直接返回,引用计数+1(原因是: 原始robj一个引用,转换后的robj一个引用) if (sdsEncodedObject(o)) { incrRefCount(o); return o; } if (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_INT) { char buf[32]; // 整型转换为字符型,返回string型的robj ll2string(buf,32,(long)o->ptr); dec = createStringObject(buf,strlen(buf)); return dec; } else { serverPanic("Unknown encoding type"); } } // quicklist.c, 添加value到链表中 /* Wrapper to allow argument-based switching between HEAD/TAIL pop */ void quicklistPush(quicklist *quicklist, void *value, const size_t sz, int where) { // 根据where决定添加到表头还表尾 if (where == QUICKLIST_HEAD) { quicklistPushHead(quicklist, value, sz); } else if (where == QUICKLIST_TAIL) { quicklistPushTail(quicklist, value, sz); } } // quicklist.c, 添加表头数据 /* Add new entry to head node of quicklist. * * Returns 0 if used existing head. * Returns 1 if new head created. */ int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) { quicklistNode *orig_head = quicklist->head; // likely 对不同平台处理 __builtin_expect(!!(x), 1), // 判断是否允许插入元素,实际上是判断 head 的ziplist空间是否已占满, 没有则直接往里面插入即可 // fill 默认: -2 // depth 默认: 0 if (likely( _quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) { // 3.1. 添加head节点的zl链表中, zl 为ziplist 链表节点 quicklist->head->zl = ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD); // 3.2. 更新头节点size大小 quicklistNodeUpdateSz(quicklist->head); } else { // 如果head已占满,则创建一个新的 quicklistNode 节点进行插入 quicklistNode *node = quicklistCreateNode(); node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD); quicklistNodeUpdateSz(node); // 3.3. 插入新节点到head之前 _quicklistInsertNodeBefore(quicklist, quicklist->head, node); } // 将链表计数+1, 避免获取总数时迭代计算 quicklist->count++; quicklist->head->count++; return (orig_head != quicklist->head); } // quicklist.c, 判断是否允许插入元素 REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node, const int fill, const size_t sz) { if (unlikely(!node)) return 0; int ziplist_overhead; /* size of previous offset */ if (sz < 254) ziplist_overhead = 1; else ziplist_overhead = 5; /* size of forward offset */ if (sz < 64) ziplist_overhead += 1; else if (likely(sz < 16384)) ziplist_overhead += 2; else ziplist_overhead += 5; /* new_sz overestimates if \'sz\' encodes to an integer type */ // 加上需要添加的新元素的长度后,进行阀值判定,如果在阀值内,则返回1,否则返回0 unsigned int new_sz = node->sz + sz + ziplist_overhead; // 使用fill参数判定 if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill))) return 1; else if (!sizeMeetsSafetyLimit(new_sz)) return 0; else if ((int)node->count < fill) return 1; else return 0; } // quicklist.c REDIS_STATIC int _quicklistNodeSizeMeetsOptimizationRequirement(const size_t sz, const int fill) { if (fill >= 0) return 0; size_t offset = (-fill) - 1; // /* Optimization levels for size-based filling */ // static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536}; // offset < 5, offset 默认将等于 1, sz <= 8292 if (offset < (sizeof(optimization_level) / sizeof(*optimization_level))) { if (sz <= optimization_level[offset]) { return 1; } else { return 0; } } else { return 0; } } // SIZE_SAFETY_LIMIT 8192 #define sizeMeetsSafetyLimit(sz) ((sz) <= SIZE_SAFETY_LIMIT) // 3.1. 向每个链表节点中添加value, 实际是向 ziplist push 数据 // ziplist.c, push *s 数据到 zl 中 unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) { unsigned char *p; p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl); // 具体添加元素方法,有点复杂。简单点说就是 判断容量、扩容、按照ziplist协议添加元素 return __ziplistInsert(zl,p,s,slen); } // ziplist.c, 在hash的数据介绍时已详细介绍 /* Insert item at "p". */ static unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) { size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen; unsigned int prevlensize, prevlen = 0; size_t offset; int nextdiff = 0; unsigned char encoding = 0; long long value = 123456789; /* initialized to avoid warning. Using a value that is easy to see if for some reason we use it uninitialized. */ zlentry tail; /* Find out prevlen for the entry that is inserted. */ if (p[0] != ZIP_END) { ZIP_DECODE_PREVLEN(p, prevlensize, prevlen); } else { unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl); if (ptail[0] != ZIP_END) { prevlen = zipRawEntryLength(ptail); } } /* See if the entry can be encoded */ if (zipTryEncoding(s,slen,&value,&encoding)) { /* \'encoding\' is set to the appropriate integer encoding */ reqlen = zipIntSize(encoding); } else { /* \'encoding\' is untouched, however zipEncodeLength will use the * string length to figure out how to encode it. */ reqlen = slen; } /* We need space for both the length of the previous entry and * the length of the payload. */ reqlen += zipPrevEncodeLength(NULL,prevlen); reqlen += zipEncodeLength(NULL,encoding,slen); /* When the insert position is not equal to the tail, we need to * make sure that the next entry can hold this entry\'s length in * its prevlen field. */ nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0; /* Store offset because a realloc may change the address of zl. */ offset = p-zl; zl = ziplistResize(zl,curlen+reqlen+nextdiff); p = zl+offset; /* Apply memory move when necessary and update tail offset. */ if (p[0] != ZIP_END) { /* Subtract one because of the ZIP_END bytes */ memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff); /* Encode this entry\'s raw length in the next entry. */ zipPrevEncodeLength(p+reqlen,reqlen); /* Update offset for tail */ ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen); /* When the tail contains more than one entry, we need to take * "nextdiff" in account as well. Otherwise, a change in the * size of prevlen doesn\'t have an effect on the *tail* offset. */ zipEntry(p+reqlen, &tail); if (p[reqlen+tail.headersize+tail.len] != ZIP_END) { ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff); } } else { /* This element will be the new tail. */ ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl); } /* When nextdiff != 0, the raw length of the next entry has changed, so * we need to cascade the update throughout the ziplist */ if (nextdiff != 0) { offset = p-zl; zl = __ziplistCascadeUpdate(zl,p+reqlen); p = zl+offset; } /* Write the entry */ p += zipPrevEncodeLength(p,prevlen); p += zipEncodeLength(p,encoding,slen); if (ZIP_IS_STR(encoding)) { memcpy(p,s,slen); } else { zipSaveInteger(p,value,encoding); } ZIPLIST_INCR_LENGTH(zl,1); return zl; } // 3.2. 更新node的size (实际占用内存空间大小) // quicklist.c, 更新node的size, 其实就是重新统计node的ziplist长度 #define quicklistNodeUpdateSz(nod以上是关于Redis:list/lpush/lrange/lpop 命令源码解析的主要内容,如果未能解决你的问题,请参考以下文章
Redis 基础 -- Redis简介CentOS 7 单机安装Redis启动Redis(后台启动Redis 指定配置文件启动Redis 开机自启Redis )Redis客户端(含图形化界面)