欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Redis源碼分析之set?和?sorted?set?使用

 更新時間:2022年03月22日 11:05:21   作者:Zhan-LiZ  
本文介紹了Redis?中的?set?和?sorted?set?使用源碼實現(xiàn)分析,Redis?的?Set?是?String?類型的無序集合,集合成員是唯一的,sorted?set有序集合和集合一樣也是?string?類型元素的集合,對Redis?set?和?sorted?set使用相關(guān)知識感興趣的朋友一起看看吧

set 和 sorted set

前言

前面在幾個文章聊到了 list,string,hash 等結(jié)構(gòu)的實現(xiàn),這次來聊一下 set 和 sorted set 的細(xì)節(jié)。

set

Redis 的 Set 是 String 類型的無序集合,集合成員是唯一的。

底層實現(xiàn)主要用到了兩種數(shù)據(jù)結(jié)構(gòu) hashtable 和 inset(整數(shù)集合)。

集合中最大的成員數(shù)為2的32次方-1 (4294967295, 每個集合可存儲40多億個成員)。

常見命令

來看下幾個常用的命令

# 向集合添加一個或多個成員
SADD key member1 [member2]

# 獲取集合的成員數(shù)
SCARD key
# 返回第一個集合與其他集合之間的差異。
SDIFF key1 [key2]
# 返回給定所有集合的差集并存儲在 destination 中
SDIFFSTORE destination key1 [key2]
# 返回給定所有集合的交集
SINTER key1 [key2]
# 返回給定所有集合的交集并存儲在 destination 中
SINTERSTORE destination key1 [key2]
# 判斷 member 元素是否是集合 key 的成員
SISMEMBER key member
# 返回集合中的所有成員
SMEMBERS key
# 將 member 元素從 source 集合移動到 destination 集合
SMOVE source destination member
# 移除并返回集合中的一個隨機(jī)元素
SPOP key
# 返回集合中一個或多個隨機(jī)數(shù)
SRANDMEMBER key [count]
# 移除集合中一個或多個成員
SREM key member1 [member2]
# 返回所有給定集合的并集
SUNION key1 [key2]
# 所有給定集合的并集存儲在 destination 集合中
SUNIONSTORE destination key1 [key2]
# 迭代集合中的元素
SSCAN key cursor [MATCH pattern] [COUNT count]

來個栗子

127.0.0.1:6379>  SADD set-test xiaoming
(integer) 1
(integer) 0
127.0.0.1:6379>  SADD set-test xiaoming1
127.0.0.1:6379>  SADD set-test xiaoming2

127.0.0.1:6379> SMEMBERS set-test
1) "xiaoming2"
2) "xiaoming"
3) "xiaoming1"

上面重復(fù)值的插入,只有第一次可以插入成功

set 的使用場景

比較適用于聚合分類

1、標(biāo)簽:比如我們博客網(wǎng)站常常使用到的興趣標(biāo)簽,把一個個有著相同愛好,關(guān)注類似內(nèi)容的用戶利用一個標(biāo)簽把他們進(jìn)行歸并。

2、共同好友功能,共同喜好,或者可以引申到二度好友之類的擴(kuò)展應(yīng)用。

3、統(tǒng)計網(wǎng)站的獨立IP。利用set集合當(dāng)中元素不唯一性,可以快速實時統(tǒng)計訪問網(wǎng)站的獨立IP。

不過對于 set 中的命令要合理的應(yīng)用,不然很容易造成慢查詢

1、使用高效的命令,比如說,如果你需要返回一個 SET 中的所有成員時,不要使用 SMEMBERS 命令,而是要使用 SSCAN 多次迭代返回,避免一次返回大量數(shù)據(jù),造成線程阻塞。

2、當(dāng)你需要執(zhí)行排序、交集、并集操作時,可以在客戶端完成,而不要用SORT、SUNION、SINTER這些命令,以免拖慢 Redis 實例。

看下源碼實現(xiàn)

這里來看下 set 中主要用到的數(shù)據(jù)類型

代碼路徑https://github.com/redis/redis/blob/6.2/src/t_set.c

