一文弄懂Redis Stream消息隊列
1. Stream簡介
Stream是redis最復(fù)雜的一個數(shù)據(jù)結(jié)構(gòu), 也是redis 5.0的一個重要更新。Redis Stream 主要用于消息隊列(MQ,Message Queue),這樣的數(shù)據(jù)結(jié)構(gòu)其實很常見, 比如騰訊云的CMQ、阿里的RocketMQ、ActiveMQ、RabbitMQ以及炙手可熱的Kafka等。
Redis 本身是有一個 Redis 發(fā)布訂閱 (pub/sub) 來實現(xiàn)消息隊列的功能,但它有個缺點就是消息無法持久化,如果出現(xiàn)網(wǎng)絡(luò)斷開、Redis 宕機(jī)等,消息就會被丟棄。簡單來說發(fā)布訂閱 (pub/sub) 可以分發(fā)消息,但無法記錄歷史消息。而 Redis Stream 提供了消息的持久化和主備復(fù)制功能,可以讓任何客戶端訪問任何時刻的數(shù)據(jù),并且能記住每一個客戶端的訪問位置,還能保證消息不丟失。
Stream主要由消息、生產(chǎn)者、消費者、消費組4部分組成. 這里消費組可能讓人有些困惑, 其實就是消費組里面有多個消費者, 他們互相競爭, 當(dāng)一個消費了某條消息, 消息會被放入待確認(rèn)隊列, 消息隊列的迭代器就會前移, 下一個同組消費者不管是誰, 都不會再次消費這個消息, 而是下一個消息。這種概念和kafka很雷同,在某些特定場景可以使用redis的stream代替kafka等消息隊列,減少系統(tǒng)復(fù)雜性,增強(qiáng)系統(tǒng)的穩(wěn)定性。
(1)創(chuàng)建消息隊列
/* * xadd用來創(chuàng)建, 每個stream有一個唯一key, *意味著讓系統(tǒng)給你返回id, id是由unix時間和從0開始下標(biāo) * 組成, 也就是這一毫秒的第幾個條目. 你可以自己設(shè)定, 但是要確保嚴(yán)格單調(diào)遞增. 后面就是鍵值對, 也就 * 是消息本身. */ xadd mystream * str1 hello str2 world
每個 Stream 都有唯一的名稱,它就是 Redis 的 key,在我們首次使用 xadd 指令追加消息時自動創(chuàng)建。
(2)刪除消息隊列
del mystream
(3)刪除消息隊列中某條消息
通過xdel可以刪除消息, 但是注意, 其實沒有刪除, 只是設(shè)置了標(biāo)志位.
xdel mystream streamID
(4)消費一條消息
xread count 1 streams mystream streanID
(5)消費者組消費
創(chuàng)建消費者組:
xgroup create mystream myg 0 # 這里最后是id, 0就代表從最前面開始獲取消息, 可以寫成$, 意味著獲取新消息.
消費者組內(nèi)第一個消費者讀取消息:
xreadgroup group myg alice count 2 streams mystream # 消費者alice讀取2條消息
消費者組內(nèi)第二個消費者讀取消息:
xreadgroup group myg bob count 1 streams mystream # 消費者bob讀取1條消息
接下來我們介紹Stream的數(shù)據(jù)結(jié)構(gòu),在介紹Stream的數(shù)據(jù)結(jié)構(gòu)之前,我們先來看看字典數(shù)(Trie Tree)和基數(shù)樹(Radix Tree),Redis的消息隊列Stream主要是基于基數(shù)樹來實現(xiàn)的。
2. 字典樹(Trie Tree)
在計算機(jī)科學(xué)中,字典樹(Trie Tree),又稱前綴樹,是一種有序樹,用于保存關(guān)聯(lián)數(shù)組,可以保存一些字符串->值的對應(yīng)關(guān)系?;旧?,它哈希表功能相同,都是 key-value 映射,只不過其中的鍵通常是字符串。與二叉查找樹不同,鍵不是直接保存在節(jié)點中,而是由節(jié)點在樹中的位置決定。一個節(jié)點的所有子孫都有相同的前綴,也就是這個節(jié)點對應(yīng)的字符串,而根節(jié)點對應(yīng)空字符串。一般情況下,不是所有的節(jié)點都有對應(yīng)的值,只有葉子節(jié)點和部分內(nèi)部節(jié)點所對應(yīng)的鍵才有相關(guān)的值。trie tree中的鍵通常是字符串,但也可以是其它的結(jié)構(gòu)。trie的算法可以很容易地修改為處理其它結(jié)構(gòu)的有序序列,比如一串?dāng)?shù)字或者形狀的排列。比如,bitwise trie中的鍵是一串位元,可以用于表示整數(shù)或者內(nèi)存地址。
它的原理是將每個key拆分成每個單位長度字符,然后對應(yīng)到每個分支上,分支所在的節(jié)點對應(yīng)為從根節(jié)點到當(dāng)前節(jié)點的拼接出的key的值。Trie樹有3個基本性質(zhì):
- 根節(jié)點不包含字符,除根節(jié)點外每一個節(jié)點都只包含一個字符;
- 從根節(jié)點到某一節(jié)點,路徑上經(jīng)過的字符連接起來,為該節(jié)點對應(yīng)的字符串;
- 每個節(jié)點的所有子節(jié)點包含的字符都不相同;
字典樹的結(jié)構(gòu)圖如下所示:
Trie 的強(qiáng)大之處就在于它的時間復(fù)雜度。它的插入和查詢時間復(fù)雜度都為 O(k) ,其中 k 為 key 的長度,與 Trie 中保存了多少個元素?zé)o關(guān)。Hash 表號稱是 O(1) 的,但在計算 hash 的時候就肯定會是 O(k) ,而且還有碰撞之類的問題;Trie 的缺點是空間消耗很高。
字典樹的應(yīng)用場景包括:
(1)字符串檢索:事先將已知的一些字符串(字典)的有關(guān)信息保存到trie樹里,查找另外一些未知字符串是否出現(xiàn)過或者出現(xiàn)頻率。
(2)詞頻統(tǒng)計:一個文本文件,大約有一萬行,每行一個詞,要求統(tǒng)計出其中最頻繁出現(xiàn)的前10個詞,請給出思想,給出時間復(fù)雜度分析。
(3)排序:Trie樹是一棵多叉樹,只要先序遍歷整棵樹,輸出相應(yīng)的字符串便是按字典序排序的結(jié)果。
(4)字符串最長公共前綴:Trie樹利用多個字符串的公共前綴來節(jié)省存儲空間,當(dāng)我們把大量字符串存儲到一棵trie樹上時,我們可以快速得到某些字符串的公共前綴。
(5)字符串搜索的前綴匹配:trie樹常用于搜索提示。如當(dāng)輸入一個網(wǎng)址,可以自動搜索出可能的選擇。當(dāng)沒有完全匹配的搜索結(jié)果,可以返回前綴最相似的可能。
3. 基數(shù)樹(Radix Tree)
3.1 Radix Tree
Trie樹其實依然比較浪費空間,有人曾經(jīng)反饋他們在實際的項目發(fā)現(xiàn),隨著key的數(shù)量的增加,發(fā)現(xiàn)Trie樹會占用大量的內(nèi)存和空間?,F(xiàn)在我們就演繹下Trie樹是如何浪費內(nèi)存和空間的。比如下面的一組數(shù)據(jù):
{ "deck": someValue, "did": someValue, "doe": someValue, "dog": someValue, "doge": someValue, "dogs": someValue }
用Trie樹的畫法把上面的key value畫出來如下:
也許你已經(jīng)發(fā)現(xiàn)了一些問題。比如"deck"這一個分支,有沒有必要一直往下來拆分嗎?還是"did",有必要d,然后i,然后d嗎?像這樣的不可分叉的單支分支,其實完全可以合并,也就是壓縮。
這樣看起來是不是要更節(jié)省一點空間呢?這只是6個單詞的樣子,數(shù)據(jù)越多,空間節(jié)省的效果越明顯。而且這樣壓縮后,不可分叉的分支高度也變矮了。我們叫這樣的Trie樹為壓縮Trie樹(Compressed Trie Tree)。壓縮Trie樹也就是Radix樹,只不過他有多個名字,有人叫壓縮Trie樹,有人叫Radix樹,它和字典樹的不同之處在于,所有只有一個子節(jié)點的中間節(jié)點都被刪除。Redis中就用到了Radix樹。
3.2 計算機(jī)對Radix Tree的處理
因為計算機(jī)可不會像人類一樣可以通過英文像上面的圖一樣來構(gòu)建樹,計算機(jī)只認(rèn)識0和1。所以為了真正的了解Radix樹,我們需要知道機(jī)器是怎么讀取Radix樹的。計算機(jī)對于Radix樹的處理是以bit(或二進(jìn)制數(shù)字)來讀取的。一次被對比r個bit,2的r次方是radix樹的基數(shù)。這也是基數(shù)樹的這個名字的由來?,F(xiàn)在我們把上面的三個單詞變成二進(jìn)制的樣子,然后一位一位的看:
dog: 01100100 01101111 01100111 doge: 01100100 01101111 01100111 01100101 dogs: 01100100 01101111 01100111 01110011
按照字符串的比對,你會發(fā)現(xiàn)dog是dogs和doge的子串。但我們現(xiàn)在比對二進(jìn)制,一位一位的比對,你會發(fā)現(xiàn)dog和doge是在第二十五位的時候不一樣的。dogs和doge是在第二十八位不一樣的。按照位的比對的結(jié)果,你會發(fā)現(xiàn)doge居然是dogs二進(jìn)制子串。這就是計算機(jī)的方式。
4. 基數(shù)樹(Radix Tree)的實現(xiàn)
4.1 raxNode結(jié)構(gòu)定義
raxNode是radix tree的核心數(shù)據(jù)結(jié)構(gòu),其結(jié)構(gòu)體如下所示:
typedef struct raxNode { uint32_t iskey:1; uint32_t isnull:1; uint32_t iscompr:1; uint32_t size:29; unsigned char data[]; } raxNode; typedef struct rax { raxNode *head; uint64_t numele; uint64_t numnodes; } rax;
- iskey:表示這個節(jié)點是否包含key
- 0:沒有key
- 1:表示從頭部到其父節(jié)點的路徑完整的存儲了key,查找的時候按子節(jié)點iskey=1來判斷key是否存在
- isnull:是否有存儲value值,比如存儲元數(shù)據(jù)就只有key,沒有value值。value值也是存儲在data中
- iscompr:是否有前綴壓縮,決定了data存儲的數(shù)據(jù)結(jié)構(gòu)
- size:該節(jié)點存儲的字符個數(shù)
- data:存儲子節(jié)點的信息
- iscompr=0:非壓縮模式下,數(shù)據(jù)格式是:[header strlen=0][abc][a-ptr][b-ptr][c-ptr](value-ptr?),有size個字符,緊跟著是size個指針,指向每個字符對應(yīng)的下一個節(jié)點。size個字符之間互相沒有路徑聯(lián)系。
- iscompr=1:壓縮模式下,數(shù)據(jù)格式是:[header strlen=3][xyz][z-ptr](value-ptr?),只有一個指針,指向下一個節(jié)點。size個字符是壓縮字符片段
4.2 Rax Insert
以下用幾個示例來詳解rax tree插入的流程。假設(shè) j 是遍歷已有節(jié)點的游標(biāo),i 是遍歷新增節(jié)點的游標(biāo)。
(1)場景一:只插入abcd
該場景下iscompr = 1,表示使用了壓縮前綴, 因此data域只有一個z-ptr指針,z-ptr指向的葉子節(jié)點iskey = 1。節(jié)點圖為:
(2)場景二:在abcd之后插入abcdef
從abcd父節(jié)點的每個壓縮前綴字符比較,遍歷完所有abcd節(jié)點后指向了其空子節(jié)點,j = 0, i < len(abcded)。查找到abcd的空子節(jié)點,直接將ef賦值到子節(jié)點上,成為abcd的子節(jié)點。ef節(jié)點被標(biāo)記為iskey=1,用來標(biāo)識abcd這個key。ef節(jié)點下再創(chuàng)建一個空子節(jié)點,iskey=1來表示abcdef這個key。節(jié)點圖為:
(3)場景三:在abcd之后插入ab
ab在abcd能找到前兩位的前綴,也就是i=len(ab),j < len(abcd)。將abcd分割成ab和cd兩個子節(jié)點,cd也是一個壓縮前綴節(jié)點,cd同時被標(biāo)記為iskey=1,來表示ab這個key。cd下掛著一個空子節(jié)點,來標(biāo)記abcd這個key。節(jié)點圖為:
(4)場景四:在abcd之后插入abABC
abcABC在abcd中只找到了ab這個前綴,即i < len(abcABC),j < len(abcd)。這個步驟有點復(fù)雜,分解一下:
- step 1:將abcd從ab之后拆分,拆分成ab、c、d 三個節(jié)點。
- step 2:c節(jié)點是一個非壓縮的節(jié)點,c掛在ab子節(jié)點上。
- step 3:d節(jié)點只有一個字符,所以也是一個非壓縮節(jié)點,掛在c子節(jié)點上。
- step 4:將ABC 拆分成了A和BC, A掛在ab子節(jié)點上,和c節(jié)點屬于同一個節(jié)點,這樣A就和c同屬于父節(jié)點ab。
- step 5:將BC作為一個壓縮前綴的節(jié)點,掛在A子節(jié)點下。
- step 6:d節(jié)點和BC節(jié)點都掛一個空子節(jié)點分別標(biāo)識abcd和abcABC這兩個key。
節(jié)點圖為:
(5)場景五:在abcd之后插入Aabc
abcd和Aabc沒有前綴匹配,i = 0,j = 0。
- 將abcd拆分成a、bcd兩個節(jié)點,a節(jié)點是一個非壓縮前綴節(jié)點。
- 將Aabc拆分成A、abc兩個節(jié)點,A節(jié)點也是一個非壓縮前綴節(jié)點。
- 將A節(jié)點掛在和a相同的父節(jié)點上。
- 同上,在bcd和abc這兩個節(jié)點下掛空子節(jié)點來分別表示兩個key。
節(jié)點圖為:
4.3 Rax Remove
(1)刪除
刪除一個key的流程比較簡單,找到iskey的節(jié)點后,向上遍歷父節(jié)點刪除非iskey的節(jié)點。如果是非壓縮的父節(jié)點并且size > 1,表示還有其他非相關(guān)的路徑存在,則需要按刪除子節(jié)點的模式去處理這個父節(jié)點,主要是做memove和realloc。
(2)合并
刪除一個key之后需要嘗試做一些合并,以收斂樹的高度。合并的條件是:
- iskey = 1的節(jié)點不能合并
- 子節(jié)點只有一個字符
- 父節(jié)點只有一個子節(jié)點(如果父節(jié)點是壓縮前綴的節(jié)點,那么只有一個子節(jié)點,滿足條件。如果父節(jié)點是非壓縮前綴的節(jié)點,那么只能有一個字符路徑才能滿足條件)
5. 消息隊列Stream的實現(xiàn)
5.1 Stream的數(shù)據(jù)結(jié)構(gòu)
/* 消息ID */ typedef struct streamID { uint64_t ms; /* Unix 時間(ms) */ uint64_t seq; /* 序列號,該毫秒下產(chǎn)生的第幾個消息隊列 */ } streamID; /* 消息隊列 */ typedef struct stream { rax *rax; /* 該消息隊列指向的基數(shù)樹,保存鍵值對 */ uint64_t length; /* 此消息隊列里面的元素個數(shù) */ streamID last_id; /* 上一次訪問的消息ID,如果沒有元素則為0 */ rax *cgroups; /* 消費者組字典: name -> streamCG */ } stream; /* 消費者組 */ typedef struct streamCG { streamID last_id; /* 上一次分發(fā)還未確認(rèn)的消息 */ rax *pel; /* 這個消費者組中未確認(rèn)的消息列表 */ rax *consumers; /* 該消息者組中的所有消費者 */ } streamCG; /* 消費者組中的一個消費者 */ typedef struct streamConsumer { mstime_t seen_time; /* 該消費者上次激活的時間 */ sds name; /* 消費者的名字 */ rax *pel; /* 該消費者待處理的未確認(rèn)的消息列表 */ } streamConsumer;
(1)消息ID(streamID)
消息ID的形式是timestampInMillis-sequence
,例如1527846880572-5
,它表示當(dāng)前的消息在毫米時間戳1527846880572
時產(chǎn)生,并且是該毫秒內(nèi)產(chǎn)生的第5條消息。消息ID可以由服務(wù)器自動生成,也可以由客戶端自己指定,但是形式必須是整數(shù)-整數(shù)
,而且必須是后面加入的消息的ID要大于前面的消息ID。
(2)消息內(nèi)容
一條消息內(nèi)容就是一系列的key/value組成,使用基數(shù)樹進(jìn)行保存。
(3)last_id
游標(biāo),每個消費組會有個游標(biāo) last_id,任意一個消費者讀取了消息都會使游標(biāo) last_id 往前移動。
(4)pel
消費者(Consumer)的狀態(tài)變量,作用是維護(hù)消費者的未確認(rèn)的 id。 pel記錄了當(dāng)前已經(jīng)被客戶端讀取的消息,但是還沒有 ack (Acknowledge character:確認(rèn)字符)。
5.2 Stream的API函數(shù)
stream *streamNew(void); // 新建一個消息隊列stream void freeStream(stream *s); // 釋放一個消息隊列stream unsigned long streamLength(const robj *subject); // 返回消息隊列的消息個數(shù) /* 把stream中從start到end的消息發(fā)送給客戶端c,如果conut不為0,則發(fā)送conut個消息,返回發(fā)出的消息個數(shù) */ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi); /* 根據(jù)迭代器獲取消息的key和value */ void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);// 根據(jù)迭代器獲取消息ID streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id); // 創(chuàng)建一個消費者組 streamCG *streamLookupCG(stream *s, sds groupname); // 根據(jù)消費者組名查詢消費者組 /* 根據(jù)消費者名在消費者組中進(jìn)行查詢 */ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags, int *created); void streamIteratorRemoveEntry(streamIterator *si, streamID *current); // 刪除消息隊列中的一個消息 /* 添加一條消息到消息隊列中 */ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id); /* 從消息隊列中刪除一條消息 */ int streamDeleteItem(stream *s, streamID *id); /* 根據(jù)長度刪除消息隊列中消息的條數(shù) */ int64_t streamTrimByLength(stream *s, long long maxlen, int approx); /* 根據(jù)最小的消息ID刪除消息隊列中消息的條數(shù) */ int64_t streamTrimByID(stream *s, streamID minid, int approx);
參考:1)https://cloud.tencent.com/developer/article/1597128
2)http://mysql.taobao.org/monthly/2019/04/03/
到此這篇關(guān)于一文弄懂Redis Stream消息隊列的文章就介紹到這了,更多相關(guān)Redis Stream消息隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis+Caffeine實現(xiàn)分布式二級緩存組件實戰(zhàn)教程
這篇文章主要介紹了Redis+Caffeine實現(xiàn)分布式二級緩存組件實戰(zhàn)教程,介紹了分布式二級緩存的優(yōu)勢,使用組件的方法,通過示例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-08-08