Redis源碼分析之set?和?sorted?set?使用
set 和 sorted set
前言
前面在幾個文章聊到了 list,string,hash
等結(jié)構(gòu)的實現(xiàn),這次來聊一下 set 和 sorted set
的細(xì)節(jié)。
set
Redis 的 Set 是 String 類型的無序集合,集合成員是唯一的。
底層實現(xiàn)主要用到了兩種數(shù)據(jù)結(jié)構(gòu) hashtable 和 inset(整數(shù)集合)。
集合中最大的成員數(shù)為2的32次方-1 (4294967295, 每個集合可存儲40多億個成員)。
常見命令
來看下幾個常用的命令
# 向集合添加一個或多個成員 SADD key member1 [member2] # 獲取集合的成員數(shù) SCARD key # 返回第一個集合與其他集合之間的差異。 SDIFF key1 [key2] # 返回給定所有集合的差集并存儲在 destination 中 SDIFFSTORE destination key1 [key2] # 返回給定所有集合的交集 SINTER key1 [key2] # 返回給定所有集合的交集并存儲在 destination 中 SINTERSTORE destination key1 [key2] # 判斷 member 元素是否是集合 key 的成員 SISMEMBER key member # 返回集合中的所有成員 SMEMBERS key # 將 member 元素從 source 集合移動到 destination 集合 SMOVE source destination member # 移除并返回集合中的一個隨機(jī)元素 SPOP key # 返回集合中一個或多個隨機(jī)數(shù) SRANDMEMBER key [count] # 移除集合中一個或多個成員 SREM key member1 [member2] # 返回所有給定集合的并集 SUNION key1 [key2] # 所有給定集合的并集存儲在 destination 集合中 SUNIONSTORE destination key1 [key2] # 迭代集合中的元素 SSCAN key cursor [MATCH pattern] [COUNT count]
來個栗子
127.0.0.1:6379> SADD set-test xiaoming (integer) 1 (integer) 0 127.0.0.1:6379> SADD set-test xiaoming1 127.0.0.1:6379> SADD set-test xiaoming2 127.0.0.1:6379> SMEMBERS set-test 1) "xiaoming2" 2) "xiaoming" 3) "xiaoming1"
上面重復(fù)值的插入,只有第一次可以插入成功
set 的使用場景
比較適用于聚合分類
1、標(biāo)簽:比如我們博客網(wǎng)站常常使用到的興趣標(biāo)簽,把一個個有著相同愛好,關(guān)注類似內(nèi)容的用戶利用一個標(biāo)簽把他們進(jìn)行歸并。
2、共同好友功能,共同喜好,或者可以引申到二度好友之類的擴(kuò)展應(yīng)用。
3、統(tǒng)計網(wǎng)站的獨立IP。利用set集合當(dāng)中元素不唯一性,可以快速實時統(tǒng)計訪問網(wǎng)站的獨立IP。
不過對于 set 中的命令要合理的應(yīng)用,不然很容易造成慢查詢
1、使用高效的命令,比如說,如果你需要返回一個 SET 中的所有成員時,不要使用 SMEMBERS 命令,而是要使用 SSCAN 多次迭代返回,避免一次返回大量數(shù)據(jù),造成線程阻塞。
2、當(dāng)你需要執(zhí)行排序、交集、并集操作時,可以在客戶端完成,而不要用SORT、SUNION、SINTER
這些命令,以免拖慢 Redis 實例。
看下源碼實現(xiàn)
這里來看下 set 中主要用到的數(shù)據(jù)類型
代碼路徑https://github.com/redis/redis/blob/6.2/src/t_set.c
void saddCommand(client *c) { robj *set; int j, added = 0; set = lookupKeyWrite(c->db,c->argv[1]); if (checkType(c,set,OBJ_SET)) return; if (set == NULL) { set = setTypeCreate(c->argv[2]->ptr); dbAdd(c->db,c->argv[1],set); } for (j = 2; j < c->argc; j++) { if (setTypeAdd(set,c->argv[j]->ptr)) added++; if (added) { signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id); server.dirty += added; addReplyLongLong(c,added); } // 當(dāng) value 是整數(shù)類型使用 intset // 否則使用哈希表 robj *setTypeCreate(sds value) { if (isSdsRepresentableAsLongLong(value,NULL) == C_OK) return createIntsetObject(); return createSetObject(); /* Add the specified value into a set. * * If the value was already member of the set, nothing is done and 0 is * returned, otherwise the new element is added and 1 is returned. */ int setTypeAdd(robj *subject, sds value) { long long llval; // 如果是 OBJ_ENCODING_HT 說明是哈希類型 if (subject->encoding == OBJ_ENCODING_HT) { dict *ht = subject->ptr; dictEntry *de = dictAddRaw(ht,value,NULL); if (de) { // 設(shè)置key ,value 設(shè)置成null dictSetKey(ht,de,sdsdup(value)); dictSetVal(ht,de,NULL); return 1; } // OBJ_ENCODING_INTSET 代表是 inset } else if (subject->encoding == OBJ_ENCODING_INTSET) { if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) { uint8_t success = 0; subject->ptr = intsetAdd(subject->ptr,llval,&success); if (success) { /* Convert to regular set when the intset contains * too many entries. */ // 如果條目過多將會轉(zhuǎn)換成集合 size_t max_entries = server.set_max_intset_entries; /* limit to 1G entries due to intset internals. */ if (max_entries >= 1<<30) max_entries = 1<<30; if (intsetLen(subject->ptr) > max_entries) setTypeConvert(subject,OBJ_ENCODING_HT); return 1; } } else { /* Failed to get integer from object, convert to regular set. */ setTypeConvert(subject,OBJ_ENCODING_HT); /* The set *was* an intset and this value is not integer * encodable, so dictAdd should always work. */ serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK); } else { serverPanic("Unknown set encoding"); return 0;
通過上面的源碼分析,可以看到
1、set 中主要用到了 hashtable 和 inset;
2、如果存儲的類型是整數(shù)類型就會使用 inset,否則使用 hashtable;
3、使用 inset 有一個最大的限制,達(dá)到了最大的限制,也是會使用 hashtable;
再來看下 inset 數(shù)據(jù)結(jié)構(gòu)
代碼地址https://github.com/redis/redis/blob/6.2/src/intset.h
typedef struct intset { // 編碼方法,指定當(dāng)前存儲的是 16 位,32 位,還是 64 位的整數(shù) uint32_t encoding; uint32_t length; // 實際保存元素的數(shù)組 int8_t contents[]; } intset;
insert
來看下 intset 的數(shù)據(jù)插入
/* Insert an integer in the intset */ intset *intsetAdd(intset *is, int64_t value, uint8_t *success) { // 計算value的編碼長度 uint8_t valenc = _intsetValueEncoding(value); uint32_t pos; if (success) *success = 1; /* Upgrade encoding if necessary. If we need to upgrade, we know that * this value should be either appended (if > 0) or prepended (if < 0), * because it lies outside the range of existing values. */ // 如果value的編碼長度大于當(dāng)前的編碼位數(shù),進(jìn)行升級 if (valenc > intrev32ifbe(is->encoding)) { /* This always succeeds, so we don't need to curry *success. */ return intsetUpgradeAndAdd(is,value); } else { /* Abort if the value is already present in the set. * This call will populate "pos" with the right position to insert * the value when it cannot be found. */ // 當(dāng)前值不存在的時候才進(jìn)行插入 if (intsetSearch(is,value,&pos)) { if (success) *success = 0; return is; } is = intsetResize(is,intrev32ifbe(is->length)+1); if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1); } // 數(shù)據(jù)插入 _intsetSet(is,pos,value); is->length = intrev32ifbe(intrev32ifbe(is->length)+1); return is; }
intset 的數(shù)據(jù)插入有一個數(shù)據(jù)升級的過程,當(dāng)一個整數(shù)被添加到整數(shù)集合時,首先需要判斷下 新元素的類型和集合中現(xiàn)有元素類型的長短,如果新元素是一個32位的數(shù)字,現(xiàn)有集合類型是16位的,那么就需要將整數(shù)集合進(jìn)行升級,然后才能將新的元素加入進(jìn)來。
這樣做的優(yōu)點:
1、提升整數(shù)集合的靈活性,可以隨意的添加整數(shù),而不用關(guān)心整數(shù)的類型;
2、可以盡可能的節(jié)約內(nèi)存。
了解完數(shù)據(jù)的插入再來看下 intset 中是如何來快速的搜索里面的數(shù)據(jù)
/* Search for the position of "value". Return 1 when the value was found and * sets "pos" to the position of the value within the intset. Return 0 when * the value is not present in the intset and sets "pos" to the position * where "value" can be inserted. */ // 如果找到了對應(yīng)的數(shù)據(jù),返回 1 將 pos 設(shè)置為對應(yīng)的位置 // 如果找不到,返回0,設(shè)置 pos 為可以為數(shù)據(jù)可以插入的位置 // intset 中的數(shù)據(jù)是排好序的,所以使用二分查找來尋找對應(yīng)的元素 static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) { int min = 0, max = intrev32ifbe(is->length)-1, mid = -1; int64_t cur = -1; /* The value can never be found when the set is empty */ if (intrev32ifbe(is->length) == 0) { if (pos) *pos = 0; return 0; } else { /* Check for the case where we know we cannot find the value, * but do know the insert position. */ if (value > _intsetGet(is,max)) { if (pos) *pos = intrev32ifbe(is->length); return 0; } else if (value < _intsetGet(is,0)) { if (pos) *pos = 0; } } // 使用二分查找 while(max >= min) { mid = ((unsigned int)min + (unsigned int)max) >> 1; cur = _intsetGet(is,mid); if (value > cur) { min = mid+1; } else if (value < cur) { max = mid-1; } else { break; if (value == cur) { if (pos) *pos = mid; return 1; if (pos) *pos = min; }
可以看到這里用到的是二分查找,intset 中的數(shù)據(jù)本身也就是排好序的
dict
來看下 dict 的數(shù)據(jù)結(jié)構(gòu)
typedef struct dict { dictType *type; void *privdata; // 哈希表數(shù)組,長度為2,一個正常存儲數(shù)據(jù),一個用來擴(kuò)容 dictht ht[2]; long rehashidx; /* rehashing not in progress if rehashidx == -1 */ int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */ } dict; // 哈希表結(jié)構(gòu),通過兩個哈希表使用,實現(xiàn)增量的 rehash typedef struct dictht { dictEntry **table; // hash 容量大小 unsigned long size; // 總是等于 size - 1,用于計算索引值 unsigned long sizemask; // 實際存儲的 dictEntry 數(shù)量 unsigned long used; } dictht; // k/v 鍵值對節(jié)點,是實際存儲數(shù)據(jù)的節(jié)點 typedef struct dictEntry { // 鍵對象,總是一個字符串類型的對象 void *key; union { // void指針,這意味著它可以指向任何類型 void *val; uint64_t u64; int64_t s64; double d; } v; // 指向下一個節(jié)點 struct dictEntry *next; } dictEntry;
可以看到 dict 中,是預(yù)留了兩個哈希表,來處理漸進(jìn)式的 rehash
rehash 細(xì)節(jié)參考 redis 中的字典
再來看下 dict 數(shù)據(jù)的插入
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing) { long index; dictEntry *entry; dictht *ht; if (dictIsRehashing(d)) _dictRehashStep(d); /* Get the index of the new element, or -1 if * the element already exists. */ if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1) return NULL; /* Allocate the memory and store the new entry. * Insert the element in top, with the assumption that in a database * system it is more likely that recently added entries are accessed * more frequently. */ // 這里來判斷是否正在 Rehash 中 ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0]; entry = zmalloc(sizeof(*entry)); entry->next = ht->table[index]; ht->table[index] = entry; ht->used++; /* Set the hash entry fields. */ // 插入具體的數(shù)據(jù) dictSetKey(d, entry, key); return entry; }
這里重點來分析下 Rehash 的過程
int dictRehash(dict *d, int n) { int empty_visits = n*10; /* Max number of empty buckets to visit. */ // 這里來判斷是否正在 Rehash 中 if (!dictIsRehashing(d)) return 0; // used 實際存儲的 dictEntry 數(shù)量 // 如果有數(shù)據(jù)進(jìn)行下面的遷移 while(n-- && d->ht[0].used != 0) { dictEntry *de, *nextde; /* Note that rehashidx can't overflow as we are sure there are more * elements because ht[0].used != 0 */ assert(d->ht[0].size > (unsigned long)d->rehashidx); while(d->ht[0].table[d->rehashidx] == NULL) { d->rehashidx++; if (--empty_visits == 0) return 1; } // 獲取老數(shù)據(jù)中的數(shù)據(jù) de = d->ht[0].table[d->rehashidx]; /* Move all the keys in this bucket from the old to the new hash HT */ while(de) { uint64_t h; nextde = de->next; /* Get the index in the new hash table */ // 獲取新哈希表的索引 h = dictHashKey(d, de->key) & d->ht[1].sizemask; de->next = d->ht[1].table[h]; // 將數(shù)據(jù)放入到新的 dity 中 d->ht[1].table[h] = de; d->ht[0].used--; d->ht[1].used++; de = nextde; d->ht[0].table[d->rehashidx] = NULL; d->rehashidx++; } /* Check if we already rehashed the whole table... */ // 這里來檢測是否完成了整個的 rehash 操作 if (d->ht[0].used == 0) { zfree(d->ht[0].table); // 將 ht[1] 的內(nèi)容放入到 ht[0] 中,ht[1] 作為備用,下次 rehash 使用 d->ht[0] = d->ht[1]; _dictReset(&d->ht[1]); d->rehashidx = -1; return 0; /* More to rehash... */ return 1; } // 使用增量的方式 rehash // 當(dāng)然數(shù)據(jù)很大的話,一次遷移所有的數(shù)據(jù),顯然是不合理的,會造成Redis線程阻塞,無法服務(wù)其他請求。這里 Redis 使用的是漸進(jìn)式 rehash。 // 在 rehash 期間,每次執(zhí)行添加,刪除,查找或者更新操作時,除了對命令本身的處理,還會順帶將哈希表1中的數(shù)據(jù)拷貝到哈希表2中。從索引0開始,每執(zhí)行一次操作命令,拷貝一個索引位置的數(shù)據(jù)。 // 如果沒有讀入和寫入操作,這時候就不能進(jìn)行 rehash // 所以會定時執(zhí)行一定數(shù)量的 rehash 操作 int incrementallyRehash(int dbid) { /* Keys dictionary */ if (dictIsRehashing(server.db[dbid].dict)) { dictRehashMilliseconds(server.db[dbid].dict,1); return 1; /* already used our millisecond for this loop... */ /* Expires */ if (dictIsRehashing(server.db[dbid].expires)) { dictRehashMilliseconds(server.db[dbid].expires,1); return 0;
1、rehash 的過程 Redis 默認(rèn)使用了兩個全局哈希表;
2、rehash 的過程是漸進(jìn)式的,因為如果數(shù)據(jù)量很大的話,一次遷移所有的數(shù)據(jù),會造成Redis線程阻塞,無法服務(wù)其他請求;
3、在進(jìn)行 rehash 期間,刪除,查找或者更新操作都會在兩個哈希表中執(zhí)行,添加操作就直接添加到哈希表2中了。查找會把兩個哈希表都找一遍,直到找到或者兩個都找不到;
4、如果在 reash 期間,如果沒有讀寫操作,這時候,就不能遷移工作了,所以后臺定時執(zhí)行一定數(shù)量的數(shù)據(jù)遷移。
sorted set
sorted set
有序集合和集合一樣也是 string 類型元素的集合,同時也不允許有重復(fù)的成員。
不同的是sorted set
中的每個元素都會關(guān)聯(lián)一個 double 類型的分?jǐn)?shù),sorted set
通過這個分?jǐn)?shù)給集合中的成員進(jìn)行從小到大的排序。有序集合中的成員是唯一的,關(guān)聯(lián)的 score 可以重復(fù)。
常見的命令
下面看下有序集合中常見的命令
# 向有序集合添加一個或多個成員,或者更新已存在成員的分?jǐn)?shù) ZADD key score1 member1 [score2 member2] # 獲取有序集合的成員數(shù) ZCARD key # 計算在有序集合中指定區(qū)間分?jǐn)?shù)的成員數(shù) ZCOUNT key min max # 有序集合中對指定成員的分?jǐn)?shù)加上增量 increment ZINCRBY key increment member # 計算給定的一個或多個有序集的交集并將結(jié)果集存儲在新的有序集合 destination 中 ZINTERSTORE destination numkeys key [key ...] # 在有序集合中計算指定字典區(qū)間內(nèi)成員數(shù)量 ZLEXCOUNT key min max # 通過索引區(qū)間返回有序集合指定區(qū)間內(nèi)的成員 ZRANGE key start stop [WITHSCORES] # 通過字典區(qū)間返回有序集合的成員 ZRANGEBYLEX key min max [LIMIT offset count] # 通過分?jǐn)?shù)返回有序集合指定區(qū)間內(nèi)的成員 ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT] # 返回有序集合中指定成員的索引 ZRANK key member # 移除有序集合中的一個或多個成員 ZREM key member [member ...] # 移除有序集合中給定的字典區(qū)間的所有成員 ZREMRANGEBYLEX key min max # 移除有序集合中給定的排名區(qū)間的所有成員 ZREMRANGEBYRANK key start stop # 移除有序集合中給定的分?jǐn)?shù)區(qū)間的所有成員 ZREMRANGEBYSCORE key min max # 返回有序集中指定區(qū)間內(nèi)的成員,通過索引,分?jǐn)?shù)從高到低 ZREVRANGE key start stop [WITHSCORES] # 返回有序集中指定分?jǐn)?shù)區(qū)間內(nèi)的成員,分?jǐn)?shù)從高到低排序 ZREVRANGEBYSCORE key max min [WITHSCORES] # 返回有序集合中指定成員的排名,有序集成員按分?jǐn)?shù)值遞減(從大到小)排序 ZREVRANK key member # 返回有序集中,成員的分?jǐn)?shù)值 ZSCORE key member # 計算給定的一個或多個有序集的并集,并存儲在新的 key 中 ZUNIONSTORE destination numkeys key [key ...] # 迭代有序集合中的元素(包括元素成員和元素分值) ZSCAN key cursor [MATCH pattern] [COUNT count]
來個栗子
127.0.0.1:6379> ZADD test-sset 1 member1 (integer) 1 127.0.0.1:6379> ZADD test-sset 2 member2 (integer) 1 127.0.0.1:6379> ZADD test-sset 3 member3 (integer) 1 127.0.0.1:6379> ZADD test-sset 3 member3 (integer) 0 127.0.0.1:6379> ZADD test-sset 4 member3 (integer) 0 127.0.0.1:6379> ZADD test-sset 5 member5 (integer) 1 127.0.0.1:6379> ZRANGE test-sset 0 10 WITHSCORES 1) "member1" 2) "1" 3) "member2" 4) "2" 5) "member3" 6) "4" 7) "member5" 8) "5"
使用場景
來看下sorted set
的使用場景
1、通過 score 的排序功能,可以實現(xiàn)類似排行榜,學(xué)習(xí)成績的排序功能;
2、也可以實現(xiàn)帶權(quán)重隊列,比如普通消息的 score 為1,重要消息的 score 為2,然后工作線程可以選擇按 score 的倒序來獲取工作任務(wù)。讓重要的任務(wù)優(yōu)先執(zhí)行;
3、也可以實現(xiàn)一個延遲隊列,將 score 存儲過期時間,從小到大排序,最靠前的就是最先過期的。
分析下源碼實現(xiàn)
sorted set
中的代碼主要在下面的兩個文件中
結(jié)構(gòu)定義:server.h
實現(xiàn):t_zset.c
先來看下sorted set
的數(shù)據(jù)結(jié)構(gòu)
typedef struct zset { dict *dict; zskiplist *zsl; } zset; typedef struct zskiplist { // 頭,尾節(jié)點 struct zskiplistNode *header, *tail; // 節(jié)點數(shù)量 unsigned long length; // 目前表內(nèi)節(jié)點的最大層數(shù) int level; } zskiplist; /* ZSETs use a specialized version of Skiplists */ // ZSETs 使用的是特定版的 Skiplists typedef struct zskiplistNode { // 這里使用 sds 存儲具體的元素 sds ele; // 元素的權(quán)重 double score; // 后向指針(為了便于從跳表的尾節(jié)點倒序查找) struct zskiplistNode *backward; // 層 // 保存著指向其他元素的指針。高層的指針越過的元素數(shù)量大于等于低層的指針,為了提高查找的效率,程序總是從高層先開始訪問,然后隨著元素值范圍的縮小,慢慢降低層次。 struct zskiplistLevel { // 節(jié)點上的前向指針 struct zskiplistNode *forward; // 跨度 // 用于記錄兩個節(jié)點之間的距離 unsigned long span; } level[]; } zskiplistNode;
看上面的數(shù)據(jù)結(jié)構(gòu)可以發(fā)現(xiàn)sorted set
的實現(xiàn)主要使用了 dict 和 zskiplist 兩種數(shù)據(jù)結(jié)構(gòu)。不過sorted set
在元素較少的情況下使用的壓縮列表,具體細(xì)節(jié)參見下文的 zsetAdd 函數(shù)。
ZADD
來看下 ZADD 的插入
// 代碼地址 https://github.com/redis/redis/blob/6.2/src/t_zset.c // ZADD 命令 void zaddCommand(client *c) { zaddGenericCommand(c,ZADD_IN_NONE); } /* This generic command implements both ZADD and ZINCRBY. */ void zaddGenericCommand(client *c, int flags) { ... /* Lookup the key and create the sorted set if does not exist. */ zobj = lookupKeyWrite(c->db,key); if (checkType(c,zobj,OBJ_ZSET)) goto cleanup; if (zobj == NULL) { if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */ // 超過閾值(zset-max-ziplist-entries、zset-max-ziplist-value)后,使用 hashtable + skiplist 存儲 if (server.zset_max_ziplist_entries == 0 || server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr)) { zobj = createZsetObject(); } else { zobj = createZsetZiplistObject(); } dbAdd(c->db,key,zobj); } // 看下具體的插入過程 int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, double *newscore) { /* Turn options into simple to check vars. */ int incr = (in_flags & ZADD_IN_INCR) != 0; int nx = (in_flags & ZADD_IN_NX) != 0; int xx = (in_flags & ZADD_IN_XX) != 0; int gt = (in_flags & ZADD_IN_GT) != 0; int lt = (in_flags & ZADD_IN_LT) != 0; /* Update the sorted set according to its encoding. */ // 如果類型是 ZIPLIST if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *eptr; if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) { ... /* Remove and re-insert when score changed. */ // 當(dāng)分?jǐn)?shù)改變時移除并重新插入 if (score != curscore) { zobj->ptr = zzlDelete(zobj->ptr,eptr); zobj->ptr = zzlInsert(zobj->ptr,ele,score); *out_flags |= ZADD_OUT_UPDATED; } return 1; // 新元素 } else if (!xx) { /* check if the element is too large or the list * becomes too long *before* executing zzlInsert. */ // 如果元素過大就使用跳表 if (zzlLength(zobj->ptr)+1 > server.zset_max_ziplist_entries || sdslen(ele) > server.zset_max_ziplist_value || !ziplistSafeToAdd(zobj->ptr, sdslen(ele))) { zsetConvert(zobj,OBJ_ENCODING_SKIPLIST); } else { if (newscore) *newscore = score; *out_flags |= ZADD_OUT_ADDED; return 1; *out_flags |= ZADD_OUT_NOP; /* Note that the above block handling ziplist would have either returned or * converted the key to skiplist. */ // 表示使用的類型是跳表 if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplistNode *znode; dictEntry *de; // 在哈希表中查找元素 de = dictFind(zs->dict,ele); if (de != NULL) { /* NX? Return, same element already exists. */ if (nx) { *out_flags |= ZADD_OUT_NOP; // 哈希表中獲取元素的權(quán)重 curscore = *(double*)dictGetVal(de); /* Prepare the score for the increment if needed. */ // 更新權(quán)重值 if (incr) { score += curscore; if (isnan(score)) { *out_flags |= ZADD_OUT_NAN; return 0; } /* GT/LT? Only update if score is greater/less than current. */ if ((lt && score >= curscore) || (gt && score <= curscore)) { if (newscore) *newscore = score; /* Remove and re-insert when score changes. */ // 如果權(quán)重發(fā)生變化了 znode = zslUpdateScore(zs->zsl,curscore,ele,score); /* Note that we did not removed the original element from * the hash table representing the sorted set, so we just * update the score. */ dictGetVal(de) = &znode->score; /* Update score ptr. */ // 如果新元素不存在 ele = sdsdup(ele); // 插入到跳表節(jié)點 znode = zslInsert(zs->zsl,score,ele); // 在哈希表中插入 serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK); *out_flags |= ZADD_OUT_ADDED; } else { serverPanic("Unknown sorted set encoding"); return 0; /* Never reached. */
sorted set
的插入使用了兩種策略
1、如果插入的數(shù)據(jù)量和長度沒有達(dá)到閥值,就使用壓縮列表進(jìn)行保存,反之就使用跳表加哈希表的組合方式進(jìn)行保存;
2、壓縮列表本身是就不適合保存過多的元素,所以達(dá)到閥值使用跳表加哈希表的組合方式進(jìn)行保存;
3、這里跳表加哈希表的組合方式也是很巧妙的,跳表用來進(jìn)行范圍的查詢,通過哈希表來實現(xiàn)單個元素權(quán)重值的查詢,組合的方式提高了查詢的效率;
ZRANGE
看完了插入函數(shù),這里再來分析下 ZRANGE
// 獲取有序集合中, 指定數(shù)據(jù)的排名. // 若reverse==0, 排名以得分升序排列. 否則排名以得分降序排列. // 第一個數(shù)據(jù)的排名為0, 而不是1 // 使用壓縮列表或者跳表,里面的數(shù)據(jù)都是排好序的 long zsetRank(robj *zobj, sds ele, int reverse) { unsigned long llen; unsigned long rank; llen = zsetLength(zobj); // 壓縮列表 if (zobj->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *zl = zobj->ptr; unsigned char *eptr, *sptr; eptr = ziplistIndex(zl,0); serverAssert(eptr != NULL); sptr = ziplistNext(zl,eptr); serverAssert(sptr != NULL); rank = 1; while(eptr != NULL) { if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele))) break; rank++; zzlNext(zl,&eptr,&sptr); } if (eptr != NULL) { // 逆向取rank // 返回后面的數(shù)據(jù) if (reverse) return llen-rank; else return rank-1; } else { return -1; // 跳表 } else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = zobj->ptr; zskiplist *zsl = zs->zsl; dictEntry *de; double score; de = dictFind(zs->dict,ele); if (de != NULL) { score = *(double*)dictGetVal(de); // 查找包含給定分值和成員對象的節(jié)點在跳躍表中的排位 rank = zslGetRank(zsl,score,ele); /* Existing elements always have a rank. */ serverAssert(rank != 0); // 逆向取rank } else { serverPanic("Unknown sorted set encoding"); } } /* * 查找包含給定分值和成員對象的節(jié)點在跳躍表中的排位。 * * 如果沒有包含給定分值和成員對象的節(jié)點,返回 0 ,否則返回排位。 * 注意,因為跳躍表的表頭也被計算在內(nèi),所以返回的排位以 1 為起始值。 * T_wrost = O(N), T_avg = O(log N) */ unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) { zskiplistNode *x; unsigned long rank = 0; int i; x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { // 遍歷所有對比的節(jié)點 while (x->level[i].forward && (x->level[i].forward->score < score || (x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele,ele) <= 0))) { rank += x->level[i].span; // 沿著前進(jìn)指針遍歷跳躍表 x = x->level[i].forward; /* x might be equal to zsl->header, so test if obj is non-NULL */ if (x->ele && sdscmp(x->ele,ele) == 0) { return rank; return 0;
通過索引區(qū)間返回有序集合指定區(qū)間內(nèi)的成員,因為數(shù)據(jù)在插入的時候,已經(jīng)按照從小到大進(jìn)行了,排序,所以返回指定區(qū)間的成員,遍歷對應(yīng)的數(shù)據(jù)即可。
總結(jié)
1、Redis 的 Set 是 String 類型的無序集合,集合成員是唯一的;
2、sorted set
有序集合和集合一樣也是 string 類型元素的集合,同時也不允許有重復(fù)的成員。不同的是sorted set
中的每個元素都會關(guān)聯(lián)一個 double 類型的分?jǐn)?shù);
3、set 底層實現(xiàn)主要用到了兩種數(shù)據(jù)結(jié)構(gòu) hashtable 和 inset(整數(shù)集合);
4、sorted set
在元素較少的情況下使用的壓縮列表存儲數(shù)據(jù),數(shù)據(jù)量超過閥值的時候 使用 dict 加 zskiplist 來存儲數(shù)據(jù);
4、跳表加哈希表的組合方式也是很巧妙的,跳表用來進(jìn)行范圍的查詢,通過哈希表來實現(xiàn)單個元素權(quán)重值的查詢,組合的方式提高了查詢的效率。
參考
【Redis核心技術(shù)與實戰(zhàn)】https://time.geekbang.org/column/intro/100056701
【Redis設(shè)計與實現(xiàn)】https://book.douban.com/subject/25900156/
【redis 集合(set)類型的使用和應(yīng)用場景】https://www.oraclejsq.com/redisjc/040101720.html
【跳躍表】https://redisbook.readthedocs.io/en/latest/internal-datastruct/skiplist.html
【Redis學(xué)習(xí)筆記】https://github.com/boilingfrog/Go-POINT/tree/master/redis
【Redis 中的 set 和 sorted set 如何使用】https://boilingfrog.github.io/2022/03/21/Redis中的set和sortedset/
到此這篇關(guān)于Redis源碼分析之set 和 sorted set 使用的文章就介紹到這了,更多相關(guān)Redis set 和 sorted set使用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
關(guān)于redis Key淘汰策略的實現(xiàn)方法
下面小編就為大家?guī)硪黄P(guān)于redis Key淘汰策略的實現(xiàn)方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-03-03Redis集群模式和常用數(shù)據(jù)結(jié)構(gòu)詳解
Redis集群模式下的運維指令主要用于集群的搭建、管理、監(jiān)控和維護(hù),講解了一些常用的Redis集群運維指令,本文重點介紹了Redis集群模式和常用數(shù)據(jù)結(jié)構(gòu),需要的朋友可以參考下2024-03-03阿里云服務(wù)器安裝配置redis的方法并且加入到開機(jī)啟動(推薦)
這篇文章主要介紹了阿里云服務(wù)器安裝配置redis并且加入到開機(jī)啟動,需要的朋友可以參考下2017-12-12