void saddCommand(client *c) {
    robj *set;
    int j, added = 0;

    set = lookupKeyWrite(c->db,c->argv[1]);
    if (checkType(c,set,OBJ_SET)) return;
    
    if (set == NULL) {
        set = setTypeCreate(c->argv[2]->ptr);
        dbAdd(c->db,c->argv[1],set);
    }
    for (j = 2; j < c->argc; j++) {
        if (setTypeAdd(set,c->argv[j]->ptr)) added++;
    if (added) {
        signalModifiedKey(c,c->db,c->argv[1]);
        notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
    server.dirty += added;
    addReplyLongLong(c,added);
}
// 當(dāng) value 是整數(shù)類型使用 intset
// 否則使用哈希表
robj *setTypeCreate(sds value) {
    if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
        return createIntsetObject();
    return createSetObject();
/* Add the specified value into a set.
 *
 * If the value was already member of the set, nothing is done and 0 is
 * returned, otherwise the new element is added and 1 is returned. */
int setTypeAdd(robj *subject, sds value) {
    long long llval;
    // 如果是 OBJ_ENCODING_HT 說明是哈希類型
    if (subject->encoding == OBJ_ENCODING_HT) {
        dict *ht = subject->ptr;
        dictEntry *de = dictAddRaw(ht,value,NULL);
        if (de) {
            // 設(shè)置key ,value 設(shè)置成null
            dictSetKey(ht,de,sdsdup(value));
            dictSetVal(ht,de,NULL);
            return 1;
        }
    // OBJ_ENCODING_INTSET 代表是 inset
    } else if (subject->encoding == OBJ_ENCODING_INTSET) {
        if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
            uint8_t success = 0;
            subject->ptr = intsetAdd(subject->ptr,llval,&success);
            if (success) {
                /* Convert to regular set when the intset contains
                 * too many entries. */
                // 如果條目過多將會轉(zhuǎn)換成集合
                size_t max_entries = server.set_max_intset_entries;
                /* limit to 1G entries due to intset internals. */
                if (max_entries >= 1<<30) max_entries = 1<<30;
                if (intsetLen(subject->ptr) > max_entries)
                    setTypeConvert(subject,OBJ_ENCODING_HT);
                return 1;
            }
        } else {
            /* Failed to get integer from object, convert to regular set. */
            setTypeConvert(subject,OBJ_ENCODING_HT);
            /* The set *was* an intset and this value is not integer
             * encodable, so dictAdd should always work. */
            serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
    } else {
        serverPanic("Unknown set encoding");
    return 0;

通過上面的源碼分析,可以看到

1、set 中主要用到了 hashtable 和 inset;

2、如果存儲的類型是整數(shù)類型就會使用 inset,否則使用 hashtable;

3、使用 inset 有一個最大的限制,達(dá)到了最大的限制,也是會使用 hashtable;

再來看下 inset 數(shù)據(jù)結(jié)構(gòu)

代碼地址https://github.com/redis/redis/blob/6.2/src/intset.h

typedef struct intset {
    // 編碼方法,指定當(dāng)前存儲的是 16 位,32 位,還是 64 位的整數(shù)
    uint32_t encoding;
    uint32_t length;
    // 實際保存元素的數(shù)組
    int8_t contents[];
} intset;
insert

來看下 intset 的數(shù)據(jù)插入

/* Insert an integer in the intset */
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
    // 計算value的編碼長度
    uint8_t valenc = _intsetValueEncoding(value);
    uint32_t pos;
    if (success) *success = 1;

    /* Upgrade encoding if necessary. If we need to upgrade, we know that
     * this value should be either appended (if > 0) or prepended (if < 0),
     * because it lies outside the range of existing values. */
    // 如果value的編碼長度大于當(dāng)前的編碼位數(shù),進(jìn)行升級
    if (valenc > intrev32ifbe(is->encoding)) {
        /* This always succeeds, so we don't need to curry *success. */
        return intsetUpgradeAndAdd(is,value);
    } else {
        /* Abort if the value is already present in the set.
         * This call will populate "pos" with the right position to insert
         * the value when it cannot be found. */
        // 當(dāng)前值不存在的時候才進(jìn)行插入  
        if (intsetSearch(is,value,&pos)) {
            if (success) *success = 0;
            return is;
        }
        is = intsetResize(is,intrev32ifbe(is->length)+1);
        if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
    }
    // 數(shù)據(jù)插入
    _intsetSet(is,pos,value);
    is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
    return is;
}

intset 的數(shù)據(jù)插入有一個數(shù)據(jù)升級的過程,當(dāng)一個整數(shù)被添加到整數(shù)集合時,首先需要判斷下 新元素的類型和集合中現(xiàn)有元素類型的長短,如果新元素是一個32位的數(shù)字,現(xiàn)有集合類型是16位的,那么就需要將整數(shù)集合進(jìn)行升級,然后才能將新的元素加入進(jìn)來。

這樣做的優(yōu)點:

1、提升整數(shù)集合的靈活性,可以隨意的添加整數(shù),而不用關(guān)心整數(shù)的類型;

2、可以盡可能的節(jié)約內(nèi)存。

了解完數(shù)據(jù)的插入再來看下 intset 中是如何來快速的搜索里面的數(shù)據(jù)

/* Search for the position of "value". Return 1 when the value was found and
 * sets "pos" to the position of the value within the intset. Return 0 when
 * the value is not present in the intset and sets "pos" to the position
 * where "value" can be inserted. */
// 如果找到了對應(yīng)的數(shù)據(jù),返回 1 將 pos 設(shè)置為對應(yīng)的位置
// 如果找不到,返回0,設(shè)置 pos 為可以為數(shù)據(jù)可以插入的位置
// intset 中的數(shù)據(jù)是排好序的,所以使用二分查找來尋找對應(yīng)的元素  
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
    int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
    int64_t cur = -1;

    /* The value can never be found when the set is empty */
    if (intrev32ifbe(is->length) == 0) {
        if (pos) *pos = 0;
        return 0;
    } else {
        /* Check for the case where we know we cannot find the value,
         * but do know the insert position. */
        if (value > _intsetGet(is,max)) {
            if (pos) *pos = intrev32ifbe(is->length);
            return 0;
        } else if (value < _intsetGet(is,0)) {
            if (pos) *pos = 0;
        }
    }
    // 使用二分查找
    while(max >= min) {
        mid = ((unsigned int)min + (unsigned int)max) >> 1;
        cur = _intsetGet(is,mid);
        if (value > cur) {
            min = mid+1;
        } else if (value < cur) {
            max = mid-1;
        } else {
            break;
    if (value == cur) {
        if (pos) *pos = mid;
        return 1;
        if (pos) *pos = min;
}

可以看到這里用到的是二分查找,intset 中的數(shù)據(jù)本身也就是排好序的

dict

來看下 dict 的數(shù)據(jù)結(jié)構(gòu)

typedef struct dict {
    dictType *type;
    void *privdata;
    // 哈希表數(shù)組,長度為2,一個正常存儲數(shù)據(jù),一個用來擴(kuò)容
    dictht ht[2];
    long rehashidx; /* rehashing not in progress if rehashidx == -1 */
    int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
} dict;

// 哈希表結(jié)構(gòu),通過兩個哈希表使用,實現(xiàn)增量的 rehash  
typedef struct dictht {
    dictEntry **table;
    // hash 容量大小
    unsigned long size;
    // 總是等于 size - 1,用于計算索引值
    unsigned long sizemask;
    // 實際存儲的 dictEntry 數(shù)量
    unsigned long used;
} dictht;
//  k/v 鍵值對節(jié)點,是實際存儲數(shù)據(jù)的節(jié)點  
typedef struct dictEntry {
    // 鍵對象,總是一個字符串類型的對象
    void *key;
    union {
        // void指針,這意味著它可以指向任何類型
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    // 指向下一個節(jié)點
    struct dictEntry *next;
} dictEntry;

可以看到 dict 中,是預(yù)留了兩個哈希表,來處理漸進(jìn)式的 rehash

rehash 細(xì)節(jié)參考  redis 中的字典

再來看下 dict 數(shù)據(jù)的插入

dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
    long index;
    dictEntry *entry;
    dictht *ht;

    if (dictIsRehashing(d)) _dictRehashStep(d);
    /* Get the index of the new element, or -1 if
     * the element already exists. */
    if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
        return NULL;
    /* Allocate the memory and store the new entry.
     * Insert the element in top, with the assumption that in a database
     * system it is more likely that recently added entries are accessed
     * more frequently. */
    // 這里來判斷是否正在 Rehash 中
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    entry = zmalloc(sizeof(*entry));
    entry->next = ht->table[index];
    ht->table[index] = entry;
    ht->used++;
    /* Set the hash entry fields. */
    // 插入具體的數(shù)據(jù)  
    dictSetKey(d, entry, key);
    return entry;
}

這里重點來分析下 Rehash 的過程

int dictRehash(dict *d, int n) {
    int empty_visits = n*10; /* Max number of empty buckets to visit. */
    // 這里來判斷是否正在 Rehash 中
    if (!dictIsRehashing(d)) return 0;

    // used 實際存儲的 dictEntry 數(shù)量
    // 如果有數(shù)據(jù)進(jìn)行下面的遷移
    while(n-- && d->ht[0].used != 0) {
        dictEntry *de, *nextde;
        /* Note that rehashidx can't overflow as we are sure there are more
         * elements because ht[0].used != 0 */
        assert(d->ht[0].size > (unsigned long)d->rehashidx);
        while(d->ht[0].table[d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
        // 獲取老數(shù)據(jù)中的數(shù)據(jù)
        de = d->ht[0].table[d->rehashidx];
        /* Move all the keys in this bucket from the old to the new hash HT */
        while(de) {
            uint64_t h;
            nextde = de->next;
            /* Get the index in the new hash table */
            // 獲取新哈希表的索引
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;
            de->next = d->ht[1].table[h];
            // 將數(shù)據(jù)放入到新的 dity 中
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }
    /* Check if we already rehashed the whole table... */
    // 這里來檢測是否完成了整個的 rehash 操作  
    if (d->ht[0].used == 0) {
        zfree(d->ht[0].table);
        // 將 ht[1] 的內(nèi)容放入到 ht[0] 中,ht[1] 作為備用,下次 rehash 使用
        d->ht[0] = d->ht[1];
        _dictReset(&d->ht[1]);
        d->rehashidx = -1;
        return 0;
    /* More to rehash... */
    return 1;
}
// 使用增量的方式 rehash 
// 當(dāng)然數(shù)據(jù)很大的話,一次遷移所有的數(shù)據(jù),顯然是不合理的,會造成Redis線程阻塞,無法服務(wù)其他請求。這里 Redis 使用的是漸進(jìn)式 rehash。
// 在 rehash 期間,每次執(zhí)行添加,刪除,查找或者更新操作時,除了對命令本身的處理,還會順帶將哈希表1中的數(shù)據(jù)拷貝到哈希表2中。從索引0開始,每執(zhí)行一次操作命令,拷貝一個索引位置的數(shù)據(jù)。  
// 如果沒有讀入和寫入操作,這時候就不能進(jìn)行 rehash
// 所以會定時執(zhí)行一定數(shù)量的 rehash 操作  
int incrementallyRehash(int dbid) {
    /* Keys dictionary */
    if (dictIsRehashing(server.db[dbid].dict)) {
        dictRehashMilliseconds(server.db[dbid].dict,1);
        return 1; /* already used our millisecond for this loop... */
    /* Expires */
    if (dictIsRehashing(server.db[dbid].expires)) {
        dictRehashMilliseconds(server.db[dbid].expires,1);
    return 0;

1、rehash 的過程 Redis 默認(rèn)使用了兩個全局哈希表;

2、rehash 的過程是漸進(jìn)式的,因為如果數(shù)據(jù)量很大的話,一次遷移所有的數(shù)據(jù),會造成Redis線程阻塞,無法服務(wù)其他請求;

3、在進(jìn)行 rehash 期間,刪除,查找或者更新操作都會在兩個哈希表中執(zhí)行,添加操作就直接添加到哈希表2中了。查找會把兩個哈希表都找一遍,直到找到或者兩個都找不到;

4、如果在 reash 期間,如果沒有讀寫操作,這時候,就不能遷移工作了,所以后臺定時執(zhí)行一定數(shù)量的數(shù)據(jù)遷移。

sorted set

sorted set有序集合和集合一樣也是 string 類型元素的集合,同時也不允許有重復(fù)的成員。

不同的是sorted set中的每個元素都會關(guān)聯(lián)一個 double 類型的分?jǐn)?shù),sorted set通過這個分?jǐn)?shù)給集合中的成員進(jìn)行從小到大的排序。有序集合中的成員是唯一的,關(guān)聯(lián)的 score 可以重復(fù)。

常見的命令

下面看下有序集合中常見的命令

# 向有序集合添加一個或多個成員,或者更新已存在成員的分?jǐn)?shù)
ZADD key score1 member1 [score2 member2]

# 獲取有序集合的成員數(shù)
ZCARD key
# 計算在有序集合中指定區(qū)間分?jǐn)?shù)的成員數(shù)
ZCOUNT key min max
# 有序集合中對指定成員的分?jǐn)?shù)加上增量 increment
ZINCRBY key increment member
# 計算給定的一個或多個有序集的交集并將結(jié)果集存儲在新的有序集合 destination 中
ZINTERSTORE destination numkeys key [key ...]
# 在有序集合中計算指定字典區(qū)間內(nèi)成員數(shù)量
ZLEXCOUNT key min max
# 通過索引區(qū)間返回有序集合指定區(qū)間內(nèi)的成員
ZRANGE key start stop [WITHSCORES]
# 通過字典區(qū)間返回有序集合的成員
ZRANGEBYLEX key min max [LIMIT offset count]
# 通過分?jǐn)?shù)返回有序集合指定區(qū)間內(nèi)的成員
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT]
# 返回有序集合中指定成員的索引
ZRANK key member
# 移除有序集合中的一個或多個成員
ZREM key member [member ...]
# 移除有序集合中給定的字典區(qū)間的所有成員
ZREMRANGEBYLEX key min max
# 移除有序集合中給定的排名區(qū)間的所有成員
ZREMRANGEBYRANK key start stop
# 移除有序集合中給定的分?jǐn)?shù)區(qū)間的所有成員
ZREMRANGEBYSCORE key min max
# 返回有序集中指定區(qū)間內(nèi)的成員,通過索引,分?jǐn)?shù)從高到低
ZREVRANGE key start stop [WITHSCORES]
# 返回有序集中指定分?jǐn)?shù)區(qū)間內(nèi)的成員,分?jǐn)?shù)從高到低排序
ZREVRANGEBYSCORE key max min [WITHSCORES]
# 返回有序集合中指定成員的排名,有序集成員按分?jǐn)?shù)值遞減(從大到小)排序
ZREVRANK key member
# 返回有序集中,成員的分?jǐn)?shù)值
ZSCORE key member
# 計算給定的一個或多個有序集的并集,并存儲在新的 key 中
ZUNIONSTORE destination numkeys key [key ...]
# 迭代有序集合中的元素(包括元素成員和元素分值)
ZSCAN key cursor [MATCH pattern] [COUNT count]

來個栗子

127.0.0.1:6379> ZADD test-sset 1 member1
(integer) 1
127.0.0.1:6379> ZADD test-sset 2 member2
(integer) 1
127.0.0.1:6379> ZADD test-sset 3 member3
(integer) 1
127.0.0.1:6379> ZADD test-sset 3 member3
(integer) 0
127.0.0.1:6379> ZADD test-sset 4 member3
(integer) 0
127.0.0.1:6379> ZADD test-sset 5 member5
(integer) 1
127.0.0.1:6379> ZRANGE test-sset 0 10 WITHSCORES
1) "member1"
2) "1"
3) "member2"
4) "2"
5) "member3"
6) "4"
7) "member5"
8) "5"

