欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

一文弄懂Redis Stream消息隊列

 更新時間:2023年06月12日 10:16:04   作者:蓬萊道人  
本文主要介紹了一文弄懂Redis Stream消息隊列,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

1. Stream簡介

Stream是redis最復(fù)雜的一個數(shù)據(jù)結(jié)構(gòu), 也是redis 5.0的一個重要更新。Redis Stream 主要用于消息隊列(MQ,Message Queue),這樣的數(shù)據(jù)結(jié)構(gòu)其實很常見, 比如騰訊云的CMQ、阿里的RocketMQ、ActiveMQ、RabbitMQ以及炙手可熱的Kafka等。

Redis 本身是有一個 Redis 發(fā)布訂閱 (pub/sub) 來實現(xiàn)消息隊列的功能,但它有個缺點就是消息無法持久化,如果出現(xiàn)網(wǎng)絡(luò)斷開、Redis 宕機(jī)等,消息就會被丟棄。簡單來說發(fā)布訂閱 (pub/sub) 可以分發(fā)消息,但無法記錄歷史消息。而 Redis Stream 提供了消息的持久化和主備復(fù)制功能,可以讓任何客戶端訪問任何時刻的數(shù)據(jù),并且能記住每一個客戶端的訪問位置,還能保證消息不丟失。

Stream主要由消息、生產(chǎn)者、消費者、消費組4部分組成. 這里消費組可能讓人有些困惑, 其實就是消費組里面有多個消費者, 他們互相競爭, 當(dāng)一個消費了某條消息, 消息會被放入待確認(rèn)隊列, 消息隊列的迭代器就會前移, 下一個同組消費者不管是誰, 都不會再次消費這個消息, 而是下一個消息。這種概念和kafka很雷同,在某些特定場景可以使用redis的stream代替kafka等消息隊列,減少系統(tǒng)復(fù)雜性,增強(qiáng)系統(tǒng)的穩(wěn)定性。

(1)創(chuàng)建消息隊列

/*
 * xadd用來創(chuàng)建, 每個stream有一個唯一key, *意味著讓系統(tǒng)給你返回id, id是由unix時間和從0開始下標(biāo) 
 * 組成, 也就是這一毫秒的第幾個條目. 你可以自己設(shè)定, 但是要確保嚴(yán)格單調(diào)遞增. 后面就是鍵值對, 也就 
 * 是消息本身.
 */
xadd mystream * str1 hello str2 world

每個 Stream 都有唯一的名稱,它就是 Redis 的 key,在我們首次使用 xadd 指令追加消息時自動創(chuàng)建。

(2)刪除消息隊列

del mystream

(3)刪除消息隊列中某條消息

通過xdel可以刪除消息, 但是注意, 其實沒有刪除, 只是設(shè)置了標(biāo)志位.

xdel mystream streamID

(4)消費一條消息

xread count 1 streams mystream streanID

(5)消費者組消費

創(chuàng)建消費者組:

xgroup create mystream myg 0 
# 這里最后是id, 0就代表從最前面開始獲取消息, 可以寫成$, 意味著獲取新消息.

消費者組內(nèi)第一個消費者讀取消息:

xreadgroup group myg alice count 2 streams mystream
# 消費者alice讀取2條消息

消費者組內(nèi)第二個消費者讀取消息:

xreadgroup group myg bob count 1 streams mystream 
# 消費者bob讀取1條消息

接下來我們介紹Stream的數(shù)據(jù)結(jié)構(gòu),在介紹Stream的數(shù)據(jù)結(jié)構(gòu)之前,我們先來看看字典數(shù)(Trie Tree)和基數(shù)樹(Radix Tree),Redis的消息隊列Stream主要是基于基數(shù)樹來實現(xiàn)的。

2. 字典樹(Trie Tree)

