Redis:list/lpush/lrange/lpop 命令源码解析

Posted 等你归去来

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis:list/lpush/lrange/lpop 命令源码解析相关的知识,希望对你有一定的参考价值。

  上一篇讲了hash数据类型的相关实现方法,没有茅塞顿开也至少知道redis如何搞事情的了吧。

  本篇咱们继续来看redis中的数据类型的实现: list 相关操作实现。

  

  同样,我们以使用者的角度,开始理解list提供的功能,相应的数据结构承载,再到具体实现,以这样一个思路来理解redis之list。

 

零、redis list相关操作方法


  从官方的手册中可以查到相关的使用方法。

1> BLPOP key1 [key2] timeout
功能: 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。(LPOP的阻塞版本)
返回值: 获取到元素的key和被弹出的元素值

2> BRPOP key1 [key2 ] timeout
功能: 移出并获取列表的最后一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。(RPOP 的阻塞版本)
返回值: 获取到元素的key和被弹出的元素值

3> BRPOPLPUSH source destination timeout
功能: 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。(RPOPLPUSH 的阻塞版本)
返回值: 被转移的元素值或者为nil

4> LINDEX key index
功能: 通过索引获取列表中的元素
返回值: 查找到的元素值,超出范围时返回nil

5> LINSERT key BEFORE|AFTER pivot value
功能: 在列表的元素前或者后插入元素
返回值: 插入后的list长度

6> LLEN key
功能: 获取列表长度
返回值: 列表长度

7> LPOP key
功能: 移出并获取列表的第一个元素
返回值: 第一个元素或者nil

8> LPUSH key value1 [value2]
功能: 将一个或多个值插入到列表头部
返回值: 插入后的list长度

9> LPUSHX key value
将一个值插入到已存在的列表头部,如果key不存在则不做任何操作
返回值: 插入后的list长度

10> LRANGE key start stop
功能: 获取列表指定范围内的元素 (包含起止边界)
返回值: 值列表

11> LREM key count value
功能: 移除列表元素, count>0:移除正向匹配的count个元素,count<0:移除逆向匹配的count个元素, count=0,只移除匹配的元素
返回值: 移除的元素个数

12> LSET key index value
功能: 通过索引设置列表元素的值
返回值: OK or err

13> LTRIM key start stop
功能: 对一个列表进行修剪(trim),就是说,让列表只保留指定区间内的元素,不在指定区间之内的元素都将被删除。
返回值: OK

14> RPOP key
功能: 移除列表的最后一个元素,返回值为移除的元素。
返回值: 最后一个元素值或者nil

15> RPOPLPUSH source destination
功能: 移除列表的最后一个元素,并将该元素添加到另一个列表并返回
返回值: 被转移的元素

16> RPUSH key value1 [value2]
功能: 在列表中添加一个或多个值
返回值: 插入后的list长度

