Redis:set/sadd/sismember/sinter/sdiffstore 命令源码解析

Posted 等你归去来

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis:set/sadd/sismember/sinter/sdiffstore 命令源码解析相关的知识,希望对你有一定的参考价值。

  上两篇我们讲了hash和list数据类型相关的主要实现方法,同时加上前面对框架服务和string相关的功能介绍,已揭开了大部分redis的实用面纱。

  现在还剩下两种数据类型: set, zset.

  本篇咱们继续来看redis中的数据类型的实现: set 相关操作实现。

 

  研究过jdk的hashmap和hashset实现的同学,肯定都是知道,set其实就是一个简化版的map,只要将map的 k->v 的形式变为 k->1 的形式就可以了。所以set只是map的一个简单包装类。

  同理,对于 redis的 hash 和 set 数据类型,我们是否可以得出这么个结论呢?(如果是那样的话,我们就只需看几个set提供的特殊功能即可)

  

  同样,我们从功能列表开始,到数据结构,再到具体实现的这么个思路,来探索redis set的实现吧。

 

零、redis set相关操作方法


  Redis 的 Set 是 String 类型的无序集合。集合成员是唯一的,这就意味着集合中不能出现重复的数据。可根据应用场景需要选用该数据类型。(比如:好友/关注/粉丝/感兴趣的人/黑白名单)

  从官方的手册中可以查到相关的使用方法。


1> SADD key member1 [member2]
功能: 向集合添加一个或多个成员
返回值: 本次添加到redis的member数量(不包含已存在的member)

2> SCARD key
功能: 获取集合的成员数
返回值: set的元素数量或者0

3> SDIFF key1 [key2]
功能: 返回给定所有集合的差集
返回值: 差集的数组列表

4> SDIFFSTORE destination key1 [key2]
功能: 返回给定所有集合的差集并存储在 destination 中
返回值: 差集元素个数

5> SINTER key1 [key2]
功能: 返回给定所有集合的交集
返回值: 交集的数组列表

6> SINTERSTORE destination key1 [key2]
功能: 返回给定所有集合的交集并存储在 destination 中
返回值: 交集的元素个数

7> SISMEMBER key member
功能: 判断 member 元素是否是集合 key 的成员
返回值: 1:如果member是key的成员, 0:如果member不是key的成员或者key不存在

8> SMEMBERS key
功能: 返回集合中的所有成员
返回值: 所有成员列表

9> SMOVE source destination member
功能: 将 member 元素从 source 集合移动到 destination 集合
返回值: 1:移动操作成功, 0:移动不成功(member不是source的成员)

10> SPOP key [count]
功能: 移除并返回集合中的一个随机元素(因为set是无序的)
返回值: 被移除的元素列表或者nil

11> SRANDMEMBER key [count]
功能: 返回集合中一个或多个随机数
返回值: 1个元素或者count个元素数组列表或者nil

12> SREM key member1 [member2]
功能: 移除集合中一个或多个成员
返回值: 实际移除的元素个数

13> SUNION key1 [key2]
功能: 返回所有给定集合的并集
返回值: 并集元素数组列表

14> SUNIONSTORE destination key1 [key2]
功能: 所有给定集合的并集存储在 destination 集合中
返回值: 并集元素个数

15> SSCAN key cursor [MATCH pattern] [COUNT count]
功能: 迭代集合中的元素
返回值: 元素数组列表

 

一、set 相关数据结构


  redis使用dict和intset 两种数据结构保存set数据。

// 1. inset 数据结构,在set数据量小且都是整型数据时使用
typedef struct intset {
    // 编码范围,由具体存储值决定
    uint32_t encoding;
    // 数组长度
    uint32_t length;
    // 具体存储元素的容器
    int8_t contents[];
} intset;

// 2. dict 相关数据结构,即是 hash 的实现相关的数据结构
/* This is our hash table structure. Every dictionary has two of this as we
 * implement incremental rehashing, for the old to the new table. */
typedef struct dictht {
    dictEntry **table;
    unsigned long size;
    unsigned long sizemask;
    unsigned long used;
} dictht;