在計算機(jī)科學(xué)中,字典樹(Trie Tree),又稱前綴樹,是一種有序樹,用于保存關(guān)聯(lián)數(shù)組,可以保存一些字符串->值的對應(yīng)關(guān)系?;旧?,它哈希表功能相同,都是 key-value 映射,只不過其中的鍵通常是字符串。與二叉查找樹不同,鍵不是直接保存在節(jié)點中,而是由節(jié)點在樹中的位置決定。一個節(jié)點的所有子孫都有相同的前綴,也就是這個節(jié)點對應(yīng)的字符串,而根節(jié)點對應(yīng)空字符串。一般情況下,不是所有的節(jié)點都有對應(yīng)的值,只有葉子節(jié)點和部分內(nèi)部節(jié)點所對應(yīng)的鍵才有相關(guān)的值。trie tree中的鍵通常是字符串,但也可以是其它的結(jié)構(gòu)。trie的算法可以很容易地修改為處理其它結(jié)構(gòu)的有序序列,比如一串?dāng)?shù)字或者形狀的排列。比如,bitwise trie中的鍵是一串位元,可以用于表示整數(shù)或者內(nèi)存地址。

它的原理是將每個key拆分成每個單位長度字符,然后對應(yīng)到每個分支上,分支所在的節(jié)點對應(yīng)為從根節(jié)點到當(dāng)前節(jié)點的拼接出的key的值。Trie樹有3個基本性質(zhì):

  • 根節(jié)點不包含字符,除根節(jié)點外每一個節(jié)點都只包含一個字符;
  • 從根節(jié)點到某一節(jié)點,路徑上經(jīng)過的字符連接起來,為該節(jié)點對應(yīng)的字符串;
  • 每個節(jié)點的所有子節(jié)點包含的字符都不相同;

字典樹的結(jié)構(gòu)圖如下所示:

Trie 的強(qiáng)大之處就在于它的時間復(fù)雜度。它的插入和查詢時間復(fù)雜度都為 O(k) ,其中 k 為 key 的長度,與 Trie 中保存了多少個元素?zé)o關(guān)。Hash 表號稱是 O(1) 的,但在計算 hash 的時候就肯定會是 O(k) ,而且還有碰撞之類的問題;Trie 的缺點是空間消耗很高。

字典樹的應(yīng)用場景包括:

(1)字符串檢索:事先將已知的一些字符串(字典)的有關(guān)信息保存到trie樹里,查找另外一些未知字符串是否出現(xiàn)過或者出現(xiàn)頻率。

(2)詞頻統(tǒng)計:一個文本文件,大約有一萬行,每行一個詞,要求統(tǒng)計出其中最頻繁出現(xiàn)的前10個詞,請給出思想,給出時間復(fù)雜度分析。

(3)排序:Trie樹是一棵多叉樹,只要先序遍歷整棵樹,輸出相應(yīng)的字符串便是按字典序排序的結(jié)果。

(4)字符串最長公共前綴:Trie樹利用多個字符串的公共前綴來節(jié)省存儲空間,當(dāng)我們把大量字符串存儲到一棵trie樹上時,我們可以快速得到某些字符串的公共前綴。

(5)字符串搜索的前綴匹配:trie樹常用于搜索提示。如當(dāng)輸入一個網(wǎng)址,可以自動搜索出可能的選擇。當(dāng)沒有完全匹配的搜索結(jié)果,可以返回前綴最相似的可能。

3. 基數(shù)樹(Radix Tree)

3.1 Radix Tree

Trie樹其實依然比較浪費空間,有人曾經(jīng)反饋他們在實際的項目發(fā)現(xiàn),隨著key的數(shù)量的增加,發(fā)現(xiàn)Trie樹會占用大量的內(nèi)存和空間?,F(xiàn)在我們就演繹下Trie樹是如何浪費內(nèi)存和空間的。比如下面的一組數(shù)據(jù):

{
  "deck": someValue,
  "did": someValue,
  "doe": someValue,
  "dog": someValue,
  "doge": someValue,
  "dogs": someValue
}

用Trie樹的畫法把上面的key value畫出來如下:

也許你已經(jīng)發(fā)現(xiàn)了一些問題。比如"deck"這一個分支,有沒有必要一直往下來拆分嗎?還是"did",有必要d,然后i,然后d嗎?像這樣的不可分叉的單支分支,其實完全可以合并,也就是壓縮。

這樣看起來是不是要更節(jié)省一點空間呢?這只是6個單詞的樣子,數(shù)據(jù)越多,空間節(jié)省的效果越明顯。而且這樣壓縮后,不可分叉的分支高度也變矮了。我們叫這樣的Trie樹為壓縮Trie樹(Compressed Trie Tree)。壓縮Trie樹也就是Radix樹,只不過他有多個名字,有人叫壓縮Trie樹,有人叫Radix樹,它和字典樹的不同之處在于,所有只有一個子節(jié)點的中間節(jié)點都被刪除。Redis中就用到了Radix樹。

3.2 計算機(jī)對Radix Tree的處理

因為計算機(jī)可不會像人類一樣可以通過英文像上面的圖一樣來構(gòu)建樹,計算機(jī)只認(rèn)識0和1。所以為了真正的了解Radix樹,我們需要知道機(jī)器是怎么讀取Radix樹的。計算機(jī)對于Radix樹的處理是以bit(或二進(jìn)制數(shù)字)來讀取的。一次被對比r個bit,2的r次方是radix樹的基數(shù)。這也是基數(shù)樹的這個名字的由來?,F(xiàn)在我們把上面的三個單詞變成二進(jìn)制的樣子,然后一位一位的看:

dog:  01100100 01101111 01100111
doge: 01100100 01101111 01100111 01100101
dogs: 01100100 01101111 01100111 01110011

按照字符串的比對,你會發(fā)現(xiàn)dog是dogs和doge的子串。但我們現(xiàn)在比對二進(jìn)制,一位一位的比對,你會發(fā)現(xiàn)dog和doge是在第二十五位的時候不一樣的。dogs和doge是在第二十八位不一樣的。按照位的比對的結(jié)果,你會發(fā)現(xiàn)doge居然是dogs二進(jìn)制子串。這就是計算機(jī)的方式。

4. 基數(shù)樹(Radix Tree)的實現(xiàn)

4.1 raxNode結(jié)構(gòu)定義

raxNode是radix tree的核心數(shù)據(jù)結(jié)構(gòu),其結(jié)構(gòu)體如下所示:

typedef struct raxNode {    
   uint32_t iskey:1;  
   uint32_t isnull:1;  
   uint32_t iscompr:1; 
   uint32_t size:29;  
   unsigned char data[]; 
} raxNode;
typedef struct rax {
    raxNode *head;
    uint64_t numele;
    uint64_t numnodes;
} rax;
  • iskey:表示這個節(jié)點是否包含key
    • 0:沒有key
    • 1:表示從頭部到其父節(jié)點的路徑完整的存儲了key,查找的時候按子節(jié)點iskey=1來判斷key是否存在
  • isnull:是否有存儲value值,比如存儲元數(shù)據(jù)就只有key,沒有value值。value值也是存儲在data中
  • iscompr:是否有前綴壓縮,決定了data存儲的數(shù)據(jù)結(jié)構(gòu)
  • size:該節(jié)點存儲的字符個數(shù)
  • data:存儲子節(jié)點的信息
    • iscompr=0:非壓縮模式下,數(shù)據(jù)格式是:[header strlen=0][abc][a-ptr][b-ptr][c-ptr](value-ptr?),有size個字符,緊跟著是size個指針,指向每個字符對應(yīng)的下一個節(jié)點。size個字符之間互相沒有路徑聯(lián)系。
    • iscompr=1:壓縮模式下,數(shù)據(jù)格式是:[header strlen=3][xyz][z-ptr](value-ptr?),只有一個指針,指向下一個節(jié)點。size個字符是壓縮字符片段

4.2 Rax Insert

以下用幾個示例來詳解rax tree插入的流程。假設(shè) j 是遍歷已有節(jié)點的游標(biāo),i 是遍歷新增節(jié)點的游標(biāo)。