17> RPUSHX key value
功能: 为已存在的列表添加值
返回值: 插入后的list长度

 

  redis中的实现方法定义如下:

    {"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    {"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    {"rpushx",rpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
    {"lpushx",lpushxCommand,3,"wmF",0,NULL,1,1,1,0,0},
    {"linsert",linsertCommand,5,"wm",0,NULL,1,1,1,0,0},
    {"rpop",rpopCommand,2,"wF",0,NULL,1,1,1,0,0},
    {"lpop",lpopCommand,2,"wF",0,NULL,1,1,1,0,0},
    {"brpop",brpopCommand,-3,"ws",0,NULL,1,1,1,0,0},
    {"brpoplpush",brpoplpushCommand,4,"wms",0,NULL,1,2,1,0,0},
    {"blpop",blpopCommand,-3,"ws",0,NULL,1,-2,1,0,0},
    {"llen",llenCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"lindex",lindexCommand,3,"r",0,NULL,1,1,1,0,0},
    {"lset",lsetCommand,4,"wm",0,NULL,1,1,1,0,0},
    {"lrange",lrangeCommand,4,"r",0,NULL,1,1,1,0,0},
    {"ltrim",ltrimCommand,4,"w",0,NULL,1,1,1,0,0},
    {"lrem",lremCommand,4,"w",0,NULL,1,1,1,0,0},
    {"rpoplpush",rpoplpushCommand,3,"wm",0,NULL,1,2,1,0,0},

 

一、list相关数据结构


  说到list或者说链表,我们能想到什么数据结构呢?单向链表、双向链表、循环链表... 好像都挺简单的,还有啥?? 我们来看下redis 的实现:

// quicklist 是其实数据容器,由head,tail 进行迭代,所以算是一个双向链表
/* quicklist is a 32 byte struct (on 64-bit systems) describing a quicklist.
 * \'count\' is the number of total entries.
 * \'len\' is the number of quicklist nodes.
 * \'compress\' is: -1 if compression disabled, otherwise it\'s the number
 *                of quicklistNodes to leave uncompressed at ends of quicklist.
 * \'fill\' is the user-requested (or default) fill factor. */
typedef struct quicklist {
    // 头节点
    quicklistNode *head;
    // 尾节点
    quicklistNode *tail;
    // 现有元素个数
    unsigned long count;        /* total count of all entries in all ziplists */
    // 现有的 quicklistNode 个数,一个 node 可能包含n个元素
    unsigned int len;           /* number of quicklistNodes */
    // 填充因子
    int fill : 16;              /* fill factor for individual nodes */
    // 多深的链表无需压缩
    unsigned int compress : 16; /* depth of end nodes not to compress;0=off */
} quicklist;
// 链表中的每个节点
typedef struct quicklistEntry {
    const quicklist *quicklist;
    quicklistNode *node;
    // 当前迭代元素的ziplist的偏移位置指针
    unsigned char *zi;
    // 纯粹的 value, 值来源 zi
    unsigned char *value;
    // 占用空间大小
    unsigned int sz;
    long long longval;
    // 当前节点偏移
    int offset;
} quicklistEntry;
// 链表元素节点使用 quicklistNode 
/* quicklistNode is a 32 byte struct describing a ziplist for a quicklist.
 * We use bit fields keep the quicklistNode at 32 bytes.
 * count: 16 bits, max 65536 (max zl bytes is 65k, so max count actually < 32k).
 * encoding: 2 bits, RAW=1, LZF=2.
 * container: 2 bits, NONE=1, ZIPLIST=2.
 * recompress: 1 bit, bool, true if node is temporarry decompressed for usage.
 * attempted_compress: 1 bit, boolean, used for verifying during testing.
 * extra: 12 bits, free for future use; pads out the remainder of 32 bits */
typedef struct quicklistNode {
    struct quicklistNode *prev;
    struct quicklistNode *next;
    // zl 为ziplist链表,保存count个元素值
    unsigned char *zl;
    unsigned int sz;             /* ziplist size in bytes */
    unsigned int count : 16;     /* count of items in ziplist */
    unsigned int encoding : 2;   /* RAW==1 or LZF==2 */
    unsigned int container : 2;  /* NONE==1 or ZIPLIST==2 */
    unsigned int recompress : 1; /* was this node previous compressed? */
    unsigned int attempted_compress : 1; /* node can\'t compress; too small */
    unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;
// list迭代器
typedef struct quicklistIter {
    const quicklist *quicklist;
    quicklistNode *current;
    unsigned char *zi;
    long offset; /* offset in current ziplist */
    int direction;
} quicklistIter;
// ziplist 数据结构 
typedef struct zlentry {
    unsigned int prevrawlensize, prevrawlen;
    unsigned int lensize, len;
    unsigned int headersize;
    unsigned char encoding;
    unsigned char *p;
} zlentry;

 

二、rpush/lpush 新增元素操作实现


  rpush是所尾部添加元素,lpush是从头部添加元素,本质上都是一样的,redis实际上也是完全复用一套代码。

// t_list.c, lpush
void lpushCommand(client *c) {
    // 使用 LIST_HEAD|LIST_TAIL 作为插入位置标识
    pushGenericCommand(c,LIST_HEAD);
}
void rpushCommand(client *c) {
    pushGenericCommand(c,LIST_TAIL);
}
// t_list.c, 实际的push操作
void pushGenericCommand(client *c, int where) {
    int j, waiting = 0, pushed = 0;
    // 在db中查找对应的key实例,查到或者查不到
    robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
    // 查到的情况下,需要验证数据类型
    if (lobj && lobj->type != OBJ_LIST) {
        addReply(c,shared.wrongtypeerr);
        return;
    }

    for (j = 2; j < c->argc; j++) {
        c->argv[j] = tryObjectEncoding(c->argv[j]);
        if (!lobj) {
            // 1. 在没有key实例的情况下,先创建key实例到db中
            lobj = createQuicklistObject();
            // 2. 设置 fill和depth 参数
            // fill 默认: -2
            // depth 默认: 0
            quicklistSetOptions(lobj->ptr, server.list_max_ziplist_size,
                                server.list_compress_depth);
            dbAdd(c->db,c->argv[1],lobj);
        }
        // 3. 一个个元素添加进去
        listTypePush(lobj,c->argv[j],where);
        pushed++;
    }
    // 返回list长度
    addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
    if (pushed) {
        // 命令传播
        char *event = (where == LIST_HEAD) ? "lpush" : "rpush";

        signalModifiedKey(c->db,c->argv[1]);
        notifyKeyspaceEvent(NOTIFY_LIST,event,c->argv[1],c->db->id);
    }
    server.dirty += pushed;
}
// 1. 创建初始list
// object.c, 创建初始list
robj *createQuicklistObject(void) {
    quicklist *l = quicklistCreate();
    robj *o = createObject(OBJ_LIST,l);
    o->encoding = OBJ_ENCODING_QUICKLIST;
    return o;
}
// quicklist.c, 创建一个新的list容器,初始化默认值
/* Create a new quicklist.
 * Free with quicklistRelease(). */
quicklist *quicklistCreate(void) {
    struct quicklist *quicklist;

    quicklist = zmalloc(sizeof(*quicklist));
    quicklist->head = quicklist->tail = NULL;
    quicklist->len = 0;
    quicklist->count = 0;
    quicklist->compress = 0;
    quicklist->fill = -2;
    return quicklist;
}

// 2. 设置quicklist 的fill和depth 值
// quicklist.c
void quicklistSetOptions(quicklist *quicklist, int fill, int depth) {
    quicklistSetFill(quicklist, fill);
    quicklistSetCompressDepth(quicklist, depth);
}
// quicklist.c, 设置 fill 参数
void quicklistSetFill(quicklist *quicklist, int fill) {
    if (fill > FILL_MAX) {
        fill = FILL_MAX;
    } else if (fill < -5) {
        fill = -5;
    }
    quicklist->fill = fill;
}
// quicklist.c, 设置 depth 参数
void quicklistSetCompressDepth(quicklist *quicklist, int compress) {
    if (compress > COMPRESS_MAX) {
        compress = COMPRESS_MAX;
    } else if (compress < 0) {
        compress = 0;
    }
    quicklist->compress = compress;
}

// 3. 将元素添加进list中
// t_list.c, 
/* The function pushes an element to the specified list object \'subject\',
 * at head or tail position as specified by \'where\'.
 *
 * There is no need for the caller to increment the refcount of \'value\' as
 * the function takes care of it if needed. */
void listTypePush(robj *subject, robj *value, int where) {
    if (subject->encoding == OBJ_ENCODING_QUICKLIST) {
        int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
        // 解码value
        value = getDecodedObject(value);
        size_t len = sdslen(value->ptr);
        // 将value添加到链表中
        quicklistPush(subject->ptr, value->ptr, len, pos);
        // 减小value的引用,如果是被解编码后的对象,此时会将内存释放
        decrRefCount(value);
    } else {
        serverPanic("Unknown list encoding");
    }
}
// object.c
/* Get a decoded version of an encoded object (returned as a new object).
 * If the object is already raw-encoded just increment the ref count. */
robj *getDecodedObject(robj *o) {
    robj *dec;
    // OBJ_ENCODING_RAW,OBJ_ENCODING_EMBSTR 编码直接返回,引用计数+1(原因是: 原始robj一个引用,转换后的robj一个引用)
    if (sdsEncodedObject(o)) {
        incrRefCount(o);
        return o;
    }
    if (o->type == OBJ_STRING && o->encoding == OBJ_ENCODING_INT) {
        char buf[32];
        // 整型转换为字符型,返回string型的robj
        ll2string(buf,32,(long)o->ptr);
        dec = createStringObject(buf,strlen(buf));
        return dec;
    } else {
        serverPanic("Unknown encoding type");
    }
}

// quicklist.c, 添加value到链表中
/* Wrapper to allow argument-based switching between HEAD/TAIL pop */
void quicklistPush(quicklist *quicklist, void *value, const size_t sz,
                   int where) {
    // 根据where决定添加到表头还表尾
    if (where == QUICKLIST_HEAD) {
        quicklistPushHead(quicklist, value, sz);
    } else if (where == QUICKLIST_TAIL) {
        quicklistPushTail(quicklist, value, sz);
    }
}
// quicklist.c, 添加表头数据
/* Add new entry to head node of quicklist.
 *
 * Returns 0 if used existing head.
 * Returns 1 if new head created. */
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
    quicklistNode *orig_head = quicklist->head;
    // likely 对不同平台处理 __builtin_expect(!!(x), 1), 
    // 判断是否允许插入元素,实际上是判断 head 的ziplist空间是否已占满, 没有则直接往里面插入即可
    // fill 默认: -2
    // depth 默认: 0
    if (likely(
            _quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
        // 3.1. 添加head节点的zl链表中, zl 为ziplist 链表节点
        quicklist->head->zl =
            ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
        // 3.2. 更新头节点size大小
        quicklistNodeUpdateSz(quicklist->head);
    } else {
        // 如果head已占满,则创建一个新的 quicklistNode 节点进行插入
        quicklistNode *node = quicklistCreateNode();
        node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);

        quicklistNodeUpdateSz(node);
        // 3.3. 插入新节点到head之前
        _quicklistInsertNodeBefore(quicklist, quicklist->head, node);
    }
    // 将链表计数+1, 避免获取总数时迭代计算
    quicklist->count++;
    quicklist->head->count++;
    return (orig_head != quicklist->head);
}
// quicklist.c, 判断是否允许插入元素
REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node,
                                           const int fill, const size_t sz) {
    if (unlikely(!node))
        return 0;

    int ziplist_overhead;
    /* size of previous offset */
    if (sz < 254)
        ziplist_overhead = 1;
    else
        ziplist_overhead = 5;

    /* size of forward offset */
    if (sz < 64)
        ziplist_overhead += 1;
    else if (likely(sz < 16384))
        ziplist_overhead += 2;
    else
        ziplist_overhead += 5;

    /* new_sz overestimates if \'sz\' encodes to an integer type */
    // 加上需要添加的新元素的长度后,进行阀值判定,如果在阀值内,则返回1,否则返回0
    unsigned int new_sz = node->sz + sz + ziplist_overhead;
    // 使用fill参数判定
    if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill)))
        return 1;
    else if (!sizeMeetsSafetyLimit(new_sz))
        return 0;
    else if ((int)node->count < fill)
        return 1;
    else
        return 0;
}
// quicklist.c 
REDIS_STATIC int
_quicklistNodeSizeMeetsOptimizationRequirement(const size_t sz,
                                               const int fill) {
    if (fill >= 0)
        return 0;

    size_t offset = (-fill) - 1;
    // /* Optimization levels for size-based filling */
    // static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536};
    // offset < 5, offset 默认将等于 1, sz <= 8292
    if (offset < (sizeof(optimization_level) / sizeof(*optimization_level))) {
        if (sz <= optimization_level[offset]) {
            return 1;
        } else {
            return 0;
        }
    } else {
        return 0;
    }
}
// SIZE_SAFETY_LIMIT 8192
#define sizeMeetsSafetyLimit(sz) ((sz) <= SIZE_SAFETY_LIMIT)