使用場景

來看下sorted set的使用場景

1、通過 score 的排序功能,可以實現(xiàn)類似排行榜,學(xué)習(xí)成績的排序功能;

2、也可以實現(xiàn)帶權(quán)重隊列,比如普通消息的 score 為1,重要消息的 score 為2,然后工作線程可以選擇按 score 的倒序來獲取工作任務(wù)。讓重要的任務(wù)優(yōu)先執(zhí)行;

3、也可以實現(xiàn)一個延遲隊列,將 score 存儲過期時間,從小到大排序,最靠前的就是最先過期的。

分析下源碼實現(xiàn)

sorted set 中的代碼主要在下面的兩個文件中

結(jié)構(gòu)定義:server.h

實現(xiàn):t_zset.c

先來看下sorted set的數(shù)據(jù)結(jié)構(gòu)

typedef struct zset {
    dict *dict;
    zskiplist *zsl;
} zset;

typedef struct zskiplist {
    // 頭,尾節(jié)點
    struct zskiplistNode *header, *tail;
    // 節(jié)點數(shù)量
    unsigned long length;
    // 目前表內(nèi)節(jié)點的最大層數(shù)
    int level;
} zskiplist;
/* ZSETs use a specialized version of Skiplists */
// ZSETs 使用的是特定版的 Skiplists
typedef struct zskiplistNode {
    // 這里使用 sds 存儲具體的元素
    sds ele;
    // 元素的權(quán)重
    double score;
    // 后向指針(為了便于從跳表的尾節(jié)點倒序查找)
    struct zskiplistNode *backward;
    // 層
    // 保存著指向其他元素的指針。高層的指針越過的元素數(shù)量大于等于低層的指針,為了提高查找的效率,程序總是從高層先開始訪問,然后隨著元素值范圍的縮小,慢慢降低層次。
    struct zskiplistLevel {
        // 節(jié)點上的前向指針
        struct zskiplistNode *forward;
        // 跨度
        // 用于記錄兩個節(jié)點之間的距離
        unsigned long span;
    } level[];
} zskiplistNode;