(1)場景一:只插入abcd

該場景下iscompr = 1,表示使用了壓縮前綴, 因此data域只有一個z-ptr指針,z-ptr指向的葉子節(jié)點iskey = 1。節(jié)點圖為:

(2)場景二:在abcd之后插入abcdef

從abcd父節(jié)點的每個壓縮前綴字符比較,遍歷完所有abcd節(jié)點后指向了其空子節(jié)點,j = 0, i < len(abcded)。查找到abcd的空子節(jié)點,直接將ef賦值到子節(jié)點上,成為abcd的子節(jié)點。ef節(jié)點被標(biāo)記為iskey=1,用來標(biāo)識abcd這個key。ef節(jié)點下再創(chuàng)建一個空子節(jié)點,iskey=1來表示abcdef這個key。節(jié)點圖為:

(3)場景三:在abcd之后插入ab

ab在abcd能找到前兩位的前綴,也就是i=len(ab),j < len(abcd)。將abcd分割成ab和cd兩個子節(jié)點,cd也是一個壓縮前綴節(jié)點,cd同時被標(biāo)記為iskey=1,來表示ab這個key。cd下掛著一個空子節(jié)點,來標(biāo)記abcd這個key。節(jié)點圖為:

(4)場景四:在abcd之后插入abABC

abcABC在abcd中只找到了ab這個前綴,即i < len(abcABC),j < len(abcd)。這個步驟有點復(fù)雜,分解一下:

  • step 1:將abcd從ab之后拆分,拆分成ab、c、d 三個節(jié)點。
  • step 2:c節(jié)點是一個非壓縮的節(jié)點,c掛在ab子節(jié)點上。
  • step 3:d節(jié)點只有一個字符,所以也是一個非壓縮節(jié)點,掛在c子節(jié)點上。
  • step 4:將ABC 拆分成了A和BC, A掛在ab子節(jié)點上,和c節(jié)點屬于同一個節(jié)點,這樣A就和c同屬于父節(jié)點ab。
  • step 5:將BC作為一個壓縮前綴的節(jié)點,掛在A子節(jié)點下。
  • step 6:d節(jié)點和BC節(jié)點都掛一個空子節(jié)點分別標(biāo)識abcd和abcABC這兩個key。

節(jié)點圖為:

(5)場景五:在abcd之后插入Aabc

abcd和Aabc沒有前綴匹配,i = 0,j = 0。

  • 將abcd拆分成a、bcd兩個節(jié)點,a節(jié)點是一個非壓縮前綴節(jié)點。
  • 將Aabc拆分成A、abc兩個節(jié)點,A節(jié)點也是一個非壓縮前綴節(jié)點。
  • 將A節(jié)點掛在和a相同的父節(jié)點上。
  • 同上,在bcd和abc這兩個節(jié)點下掛空子節(jié)點來分別表示兩個key。

節(jié)點圖為:

4.3 Rax Remove

(1)刪除

    刪除一個key的流程比較簡單,找到iskey的節(jié)點后,向上遍歷父節(jié)點刪除非iskey的節(jié)點。如果是非壓縮的父節(jié)點并且size > 1,表示還有其他非相關(guān)的路徑存在,則需要按刪除子節(jié)點的模式去處理這個父節(jié)點,主要是做memove和realloc。

(2)合并

    刪除一個key之后需要嘗試做一些合并,以收斂樹的高度。合并的條件是:

  • iskey = 1的節(jié)點不能合并
  • 子節(jié)點只有一個字符
  • 父節(jié)點只有一個子節(jié)點(如果父節(jié)點是壓縮前綴的節(jié)點,那么只有一個子節(jié)點,滿足條件。如果父節(jié)點是非壓縮前綴的節(jié)點,那么只能有一個字符路徑才能滿足條件)

5. 消息隊列Stream的實現(xiàn)

5.1 Stream的數(shù)據(jù)結(jié)構(gòu)

