Redis源碼分析之set?和?sorted?set?使用
set 和 sorted set
前言
前面在幾個(gè)文章聊到了 list,string,hash 等結(jié)構(gòu)的實(shí)現(xiàn),這次來(lái)聊一下 set 和 sorted set 的細(xì)節(jié)。
set
Redis 的 Set 是 String 類型的無(wú)序集合,集合成員是唯一的。
底層實(shí)現(xiàn)主要用到了兩種數(shù)據(jù)結(jié)構(gòu) hashtable 和 inset(整數(shù)集合)。
集合中最大的成員數(shù)為2的32次方-1 (4294967295, 每個(gè)集合可存儲(chǔ)40多億個(gè)成員)。
常見命令
來(lái)看下幾個(gè)常用的命令
# 向集合添加一個(gè)或多個(gè)成員 SADD key member1 [member2] # 獲取集合的成員數(shù) SCARD key # 返回第一個(gè)集合與其他集合之間的差異。 SDIFF key1 [key2] # 返回給定所有集合的差集并存儲(chǔ)在 destination 中 SDIFFSTORE destination key1 [key2] # 返回給定所有集合的交集 SINTER key1 [key2] # 返回給定所有集合的交集并存儲(chǔ)在 destination 中 SINTERSTORE destination key1 [key2] # 判斷 member 元素是否是集合 key 的成員 SISMEMBER key member # 返回集合中的所有成員 SMEMBERS key # 將 member 元素從 source 集合移動(dòng)到 destination 集合 SMOVE source destination member # 移除并返回集合中的一個(gè)隨機(jī)元素 SPOP key # 返回集合中一個(gè)或多個(gè)隨機(jī)數(shù) SRANDMEMBER key [count] # 移除集合中一個(gè)或多個(gè)成員 SREM key member1 [member2] # 返回所有給定集合的并集 SUNION key1 [key2] # 所有給定集合的并集存儲(chǔ)在 destination 集合中 SUNIONSTORE destination key1 [key2] # 迭代集合中的元素 SSCAN key cursor [MATCH pattern] [COUNT count]
來(lái)個(gè)栗子
127.0.0.1:6379> SADD set-test xiaoming (integer) 1 (integer) 0 127.0.0.1:6379> SADD set-test xiaoming1 127.0.0.1:6379> SADD set-test xiaoming2 127.0.0.1:6379> SMEMBERS set-test 1) "xiaoming2" 2) "xiaoming" 3) "xiaoming1"
上面重復(fù)值的插入,只有第一次可以插入成功
set 的使用場(chǎng)景
比較適用于聚合分類
1、標(biāo)簽:比如我們博客網(wǎng)站常常使用到的興趣標(biāo)簽,把一個(gè)個(gè)有著相同愛好,關(guān)注類似內(nèi)容的用戶利用一個(gè)標(biāo)簽把他們進(jìn)行歸并。
2、共同好友功能,共同喜好,或者可以引申到二度好友之類的擴(kuò)展應(yīng)用。
3、統(tǒng)計(jì)網(wǎng)站的獨(dú)立IP。利用set集合當(dāng)中元素不唯一性,可以快速實(shí)時(shí)統(tǒng)計(jì)訪問(wèn)網(wǎng)站的獨(dú)立IP。
不過(guò)對(duì)于 set 中的命令要合理的應(yīng)用,不然很容易造成慢查詢
1、使用高效的命令,比如說(shuō),如果你需要返回一個(gè) SET 中的所有成員時(shí),不要使用 SMEMBERS 命令,而是要使用 SSCAN 多次迭代返回,避免一次返回大量數(shù)據(jù),造成線程阻塞。
2、當(dāng)你需要執(zhí)行排序、交集、并集操作時(shí),可以在客戶端完成,而不要用SORT、SUNION、SINTER這些命令,以免拖慢 Redis 實(shí)例。
看下源碼實(shí)現(xiàn)
這里來(lái)看下 set 中主要用到的數(shù)據(jù)類型
代碼路徑https://github.com/redis/redis/blob/6.2/src/t_set.c
void saddCommand(client *c) {
robj *set;
int j, added = 0;
set = lookupKeyWrite(c->db,c->argv[1]);
if (checkType(c,set,OBJ_SET)) return;
if (set == NULL) {
set = setTypeCreate(c->argv[2]->ptr);
dbAdd(c->db,c->argv[1],set);
}
for (j = 2; j < c->argc; j++) {
if (setTypeAdd(set,c->argv[j]->ptr)) added++;
if (added) {
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_SET,"sadd",c->argv[1],c->db->id);
server.dirty += added;
addReplyLongLong(c,added);
}
// 當(dāng) value 是整數(shù)類型使用 intset
// 否則使用哈希表
robj *setTypeCreate(sds value) {
if (isSdsRepresentableAsLongLong(value,NULL) == C_OK)
return createIntsetObject();
return createSetObject();
/* Add the specified value into a set.
*
* If the value was already member of the set, nothing is done and 0 is
* returned, otherwise the new element is added and 1 is returned. */
int setTypeAdd(robj *subject, sds value) {
long long llval;
// 如果是 OBJ_ENCODING_HT 說(shuō)明是哈希類型
if (subject->encoding == OBJ_ENCODING_HT) {
dict *ht = subject->ptr;
dictEntry *de = dictAddRaw(ht,value,NULL);
if (de) {
// 設(shè)置key ,value 設(shè)置成null
dictSetKey(ht,de,sdsdup(value));
dictSetVal(ht,de,NULL);
return 1;
}
// OBJ_ENCODING_INTSET 代表是 inset
} else if (subject->encoding == OBJ_ENCODING_INTSET) {
if (isSdsRepresentableAsLongLong(value,&llval) == C_OK) {
uint8_t success = 0;
subject->ptr = intsetAdd(subject->ptr,llval,&success);
if (success) {
/* Convert to regular set when the intset contains
* too many entries. */
// 如果條目過(guò)多將會(huì)轉(zhuǎn)換成集合
size_t max_entries = server.set_max_intset_entries;
/* limit to 1G entries due to intset internals. */
if (max_entries >= 1<<30) max_entries = 1<<30;
if (intsetLen(subject->ptr) > max_entries)
setTypeConvert(subject,OBJ_ENCODING_HT);
return 1;
}
} else {
/* Failed to get integer from object, convert to regular set. */
setTypeConvert(subject,OBJ_ENCODING_HT);
/* The set *was* an intset and this value is not integer
* encodable, so dictAdd should always work. */
serverAssert(dictAdd(subject->ptr,sdsdup(value),NULL) == DICT_OK);
} else {
serverPanic("Unknown set encoding");
return 0;通過(guò)上面的源碼分析,可以看到
1、set 中主要用到了 hashtable 和 inset;
2、如果存儲(chǔ)的類型是整數(shù)類型就會(huì)使用 inset,否則使用 hashtable;
3、使用 inset 有一個(gè)最大的限制,達(dá)到了最大的限制,也是會(huì)使用 hashtable;
再來(lái)看下 inset 數(shù)據(jù)結(jié)構(gòu)
代碼地址https://github.com/redis/redis/blob/6.2/src/intset.h
typedef struct intset {
// 編碼方法,指定當(dāng)前存儲(chǔ)的是 16 位,32 位,還是 64 位的整數(shù)
uint32_t encoding;
uint32_t length;
// 實(shí)際保存元素的數(shù)組
int8_t contents[];
} intset;insert
來(lái)看下 intset 的數(shù)據(jù)插入
/* Insert an integer in the intset */
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
// 計(jì)算value的編碼長(zhǎng)度
uint8_t valenc = _intsetValueEncoding(value);
uint32_t pos;
if (success) *success = 1;
/* Upgrade encoding if necessary. If we need to upgrade, we know that
* this value should be either appended (if > 0) or prepended (if < 0),
* because it lies outside the range of existing values. */
// 如果value的編碼長(zhǎng)度大于當(dāng)前的編碼位數(shù),進(jìn)行升級(jí)
if (valenc > intrev32ifbe(is->encoding)) {
/* This always succeeds, so we don't need to curry *success. */
return intsetUpgradeAndAdd(is,value);
} else {
/* Abort if the value is already present in the set.
* This call will populate "pos" with the right position to insert
* the value when it cannot be found. */
// 當(dāng)前值不存在的時(shí)候才進(jìn)行插入
if (intsetSearch(is,value,&pos)) {
if (success) *success = 0;
return is;
}
is = intsetResize(is,intrev32ifbe(is->length)+1);
if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
}
// 數(shù)據(jù)插入
_intsetSet(is,pos,value);
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}intset 的數(shù)據(jù)插入有一個(gè)數(shù)據(jù)升級(jí)的過(guò)程,當(dāng)一個(gè)整數(shù)被添加到整數(shù)集合時(shí),首先需要判斷下 新元素的類型和集合中現(xiàn)有元素類型的長(zhǎng)短,如果新元素是一個(gè)32位的數(shù)字,現(xiàn)有集合類型是16位的,那么就需要將整數(shù)集合進(jìn)行升級(jí),然后才能將新的元素加入進(jìn)來(lái)。
這樣做的優(yōu)點(diǎn):
1、提升整數(shù)集合的靈活性,可以隨意的添加整數(shù),而不用關(guān)心整數(shù)的類型;
2、可以盡可能的節(jié)約內(nèi)存。
了解完數(shù)據(jù)的插入再來(lái)看下 intset 中是如何來(lái)快速的搜索里面的數(shù)據(jù)
/* Search for the position of "value". Return 1 when the value was found and
* sets "pos" to the position of the value within the intset. Return 0 when
* the value is not present in the intset and sets "pos" to the position
* where "value" can be inserted. */
// 如果找到了對(duì)應(yīng)的數(shù)據(jù),返回 1 將 pos 設(shè)置為對(duì)應(yīng)的位置
// 如果找不到,返回0,設(shè)置 pos 為可以為數(shù)據(jù)可以插入的位置
// intset 中的數(shù)據(jù)是排好序的,所以使用二分查找來(lái)尋找對(duì)應(yīng)的元素
static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
int min = 0, max = intrev32ifbe(is->length)-1, mid = -1;
int64_t cur = -1;
/* The value can never be found when the set is empty */
if (intrev32ifbe(is->length) == 0) {
if (pos) *pos = 0;
return 0;
} else {
/* Check for the case where we know we cannot find the value,
* but do know the insert position. */
if (value > _intsetGet(is,max)) {
if (pos) *pos = intrev32ifbe(is->length);
return 0;
} else if (value < _intsetGet(is,0)) {
if (pos) *pos = 0;
}
}
// 使用二分查找
while(max >= min) {
mid = ((unsigned int)min + (unsigned int)max) >> 1;
cur = _intsetGet(is,mid);
if (value > cur) {
min = mid+1;
} else if (value < cur) {
max = mid-1;
} else {
break;
if (value == cur) {
if (pos) *pos = mid;
return 1;
if (pos) *pos = min;
}可以看到這里用到的是二分查找,intset 中的數(shù)據(jù)本身也就是排好序的
dict
來(lái)看下 dict 的數(shù)據(jù)結(jié)構(gòu)
typedef struct dict {
dictType *type;
void *privdata;
// 哈希表數(shù)組,長(zhǎng)度為2,一個(gè)正常存儲(chǔ)數(shù)據(jù),一個(gè)用來(lái)擴(kuò)容
dictht ht[2];
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
int16_t pauserehash; /* If >0 rehashing is paused (<0 indicates coding error) */
} dict;
// 哈希表結(jié)構(gòu),通過(guò)兩個(gè)哈希表使用,實(shí)現(xiàn)增量的 rehash
typedef struct dictht {
dictEntry **table;
// hash 容量大小
unsigned long size;
// 總是等于 size - 1,用于計(jì)算索引值
unsigned long sizemask;
// 實(shí)際存儲(chǔ)的 dictEntry 數(shù)量
unsigned long used;
} dictht;
// k/v 鍵值對(duì)節(jié)點(diǎn),是實(shí)際存儲(chǔ)數(shù)據(jù)的節(jié)點(diǎn)
typedef struct dictEntry {
// 鍵對(duì)象,總是一個(gè)字符串類型的對(duì)象
void *key;
union {
// void指針,這意味著它可以指向任何類型
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
// 指向下一個(gè)節(jié)點(diǎn)
struct dictEntry *next;
} dictEntry;可以看到 dict 中,是預(yù)留了兩個(gè)哈希表,來(lái)處理漸進(jìn)式的 rehash
rehash 細(xì)節(jié)參考 redis 中的字典
再來(lái)看下 dict 數(shù)據(jù)的插入
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
long index;
dictEntry *entry;
dictht *ht;
if (dictIsRehashing(d)) _dictRehashStep(d);
/* Get the index of the new element, or -1 if
* the element already exists. */
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
return NULL;
/* Allocate the memory and store the new entry.
* Insert the element in top, with the assumption that in a database
* system it is more likely that recently added entries are accessed
* more frequently. */
// 這里來(lái)判斷是否正在 Rehash 中
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry));
entry->next = ht->table[index];
ht->table[index] = entry;
ht->used++;
/* Set the hash entry fields. */
// 插入具體的數(shù)據(jù)
dictSetKey(d, entry, key);
return entry;
}這里重點(diǎn)來(lái)分析下 Rehash 的過(guò)程
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty buckets to visit. */
// 這里來(lái)判斷是否正在 Rehash 中
if (!dictIsRehashing(d)) return 0;
// used 實(shí)際存儲(chǔ)的 dictEntry 數(shù)量
// 如果有數(shù)據(jù)進(jìn)行下面的遷移
while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
// 獲取老數(shù)據(jù)中的數(shù)據(jù)
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
while(de) {
uint64_t h;
nextde = de->next;
/* Get the index in the new hash table */
// 獲取新哈希表的索引
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
// 將數(shù)據(jù)放入到新的 dity 中
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}
/* Check if we already rehashed the whole table... */
// 這里來(lái)檢測(cè)是否完成了整個(gè)的 rehash 操作
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
// 將 ht[1] 的內(nèi)容放入到 ht[0] 中,ht[1] 作為備用,下次 rehash 使用
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
/* More to rehash... */
return 1;
}
// 使用增量的方式 rehash
// 當(dāng)然數(shù)據(jù)很大的話,一次遷移所有的數(shù)據(jù),顯然是不合理的,會(huì)造成Redis線程阻塞,無(wú)法服務(wù)其他請(qǐng)求。這里 Redis 使用的是漸進(jìn)式 rehash。
// 在 rehash 期間,每次執(zhí)行添加,刪除,查找或者更新操作時(shí),除了對(duì)命令本身的處理,還會(huì)順帶將哈希表1中的數(shù)據(jù)拷貝到哈希表2中。從索引0開始,每執(zhí)行一次操作命令,拷貝一個(gè)索引位置的數(shù)據(jù)。
// 如果沒(méi)有讀入和寫入操作,這時(shí)候就不能進(jìn)行 rehash
// 所以會(huì)定時(shí)執(zhí)行一定數(shù)量的 rehash 操作
int incrementallyRehash(int dbid) {
/* Keys dictionary */
if (dictIsRehashing(server.db[dbid].dict)) {
dictRehashMilliseconds(server.db[dbid].dict,1);
return 1; /* already used our millisecond for this loop... */
/* Expires */
if (dictIsRehashing(server.db[dbid].expires)) {
dictRehashMilliseconds(server.db[dbid].expires,1);
return 0;1、rehash 的過(guò)程 Redis 默認(rèn)使用了兩個(gè)全局哈希表;
2、rehash 的過(guò)程是漸進(jìn)式的,因?yàn)槿绻麛?shù)據(jù)量很大的話,一次遷移所有的數(shù)據(jù),會(huì)造成Redis線程阻塞,無(wú)法服務(wù)其他請(qǐng)求;
3、在進(jìn)行 rehash 期間,刪除,查找或者更新操作都會(huì)在兩個(gè)哈希表中執(zhí)行,添加操作就直接添加到哈希表2中了。查找會(huì)把兩個(gè)哈希表都找一遍,直到找到或者兩個(gè)都找不到;
4、如果在 reash 期間,如果沒(méi)有讀寫操作,這時(shí)候,就不能遷移工作了,所以后臺(tái)定時(shí)執(zhí)行一定數(shù)量的數(shù)據(jù)遷移。
sorted set
sorted set有序集合和集合一樣也是 string 類型元素的集合,同時(shí)也不允許有重復(fù)的成員。
不同的是sorted set中的每個(gè)元素都會(huì)關(guān)聯(lián)一個(gè) double 類型的分?jǐn)?shù),sorted set通過(guò)這個(gè)分?jǐn)?shù)給集合中的成員進(jìn)行從小到大的排序。有序集合中的成員是唯一的,關(guān)聯(lián)的 score 可以重復(fù)。
常見的命令
下面看下有序集合中常見的命令
# 向有序集合添加一個(gè)或多個(gè)成員,或者更新已存在成員的分?jǐn)?shù) ZADD key score1 member1 [score2 member2] # 獲取有序集合的成員數(shù) ZCARD key # 計(jì)算在有序集合中指定區(qū)間分?jǐn)?shù)的成員數(shù) ZCOUNT key min max # 有序集合中對(duì)指定成員的分?jǐn)?shù)加上增量 increment ZINCRBY key increment member # 計(jì)算給定的一個(gè)或多個(gè)有序集的交集并將結(jié)果集存儲(chǔ)在新的有序集合 destination 中 ZINTERSTORE destination numkeys key [key ...] # 在有序集合中計(jì)算指定字典區(qū)間內(nèi)成員數(shù)量 ZLEXCOUNT key min max # 通過(guò)索引區(qū)間返回有序集合指定區(qū)間內(nèi)的成員 ZRANGE key start stop [WITHSCORES] # 通過(guò)字典區(qū)間返回有序集合的成員 ZRANGEBYLEX key min max [LIMIT offset count] # 通過(guò)分?jǐn)?shù)返回有序集合指定區(qū)間內(nèi)的成員 ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT] # 返回有序集合中指定成員的索引 ZRANK key member # 移除有序集合中的一個(gè)或多個(gè)成員 ZREM key member [member ...] # 移除有序集合中給定的字典區(qū)間的所有成員 ZREMRANGEBYLEX key min max # 移除有序集合中給定的排名區(qū)間的所有成員 ZREMRANGEBYRANK key start stop # 移除有序集合中給定的分?jǐn)?shù)區(qū)間的所有成員 ZREMRANGEBYSCORE key min max # 返回有序集中指定區(qū)間內(nèi)的成員,通過(guò)索引,分?jǐn)?shù)從高到低 ZREVRANGE key start stop [WITHSCORES] # 返回有序集中指定分?jǐn)?shù)區(qū)間內(nèi)的成員,分?jǐn)?shù)從高到低排序 ZREVRANGEBYSCORE key max min [WITHSCORES] # 返回有序集合中指定成員的排名,有序集成員按分?jǐn)?shù)值遞減(從大到小)排序 ZREVRANK key member # 返回有序集中,成員的分?jǐn)?shù)值 ZSCORE key member # 計(jì)算給定的一個(gè)或多個(gè)有序集的并集,并存儲(chǔ)在新的 key 中 ZUNIONSTORE destination numkeys key [key ...] # 迭代有序集合中的元素(包括元素成員和元素分值) ZSCAN key cursor [MATCH pattern] [COUNT count]
來(lái)個(gè)栗子
127.0.0.1:6379> ZADD test-sset 1 member1 (integer) 1 127.0.0.1:6379> ZADD test-sset 2 member2 (integer) 1 127.0.0.1:6379> ZADD test-sset 3 member3 (integer) 1 127.0.0.1:6379> ZADD test-sset 3 member3 (integer) 0 127.0.0.1:6379> ZADD test-sset 4 member3 (integer) 0 127.0.0.1:6379> ZADD test-sset 5 member5 (integer) 1 127.0.0.1:6379> ZRANGE test-sset 0 10 WITHSCORES 1) "member1" 2) "1" 3) "member2" 4) "2" 5) "member3" 6) "4" 7) "member5" 8) "5"
使用場(chǎng)景
來(lái)看下sorted set的使用場(chǎng)景
1、通過(guò) score 的排序功能,可以實(shí)現(xiàn)類似排行榜,學(xué)習(xí)成績(jī)的排序功能;
2、也可以實(shí)現(xiàn)帶權(quán)重隊(duì)列,比如普通消息的 score 為1,重要消息的 score 為2,然后工作線程可以選擇按 score 的倒序來(lái)獲取工作任務(wù)。讓重要的任務(wù)優(yōu)先執(zhí)行;
3、也可以實(shí)現(xiàn)一個(gè)延遲隊(duì)列,將 score 存儲(chǔ)過(guò)期時(shí)間,從小到大排序,最靠前的就是最先過(guò)期的。
分析下源碼實(shí)現(xiàn)
sorted set 中的代碼主要在下面的兩個(gè)文件中
結(jié)構(gòu)定義:server.h
實(shí)現(xiàn):t_zset.c
先來(lái)看下sorted set的數(shù)據(jù)結(jié)構(gòu)
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;
typedef struct zskiplist {
// 頭,尾節(jié)點(diǎn)
struct zskiplistNode *header, *tail;
// 節(jié)點(diǎn)數(shù)量
unsigned long length;
// 目前表內(nèi)節(jié)點(diǎn)的最大層數(shù)
int level;
} zskiplist;
/* ZSETs use a specialized version of Skiplists */
// ZSETs 使用的是特定版的 Skiplists
typedef struct zskiplistNode {
// 這里使用 sds 存儲(chǔ)具體的元素
sds ele;
// 元素的權(quán)重
double score;
// 后向指針(為了便于從跳表的尾節(jié)點(diǎn)倒序查找)
struct zskiplistNode *backward;
// 層
// 保存著指向其他元素的指針。高層的指針越過(guò)的元素?cái)?shù)量大于等于低層的指針,為了提高查找的效率,程序總是從高層先開始訪問(wèn),然后隨著元素值范圍的縮小,慢慢降低層次。
struct zskiplistLevel {
// 節(jié)點(diǎn)上的前向指針
struct zskiplistNode *forward;
// 跨度
// 用于記錄兩個(gè)節(jié)點(diǎn)之間的距離
unsigned long span;
} level[];
} zskiplistNode;看上面的數(shù)據(jù)結(jié)構(gòu)可以發(fā)現(xiàn)sorted set的實(shí)現(xiàn)主要使用了 dict 和 zskiplist 兩種數(shù)據(jù)結(jié)構(gòu)。不過(guò)sorted set在元素較少的情況下使用的壓縮列表,具體細(xì)節(jié)參見下文的 zsetAdd 函數(shù)。

