Redis:zset/zadd/zrange/zrembyscore 命令源码解析
Posted 等你归去来
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis:zset/zadd/zrange/zrembyscore 命令源码解析相关的知识,希望对你有一定的参考价值。
前面几篇文章,我们完全领略了redis的string,hash,list,set数据类型的实现方法,相信对redis已经不再神秘。
本篇我们将介绍redis的最后一种数据类型: zset 的相关实现。
本篇过后,我们对redis的各种基础功能,应该不会再有疑惑。有可能的话,我们后续将会对redis的高级功能的实现做解析。(如复制、哨兵模式、集群模式)
回归本篇主题,zset。zset 又称有序集合(sorted set),即是序版本的set。经过上篇的介绍,大家可以看到,redis的读取功能相当有限,许多是基于随机数的方式进行读取,其原因就是set是无序的。当set有序之后,查询能力就会得到极大的提升。1. 可以根据下标进行定位元素; 2. 可以范围查询元素; 这是有序带来的好处。
那么,我们不妨先思考一下,如何实现有序?两种方法:1. 根据添加顺序定义,1、2、3... ; 2. 自定义排序值; 第1种方法实现简单,添加时复杂度小,但是功能受限;第2种方法相对自由,对于每次插入都可能涉及重排序问题,但是查询相对稳定,可以不必完全受限于系统实现;
同样,我们以功能列表,到数据结构,再功能实现的思路,来解析redis的zset有序集合的实现方式吧。
零、redis zset相关操作方法
zset: Redis 有序集合是string类型元素的集合,且不允许重复的成员。每个元素都会关联一个double类型的分数,通过分数来为集合中的成员进行从小到大的排序。
使用场景如: 保存任务队列,该队列由后台定时扫描; 排行榜;
从官方手册上查到相关使用方法如下:
1> ZADD key score1 member1 [score2 member2]
功能: 向有序集合添加一个或多个成员,或者更新已存在成员的分数
返回值: 添加成功的元素个数(已存在的添加不成功)2> ZCARD key
功能: 获取有序集合的成员数
返回值: 元素个数或03> ZCOUNT key min max
功能: 计算在有序集合中指定区间分数的成员数
返回值: 区间内的元素个数4> ZINCRBY key increment member
功能: 有序集合中对指定成员的分数加上增量 increment
返回值: member增加后的分数5> ZINTERSTORE destination numkeys key [key ...]
功能: 计算给定的一个或多个有序集的交集并将结果集存储在新的有序集合 key 中
返回值: 交集元素个数6> ZLEXCOUNT key min max
功能: 在有序集合中计算指定字典区间内成员数量
返回值: 区间内的元素个数7> ZRANGE key start stop [WITHSCORES]
功能: 通过索引区间返回有序集合指定区间内的成员
返回值: 区间内元素列表8> ZRANGEBYLEX key min max [LIMIT offset count]
功能: 通过字典区间返回有序集合的成员
返回值: 区间内元素列表9> ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT]
功能: 通过分数返回有序集合指定区间内的成员
返回值: 区间内元素列表10> ZRANK key member
功能: 返回有序集合中指定成员的索引
返回值: member的排名或者 nil11> ZREM key member [member ...]
功能: 移除有序集合中的一个或多个成员
返回值: 成功移除的元素个数12> ZREMRANGEBYLEX key min max
功能: 移除有序集合中给定的字典区间的所有成员
返回值: 成功移除的元素个数13> ZREMRANGEBYRANK key start stop
功能: 移除有序集合中给定的排名区间的所有成员
返回值: 成功移除的元素个数14> ZREMRANGEBYSCORE key min max
功能: 移除有序集合中给定的分数区间的所有成员
返回值: 成功移除的元素个数15> ZREVRANGE key start stop [WITHSCORES]
功能: 返回有序集中指定区间内的成员,通过索引,分数从高到低
返回值: 区间内元素列表及分数16> ZREVRANGEBYSCORE key max min [WITHSCORES]
功能: 返回有序集中指定分数区间内的成员,分数从高到低排序
返回值: 区间内元素列表及分数17> ZREVRANK key member
功能: 返回有序集合中指定成员的排名,有序集成员按分数值递减(从大到小)排序
返回值: member排名或者 nil18> ZSCORE key member
功能: 返回有序集中,成员的分数值
返回值: member分数19> ZUNIONSTORE destination numkeys key [key ...]
功能: 计算给定的一个或多个有序集的并集,并存储在新的 key 中
返回值: 存储到新key的元素个数20> ZSCAN key cursor [MATCH pattern] [COUNT count]
功能: 迭代有序集合中的元素(包括元素成员和元素分值)
返回值: 元素列表21> ZPOPMAX/ZPOPMIN/BZPOPMAX/BZPOPMIN
一、zset 相关数据结构
zset 的实现,使用了 ziplist, zskiplist 和 dict 进行实现。
/* ZSETs use a specialized version of Skiplists */ typedef struct zskiplistNode { sds ele; double score; struct zskiplistNode *backward; struct zskiplistLevel { struct zskiplistNode *forward; unsigned int span; } level[]; } zskiplistNode; // 跳跃链表 typedef struct zskiplist { struct zskiplistNode *header, *tail; unsigned long length; int level; } zskiplist; // zset 主数据结构,dict + zskiplist typedef struct zset { dict *dict; zskiplist *zsl; } zset; // zset 在合适场景下,将先使用 ziplist 存储数据 typedef struct zlentry { unsigned int prevrawlensize, prevrawlen; unsigned int lensize, len; unsigned int headersize; unsigned char encoding; unsigned char *p; } zlentry;
二、zadd 添加成员操作
从添加实现中,我们可以完整领略数据结构的运用。
// 用法: ZADD key score1 member1 [score2 member2] // t_zset.c void zaddCommand(client *c) { // zadd 的多个参数变形, 使用 flags 进行区分复用 zaddGenericCommand(c,ZADD_NONE); } void zaddGenericCommand(client *c, int flags) { static char *nanerr = "resulting score is not a number (NaN)"; robj *key = c->argv[1]; robj *zobj; sds ele; double score = 0, *scores = NULL, curscore = 0.0; int j, elements; int scoreidx = 0; /* The following vars are used in order to track what the command actually * did during the execution, to reply to the client and to trigger the * notification of keyspace change. */ int added = 0; /* Number of new elements added. */ int updated = 0; /* Number of elements with updated score. */ int processed = 0; /* Number of elements processed, may remain zero with options like XX. */ /* Parse options. At the end \'scoreidx\' is set to the argument position * of the score of the first score-element pair. */ // 从第三位置开始尝试解析特殊标识(用法规范) // 按位与到 flags 中 scoreidx = 2; while(scoreidx < c->argc) { char *opt = c->argv[scoreidx]->ptr; // NX: 不更新已存在的元素,只做添加操作 if (!strcasecmp(opt,"nx")) flags |= ZADD_NX; // XX: 只做更新操作,不做添加操作 else if (!strcasecmp(opt,"xx")) flags |= ZADD_XX; // CH: 将返回值从添加的新元素数修改为已更改元素的总数。 更改的元素是第添加的新元素以及已为其更新分数的现有元素。 因此,命令行中指定的具有与过去相同分数的元素将不计算在内。 注意:通常,ZADD的返回值仅计算添加的新元素的数量。 else if (!strcasecmp(opt,"ch")) flags |= ZADD_CH; // INCR: 使用指定元素增加指定分数, 与 ZINCRBY 类似,此场景下,只允许操作一个元素 else if (!strcasecmp(opt,"incr")) flags |= ZADD_INCR; else break; scoreidx++; } /* Turn options into simple to check vars. */ int incr = (flags & ZADD_INCR) != 0; int nx = (flags & ZADD_NX) != 0; int xx = (flags & ZADD_XX) != 0; int ch = (flags & ZADD_CH) != 0; /* After the options, we expect to have an even number of args, since * we expect any number of score-element pairs. */ // 把特殊标识去除后,剩下的参数列表应该2n数,即 score-element 一一配对的,否则语法错误 elements = c->argc-scoreidx; if (elements % 2) { addReply(c,shared.syntaxerr); return; } elements /= 2; /* Now this holds the number of score-element pairs. */ /* Check for incompatible options. */ // 互斥项 if (nx && xx) { addReplyError(c, "XX and NX options at the same time are not compatible"); return; } // 语法检查,INCR 只能针对1个元素操作 if (incr && elements > 1) { addReplyError(c, "INCR option supports a single increment-element pair"); return; } /* Start parsing all the scores, we need to emit any syntax error * before executing additions to the sorted set, as the command should * either execute fully or nothing at all. */ // 解析所有的 score 值为double类型,赋值到 scores 中 scores = zmalloc(sizeof(double)*elements); for (j = 0; j < elements; j++) { if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL) != C_OK) goto cleanup; } /* Lookup the key and create the sorted set if does not exist. */ // 语法检查 zobj = lookupKeyWrite(c->db,key); if (zobj == NULL) { if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */ // 创建原始key对象 // 默认 zset_max_ziplist_entries=OBJ_ZSET_MAX_ZIPLIST_ENTRIES: 128 // 默认 zset_max_ziplist_value=OBJ_ZSET_MAX_ZIPLIST_VALUE: 64 // 所以此处默认主要是检查 第1个member的长度是大于 64 if (server.zset_max_ziplist_entries == 0 || server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr)) { // 2. 通用情况使用 dict+quicklist 型的zset zobj = createZsetObject(); } else { // 1. 元素比较小的情况下创建 ziplist 型的 zset zobj = createZsetZiplistObject(); } // 将对象添加到db中,后续所有操作针对 zobj 操作即是对db的操作 (引用传递) dbAdd(c->db,key,zobj); } else { if (zobj->type != OBJ_ZSET) { addReply(c,shared.wrongtypeerr); goto cleanup; } } // 一个个元素循环添加 for (j = 0; j < elements; j++) { score = scores[j]; ele = c->argv[scoreidx+1+j*2]->ptr; // 分当前zobj的编码不同进行添加 (ziplist, skiplist) // 3. ZIPLIST 编码下的zset添加操作 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *eptr; // 3.1. 查找是否存在要添加的元素 (确定添加或更新) if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) { if (nx) continue; if (incr) { score += curscore; if (isnan(score)) { addReplyError(c,nanerr); goto cleanup; } } /* Remove and re-insert when score changed. */ if (score != curscore) { // 3.2. 元素更新操作,先删再插入 zobj->ptr = zzlDelete(zobj->ptr,eptr); zobj->ptr = zzlInsert(zobj->ptr,ele,score); server.dirty++; updated++; } processed++; } else if (!xx) { /* Optimize: check if the element is too large or the list * becomes too long *before* executing zzlInsert. */ zobj->ptr = zzlInsert(zobj->ptr,ele,score); // 5. 超过一条件后,做 ziplist->skiplist 转换 // 默认 元素个数>128, 当前元素>64 // 这两个判断不会重复吗?? 两个原因: 1. 转换函数内部会重新判定; 2. 下一次循环时不会再走当前逻辑; if (zzlLength(zobj->ptr) > server.zset_max_ziplist_entries) zsetConvert(zobj,OBJ_ENCODING_SKIPLIST); if (sdslen(ele) > server.zset_max_ziplist_value) zsetConvert(zobj,OBJ_ENCODING_SKIPLIST); server.dirty++; added++; processed++; } } // 4. skiplist 下的zset元素添加 else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplistNode *znode; dictEntry *de; // 判断ele是否已存在,使用hash查找,快速 de = dictFind(zs->dict,ele); if (de != NULL) { if (nx) continue; curscore = *(double*)dictGetVal(de); if (incr) { score += curscore; if (isnan(score)) { addReplyError(c,nanerr); /* Don\'t need to check if the sorted set is empty * because we know it has at least one element. */ goto cleanup; } } /* Remove and re-insert when score changes. */ // 先删再插入 skiplist if (score != curscore) { zskiplistNode *node; serverAssert(zslDelete(zs->zsl,curscore,ele,&node)); znode = zslInsert(zs->zsl,score,node->ele); /* We reused the node->ele SDS string, free the node now * since zslInsert created a new one. */ node->ele = NULL; zslFreeNode(node); /* Note that we did not removed the original element from * the hash table representing the sorted set, so we just * update the score. */ // 更新dict中的分数引用 dictGetVal(de) = &znode->score; /* Update score ptr. */ server.dirty++; updated++; } processed++; } else if (!xx) { ele = sdsdup(ele); znode = zslInsert(zs->zsl,score,ele); // 添加skiplist的同时,也往 dict 中添加一份数据,因为hash的查找永远是最快的 serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK); server.dirty++; added++; processed++; } } else { serverPanic("Unknown sorted set encoding"); } } reply_to_client: if (incr) { /* ZINCRBY or INCR option. */ if (processed) addReplyDouble(c,score); else addReply(c,shared.nullbulk); } else { /* ZADD. */ addReplyLongLong(c,ch ? added+updated : added); } cleanup: zfree(scores); if (added || updated) { signalModifiedKey(c->db,key); notifyKeyspaceEvent(NOTIFY_ZSET, incr ? "zincr" : "zadd", key, c->db->id); } } // 1. 元素比较小的情况下创建 ziplist 型的 zset // object.c, 创建ziplist 的zset robj *createZsetZiplistObject(void) { unsigned char *zl = ziplistNew(); robj *o = createObject(OBJ_ZSET,zl); o->encoding = OBJ_ENCODING_ZIPLIST; return o; } // 2. 创建通用的 zset 实例 // object.c robj *createZsetObject(void) { zset *zs = zmalloc(sizeof(*zs)); robj *o; // zsetDictType 稍有不同 zs->dict = dictCreate(&zsetDictType,NULL); // 首次遇到 skiplist, 咱去瞅瞅是如何创建的 zs->zsl = zslCreate(); o = createObject(OBJ_ZSET,zs); o->encoding = OBJ_ENCODING_SKIPLIST; return o; } // server.c, zset创建时使用的dict类型,与hash有不同 /* Sorted sets hash (note: a skiplist is used in addition to the hash table) */ dictType zsetDictType = { dictSdsHash, /* hash function */ NULL, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* key compare */ NULL, /* Note: SDS string shared & freed by skiplist */ NULL /* val destructor */ }; // 创建 skiplist 对象 /* Create a new skiplist. */ zskiplist *zslCreate(void) { int j; zskiplist *zsl; zsl = zmalloc(sizeof(*zsl)); zsl->level = 1; zsl->length = 0; // 创建header节点,ZSKIPLIST_MAXLEVEL 32 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL); // 初始化header for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) { zsl->header->level[j].forward = NULL; zsl->header->level[j].span = 0; } zsl->header->backward = NULL; zsl->tail = NULL; return zsl; } /* Create a skiplist node with the specified number of levels. * The SDS string \'ele\' is referenced by the node after the call. */ zskiplistNode *zslCreateNode(int level, double score, sds ele) { zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel)); zn->score = score; zn->ele = ele; return zn; } // 3. ZIPLIST 编码下的zset添加操作 // 3.1. 查找是否存在要添加的元素 (确定添加或更新) // t_zset.c, 查找指定ele unsigned char *zzlFind(unsigned char *zl, sds ele, double *score) { unsigned char *eptr = ziplistIndex(zl,0), *sptr; // 遍历所有ziplist // 可见,此时的ziplist并没有表现出有序啊 while (eptr != NULL) { // eptr 相当于是 key // sptr 相当于score sptr = ziplistNext(zl,eptr); serverAssert(sptr != NULL); if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) { /* Matching element, pull out score. */ // 找到相应的 key 后,解析下一值,即 score if (score != NULL) *score = zzlGetScore(sptr); return eptr; } /* Move to next element. */ // 移动两次对象,才会到下一元素(因为存储是 key-score 相邻存储) eptr = ziplistNext(zl,sptr); } return NULL; } // t_zset.c, 获取元素的score double zzlGetScore(unsigned char *sptr) { unsigned char *vstr; unsigned int vlen; long long vlong; char buf[128]; double score; serverAssert(sptr != NULL); serverAssert(ziplistGet(sptr,&vstr,&vlen,&vlong)); // 带小数点不带小数点 if (vstr) { memcpy(buf,vstr,vlen); buf[vlen] = \'\\0\'; // 做类型转换 score = strtod(buf,NULL); } else { score = vlong; } return score; } // 3.2. 元素更新操作,先删再插入 // t_zset.c /* Delete (element,score) pair from ziplist. Use local copy of eptr because we * don\'t want to modify the one given as argument. */ unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) { unsigned char *p = eptr; /* TODO: add function to ziplist API to delete N elements from offset. */ zl = ziplistDelete(zl,&p); zl = ziplistDelete(zl,&p); return zl; } // 添加 ele-score 到 ziplist 中 /* Insert (element,score) pair in ziplist. This function assumes the element is * not yet present in the list. */ unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) { unsigned char *eptr = ziplistIndex(zl,0), *sptr; double s; // 在上面查找时,我们看到ziplist也是遍历,以为是无序的ziplist // 然而实际上,插入时是维护了顺序的哟 while (eptr != NULL) { sptr = ziplistNext(zl,eptr); serverAssert(sptr != NULL); s = zzlGetScore(sptr); // 找到第一个比score大的位置,在其前面插入 ele-score if (s > score) { /* First element with score larger than score for element to be * inserted. This means we should take its spot in the list to * maintain ordering. */ zl = zzlInsertAt(zl,eptr,ele,score); break; } else if (s == score) { /* Ensure lexicographical ordering for elements. */ // 当分数相同时,按字典顺序排列 if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele)) > 0) { zl = zzlInsertAt(zl,eptr,ele,score); break; } } /* Move to next element. */ eptr = ziplistNext(zl,sptr); } /* Push on tail of list when it was not yet inserted. */ // 以上遍历完成都没有找到相应位置,说明当前score是最大值,将其插入尾部 if (eptr == NULL) zl = zzlInsertAt(zl,NULL,ele,score); return zl; } // 在eptr的前面插入 ele-score unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, double score) { unsigned char *sptr; char scorebuf[128]; int scorelen; size_t offset; scorelen = d2string(scorebuf,sizeof(scorebuf),score); if (eptr == NULL) { // 直接插入到尾部 zl = ziplistPush(zl,(unsigned char*)ele,sdslen(ele),ZIPLIST_TAIL); zl = ziplistPush(zl,(unsigned char*)scorebuf,scorelen,ZIPLIST_TAIL); } else { /* Keep offset relative to zl, as it might be re-allocated. */ offset = eptr-zl; // 直接在 eptr 位置添加 ele, 其他元素后移 zl = ziplistInsert(zl,eptr,(unsigned char*)ele,sdslen(ele)); eptr = zl+offset; /* Insert score after the element. */ // 此时的 eptr 已经插入ele之后的位置,后移一位后,就可以找到 score 的存储位置 serverAssert((sptr = ziplistNext(zl,eptr)) != NULL); zl = ziplistInsert(zl,sptr,(unsigned char*)scorebuf,scorelen); } return zl; } // 4. skiplist 下的zset元素添加 // 4.1. 添加元素 // t_zset.c, 添加 ele-score 到 skiplist 中 /* Insert a new node in the skiplist. Assumes the element does not already * exist (up to the caller to enforce that). The skiplist takes ownership * of the passed SDS string \'ele\'. */ zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) { // ZSKIPLIST_MAXLEVEL 32 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; unsigned int rank[ZSKIPLIST_MAXLEVEL]; int i, level; serverAssert(!isnan(score)); x = zsl->header; // 初始 zsl->level = 1 // 从header的最高层开始遍历 for (i = zsl->level-1; i >= 0; i--) { /* store rank that is crossed to reach the insert position */ // 计算出每层可以插入的位置 rank[i] = i == (zsl->level-1) ? 0 : rank[i+1]; // 当前level的score小于需要添加的元素时,往前推进skiplist while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) < 0))) { rank[i] += x->level[i].span; x = x->level[i].forward; } update[i] = x; } /* we assume the element is not already inside, since we allow duplicated * scores, reinserting the same element should never happen since the * caller of zslInsert() should test in the hash table if the element is * already inside or not. */ // 得到一随机的level, 决定要写的节点数 // 如果当前的level过小,则变更level, 重新初始化大的level level = zslRandomLevel(); if (level > zsl->level) { for (i = zsl->level; i < level; i++) { rank[i] = 0; update[i] = zsl->header; update[i]->level[i].span = zsl->length; } zsl->level = level; } // 构建新的 skiplist 节点,为每一层节点添加同样的数据 x = zslCreateNode(level,score,ele); for (i = 0; i < level; i++) { // 让i层的节点与x关联 x->level[i].forward = update[i]->level[i].forward; update[i]->level[i].forward = x; /* update span covered by update[i] as x is inserted here */ x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]); update[i]->level[i].span = (rank[0] - rank[i]) + 1; } /* increment span for untouched levels */ // 如果当前level较小,则存在有的level未赋值情况,需要主动+1 for (i = level; i < zsl->level; i++) { update[i]->level[i].span++; } // 关联好header后,设置backward指针 x->backward = (update[0] == zsl->header) ? NULL : update[0]; if (x->level[0].forward) x->level[0].forward->backward = x; else // 同有后继节点,说明是尾节点,赋值tail zsl->tail = x; zsl->length++; return x; }
ziplist添加没啥好说的,skiplist可以稍微提提,大体步骤为四步:&nb
以上是关于Redis:zset/zadd/zrange/zrembyscore 命令源码解析的主要内容,如果未能解决你的问题,请参考以下文章
Redis 基础 -- Redis简介CentOS 7 单机安装Redis启动Redis(后台启动Redis 指定配置文件启动Redis 开机自启Redis )Redis客户端(含图形化界面)