/* 消息ID */
typedef struct streamID {
    uint64_t ms;        /* Unix 時間(ms) */
    uint64_t seq;       /* 序列號,該毫秒下產(chǎn)生的第幾個消息隊列 */
} streamID;
/* 消息隊列 */
typedef struct stream {
    rax *rax;               /* 該消息隊列指向的基數(shù)樹,保存鍵值對 */
    uint64_t length;        /* 此消息隊列里面的元素個數(shù) */
    streamID last_id;       /* 上一次訪問的消息ID,如果沒有元素則為0 */
    rax *cgroups;           /* 消費者組字典: name -> streamCG */
} stream;
/* 消費者組 */
typedef struct streamCG {
    streamID last_id;       /* 上一次分發(fā)還未確認(rèn)的消息 */
    rax *pel;               /* 這個消費者組中未確認(rèn)的消息列表 */
    rax *consumers;         /* 該消息者組中的所有消費者 */
} streamCG;
/* 消費者組中的一個消費者 */
typedef struct streamConsumer {
    mstime_t seen_time;         /* 該消費者上次激活的時間 */
    sds name;                   /* 消費者的名字 */
    rax *pel;                   /* 該消費者待處理的未確認(rèn)的消息列表 */
} streamConsumer;

 (1)消息ID(streamID)

消息ID的形式是timestampInMillis-sequence,例如1527846880572-5,它表示當(dāng)前的消息在毫米時間戳1527846880572時產(chǎn)生,并且是該毫秒內(nèi)產(chǎn)生的第5條消息。消息ID可以由服務(wù)器自動生成,也可以由客戶端自己指定,但是形式必須是整數(shù)-整數(shù),而且必須是后面加入的消息的ID要大于前面的消息ID。

(2)消息內(nèi)容

 一條消息內(nèi)容就是一系列的key/value組成,使用基數(shù)樹進(jìn)行保存。

(3)last_id 

游標(biāo),每個消費組會有個游標(biāo) last_id,任意一個消費者讀取了消息都會使游標(biāo) last_id 往前移動。

(4)pel

 消費者(Consumer)的狀態(tài)變量,作用是維護(hù)消費者的未確認(rèn)的 id。 pel記錄了當(dāng)前已經(jīng)被客戶端讀取的消息,但是還沒有 ack (Acknowledge character:確認(rèn)字符)。

5.2 Stream的API函數(shù)

stream *streamNew(void);     // 新建一個消息隊列stream
void freeStream(stream *s);  // 釋放一個消息隊列stream
unsigned long streamLength(const robj *subject);  // 返回消息隊列的消息個數(shù)
/* 把stream中從start到end的消息發(fā)送給客戶端c,如果conut不為0,則發(fā)送conut個消息,返回發(fā)出的消息個數(shù) */
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, 
                            streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi);
/* 根據(jù)迭代器獲取消息的key和value */
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, 
                            int64_t *fieldlen, int64_t *valuelen);
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);// 根據(jù)迭代器獲取消息ID
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id); // 創(chuàng)建一個消費者組
streamCG *streamLookupCG(stream *s, sds groupname);    // 根據(jù)消費者組名查詢消費者組
/* 根據(jù)消費者名在消費者組中進(jìn)行查詢 */
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags, int *created); 
void streamIteratorRemoveEntry(streamIterator *si, streamID *current); // 刪除消息隊列中的一個消息
/* 添加一條消息到消息隊列中 */
int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id);
/* 從消息隊列中刪除一條消息 */
int streamDeleteItem(stream *s, streamID *id);
/* 根據(jù)長度刪除消息隊列中消息的條數(shù) */
int64_t streamTrimByLength(stream *s, long long maxlen, int approx);
/* 根據(jù)最小的消息ID刪除消息隊列中消息的條數(shù) */
int64_t streamTrimByID(stream *s, streamID minid, int approx);

參考:1)https://cloud.tencent.com/developer/article/1597128

           2)http://mysql.taobao.org/monthly/2019/04/03/