ZADD
來(lái)看下 ZADD 的插入
// 代碼地址 https://github.com/redis/redis/blob/6.2/src/t_zset.c
// ZADD 命令
void zaddCommand(client *c) {
zaddGenericCommand(c,ZADD_IN_NONE);
}
/* This generic command implements both ZADD and ZINCRBY. */
void zaddGenericCommand(client *c, int flags) {
...
/* Lookup the key and create the sorted set if does not exist. */
zobj = lookupKeyWrite(c->db,key);
if (checkType(c,zobj,OBJ_ZSET)) goto cleanup;
if (zobj == NULL) {
if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
// 超過(guò)閾值(zset-max-ziplist-entries、zset-max-ziplist-value)后,使用 hashtable + skiplist 存儲(chǔ)
if (server.zset_max_ziplist_entries == 0 ||
server.zset_max_ziplist_value < sdslen(c->argv[scoreidx+1]->ptr))
{
zobj = createZsetObject();
} else {
zobj = createZsetZiplistObject();
}
dbAdd(c->db,key,zobj);
}
// 看下具體的插入過(guò)程
int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, double *newscore) {
/* Turn options into simple to check vars. */
int incr = (in_flags & ZADD_IN_INCR) != 0;
int nx = (in_flags & ZADD_IN_NX) != 0;
int xx = (in_flags & ZADD_IN_XX) != 0;
int gt = (in_flags & ZADD_IN_GT) != 0;
int lt = (in_flags & ZADD_IN_LT) != 0;
/* Update the sorted set according to its encoding. */
// 如果類型是 ZIPLIST
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *eptr;
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
...
/* Remove and re-insert when score changed. */
// 當(dāng)分?jǐn)?shù)改變時(shí)移除并重新插入
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
*out_flags |= ZADD_OUT_UPDATED;
}
return 1;
// 新元素
} else if (!xx) {
/* check if the element is too large or the list
* becomes too long *before* executing zzlInsert. */
// 如果元素過(guò)大就使用跳表
if (zzlLength(zobj->ptr)+1 > server.zset_max_ziplist_entries ||
sdslen(ele) > server.zset_max_ziplist_value ||
!ziplistSafeToAdd(zobj->ptr, sdslen(ele)))
{
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
} else {
if (newscore) *newscore = score;
*out_flags |= ZADD_OUT_ADDED;
return 1;
*out_flags |= ZADD_OUT_NOP;
/* Note that the above block handling ziplist would have either returned or
* converted the key to skiplist. */
// 表示使用的類型是跳表
if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplistNode *znode;
dictEntry *de;
// 在哈希表中查找元素
de = dictFind(zs->dict,ele);
if (de != NULL) {
/* NX? Return, same element already exists. */
if (nx) {
*out_flags |= ZADD_OUT_NOP;
// 哈希表中獲取元素的權(quán)重
curscore = *(double*)dictGetVal(de);
/* Prepare the score for the increment if needed. */
// 更新權(quán)重值
if (incr) {
score += curscore;
if (isnan(score)) {
*out_flags |= ZADD_OUT_NAN;
return 0;
}
/* GT/LT? Only update if score is greater/less than current. */
if ((lt && score >= curscore) || (gt && score <= curscore)) {
if (newscore) *newscore = score;
/* Remove and re-insert when score changes. */
// 如果權(quán)重發(fā)生變化了
znode = zslUpdateScore(zs->zsl,curscore,ele,score);
/* Note that we did not removed the original element from
* the hash table representing the sorted set, so we just
* update the score. */
dictGetVal(de) = &znode->score; /* Update score ptr. */
// 如果新元素不存在
ele = sdsdup(ele);
// 插入到跳表節(jié)點(diǎn)
znode = zslInsert(zs->zsl,score,ele);
// 在哈希表中插入
serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
*out_flags |= ZADD_OUT_ADDED;
} else {
serverPanic("Unknown sorted set encoding");
return 0; /* Never reached. */sorted set 的插入使用了兩種策略
1、如果插入的數(shù)據(jù)量和長(zhǎng)度沒(méi)有達(dá)到閥值,就使用壓縮列表進(jìn)行保存,反之就使用跳表加哈希表的組合方式進(jìn)行保存;
2、壓縮列表本身是就不適合保存過(guò)多的元素,所以達(dá)到閥值使用跳表加哈希表的組合方式進(jìn)行保存;
3、這里跳表加哈希表的組合方式也是很巧妙的,跳表用來(lái)進(jìn)行范圍的查詢,通過(guò)哈希表來(lái)實(shí)現(xiàn)單個(gè)元素權(quán)重值的查詢,組合的方式提高了查詢的效率;

ZRANGE
看完了插入函數(shù),這里再來(lái)分析下 ZRANGE
// 獲取有序集合中, 指定數(shù)據(jù)的排名.
// 若reverse==0, 排名以得分升序排列. 否則排名以得分降序排列.
// 第一個(gè)數(shù)據(jù)的排名為0, 而不是1
// 使用壓縮列表或者跳表,里面的數(shù)據(jù)都是排好序的
long zsetRank(robj *zobj, sds ele, int reverse) {
unsigned long llen;
unsigned long rank;
llen = zsetLength(zobj);
// 壓縮列表
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = zobj->ptr;
unsigned char *eptr, *sptr;
eptr = ziplistIndex(zl,0);
serverAssert(eptr != NULL);
sptr = ziplistNext(zl,eptr);
serverAssert(sptr != NULL);
rank = 1;
while(eptr != NULL) {
if (ziplistCompare(eptr,(unsigned char*)ele,sdslen(ele)))
break;
rank++;
zzlNext(zl,&eptr,&sptr);
}
if (eptr != NULL) {
// 逆向取rank
// 返回后面的數(shù)據(jù)
if (reverse)
return llen-rank;
else
return rank-1;
} else {
return -1;
// 跳表
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
dictEntry *de;
double score;
de = dictFind(zs->dict,ele);
if (de != NULL) {
score = *(double*)dictGetVal(de);
// 查找包含給定分值和成員對(duì)象的節(jié)點(diǎn)在跳躍表中的排位
rank = zslGetRank(zsl,score,ele);
/* Existing elements always have a rank. */
serverAssert(rank != 0);
// 逆向取rank
} else {
serverPanic("Unknown sorted set encoding");
}
}
/*
* 查找包含給定分值和成員對(duì)象的節(jié)點(diǎn)在跳躍表中的排位。
*
* 如果沒(méi)有包含給定分值和成員對(duì)象的節(jié)點(diǎn),返回 0 ,否則返回排位。
* 注意,因?yàn)樘S表的表頭也被計(jì)算在內(nèi),所以返回的排位以 1 為起始值。
* T_wrost = O(N), T_avg = O(log N)
*/
unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
// 遍歷所有對(duì)比的節(jié)點(diǎn)
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) <= 0))) {
rank += x->level[i].span;
// 沿著前進(jìn)指針遍歷跳躍表
x = x->level[i].forward;
/* x might be equal to zsl->header, so test if obj is non-NULL */
if (x->ele && sdscmp(x->ele,ele) == 0) {
return rank;
return 0;通過(guò)索引區(qū)間返回有序集合指定區(qū)間內(nèi)的成員,因?yàn)閿?shù)據(jù)在插入的時(shí)候,已經(jīng)按照從小到大進(jìn)行了,排序,所以返回指定區(qū)間的成員,遍歷對(duì)應(yīng)的數(shù)據(jù)即可。
總結(jié)
1、Redis 的 Set 是 String 類型的無(wú)序集合,集合成員是唯一的;
2、sorted set有序集合和集合一樣也是 string 類型元素的集合,同時(shí)也不允許有重復(fù)的成員。不同的是sorted set中的每個(gè)元素都會(huì)關(guān)聯(lián)一個(gè) double 類型的分?jǐn)?shù);
3、set 底層實(shí)現(xiàn)主要用到了兩種數(shù)據(jù)結(jié)構(gòu) hashtable 和 inset(整數(shù)集合);
4、sorted set在元素較少的情況下使用的壓縮列表存儲(chǔ)數(shù)據(jù),數(shù)據(jù)量超過(guò)閥值的時(shí)候 使用 dict 加 zskiplist 來(lái)存儲(chǔ)數(shù)據(jù);
4、跳表加哈希表的組合方式也是很巧妙的,跳表用來(lái)進(jìn)行范圍的查詢,通過(guò)哈希表來(lái)實(shí)現(xiàn)單個(gè)元素權(quán)重值的查詢,組合的方式提高了查詢的效率。
參考
【Redis核心技術(shù)與實(shí)戰(zhàn)】https://time.geekbang.org/column/intro/100056701
【Redis設(shè)計(jì)與實(shí)現(xiàn)】https://book.douban.com/subject/25900156/
【redis 集合(set)類型的使用和應(yīng)用場(chǎng)景】https://www.oraclejsq.com/redisjc/040101720.html
【跳躍表】https://redisbook.readthedocs.io/en/latest/internal-datastruct/skiplist.html
【Redis學(xué)習(xí)筆記】https://github.com/boilingfrog/Go-POINT/tree/master/redis
【Redis 中的 set 和 sorted set 如何使用】https://boilingfrog.github.io/2022/03/21/Redis中的set和sortedset/
到此這篇關(guān)于Redis源碼分析之set 和 sorted set 使用的文章就介紹到這了,更多相關(guān)Redis set 和 sorted set使用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
關(guān)于redis Key淘汰策略的實(shí)現(xiàn)方法
下面小編就為大家?guī)?lái)一篇關(guān)于redis Key淘汰策略的實(shí)現(xiàn)方法。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-03-03
Redis集群模式和常用數(shù)據(jù)結(jié)構(gòu)詳解
Redis集群模式下的運(yùn)維指令主要用于集群的搭建、管理、監(jiān)控和維護(hù),講解了一些常用的Redis集群運(yùn)維指令,本文重點(diǎn)介紹了Redis集群模式和常用數(shù)據(jù)結(jié)構(gòu),需要的朋友可以參考下2024-03-03
阿里云服務(wù)器安裝配置redis的方法并且加入到開機(jī)啟動(dòng)(推薦)
這篇文章主要介紹了阿里云服務(wù)器安裝配置redis并且加入到開機(jī)啟動(dòng),需要的朋友可以參考下2017-12-12