typedef struct dict {
    dictType *type;
    void *privdata;
    dictht ht[2];
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    unsigned long iterators; /* number of iterators currently running */
} dict;

/* If safe is set to 1 this is a safe iterator, that means, you can call
 * dictAdd, dictFind, and other functions against the dictionary even while
 * iterating. Otherwise it is a non safe iterator, and only dictNext()
 * should be called while iterating. */
typedef struct dictIterator {
    dict *d;
    long index;
    int table, safe;
    dictEntry *entry, *nextEntry;
    /* unsafe iterator fingerprint for misuse detection. */
    long long fingerprint;
} dictIterator;

typedef struct dictEntry {
    void *key;
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    struct dictEntry *next;
} dictEntry;

typedef struct dictType {
    unsigned int (*hashFunction)(const void *key);
    void *(*keyDup)(void *privdata, const void *key);
    void *(*valDup)(void *privdata, const void *obj);
    int (*keyCompare)(void *privdata, const void *key1, const void *key2);
    void (*keyDestructor)(void *privdata, void *key);
    void (*valDestructor)(void *privdata, void *obj);
} dictType;

  对于set相关的命令的接口定义:

    {"sadd",saddCommand,-3,"wmF",0,NULL,1,1,1,0,0},
    {"srem",sremCommand,-3,"wF",0,NULL,1,1,1,0,0},
    {"smove",smoveCommand,4,"wF",0,NULL,1,2,1,0,0},
    {"sismember",sismemberCommand,3,"rF",0,NULL,1,1,1,0,0},
    {"scard",scardCommand,2,"rF",0,NULL,1,1,1,0,0},
    {"spop",spopCommand,-2,"wRsF",0,NULL,1,1,1,0,0},
    {"srandmember",srandmemberCommand,-2,"rR",0,NULL,1,1,1,0,0},
    {"sinter",sinterCommand,-2,"rS",0,NULL,1,-1,1,0,0},
    {"sinterstore",sinterstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
    {"sunion",sunionCommand,-2,"rS",0,NULL,1,-1,1,0,0},
    {"sunionstore",sunionstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
    {"sdiff",sdiffCommand,-2,"rS",0,NULL,1,-1,1,0,0},
    {"sdiffstore",sdiffstoreCommand,-3,"wm",0,NULL,1,-1,1,0,0},
    {"smembers",sinterCommand,2,"rS",0,NULL,1,1,1,0,0},
    {"sscan",sscanCommand,-3,"rR",0,NULL,1,1,1,0,0},

 

二、sadd 添加成员操作


  一般我们都会以添加数据开始。从而理解数据结构的应用。

// 用法: SADD key member1 [member2]
// t_set.c, 添加member
void saddCommand(client *c) {
    robj *set;
    int j, added = 0;
    // 先从当前db中查找set实例
    set = lookupKeyWrite(c->db,c->argv[1]);
    if (set == NULL) {
        // 1. 新建set实例并添加到当前db中
        set = setTypeCreate(c->argv[2]->ptr);
        dbAdd(c->db,c->argv[1],set);
    } else {
        if (set->type != OBJ_SET) {
            addReply(c,shared.wrongtypeerr);
            return;
        }
    }
    // 对于n个member,一个个地添加即可
    for (j = 2; j < c->argc; j++) {
        // 2. 只有添加成功, added 才会加1
        if (setTypeAdd(set,c->argv[j]->ptr)) added++;
    }
    // 命令传播
    if (added) {
        signalModifiedKey(c->db,c->argv[1]);
        notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
    }
    server.dirty += added;
    // 响应添加成功的数量
    addReplyLongLong(c,added);
}

// 1. 创建新的set集合实例(需根据首次的参数类型判定)
// t_set.c, 创建set实例
/* Factory method to return a set that *can* hold "value". When the object has
 * an integer-encodable value, an intset will be returned. Otherwise a regular
 * hash table. */
robj *setTypeCreate(sds value) {
    // 如果传入的value是整型,则创建 intset 类型的set
    // 否则使用dict类型的set
    // 一般地,第一个数据为整型,后续数据也应该为整型,所以这个数据结构相对稳定
    // 而hash的容器创建时,只使用了一 ziplist 创建,这是不一样的实现
    if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
        return createIntsetObject();
    return createSetObject();
}

// 1.1. 创建 intset 型的set
// object.c 
robj *createIntsetObject(void) {
    intset *is = intsetNew();
    robj *o = createObject(OBJ_SET,is);
    o->encoding = OBJ_ENCODING_INTSET;
    return o;
}
// intset.c, new一个空的intset对象
/* Create an empty intset. */
intset *intsetNew(void) {
    intset *is = zmalloc(sizeof(intset));
    is->encoding = intrev32ifbe(INTSET_ENC_INT16);
    is->length = 0;
    return is;
}

// 1.2. 创建dict 型的set
robj *createSetObject(void) {
    dict *d = dictCreate(&setDictType,NULL);
    robj *o = createObject(OBJ_SET,d);
    o->encoding = OBJ_ENCODING_HT;
    return o;
}
// dict.c
/* Create a new hash table */
dict *dictCreate(dictType *type,
        void *privDataPtr)
{
    dict *d = zmalloc(sizeof(*d));

    _dictInit(d,type,privDataPtr);
    return d;
}
/* Initialize the hash table */
int _dictInit(dict *d, dictType *type,
        void *privDataPtr)
{
    _dictReset(&d->ht[0]);
    _dictReset(&d->ht[1]);
    d->type = type;
    d->privdata = privDataPtr;
    d->rehashidx = -1;
    d->iterators = 0;
    return DICT_OK;
}

// 2. 添加member到set集合中
// t_set.c, 添加元素
/* Add the specified value into a set.
 *
 * If the value was already member of the set, nothing is done and 0 is
 * returned, otherwise the new element is added and 1 is returned. */
int setTypeAdd(robj *subject, sds value) {
    long long llval;
    // 2.1. HT编码和INTSET编码分别处理就好
    if (subject->encoding == OBJ_ENCODING_HT) {
        dict *ht = subject->ptr;
        // 以 value 为 key, 添加实例到ht中
        // 实现过程也很简单,大概就是如果存在则返回NULL(即无需添加),辅助rehash,分配内存创建dictEntry实例,稍后简单看看
        dictEntry *de = dictAddRaw(ht,value);
        if (de) {
            // 重新设置key为 sdsdup(value), value为NULL
            dictSetKey(ht,de,sdsdup(value));
            dictSetVal(ht,de,NULL);
            return 1;
        }
    } 
    // 2.2. intset 编码的member添加
    else if (subject->encoding == OBJ_ENCODING_INTSET) {
        // 尝试解析value为 long 型,值写入 llval 中
        if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
            uint8_t success = 0;
            // 情况1. 可添加到intset中
            subject->ptr = intsetAdd(subject->ptr,llval,&success);
            if (success) {
                /* Convert to regular set when the intset contains
                 * too many entries. */
                // 默认: 512, intset大于之后,则转换为ht hash表模式存储 
                if (intsetLen(subject->ptr) > server.set_max_intset_entries)
                    // 2.3. 转换intset编码为 ht 编码
                    setTypeConvert(subject,OBJ_ENCODING_HT);
                return 1;
            }
        } else {
            // 情况2. member 是字符串型,先将set容器转换为 ht 编码,再重新执行dict的添加模式
            /* Failed to get integer from object, convert to regular set. */
            setTypeConvert(subject,OBJ_ENCODING_HT);

            /* The set *was* an intset and this value is not integer
             * encodable, so dictAdd should always work. */
            serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
            return 1;
        }
    } else {
        serverPanic("Unknown set encoding");
    }
    return 0;
}
// 2.1. 添加member到dict中(略解, 在hash数据结构解析中已介绍)
// dict.c, 添加某key到 d 字典中
/* Low level add. This function adds the entry but instead of setting
 * a value returns the dictEntry structure to the user, that will make
 * sure to fill the value field as he wishes.
 *
 * This function is also directly exposed to the user API to be called
 * mainly in order to store non-pointers inside the hash value, example:
 *
 * entry = dictAddRaw(dict,mykey);
 * if (entry != NULL) dictSetSignedIntegerVal(entry,1000);
 *
 * Return values:
 *
 * If key already exists NULL is returned.
 * If key was added, the hash entry is returned to be manipulated by the caller.
 */
dictEntry *dictAddRaw(dict *d, void *key)
{
    int index;
    dictEntry *entry;
    dictht *ht;

    if (dictIsRehashing(d)) _dictRehashStep(d);

    /* Get the index of the new element, or -1 if
     * the element already exists. */
    // 获取需要添加的key的存放位置下标(slot), 如果该key已存在, 则返回-1(无可用slot)
    if ((index = _dictKeyIndex(d, key)) == -1)
        return NULL;

    /* Allocate the memory and store the new entry.
     * Insert the element in top, with the assumption that in a database
     * system it is more likely that recently added entries are accessed
     * more frequently. */
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    entry = zmalloc(sizeof(*entry));
    entry->next = ht->table[index];
    ht->table[index] = entry;
    ht->used++;

    /* Set the hash entry fields. */
    dictSetKey(d, entry, key);
    return entry;
}

// 2.2. 添加整型数据到 intset中
// intset.c, 添加value
/* Insert an integer in the intset */
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
    // 获取value的所属范围
    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;
    if (success) *success = 1;

    /* Upgrade encoding if necessary. If we need to upgrade, we know that
     * this value should be either appended (if > 0) or prepended (if < 0),
     * because it lies outside the range of existing values. */
    // 默认 is->encoding 为 INTSET_ENC_INT16 (16位长)
    // 2.2.1. 即超过当前预设的位长,则需要增大预设,然后添加
    // 此时的value可以确定: 要么是最大,要么是最小 (所以我们可以推断,此intset应该是有序的)
    if (valenc > intrev32ifbe(is->encoding)) {
        /* This always succeeds, so we don\'t need to curry *success. */
        return intsetUpgradeAndAdd(is,value);
    } else {
        /* Abort if the value is already present in the set.
         * This call will populate "pos" with the right position to insert
         * the value when it cannot be found. */
        // 2.2.2. 在当前环境下添加value
        // 找到value则说明元素已存在,不可再添加
        // pos 保存比value小的第1个元素的位置
        if (intsetSearch(is,value,&pos)) {
            if (success) *success = 0;
            return is;
        }

        is = intsetResize(is,intrev32ifbe(is->length)+1);
        // 在pos不是末尾位置时,需要留出空位,依次移动后面的元素
        if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
    }
    // 针对编码位不变更的情况下设置pos位置的值
    _intsetSet(is,pos,value);
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}
// 判断 value 的位长
// INTSET_ENC_INT16 < INTSET_ENC_INT32 < INTSET_ENC_INT64
// 2 < 4 < 8
/* Return the required encoding for the provided value. */
static uint8_t _intsetValueEncoding(int64_t v) {
    if (v < INT32_MIN || v > INT32_MAX)
        return INTSET_ENC_INT64;
    else if (v < INT16_MIN || v > INT16_MAX)
        return INTSET_ENC_INT32;
    else
        return INTSET_ENC_INT16;
}

