Redis?中使用?list,streams,pub/sub?幾種方式實現(xiàn)消息隊列的問題
使用 Redis 實現(xiàn)消息隊列
Redis 中也是可以實現(xiàn)消息隊列
不過談到消息隊列,我們會經(jīng)常遇到下面的幾個問題
1、消息如何防止丟失;
2、消息的重復(fù)發(fā)送如何處理;
3、消息的順序性問題;
關(guān)于 mq 中如何處理這幾個問題,可參看RabbitMQ,RocketMQ,Kafka 事務(wù)性,消息丟失,消息順序性和消息重復(fù)發(fā)送的處理策略
基于List的消息隊列
對于 List
使用 LPUSH 寫入數(shù)據(jù),使用 RPOP 讀出數(shù)據(jù)
127.0.0.1:6379> LPUSH test "ceshi-1" (integer) 1 127.0.0.1:6379> RPOP test "ceshi-1"
使用 RPOP 客戶端就需要一直輪詢,來監(jiān)測是否有值可以讀出,可以使用 BRPOP 可以進行阻塞式讀取,客戶端在沒有讀到隊列數(shù)據(jù)時,自動阻塞,直到有新的數(shù)據(jù)寫入隊列,再開始讀取新數(shù)據(jù)。
127.0.0.1:6379> BRPOP test 10
后面的 10 是監(jiān)聽的時間,單位是秒,10秒沒數(shù)據(jù),就退出。
如果客戶端從隊列中拿到一條消息時,但是還沒消費,客戶端宕機了,這條消息就對應(yīng)丟失了, Redis 中為了避免這種情況的出現(xiàn),提供了 BRPOPLPUSH 命令,BRPOPLPUSH 會在消費一條消息的時候,同時把消息插入到另一個 List,這樣如果消費者程序讀了消息但沒能正常處理,等它重啟后,就可以從備份 List 中重新讀取消息并進行處理了。
127.0.0.1:6379> LPUSH test "ceshi-1" (integer) 1 127.0.0.1:6379> LPUSH test "ceshi-2" (integer) 2 127.0.0.1:6379> BRPOPLPUSH test a-test 100 "ceshi-1" 127.0.0.1:6379> BRPOPLPUSH test a-test 100 "ceshi-2" 127.0.0.1:6379> BRPOPLPUSH test a-test 100 127.0.0.1:6379> RPOP a-test "ceshi-1" 127.0.0.1:6379> RPOP a-test "ceshi-2"
不過 List 類型并不支持消費組的實現(xiàn),Redis 從 5.0 版本開始提供的 Streams 數(shù)據(jù)類型,來支持消息隊列的場景。
分析下源碼實現(xiàn)
在版本3.2之前,Redis中的列表是 ziplist 和 linkedlist 實現(xiàn)的,針對 ziplist 存在的問題, 在3.2之后,引入了 quicklist 來對 ziplist 進行優(yōu)化。
對于 ziplist 來講:
1、保存過大的元素,否則容易導(dǎo)致內(nèi)存重新分配,甚至可能引發(fā)連鎖更新的問題。
2、保存過多的元素,否則訪問性能會降低。
quicklist 使多個數(shù)據(jù)項,不再用一個 ziplist 來存,而是分拆到多個 ziplist 中,每個 ziplist 用指針串起來,這樣修改其中一個數(shù)據(jù)項,即便發(fā)生級聯(lián)更新,也只會影響這一個 ziplist,其它 ziplist 不受影響。
下面看下 list 的實現(xiàn)
代碼鏈接https://github.com/redis/redis/blob/6.2/src/t_list.c
void listTypePush(robj *subject, robj *value, int where) { if (subject->encoding == OBJ_ENCODING_QUICKLIST) { int pos = (where == LIST_HEAD) ? QUICKLIST_HEAD : QUICKLIST_TAIL; if (value->encoding == OBJ_ENCODING_INT) { char buf[32]; ll2string(buf, 32, (long)value->ptr); quicklistPush(subject->ptr, buf, strlen(buf), pos); } else { quicklistPush(subject->ptr, value->ptr, sdslen(value->ptr), pos); } } else { serverPanic("Unknown list encoding"); } } /* Wrapper to allow argument-based switching between HEAD/TAIL pop */ void quicklistPush(quicklist *quicklist, void *value, const size_t sz, int where) { if (where == QUICKLIST_HEAD) { quicklistPushHead(quicklist, value, sz); } else if (where == QUICKLIST_TAIL) { quicklistPushTail(quicklist, value, sz); } }
可以看下上面主要用到的是 quicklist
這里再來分析下 quicklist 的數(shù)據(jù)結(jié)構(gòu)
typedef struct quicklist { // quicklist的鏈表頭 quicklistNode *head; // quicklist的鏈表尾 quicklistNode *tail; // 所有ziplist中的總元素個數(shù) unsigned long count; /* total count of all entries in all ziplists */ // quicklistNodes的個數(shù) unsigned long len; /* number of quicklistNodes */ int fill : QL_FILL_BITS; /* fill factor for individual nodes */ unsigned int compress : QL_COMP_BITS; /* depth of end nodes not to compress;0=off */ unsigned int bookmark_count: QL_BM_BITS; quicklistBookmark bookmarks[]; } quicklist; typedef struct quicklistNode { // 前一個quicklistNode struct quicklistNode *prev; // 后一個quicklistNode struct quicklistNode *next; // quicklistNode指向的ziplist unsigned char *zl; // ziplist的字節(jié)大小 unsigned int sz; /* ziplist size in bytes */ // ziplist中的元素個數(shù) unsigned int count : 16; /* count of items in ziplist */ // 編碼格式,原生字節(jié)數(shù)組或壓縮存儲 unsigned int encoding : 2; /* RAW==1 or LZF==2 */ // 存儲方式 unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */ // 數(shù)據(jù)是否被壓縮 unsigned int recompress : 1; /* was this node previous compressed? */ // 數(shù)據(jù)能否被壓縮 unsigned int attempted_compress : 1; /* node can't compress; too small */ // 預(yù)留的bit位 unsigned int extra : 10; /* more bits to steal for future usage */ } quicklistNode;
quicklist 作為一個鏈表結(jié)構(gòu),在它的數(shù)據(jù)結(jié)構(gòu)中,是定義了整個 quicklist 的頭、尾指針,這樣一來,我們就可以通過 quicklist 的數(shù)據(jù)結(jié)構(gòu),來快速定位到 quicklist 的鏈表頭和鏈表尾。
來看下 quicklist 是如何插入的
/* Add new entry to head node of quicklist. * * Returns 0 if used existing head. * Returns 1 if new head created. */ int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) { quicklistNode *orig_head = quicklist->head; assert(sz < UINT32_MAX); /* TODO: add support for quicklist nodes that are sds encoded (not zipped) */ if (likely( // 檢測插入位置的 ziplist 是否能容納該元素 _quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) { quicklist->head->zl = ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD); quicklistNodeUpdateSz(quicklist->head); } else { // 容納不了,就重新創(chuàng)建一個 quicklistNode quicklistNode *node = quicklistCreateNode(); node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD); quicklistNodeUpdateSz(node); _quicklistInsertNodeBefore(quicklist, quicklist->head, node); } quicklist->count++; quicklist->head->count++; return (orig_head != quicklist->head); }
quicklist 采用的是鏈表結(jié)構(gòu),所以當插入一個新元素的時候,首先判斷下 quicklist 插入位置的 ziplist 是否能容納該元素,即單個 ziplist 是否不超過 8KB,或是單個 ziplist 里的元素個數(shù)是否滿足要求。
如果可以插入就當前的節(jié)點進行插入,否則就新建一個 quicklistNode 來保存先插入的節(jié)點。
quicklist 通過控制每個 quicklistNode 中,ziplist 的大小或是元素個數(shù),就有效減少了在 ziplist 中新增或修改元素后,發(fā)生連鎖更新的情況,從而提供了更好的訪問性能。
基于 Streams 的消息隊列
Streams 是 Redis 專門為消息隊列設(shè)計的數(shù)據(jù)類型。
是可持久化的,可以保證數(shù)據(jù)不丟失。
支持消息的多播、分組消費。
支持消息的有序性。
來看下幾個主要的命令
XADD:插入消息,保證有序,可以自動生成全局唯一ID; XREAD:用于讀取消息,可以按ID讀取數(shù)據(jù); XREADGROUP:按消費組形式讀取消息; XPENDING和XACK:XPENDING命令可以用來查詢每個消費組內(nèi)所有消費者已讀取但尚未確認的消息,而XACK命令用于向消息隊列確認消息處理已完成。
下面看幾個常用的命令
XADD
使用 XADD 向隊列添加消息,如果指定的隊列不存在,則創(chuàng)建一個隊列,XADD 語法格式:
$ XADD key ID field value [field value ...]
key:隊列名稱,如果不存在就創(chuàng)建
ID:消息 id,我們使用 * 表示由 redis 生成,可以自定義,但是要自己保證遞增性
field value:記錄
$ XADD teststream * name xiaohong surname xiaobai "1646650328883-0"
可以看到 1646650328883-0
就是自動生成的全局唯一消息ID
XREAD
使用 XREAD 以阻塞或非阻塞方式獲取消息列表
$ XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
count:數(shù)量
milliseconds:可選,阻塞毫秒數(shù),沒有設(shè)置就是非阻塞模式
key:隊列名
id:消息 ID
$ XREAD BLOCK 100 STREAMS teststream 0 1) 1) "teststream" 2) 1) 1) "1646650328883-0" 2) 1) "name" 2) "xiaohong" 3) "surname" 4) "xiaobai"
BLOCK 就是阻塞的毫秒數(shù)
XGROUP
使用 XGROUP CREATE 創(chuàng)建消費者組
$ XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
key:隊列名稱,如果不存在就創(chuàng)建
groupname:組名
$:表示從尾部開始消費,只接受新消息,當前 Stream 消息會全部忽略
從頭開始消費
$ XGROUP CREATE teststream test-consumer-group-name 0-0
從尾部開始消費
$ XGROUP CREATE teststream test-consumer-group-name $
XREADGROUP GROUP
使用 XREADGROUP GROUP
讀取消費組中的消息
$ XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
group:消費組名
consumer:消費者名
count:讀取數(shù)量
milliseconds:阻塞毫秒數(shù)
key:隊列名
ID:消息 ID
$ XADD teststream * name xiaohong surname xiaobai "1646653392799-0" $ XREADGROUP GROUP test-consumer-group-name test-consumer-name COUNT 1 STREAMS teststream > 1) 1) "teststream" 2) 1) 1) "1646653392799-0" 2) 1) "name" 2) "xiaohong" 3) "surname" 4) "xiaobai"
消息隊列中的消息一旦被消費組里的一個消費者讀取了,就不能再被該消費組內(nèi)的其他消費者讀取了。
如果沒有通過 XACK 命令告知消息已經(jīng)成功消費了,該消息會一直存在,可以通過 XPENDING 命令查看已讀取、但尚未確認處理完成的消息。
$ XPENDING teststream test-consumer-group-name 1) (integer) 3 2) "1646653325535-0" 3) "1646653392799-0" 4) 1) 1) "test-consumer-name" 2) "3"
分析下源碼實現(xiàn)
stream 的結(jié)構(gòu)
typedef struct stream { // 這是使用前綴樹存儲數(shù)據(jù) rax *rax; /* The radix tree holding the stream. */ uint64_t length; /* Number of elements inside this stream. */ // 當前stream的最后一個id streamID last_id; /* Zero if there are yet no items. */ // 存儲當前的消費者組信息 rax *cgroups; /* Consumer groups dictionary: name -> streamCG */ } stream; typedef struct streamID { // 消息創(chuàng)建時的時間 uint64_t ms; /* Unix time in milliseconds. */ // 消息的序號 uint64_t seq; /* Sequence number. */ } streamID;
可以看到 stream 的實現(xiàn)用到了 rax 樹
再來看 rax 的實現(xiàn)
typedef struct rax { // radix tree 的頭節(jié)點 raxNode *head; // radix tree 所存儲的元素總數(shù),每插入一個 ID,計數(shù)加 1 uint64_t numele; // radix tree 的節(jié)點總數(shù) uint64_t numnodes; } rax; typedef struct raxNode { // 表示從 Radix Tree 的根節(jié)點到當前節(jié)點路徑上的字符組成的字符串,是否表示了一個完整的 key uint32_t iskey:1; /* Does this node contain a key? */ // 表明當前key對應(yīng)的value是否為空 uint32_t isnull:1; /* Associated value is NULL (don't store it). */ // 表示當前節(jié)點是非壓縮節(jié)點,還是壓縮節(jié)點。 uint32_t iscompr:1; /* Node is compressed. */ // 壓縮節(jié)點壓縮的字符串長度或者非壓縮節(jié)點的子節(jié)點個數(shù) uint32_t size:29; /* Number of children, or compressed string len. */ // 包含填充字段,同時存儲了當前節(jié)點包含的字符串以及子節(jié)點的指針,key對應(yīng)的value指針。 unsigned char data[]; } raxNode;
下面的前綴樹就是存儲了(radix、race、read、real 和 redis)這幾個 key 的布局
Radix Tree 非葉子節(jié)點,要不然是壓縮節(jié)點,只指向單個子節(jié)點,要不然是非壓縮節(jié)點,指向多個子節(jié)點,但每個子節(jié)點只表示一個字符。所以,非葉子節(jié)點無法同時指向表示單個字符的子節(jié)點和表示合并字符串的子節(jié)點。
data 是用來保存實際數(shù)據(jù)的。不過,這里保存的數(shù)據(jù)會根據(jù)當前節(jié)點的類型而有所不同:
對于非壓縮節(jié)點來說,data 數(shù)組包括子節(jié)點對應(yīng)的字符、指向子節(jié)點的指針,以及節(jié)點表示 key 時對應(yīng)的 value 指針;
對于壓縮節(jié)點來說,data 數(shù)組包括子節(jié)點對應(yīng)的合并字符串、指向子節(jié)點的指針,以及節(jié)點為 key 時的 value 指針。
Stream 保存的消息數(shù)據(jù),按照 key-value 形式來看的話,消息 ID 就相當于 key,而消息內(nèi)容相當于是 value。也就是說,Stream 會使用 Radix Tree 來保存消息 ID,然后將消息內(nèi)容保存在 listpack 中,并作為消息 ID 的 value,用 raxNode 的 value 指針指向?qū)?yīng)的 listpack。
這個 listpack 有點臉盲,listpack 是在 redis5.0 引入了一種新的數(shù)據(jù)結(jié)構(gòu),listpack 相比于 ziplist 有哪些優(yōu)點呢
壓縮列表的細節(jié)可參見壓縮列表
對于壓縮列表來講:保存過大的元素,否則容易導(dǎo)致內(nèi)存重新分配,甚至可能引發(fā)連鎖更新的問題。
在 listpack 中,因為每個列表項只記錄自己的長度,而不會像 ziplist 中的列表項那樣,會記錄前一項的長度。所以,當我們在 listpack 中新增或修改元素時,實際上只會涉及每個列表項自己的操作,而不會影響后續(xù)列表項的長度變化,這就避免了連鎖更新。
streamCG 消費者組
typedef struct streamCG { // 當前我這個stream的最大id streamID last_id; // 還沒有收到ACK的消息列表 rax *pel; // 消費組中的所有消費者,消費者名稱為鍵,streamConsumer 為值 rax *consumers; } streamCG;
last_id: 每個組的消費者共享一個last_id代表這個組消費到了什么位置,每次投遞后會更新這個group;
pel: 已經(jīng)發(fā)送給客戶端,但是還沒有收到XACK的消息都存儲在pel樹里面;
consumers: 存儲當前這個消費者組中的消費者。
streamConsumer 消費者結(jié)構(gòu)
typedef struct streamConsumer { // 為該消費者最后一次活躍的時間 mstime_t seen_time; // 消費者名稱,為sds結(jié)構(gòu) sds name; // 待ACK的消息列表,和 streamCG 中指向的是同一個 rax *pel; } streamConsumer;
消息隊列中的消息一旦被消費組里的一個消費者讀取了,就不能再被該消費組內(nèi)的其他消費者讀取了。
消費者組中會維護 last_id,代表消費者組消費的位置,同時未經(jīng) ACK 的消息會存在于 pel 中。
發(fā)布訂閱
Redis 發(fā)布訂閱(pub/sub)是一種消息通信模式:發(fā)送者(pub)發(fā)送消息,訂閱者(sub)接收消息。
來看下幾個主要的命令
PSUBSCRIBE pattern [pattern ...] 訂閱一個或多個符合給定模式的頻道。 PUBSUB subcommand [argument [argument ...]] 查看訂閱與發(fā)布系統(tǒng)狀態(tài)。 PUBLISH channel message 將信息發(fā)送到指定的頻道。 PUNSUBSCRIBE [pattern [pattern ...]] 退訂所有給定模式的頻道。 SUBSCRIBE channel [channel ...] 訂閱給定的一個或多個頻道的信息。 UNSUBSCRIBE [channel [channel ...]] 指退訂給定的頻道。
普通的訂閱
訂閱 test
$ SUBSCRIBE test
向 test 發(fā)布信息
$ PUBLISH test 1
基于模式(pattern)的發(fā)布/訂閱
相當于是模糊匹配,訂閱的時候通過加入通配符中來實現(xiàn),?表示1個占位符,表示任意個占位符(包括0),?表示1個以上占位符。
訂閱
$ psubscribe p-test*
發(fā)送信息
$ PUBLISH p-testa ceshi-1
看下源碼實現(xiàn)
Redis 將所有頻道和模式的訂閱關(guān)系分別保存在 pubsub_channels 和 pubsub_patterns 中。
代碼路徑https://github.com/redis/redis/blob/6.0/src/server.h
struct redisServer { // 保存訂閱頻道的信息 dict *pubsub_channels; /* Map channels to list of subscribed clients */ // 保存著所有和模式相關(guān)的信息 dict *pubsub_patterns; /* A dict of pubsub_patterns */ // ... }
pubsub_channels 屬性是一個字典,字典的鍵為正在被訂閱的頻道,而字典的值則是一個鏈表, 鏈表中保存了所有訂閱這個頻道的客戶端。
使用 PSUBSCRIBE 命令訂閱頻道時,就會將訂閱的頻道和客戶端在 pubsub_channels 中進行關(guān)聯(lián)
代碼路徑 https://github.com/redis/redis/blob/6.2/src/pubsub.c
// 訂閱一個頻道,成功返回1,已經(jīng)訂閱返回0 int pubsubSubscribeChannel(client *c, robj *channel) { dictEntry *de; list *clients = NULL; int retval = 0; /* Add the channel to the client -> channels hash table */ // 將頻道添加到客戶端本地的哈希表中 // 客戶端自己也有一個訂閱頻道的列表,記錄了此客戶端所訂閱的頻道 if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { retval = 1; incrRefCount(channel); // 添加到服務(wù)器中的pubsub_channels中 // 判斷下這個 channel 是否已經(jīng)創(chuàng)建了 de = dictFind(server.pubsub_channels,channel); if (de == NULL) { // 沒有創(chuàng)建,先創(chuàng)建 channel,后添加 clients = listCreate(); dictAdd(server.pubsub_channels,channel,clients); incrRefCount(channel); } else { // 已經(jīng)創(chuàng)建過了 clients = dictGetVal(de); } // 在尾部添加客戶端 listAddNodeTail(clients,c); } /* Notify the client */ addReplyPubsubSubscribed(c,channel); } typedef struct client { dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ } client;
1、客戶端進行訂閱的時候,自己本身也會維護一個訂閱的 channel 列表;
2、服務(wù)端會將訂閱的客戶端添加到自己的 pubsub_channels 中。
再來看下取消訂閱 pubsubUnsubscribeChannel
// 取消 client 訂閱 int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) { dictEntry *de; list *clients; listNode *ln; int retval = 0; // 客戶端在本地的哈希表中刪除channel incrRefCount(channel); /* channel may be just a pointer to the same object we have in the hash tables. Protect it... */ if (dictDelete(c->pubsub_channels,channel) == DICT_OK) { retval = 1; /* Remove the client from the channel -> clients list hash table */ de = dictFind(server.pubsub_channels,channel); serverAssertWithInfo(c,NULL,de != NULL); clients = dictGetVal(de); ln = listSearchKey(clients,c); serverAssertWithInfo(c,NULL,ln != NULL); listDelNode(clients,ln); if (listLength(clients) == 0) { /* Free the list and associated hash entry at all if this was * the latest client, so that it will be possible to abuse * Redis PUBSUB creating millions of channels. */ dictDelete(server.pubsub_channels,channel); } } /* Notify the client */ if (notify) addReplyPubsubUnsubscribed(c,channel); decrRefCount(channel); /* it is finally safe to release it */ return retval; }
取消訂閱的邏輯也比較簡單,先在客戶端本地維護的 channel 列表移除對應(yīng)的 channel 信息,然后在服務(wù)端中的 pubsub_channels 移除對應(yīng)的客戶端信息。
再來看下信息是如何進行發(fā)布的呢
/* Publish a message */ int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; dictEntry *de; dictIterator *di; listNode *ln; listIter li; /* Send to clients listening for that channel */ // 找到Channel所對應(yīng)的dictEntry de = dictFind(server.pubsub_channels,channel); if (de) { // 獲取此 channel 對應(yīng)的所有客戶端 list *list = dictGetVal(de); listNode *ln; listIter li; listRewind(list,&li); // 一個個發(fā)送信息 while ((ln = listNext(&li)) != NULL) { client *c = ln->value; addReplyPubsubMessage(c,channel,message); receivers++; } } /* Send to clients listening to matching channels */ // 拿到所有的客戶端信息 di = dictGetIterator(server.pubsub_patterns); if (di) { channel = getDecodedObject(channel); while((de = dictNext(di)) != NULL) { robj *pattern = dictGetKey(de); list *clients = dictGetVal(de); // 這里進行匹配 // 擁有相同的 pattern 的客戶端會被放入到同一個鏈表中 if (!stringmatchlen((char*)pattern->ptr, sdslen(pattern->ptr), (char*)channel->ptr, sdslen(channel->ptr),0)) continue; listRewind(clients,&li); while ((ln = listNext(&li)) != NULL) { client *c = listNodeValue(ln); addReplyPubsubPatMessage(c,pattern,channel,message); receivers++; } } decrRefCount(channel); dictReleaseIterator(di); } return receivers; }
消息的發(fā)布,除了會向 pubsub_channels 中的客戶端發(fā)送信息,也會通過 pubsub_patterns 給匹配的客戶端發(fā)送信息。
通過 channel 訂閱,通過 channel 找到匹配的客戶端鏈表,然后逐一發(fā)送
通過 pattern 訂閱,拿出所有的 patterns ,然后根據(jù)規(guī)則,對 發(fā)送的 channel ,進行一一匹配,找到滿足條件的客戶端然后發(fā)送信息。
再來看下 pubsub_patterns 中的客戶端數(shù)據(jù)是如何保存的
/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */ int pubsubSubscribePattern(client *c, robj *pattern) { dictEntry *de; list *clients; int retval = 0; // 如果客戶端沒有訂閱過 if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { retval = 1; // 客戶端端本地進行記錄 listAddNodeTail(c->pubsub_patterns,pattern); incrRefCount(pattern); /* Add the client to the pattern -> list of clients hash table */ de = dictFind(server.pubsub_patterns,pattern); if (de == NULL) { // 沒有創(chuàng)建,先創(chuàng)建 clients = listCreate(); dictAdd(server.pubsub_patterns,pattern,clients); incrRefCount(pattern); } else { clients = dictGetVal(de); } listAddNodeTail(clients,c); } /* Notify the client */ addReplyPubsubPatSubscribed(c,pattern); return retval; }
這里訂閱 pattern 的流程和訂閱 channel 的流程有點類似,只是這里存儲的是 pattern。pubsub_patterns 的類型也是 dict。
擁有相同的 pattern 的客戶端會被放入到同一個鏈表中???redis 的提交記錄可以發(fā)現(xiàn),原本 pubsub_patterns 的類型是 list,后面調(diào)整成了 dict。issues
This commit introduced a dictionary on the server side to efficiently handle the pub sub pattern matching. However, there is another list maintaining the same information which is redundant as well as expensive to operate on. Hence removing it.
如果是一個鏈表,就需要遍歷所有的鏈表,使用 dict ,將有相同 pattern 的客戶端放入同一個鏈表中,這樣匹配前面的 pattern 就好了,不用遍歷所有的客戶端節(jié)點。
總結(jié)
redis 中消息隊列的實現(xiàn),可以使用 list,Streams,pub/sub。
1、list 不支持消費者組;
2、發(fā)布訂閱 (pub/sub) 消息無法持久化,如果出現(xiàn)網(wǎng)絡(luò)斷開、Redis 宕機等,消息就會被丟棄,分發(fā)消息,無法記住歷史消息;
3、5.0 引入了 Streams,專門為消息隊列設(shè)計的數(shù)據(jù)結(jié)構(gòu),其中支持了消費者組,支持消息的有序性,支持消息的持久化;
參考
【Redis核心技術(shù)與實戰(zhàn)】https://time.geekbang.org/column/intro/100056701
【Redis設(shè)計與實現(xiàn)】https://book.douban.com/subject/25900156/
【Redis Streams 介紹】http://www.redis.cn/topics/streams-intro.html
【Centos7.6安裝redis-6.0.8版本】https://blog.csdn.net/roc_wl/article/details/108662719
【Stream 數(shù)據(jù)類型源碼分析】https://blog.csdn.net/weixin_45505313/article/details/109060761
【訂閱與發(fā)布】https://redisbook.readthedocs.io/en/latest/feature/pubsub.html
【Redis學習筆記】https://github.com/boilingfrog/Go-POINT/tree/master/redis
【使用 Redis 實現(xiàn)消息隊列】https://boilingfrog.github.io/2022/03/14/redis中實現(xiàn)消息隊列的幾種方式/
到此這篇關(guān)于Redis 中使用 list,streams,pub/sub 幾種方式實現(xiàn)消息隊列的文章就介紹到這了,更多相關(guān)Redis 消息隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
解析Redis未授權(quán)訪問漏洞復(fù)現(xiàn)與利用危害
這篇文章主要介紹了Redis未授權(quán)訪問漏洞復(fù)現(xiàn)與利用,介紹了redis未授權(quán)訪問漏洞的基本概念及漏洞的危害,本文給大家介紹的非常詳細,需要的朋友可以參考下2022-01-01分布式鎖為什么要選擇Zookeeper而不是Redis?看完這篇你就明白了
Zookeeper的機制可以保證分布式鎖實現(xiàn)業(yè)務(wù)代碼簡單,成本低,Redis如果要解決分布式鎖的問題,對于一些復(fù)雜的情況,很難解決,成本較高,這篇文章重點給大家介紹分布式鎖選擇Zookeeper 而不是Redis的理由,一起看看吧2021-05-05