看上面的數(shù)據(jù)結(jié)構(gòu)可以發(fā)現(xiàn)sorted set的實現(xiàn)主要使用了 dict 和 zskiplist 兩種數(shù)據(jù)結(jié)構(gòu)。不過sorted set在元素較少的情況下使用的壓縮列表,具體細(xì)節(jié)參見下文的 zsetAdd 函數(shù)。

ZADD

來看下 ZADD 的插入

// 代碼地址 https://github.com/redis/redis/blob/6.2/src/t_zset.c
// ZADD 命令
void zaddCommand(client *c) {
    zaddGenericCommand(c,ZADD_IN_NONE);
}

/* This generic command implements both ZADD and ZINCRBY. */
void zaddGenericCommand(client *c, int flags) {
    ...
    /* Lookup the key and create the sorted set if does not exist. */
    zobj = lookupKeyWrite(c->db,key);
    if (checkType(c,zobj,OBJ_ZSET)) goto cleanup;
    if (zobj == NULL) {
        if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
        // 超過閾值(zset-max-ziplist-entries、zset-max-ziplist-value)后,使用 hashtable + skiplist 存儲
        if (server.zset_max_ziplist_entries == 0 ||
            server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
        {
            zobj = createZsetObject();
        } else {
            zobj = createZsetZiplistObject();
        }
        dbAdd(c->db,key,zobj);
    }
// 看下具體的插入過程
int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, double *newscore) {
    /* Turn options into simple to check vars. */
    int incr = (in_flags & ZADD_IN_INCR) != 0;
    int nx = (in_flags & ZADD_IN_NX) != 0;
    int xx = (in_flags & ZADD_IN_XX) != 0;
    int gt = (in_flags & ZADD_IN_GT) != 0;
    int lt = (in_flags & ZADD_IN_LT) != 0;
    /* Update the sorted set according to its encoding. */
    // 如果類型是 ZIPLIST
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        unsigned char *eptr;
        if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
             ...
            /* Remove and re-insert when score changed. */
            // 當(dāng)分?jǐn)?shù)改變時移除并重新插入
            if (score != curscore) {
                zobj->ptr = zzlDelete(zobj->ptr,eptr);
                zobj->ptr = zzlInsert(zobj->ptr,ele,score);
                *out_flags |= ZADD_OUT_UPDATED;
            }
            return 1;
        // 新元素
        } else if (!xx) {
            /* check if the element is too large or the list
             * becomes too long *before* executing zzlInsert. */
            // 如果元素過大就使用跳表
            if (zzlLength(zobj->ptr)+1 > server.zset_max_ziplist_entries ||
                sdslen(ele) > server.zset_max_ziplist_value ||
                !ziplistSafeToAdd(zobj->ptr, sdslen(ele)))
            {
                zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
            } else {
                if (newscore) *newscore = score;
                *out_flags |= ZADD_OUT_ADDED;
                return 1;
            *out_flags |= ZADD_OUT_NOP;
    /* Note that the above block handling ziplist would have either returned or
     * converted the key to skiplist. */
    // 表示使用的類型是跳表
    if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        zskiplistNode *znode;
        dictEntry *de;
        // 在哈希表中查找元素
        de = dictFind(zs->dict,ele);
        if (de != NULL) {
            /* NX? Return, same element already exists. */
            if (nx) {
                *out_flags |= ZADD_OUT_NOP;
            // 哈希表中獲取元素的權(quán)重
            curscore = *(double*)dictGetVal(de);
            /* Prepare the score for the increment if needed. */
            // 更新權(quán)重值
            if (incr) {
                score += curscore;
                if (isnan(score)) {
                    *out_flags |= ZADD_OUT_NAN;
                    return 0;
                }
            /* GT/LT? Only update if score is greater/less than current. */
            if ((lt && score >= curscore) || (gt && score <= curscore)) {
            if (newscore) *newscore = score;
            /* Remove and re-insert when score changes. */
            // 如果權(quán)重發(fā)生變化了
                znode = zslUpdateScore(zs->zsl,curscore,ele,score);
                /* Note that we did not removed the original element from
                 * the hash table representing the sorted set, so we just
                 * update the score. */
                dictGetVal(de) = &znode->score; /* Update score ptr. */
        // 如果新元素不存在
            ele = sdsdup(ele);
            // 插入到跳表節(jié)點
            znode = zslInsert(zs->zsl,score,ele);
            // 在哈希表中插入
            serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
            *out_flags |= ZADD_OUT_ADDED;
    } else {
        serverPanic("Unknown sorted set encoding");
    return 0; /* Never reached. */

sorted set 的插入使用了兩種策略

1、如果插入的數(shù)據(jù)量和長度沒有達(dá)到閥值,就使用壓縮列表進(jìn)行保存,反之就使用跳表加哈希表的組合方式進(jìn)行保存;

2、壓縮列表本身是就不適合保存過多的元素,所以達(dá)到閥值使用跳表加哈希表的組合方式進(jìn)行保存;

3、這里跳表加哈希表的組合方式也是很巧妙的,跳表用來進(jìn)行范圍的查詢,通過哈希表來實現(xiàn)單個元素權(quán)重值的查詢,組合的方式提高了查詢的效率;

ZRANGE

看完了插入函數(shù),這里再來分析下 ZRANGE

// 獲取有序集合中, 指定數(shù)據(jù)的排名.
// 若reverse==0, 排名以得分升序排列. 否則排名以得分降序排列.
// 第一個數(shù)據(jù)的排名為0, 而不是1
// 使用壓縮列表或者跳表,里面的數(shù)據(jù)都是排好序的
long zsetRank(robj *zobj, sds ele, int reverse) {
    unsigned long llen;
    unsigned long rank;

    llen = zsetLength(zobj);
    // 壓縮列表
    if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
        unsigned char *zl = zobj->ptr;
        unsigned char *eptr, *sptr;
        eptr = ziplistIndex(zl,0);
        serverAssert(eptr != NULL);
        sptr = ziplistNext(zl,eptr);
        serverAssert(sptr != NULL);
        rank = 1;
        while(eptr != NULL) {
            if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele)))
                break;
            rank++;
            zzlNext(zl,&eptr,&sptr);
        }
        if (eptr != NULL) {
             // 逆向取rank
             // 返回后面的數(shù)據(jù)
            if (reverse)
                return llen-rank;
            else
                return rank-1;
        } else {
            return -1;
    // 跳表
    } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = zobj->ptr;
        zskiplist *zsl = zs->zsl;
        dictEntry *de;
        double score;
        de = dictFind(zs->dict,ele);
        if (de != NULL) {
            score = *(double*)dictGetVal(de);
            // 查找包含給定分值和成員對象的節(jié)點在跳躍表中的排位
            rank = zslGetRank(zsl,score,ele);
            /* Existing elements always have a rank. */
            serverAssert(rank != 0);
            // 逆向取rank
    } else {
        serverPanic("Unknown sorted set encoding");
    }
}
/*
 * 查找包含給定分值和成員對象的節(jié)點在跳躍表中的排位。
 *
 * 如果沒有包含給定分值和成員對象的節(jié)點,返回 0 ,否則返回排位。
 * 注意,因為跳躍表的表頭也被計算在內(nèi),所以返回的排位以 1 為起始值。
 * T_wrost = O(N), T_avg = O(log N)
 */
unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *x;
    unsigned long rank = 0;
    int i;
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        // 遍歷所有對比的節(jié)點
        while (x->level[i].forward &&
            (x->level[i].forward->score < score ||
                (x->level[i].forward->score == score &&
                sdscmp(x->level[i].forward->ele,ele) <= 0))) {
            rank += x->level[i].span;
            // 沿著前進(jìn)指針遍歷跳躍表
            x = x->level[i].forward;
        /* x might be equal to zsl->header, so test if obj is non-NULL */
        if (x->ele && sdscmp(x->ele,ele) == 0) {
            return rank;
    return 0;

通過索引區(qū)間返回有序集合指定區(qū)間內(nèi)的成員,因為數(shù)據(jù)在插入的時候,已經(jīng)按照從小到大進(jìn)行了,排序,所以返回指定區(qū)間的成員,遍歷對應(yīng)的數(shù)據(jù)即可。

總結(jié)

1、Redis 的 Set 是 String 類型的無序集合,集合成員是唯一的;

2、sorted set有序集合和集合一樣也是 string 類型元素的集合,同時也不允許有重復(fù)的成員。不同的是sorted set中的每個元素都會關(guān)聯(lián)一個 double 類型的分?jǐn)?shù);

3、set 底層實現(xiàn)主要用到了兩種數(shù)據(jù)結(jié)構(gòu) hashtable 和 inset(整數(shù)集合);

4、sorted set在元素較少的情況下使用的壓縮列表存儲數(shù)據(jù),數(shù)據(jù)量超過閥值的時候 使用 dict 加 zskiplist 來存儲數(shù)據(jù);

4、跳表加哈希表的組合方式也是很巧妙的,跳表用來進(jìn)行范圍的查詢,通過哈希表來實現(xiàn)單個元素權(quán)重值的查詢,組合的方式提高了查詢的效率。

參考

【Redis核心技術(shù)與實戰(zhàn)】https://time.geekbang.org/column/intro/100056701
【Redis設(shè)計與實現(xiàn)】https://book.douban.com/subject/25900156/
【redis 集合(set)類型的使用和應(yīng)用場景】https://www.oraclejsq.com/redisjc/040101720.html
【跳躍表】https://redisbook.readthedocs.io/en/latest/internal-datastruct/skiplist.html
【Redis學(xué)習(xí)筆記】https://github.com/boilingfrog/Go-POINT/tree/master/redis
【Redis 中的 set 和 sorted set 如何使用】https://boilingfrog.github.io/2022/03/21/Redis中的set和sortedset/

到此這篇關(guān)于Redis源碼分析之set 和 sorted set 使用的文章就介紹到這了,更多相關(guān)Redis set 和 sorted set使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • redis中的常用5大數(shù)據(jù)類型

    redis中的常用5大數(shù)據(jù)類型

    這篇文章主要介紹了redis中的常用5大數(shù)據(jù)類型,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-04-04
  • 你了解Redis事務(wù)嗎

    你了解Redis事務(wù)嗎

    說到事務(wù),大家會立刻想到Mysql的事務(wù),所謂的事務(wù)就是對數(shù)據(jù)進(jìn)行一系列的操作,要么都執(zhí)行成功,要么都執(zhí)行失敗,下面就介紹一下Redis如何實現(xiàn)事務(wù),感興趣的可以了解一下
    2022-08-08
  • 關(guān)于redis Key淘汰策略的實現(xiàn)方法

    關(guān)于redis Key淘汰策略的實現(xiàn)方法

    下面小編就為大家?guī)硪黄P(guān)于redis Key淘汰策略的實現(xiàn)方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-03-03
  • Redis分布式限流的幾種實現(xiàn)

    Redis分布式限流的幾種實現(xiàn)

    分布式限流是指通過將限流策略嵌入到分布式系統(tǒng)中,以控制流量或保護(hù)服務(wù),本文就來介紹一下Redis分布式限流的幾種實現(xiàn),感興趣的可以了解一下
    2023-12-12
  • Redis Key過期監(jiān)聽的配置詳解

    Redis Key過期監(jiān)聽的配置詳解

    這篇文章主要介紹了Redis Key過期監(jiān)聽配置,默認(rèn)情況下在Windows系統(tǒng)中雙擊redis-server.exe用的是內(nèi)置的配置文件,文中通過代碼示例和圖文講解的非常詳細(xì),對大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下
    2024-06-06
  • redis啟動失敗問題之完美解決方案

    redis啟動失敗問題之完美解決方案

    這篇文章主要介紹了redis啟動失敗問題之完美解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-09-09
  • Redis集群模式和常用數(shù)據(jù)結(jié)構(gòu)詳解

    Redis集群模式和常用數(shù)據(jù)結(jié)構(gòu)詳解

    Redis集群模式下的運維指令主要用于集群的搭建、管理、監(jiān)控和維護(hù),講解了一些常用的Redis集群運維指令,本文重點介紹了Redis集群模式和常用數(shù)據(jù)結(jié)構(gòu),需要的朋友可以參考下
    2024-03-03
  • 使用redis生成唯一編號及原理示例詳解

    使用redis生成唯一編號及原理示例詳解

    今天介紹下如何使用redis生成唯一的序列號,其實主要思想還是利用redis單線程的特性,可以保證操作的原子性,使讀寫同一個key時不會出現(xiàn)不同的數(shù)據(jù),感興趣的朋友跟隨小編一起看看吧
    2021-09-09
  • 阿里云服務(wù)器安裝配置redis的方法并且加入到開機(jī)啟動(推薦)

    阿里云服務(wù)器安裝配置redis的方法并且加入到開機(jī)啟動(推薦)

    這篇文章主要介紹了阿里云服務(wù)器安裝配置redis并且加入到開機(jī)啟動,需要的朋友可以參考下
    2017-12-12
  • redis如何實現(xiàn)保存對象

    redis如何實現(xiàn)保存對象

    這篇文章主要介紹了redis如何實現(xiàn)保存對象,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-06-06

最新評論