// 2.2.1. 升级预设位长,并添加value
// intset.c
/* Upgrades the intset to a larger encoding and inserts the given integer. */
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
    uint8_t curenc = intrev32ifbe(is->encoding);
    uint8_t newenc = _intsetValueEncoding(value);
    int length = intrev32ifbe(is->length);
    int prepend = value < 0 ? 1 : 0;

    /* First set new encoding and resize */
    is->encoding = intrev32ifbe(newenc);
    // 每次必进行扩容
    is = intsetResize(is,intrev32ifbe(is->length)+1);

    /* Upgrade back-to-front so we don\'t overwrite values.
     * Note that the "prepend" variable is used to make sure we have an empty
     * space at either the beginning or the end of the intset. */
    // 因编码发生变化,元素的位置已经不能一一对应,需要按照原来的编码依次转移过来
    // 从后往前依次赋值,所以,内存位置上不存在覆盖问题(后面内存位置一定是空的),直接依次赋值即可(高效复制)
    while(length--)
        _intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));

    /* Set the value at the beginning or the end. */
    // 对新增加的元素,负数添加到第0位,否则添加到最后一个元素后一位
    if (prepend)
        _intsetSet(is,0,value);
    else
        _intsetSet(is,intrev32ifbe(is->length),value);
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}
/* Resize the intset */
static intset *intsetResize(intset *is, uint32_t len) {
    uint32_t size = len*intrev32ifbe(is->encoding);
    // malloc
    is = zrealloc(is,sizeof(intset)+size);
    return is;
}
// intset.c, 获取pos位置的值
/* Return the value at pos, given an encoding. */
static int64_t _intsetGetEncoded(intset *is, int pos, uint8_t enc) {
    int64_t v64;
    int32_t v32;
    int16_t v16;

    if (enc == INTSET_ENC_INT64) {
        memcpy(&v64,((int64_t*)is->contents)+pos,sizeof(v64));
        memrev64ifbe(&v64);
        return v64;
    } else if (enc == INTSET_ENC_INT32) {
        memcpy(&v32,((int32_t*)is->contents)+pos,sizeof(v32));
        memrev32ifbe(&v32);
        return v32;
    } else {
        memcpy(&v16,((int16_t*)is->contents)+pos,sizeof(v16));
        memrev16ifbe(&v16);
        return v16;
    }
}
// intset.c, 设置pos位置的值,和数组赋值的实际意义差不多
// 只是这里数据类型是不确定的,所以使用指针进行赋值
/* Set the value at pos, using the configured encoding. */
static void _intsetSet(intset *is, int pos, int64_t value) {
    uint32_t encoding = intrev32ifbe(is->encoding);
    if (encoding == INTSET_ENC_INT64) {
        ((int64_t*)is->contents)[pos] = value;
        memrev64ifbe(((int64_t*)is->contents)+pos);
    } else if (encoding == INTSET_ENC_INT32) {
        ((int32_t*)is->contents)[pos] = value;
        memrev32ifbe(((int32_t*)is->contents)+pos);
    } else {
        ((int16_t*)is->contents)[pos] = value;
        memrev16ifbe(((int16_t*)is->contents)+pos);
    }
}