到此這篇關(guān)于一文弄懂Redis Stream消息隊列的文章就介紹到這了,更多相關(guān)Redis Stream消息隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Redis數(shù)據(jù)類型超詳細(xì)講解分析

    Redis數(shù)據(jù)類型超詳細(xì)講解分析

    Redis是一個開源的內(nèi)存數(shù)據(jù)結(jié)構(gòu)存儲系統(tǒng),可以用作數(shù)據(jù)庫、緩存和消息中間件,本文詳細(xì)介紹了Redis的各個數(shù)據(jù)類型、內(nèi)部編碼以及一些高級功能,如Geo、HyperLogLog和Stream,需要的朋友可以參考下
    2024-12-12
  • 利用Redis實現(xiàn)點贊功能的示例代碼

    利用Redis實現(xiàn)點贊功能的示例代碼

    點贊對我們來說并不陌生,我們經(jīng)常會在手機(jī)軟件或者網(wǎng)頁中看到它。今天就讓我們來了解一下如何用Redis實現(xiàn)這一功能吧,感興趣的可以嘗試一下
    2022-06-06
  • Redis+Caffeine實現(xiàn)分布式二級緩存組件實戰(zhàn)教程

    Redis+Caffeine實現(xiàn)分布式二級緩存組件實戰(zhàn)教程

    這篇文章主要介紹了Redis+Caffeine實現(xiàn)分布式二級緩存組件實戰(zhàn)教程,介紹了分布式二級緩存的優(yōu)勢,使用組件的方法,通過示例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下
    2022-08-08
  • Redis序列化轉(zhuǎn)換類型報錯的解決

    Redis序列化轉(zhuǎn)換類型報錯的解決

    本文主要介紹了Redis序列化轉(zhuǎn)換類型報錯的解決,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-04-04
  • Windows下Redis的安裝使用教程

    Windows下Redis的安裝使用教程

    這篇文章主要以圖文結(jié)合的方式為大家詳細(xì)介紹了Windows下Redis的安裝使用,Redis的出現(xiàn),很大程度補償了memcached這類key/value存儲的不足,在部分場合可以對關(guān)系數(shù)據(jù)庫起到很好的補充作用,對Redis感興趣的小伙伴們可以參考一下
    2016-05-05
  • Redis實現(xiàn)信息已讀未讀狀態(tài)提示

    Redis實現(xiàn)信息已讀未讀狀態(tài)提示

    這篇文章主要介紹了Redis實現(xiàn)信息已讀未讀狀態(tài)提示的相關(guān)資料,需要的朋友可以參考下
    2016-04-04
  • Redis 實現(xiàn)“附近的人”功能

    Redis 實現(xiàn)“附近的人”功能

    Redis基于geohash和有序集合提供了地理位置相關(guān)功能。這篇文章主要介紹了Redis 實現(xiàn)“附近的人”功能,需要的朋友可以參考下
    2019-11-11
  • RedisTemplate集成+封裝RedisUtil過程

    RedisTemplate集成+封裝RedisUtil過程

    本文介紹了如何搭建一個多模塊的Redis項目,包括項目搭建、配置和測試,通過使用父項目管理多個子模塊,可以實現(xiàn)單點構(gòu)建、統(tǒng)一版本管理和清晰的項目結(jié)構(gòu),文章還提供了在Spring Boot項目中集成RedisTemplate的示例,并解決了編碼問題
    2024-12-12
  • Redisson 主從一致性問題詳解

    Redisson 主從一致性問題詳解

    這篇文章主要為大家介紹了Redisson 主從一致性問題詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-08-08
  • Redis利用互斥鎖解決緩存擊穿問題

    Redis利用互斥鎖解決緩存擊穿問題

    使用互斥鎖可以有效防止緩存擊穿的情況發(fā)生,它能夠保證在緩存失效時,只有一個線程或者進(jìn)程能夠去加載數(shù)據(jù),其余的請求都會等待這個加載過程完成,雖然這種方式會犧牲一部分性能,但它大大提高了系統(tǒng)的穩(wěn)定性和可用性
    2024-08-08

最新評論