// 3.1. 向每个链表节点中添加value, 实际是向 ziplist push 数据
// ziplist.c, push *s 数据到 zl 中
unsigned char *ziplistPush(unsigned char *zl, unsigned char *s, unsigned int slen, int where) {
    unsigned char *p;
    p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl);
    // 具体添加元素方法,有点复杂。简单点说就是 判断容量、扩容、按照ziplist协议添加元素
    return __ziplistInsert(zl,p,s,slen);
}
// ziplist.c, 在hash的数据介绍时已详细介绍
/* Insert item at "p". */
static unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
    size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
    unsigned int prevlensize, prevlen = 0;
    size_t offset;
    int nextdiff = 0;
    unsigned char encoding = 0;
    long long value = 123456789; /* initialized to avoid warning. Using a value
                                    that is easy to see if for some reason
                                    we use it uninitialized. */
    zlentry tail;

    /* Find out prevlen for the entry that is inserted. */
    if (p[0] != ZIP_END) {
        ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
    } else {
        unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
        if (ptail[0] != ZIP_END) {
            prevlen = zipRawEntryLength(ptail);
        }
    }

    /* See if the entry can be encoded */
    if (zipTryEncoding(s,slen,&value,&encoding)) {
        /* \'encoding\' is set to the appropriate integer encoding */
        reqlen = zipIntSize(encoding);
    } else {
        /* \'encoding\' is untouched, however zipEncodeLength will use the
         * string length to figure out how to encode it. */
        reqlen = slen;
    }
    /* We need space for both the length of the previous entry and
     * the length of the payload. */
    reqlen += zipPrevEncodeLength(NULL,prevlen);
    reqlen += zipEncodeLength(NULL,encoding,slen);

    /* When the insert position is not equal to the tail, we need to
     * make sure that the next entry can hold this entry\'s length in
     * its prevlen field. */
    nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;

    /* Store offset because a realloc may change the address of zl. */
    offset = p-zl;
    zl = ziplistResize(zl,curlen+reqlen+nextdiff);
    p = zl+offset;

    /* Apply memory move when necessary and update tail offset. */
    if (p[0] != ZIP_END) {
        /* Subtract one because of the ZIP_END bytes */
        memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);

        /* Encode this entry\'s raw length in the next entry. */
        zipPrevEncodeLength(p+reqlen,reqlen);

        /* Update offset for tail */
        ZIPLIST_TAIL_OFFSET(zl) =
            intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);

        /* When the tail contains more than one entry, we need to take
         * "nextdiff" in account as well. Otherwise, a change in the
         * size of prevlen doesn\'t have an effect on the *tail* offset. */
        zipEntry(p+reqlen, &tail);
        if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
            ZIPLIST_TAIL_OFFSET(zl) =
                intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
        }
    } else {
        /* This element will be the new tail. */
        ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
    }

    /* When nextdiff != 0, the raw length of the next entry has changed, so
     * we need to cascade the update throughout the ziplist */
    if (nextdiff != 0) {
        offset = p-zl;
        zl = __ziplistCascadeUpdate(zl,p+reqlen);
        p = zl+offset;
    }

    /* Write the entry */
    p += zipPrevEncodeLength(p,prevlen);
    p += zipEncodeLength(p,encoding,slen);
    if (ZIP_IS_STR(encoding)) {
        memcpy(p,s,slen);
    } else {
        zipSaveInteger(p,value,encoding);
    }
    ZIPLIST_INCR_LENGTH(zl,1);
    return zl;
}
// 3.2. 更新node的size (实际占用内存空间大小)
// quicklist.c, 更新node的size, 其实就是重新统计node的ziplist长度
#define quicklistNodeUpdateSz(nod

以上是关于Redis:list/lpush/lrange/lpop 命令源码解析的主要内容,如果未能解决你的问题,请参考以下文章

redis启动,停止

redis演练聚集

windows 怎么开启redis

redis如何执行redis命令

Redis学习——详解Redis配置文件

Redis 基础 -- Redis简介CentOS 7 单机安装Redis启动Redis(后台启动Redis 指定配置文件启动Redis 开机自启Redis )Redis客户端(含图形化界面)