// 2.2.2. 在编码类型未变更的情况,需要查找可以存放value的位置(为了确认该value是否已存在,以及小于value的第一个位置赋值)
/* Search for the position of "value". Return 1 when the value was found and
 * sets "pos" to the position of the value within the intset. Return 0 when
 * the value is not present in the intset and sets "pos" to the position
 * where "value" can be inserted. */
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
    int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
    int64_t cur = -1;

    /* The value can never be found when the set is empty */
    if (intrev32ifbe(is->length) == 0) {
        if (pos) *pos = 0;
        return 0;
    } else {
        /* Check for the case where we know we cannot find the value,
         * but do know the insert position. */
        // 因 intset 是有序数组,即可以判定是否超出范围,如果超出则元素必定不存在
        if (value > _intsetGet(is,intrev32ifbe(is->length)-1)) {
            if (pos) *pos = intrev32ifbe(is->length);
            return 0;
        } else if (value < _intsetGet(is,0)) {
            if (pos) *pos = 0;
            return 0;
        }
    }
    // 使用二分查找
    while(max >= min) {
        mid = ((unsigned int)min + (unsigned int)max) >> 1;
        cur = _intsetGet(is,mid);
        if (value > cur) {
            min = mid+1;
        } else if (value < cur) {
            max = mid-1;
        } else {
            // 找到了
            break;
        }
    }

    if (value == cur) {
        if (pos) *pos = mid;
        return 1;
    } else {
        // 在没有找到的情况下,min就是第一个比 value 小的元素
        if (pos) *pos = min;
        return 0;
    }
}
// intset移动(内存移动)
static void intsetMoveTail(intset *is以上是关于Redis:set/sadd/sismember/sinter/sdiffstore 命令源码解析的主要内容,如果未能解决你的问题,请参考以下文章

redis 如何重启?

redis启动,停止

redis演练聚集

windows 怎么开启redis

redis如何执行redis命令

Redis学习——详解Redis配置文件