一文詳解消息隊(duì)列中為什么不用redis作為隊(duì)列
1 引言
我經(jīng)常聽到很多人討論,關(guān)于把 Redis 當(dāng)作隊(duì)列來用是否合適的問題。
有些人表示贊成,他們認(rèn)為 Redis
很輕量,用作隊(duì)列很方便。
也些人則反對(duì),認(rèn)為 Redis
會(huì)丟數(shù)據(jù),最好還是用專業(yè)的隊(duì)列中間件更穩(wěn)妥
究竟哪種方案更好呢?
這篇文章,就聊一聊把 Redis
當(dāng)作隊(duì)列,究竟是否合適這個(gè)問題。
從簡單到復(fù)雜,一步步梳理其中的細(xì)節(jié),把這個(gè)問題真正的講清楚。
看完這篇文章后對(duì)這個(gè)問題你會(huì)有全新的認(rèn)識(shí)。
在文章的最后,還會(huì)告訴你關(guān)于技術(shù)選型的思路,文章有點(diǎn)長,希望你可以耐心讀完
1.1 Redis中List隊(duì)列
1.1.1 簡單使用
從最簡單的開始:List
隊(duì)列
首先,我們先從最簡單的場景開始講起
如果你的業(yè)務(wù)需求足夠簡單,想把 Redis
當(dāng)作隊(duì)列來使用,肯定最先想到的就是使用 List
這個(gè)數(shù)據(jù)類型
因?yàn)?code>List底層的實(shí)現(xiàn)就是一個(gè)鏈表,在頭部和尾部操作元素,時(shí)間復(fù)雜度都是 O(1)
,這意味著它非常符合消息隊(duì)列的模型。
如果把 List
當(dāng)作隊(duì)列,你可以這么來用。
生產(chǎn)者使用 LPUSH
發(fā)布消息:
127.0.0.1:6379> LPUSH queue msg1 (integer) 1 127.0.0.1:6379> LPUSH queue msg2 (integer) 2
消費(fèi)者這一側(cè),使用 RPOP
拉取消息:
127.0.0.1:6379> RPOP queue "msg1" 127.0.0.1:6379> RPOP queue "msg2"
這個(gè)模型非常簡單,也很容易理解。
但這里有個(gè)小問題,當(dāng)隊(duì)列中已經(jīng)沒有消息了,消費(fèi)者在執(zhí)行 RPOP
時(shí),會(huì)返回 NULL
127.0.0.1:6379> RPOP queue (nil) // 沒消息了
而我們在編寫消費(fèi)者邏輯時(shí),一般是一個(gè)死循環(huán)
,這個(gè)邏輯需要不斷地從隊(duì)列中拉取消息進(jìn)行處理,偽代碼一般會(huì)這么寫:
while true: msg = redis.rpop("queue") // 沒有消息,繼續(xù)循環(huán) if msg == null: continue // 處理消息 handle(msg)
如果此時(shí)隊(duì)列為空,那消費(fèi)者依舊會(huì)頻繁拉取消息,這會(huì)造成CPU 空轉(zhuǎn)
,不僅浪費(fèi) CPU
資源,還會(huì)對(duì) Redis
造成壓力。
1.1.2 解決cpu空轉(zhuǎn)問題
怎么解決這個(gè)問題呢?
也很簡單,當(dāng)隊(duì)列為空時(shí),我們可以休眠一會(huì),再去嘗試?yán)∠?。代碼可以修改成這樣:
while true: msg = redis.rpop("queue") // 沒有消息,休眠2s if msg == null: sleep(2) continue // 處理消息 handle(msg)
這就解決了 CPU
空轉(zhuǎn)問題
這個(gè)問題雖然解決了,但又帶來另外一個(gè)問題:當(dāng)消費(fèi)者在休眠等待時(shí),有新消息來了,那消費(fèi)者處理新消息就會(huì)存在延遲假設(shè)設(shè)置的休眠時(shí)間是 2s
,那新消息最多存在 2s
的延遲。
1.1.3 Redis阻塞式拉取
要想縮短這個(gè)延遲,只能減小休眠的時(shí)間。但休眠時(shí)間越小,又有可能引發(fā) CPU
空轉(zhuǎn)問題。
魚和熊掌不可兼得
那如何做,既能及時(shí)處理新消息,還能避免 CPU
空轉(zhuǎn)呢?
Redis
是否存在這樣一種機(jī)制:如果隊(duì)列為空,消費(fèi)者在拉取消息時(shí)就阻塞等待,一旦有新消息過來,就通知我的消費(fèi)者立即處理新消息呢?
幸運(yùn)的是,Redis
確實(shí)提供了「阻塞式」拉取消息的命令:BRPOP / BLPOP
,這里的 B
指的是阻塞Block
現(xiàn)在,你可以這樣來拉取消息了:
while true: // 沒消息阻塞等待,0表示不設(shè)置超時(shí)時(shí)間 msg = redis.brpop("queue", 0) if msg == null: continue // 處理消息 handle(msg)
使用 BRPOP
這種阻塞式方式拉取消息時(shí),還支持傳入一個(gè)超時(shí)時(shí)間,如果設(shè)置為 0
,則表示不設(shè)置超時(shí),直到有新消息才返回,否則會(huì)在指定的超時(shí)時(shí)間后返回 NULL
這個(gè)方案不錯(cuò),既兼顧了效率,還避免了 CPU
空轉(zhuǎn)問題,一舉兩得
注意:如果設(shè)置的超時(shí)時(shí)間太長,這個(gè)連接太久沒有活躍過,可能會(huì)被 Redis Server
判定為無效連接,之后 Redis Server
會(huì)強(qiáng)制把這個(gè)客戶端踢下線。所以,采用這種方案,客戶端要有重連機(jī)制。
解決了消息處理不及時(shí)的問題,你可以再思考一下,這種隊(duì)列模型,有什么缺點(diǎn)?
- 不支持重復(fù)消費(fèi):消費(fèi)者拉取消息后,這條消息就從
List
中刪除了,無法被其它消費(fèi)者再次消費(fèi),即不支持多個(gè)消費(fèi)者消費(fèi)同一批數(shù)據(jù) - 消息丟失:消費(fèi)者拉取到消息后,如果發(fā)生異常宕機(jī),那這條消息就丟失了
第一個(gè)問題是功能上的,使用 List
做消息隊(duì)列,它僅僅支持最簡單的,一組生產(chǎn)者對(duì)應(yīng)一組消費(fèi)者,不能滿足多組生產(chǎn)者和消費(fèi)者的業(yè)務(wù)場景
第二個(gè)問題就比較棘手了,因?yàn)閺?nbsp;List
中 POP
一條消息出來后,這條消息就會(huì)立即從鏈表中刪除了。也就是說,無論消費(fèi)者是否處理成功,這條消息都沒辦法再次消費(fèi)了。這也意味著,如果消費(fèi)者在處理消息時(shí)異常宕機(jī),那這條消息就相當(dāng)于丟失了。
1.2 Redis發(fā)布訂閱
1.2.1 簡單使用
發(fā)布/訂閱模型:Pub/Sub
從名字就能看出來,這個(gè)模塊是 Redis
專門是針對(duì)發(fā)布/訂閱這種隊(duì)列模型設(shè)計(jì)的
它正好可以解決前面提到的第一個(gè)問題:重復(fù)消費(fèi)。
即多組生產(chǎn)者、消費(fèi)者的場景,我們來看它是如何做的。
Redis
提供了 PUBLISH / SUBSCRIBE
命令,來完成發(fā)布、訂閱的操作。
假設(shè)你想開啟 2 個(gè)消費(fèi)者,同時(shí)消費(fèi)同一批數(shù)據(jù),就可以按照以下方式來實(shí)現(xiàn)。
首先,使用 SUBSCRIBE
命令,啟動(dòng) 2 個(gè)消費(fèi)者,并訂閱同一個(gè)隊(duì)列。
// 2個(gè)消費(fèi)者 都訂閱一個(gè)隊(duì)列 127.0.0.1:6379> SUBSCRIBE queue Reading messages... (press Ctrl-C to quit) 1) "subscribe" 2) "queue" 3) (integer) 1
此時(shí),2 個(gè)消費(fèi)者都會(huì)被阻塞住,等待新消息的到來。
之后,再啟動(dòng)一個(gè)生產(chǎn)者,發(fā)布一條消息。
127.0.0.1:6379> PUBLISH queue msg1 (integer) 1
這時(shí),2 個(gè)消費(fèi)者就會(huì)解除阻塞,收到生產(chǎn)者發(fā)來的新消息。
127.0.0.1:6379> SUBSCRIBE queue // 收到新消息 1) "message" 2) "queue" 3) "msg1"
看到了么,使用 Pub/Sub
這種方案,既支持阻塞式拉取消息,還很好地滿足了多組消費(fèi)者,消費(fèi)同一批數(shù)據(jù)的業(yè)務(wù)需求。
除此之外,Pub/Sub
還提供了匹配訂閱模式,允許消費(fèi)者根據(jù)一定規(guī)則,訂閱多個(gè)自己感興趣的隊(duì)列
// 訂閱符合規(guī)則的隊(duì)列 127.0.0.1:6379> PSUBSCRIBE queue.* Reading messages... (press Ctrl-C to quit) 1) "psubscribe" 2) "queue.*" 3) (integer) 1
這里的消費(fèi)者,訂閱了 queue.*
相關(guān)的隊(duì)列消息。
之后,生產(chǎn)者分別向 queue.p1
和 queue.p2
發(fā)布消息。
127.0.0.1:6379> PUBLISH queue.p1 msg1 (integer) 1 127.0.0.1:6379> PUBLISH queue.p2 msg2 (integer) 1
這時(shí)再看消費(fèi)者,它就可以接收到這 2 個(gè)生產(chǎn)者的消息了。
127.0.0.1:6379> PSUBSCRIBE queue.* Reading messages... (press Ctrl-C to quit) ... // 來自queue.p1的消息 1) "pmessage" 2) "queue.*" 3) "queue.p1" 4) "msg1" // 來自queue.p2的消息 1) "pmessage" 2) "queue.*" 3) "queue.p2" 4) "msg2"
我們可以看到,Pub/Sub
最大的優(yōu)勢就是,支持多組生產(chǎn)者、消費(fèi)者處理消息。
1.2.2 發(fā)布訂閱的缺點(diǎn)
講完了它的優(yōu)點(diǎn),那它有什么缺點(diǎn)呢?
其實(shí),Pub/Sub
最大問題是:丟數(shù)據(jù)
如果發(fā)生以下場景,就有可能導(dǎo)致數(shù)據(jù)丟失:
- 消費(fèi)者下線
- Redis 宕機(jī)
- 消息堆積
究竟是怎么回事?
這其實(shí)與 Pub/Sub
的實(shí)現(xiàn)方式有很大關(guān)系。
Pub/Sub
在實(shí)現(xiàn)時(shí)非常簡單,它沒有基于任何數(shù)據(jù)類型,也沒有做任何的數(shù)據(jù)存儲(chǔ),它只是單純地為生產(chǎn)者、消費(fèi)者建立數(shù)據(jù)轉(zhuǎn)發(fā)通道,把符合規(guī)則的數(shù)據(jù),從一端轉(zhuǎn)發(fā)到另一端。
一個(gè)完整的發(fā)布、訂閱消息處理流程是這樣的:
- 消費(fèi)者訂閱指定隊(duì)列,
Redis
就會(huì)記錄一個(gè)映射關(guān)系:隊(duì)列->消費(fèi)者 - 生產(chǎn)者向這個(gè)隊(duì)列發(fā)布消息,那
Redis
就從映射關(guān)系中找出對(duì)應(yīng)的消費(fèi)者,把消息轉(zhuǎn)發(fā)給它
看到了么,整個(gè)過程中,沒有任何的數(shù)據(jù)存儲(chǔ),一切都是實(shí)時(shí)轉(zhuǎn)發(fā)的。
這種設(shè)計(jì)方案,就導(dǎo)致了上面提到的那些問題。
例如,如果一個(gè)消費(fèi)者異常掛掉了,它再重新上線后,只能接收新的消息,在下線期間生產(chǎn)者發(fā)布的消息,因?yàn)檎也坏较M(fèi)者,都會(huì)被丟棄掉。
如果所有消費(fèi)者都下線了,那生產(chǎn)者發(fā)布的消息,因?yàn)檎也坏饺魏我粋€(gè)消費(fèi)者,也會(huì)全部丟棄所以,當(dāng)你在使用 Pub/Sub
時(shí),一定要注意:消費(fèi)者必須先訂閱隊(duì)列,生產(chǎn)者才能發(fā)布消息,否則消息會(huì)丟失這也是前面講例子時(shí),我們讓消費(fèi)者先訂閱隊(duì)列,之后才讓生產(chǎn)者發(fā)布消息的原因。
另外,因?yàn)?nbsp;Pub/Sub
沒有基于任何數(shù)據(jù)類型實(shí)現(xiàn),所以它也不具備數(shù)據(jù)持久化的能力。
也就是說,Pub/Sub
的相關(guān)操作,不會(huì)寫入到 RDB
和 AOF
中,當(dāng) Redis
宕機(jī)重啟,Pub/Sub
的數(shù)據(jù)也會(huì)全部丟失。
最后,我們來看 Pub/Sub
在處理消息積壓
時(shí),為什么也會(huì)丟數(shù)據(jù)?
當(dāng)消費(fèi)者的速度,跟不上生產(chǎn)者時(shí),就會(huì)導(dǎo)致數(shù)據(jù)積壓的情況發(fā)生。
如果采用 List
當(dāng)作隊(duì)列,消息積壓時(shí),會(huì)導(dǎo)致這個(gè)鏈表很長,最直接的影響就是,Redis
內(nèi)存會(huì)持續(xù)增長,直到消費(fèi)者把所有數(shù)據(jù)都從鏈表中取出。
但 Pub/Sub
的處理方式卻不一樣,當(dāng)消息積壓時(shí),有可能會(huì)導(dǎo)致消費(fèi)失敗和消息丟失!
這是怎么回事?
還是回到 Pub/Sub
的實(shí)現(xiàn)細(xì)節(jié)上來說。
每個(gè)消費(fèi)者訂閱一個(gè)隊(duì)列時(shí),Redis
都會(huì)在 Server
上給這個(gè)消費(fèi)者在分配一個(gè)緩沖區(qū),這個(gè)緩沖區(qū)其實(shí)就是一塊內(nèi)存。
當(dāng)生產(chǎn)者發(fā)布消息時(shí),Redis
先把消息寫到對(duì)應(yīng)消費(fèi)者的緩沖區(qū)中。
之后,消費(fèi)者不斷地從緩沖區(qū)讀取消息,處理消息。
但是,問題就出在這個(gè)緩沖區(qū)上
因?yàn)檫@個(gè)緩沖區(qū)其實(shí)是有上限的(可配置),如果消費(fèi)者拉取消息很慢,就會(huì)造成生產(chǎn)者發(fā)布到緩沖區(qū)的消息開始積壓,緩沖區(qū)內(nèi)存持續(xù)增長。
如果超過了緩沖區(qū)配置的上限,此時(shí),Redis
就會(huì)強(qiáng)制把這個(gè)消費(fèi)者踢下線
這時(shí)消費(fèi)者就會(huì)消費(fèi)失敗,也會(huì)丟失數(shù)據(jù)。
如果你有看過 Redis
的配置文件,可以看到這個(gè)緩沖區(qū)的默認(rèn)配置:client-output-buffer-limit pubsub 32mb 8mb 60
它的參數(shù)含義如下:
32mb
:緩沖區(qū)一旦超過 32MB,Redis 直接強(qiáng)制把消費(fèi)者踢下線8mb + 60
:緩沖區(qū)超過 8MB,并且持續(xù) 60 秒,Redis 也會(huì)把消費(fèi)者踢下線
Pub/Sub
的這一點(diǎn)特點(diǎn),是與 List
作隊(duì)列差異比較大的
從這里你應(yīng)該可以看出,List
其實(shí)是屬于拉模型,而 Pub/Sub
其實(shí)屬于推模型。
List
中的數(shù)據(jù)可以一直積壓在內(nèi)存中,消費(fèi)者什么時(shí)候來拉都可以。
但 Pub/Sub
是把消息先推到消費(fèi)者在 Redis Server
上的緩沖區(qū)中,然后等消費(fèi)者再來取。
當(dāng)生產(chǎn)、消費(fèi)速度不匹配時(shí),就會(huì)導(dǎo)致緩沖區(qū)的內(nèi)存開始膨脹,Redis
為了控制緩沖區(qū)的上限,所以就有了上面講到的,強(qiáng)制把消費(fèi)者踢下線的機(jī)制。
好了,現(xiàn)在我們總結(jié)一下 Pub/Sub
的優(yōu)缺點(diǎn):
- 支持發(fā)布 / 訂閱,支持多組生產(chǎn)者、消費(fèi)者處理消息
- 消費(fèi)者下線,數(shù)據(jù)會(huì)丟失
- 不支持?jǐn)?shù)據(jù)持久化,
Redis
宕機(jī),數(shù)據(jù)也會(huì)丟失 - 消息堆積,緩沖區(qū)溢出,消費(fèi)者會(huì)被強(qiáng)制踢下線,數(shù)據(jù)也會(huì)丟失
有沒有發(fā)現(xiàn),除了第一個(gè)是優(yōu)點(diǎn)之外,剩下的都是缺點(diǎn)。
所以,很多人看到 Pub/Sub
的特點(diǎn)后,覺得這個(gè)功能很雞肋
也正是以上原因,Pub/Sub
在實(shí)際的應(yīng)用場景中用得并不多
目前只有哨兵集群和Redis
實(shí)例通信時(shí),采用了 Pub/Sub
的方案,因?yàn)樯诒梅霞磿r(shí)通訊的業(yè)務(wù)場景。
我們再來看一下,Pub/Sub
有沒有解決,消息處理時(shí)異常宕機(jī),無法再次消費(fèi)的問題呢?
其實(shí)也不行,Pub/Sub
從緩沖區(qū)取走數(shù)據(jù)之后,數(shù)據(jù)就從 Redis
緩沖區(qū)刪除了,消費(fèi)者發(fā)生異常,自然也無法再次重新消費(fèi)。
好,現(xiàn)在我們重新梳理一下,我們在使用消息隊(duì)列時(shí)的需求。
當(dāng)我們在使用一個(gè)消息隊(duì)列時(shí),希望它的功能如下:
- 支持阻塞等待拉取消息
- 支持發(fā)布 / 訂閱模式
- 消費(fèi)失敗,可重新消費(fèi),消息不丟失
- 實(shí)例宕機(jī),消息不丟失,數(shù)據(jù)可持久化
- 消息可堆積
Redis
除了 List
和 Pub/Sub
之外,還有符合這些要求的數(shù)據(jù)類型嗎?
其實(shí),Redis
的作者也看到了以上這些問題,也一直在朝著這些方向努力著。Redis
作者在開發(fā) Redis
期間,還另外開發(fā)了一個(gè)開源項(xiàng)目 disque
這個(gè)項(xiàng)目的定位,就是一個(gè)基于內(nèi)存的分布式消息隊(duì)列中間件。
但由于種種原因,這個(gè)項(xiàng)目一直不溫不火。
終于,在 Redis 5.0
版本,作者把 disque
功能移植到了 Redis
中,并給它定義了一個(gè)新的數(shù)據(jù)類型:Stream
下面我們就來看看,它能符合上面提到的這些要求嗎?
1.3 Redis中的Stream
1.3.1 簡單使用
趨于成熟的隊(duì)列:Stream
我們來看 Stream
是如何解決上面這些問題的
我們依舊從簡單到復(fù)雜,依次來看 Stream
在做消息隊(duì)列時(shí),是如何處理的?
首先,Stream
通過 XADD
和 XREAD
完成最簡單的生產(chǎn)、消費(fèi)模型:
- XADD:發(fā)布消息
- XREAD:讀取消息
生產(chǎn)者發(fā)布 2 條消息:
// *表示讓Redis自動(dòng)生成消息ID 127.0.0.1:6379> XADD queue * name zhangsan "1618469123380-0" 127.0.0.1:6379> XADD queue * name lisi "1618469127777-0"
使用 XADD
命令發(fā)布消息,其中的*
表示讓 Redis
自動(dòng)生成唯一的消息 ID
這個(gè)消息 ID
的格式是時(shí)間戳-自增序號(hào)
消費(fèi)者拉取消息:
// 從開頭讀取5條消息,0-0表示從開頭讀取 127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 0-0 1) 1) "queue" 2) 1) 1) "1618469123380-0" 2) 1) "name" 2) "zhangsan" 2) 1) "1618469127777-0" 2) 1) "name" 2) "lisi"
如果想繼續(xù)拉取消息,需要傳入上一條消息的 ID:
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 1618469127777-0 (nil)
沒有消息,Redis
會(huì)返回 NULL
以上就是 Stream
最簡單的生產(chǎn)、消費(fèi)
這里不再重點(diǎn)介紹 Stream
命令的各種參數(shù),我在例子中演示時(shí),凡是大寫的單詞都是固定參數(shù),凡是小寫的單詞,都是可以自己定義的,例如隊(duì)列名、消息長度等等,下面的例子規(guī)則也是一樣,為了方便你理解,這里有必要提醒一下。
下面我們來看,針對(duì)前面提到的消息隊(duì)列要求,Stream
都是如何解決的?
1.3.2 stream阻塞拉取
Stream
是否支持阻塞式
拉取消息?
可以的,在讀取消息時(shí),只需要增加 BLOCK
參數(shù)即可
// BLOCK 0 表示阻塞等待,不設(shè)置超時(shí)時(shí)間 127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0
這時(shí),消費(fèi)者就會(huì)阻塞等待,直到生產(chǎn)者發(fā)布新的消息才會(huì)返回。
1.3.3 Stream支持發(fā)布 / 訂閱模式
也沒問題,Stream
通過以下命令完成發(fā)布訂閱:
XGROUP
:創(chuàng)建消費(fèi)者組XREADGROUP
:在指定消費(fèi)組下,開啟消費(fèi)者拉取消息
下面我們來看具體如何做?
首先,生產(chǎn)者依舊發(fā)布 2 條消息:
127.0.0.1:6379> XADD queue * name zhangsan "1618470740565-0" 127.0.0.1:6379> XADD queue * name lisi "1618470743793-0"
之后,我們想要開啟 2 組消費(fèi)者處理同一批數(shù)據(jù),就需要?jiǎng)?chuàng)建 2 個(gè)消費(fèi)者組:
// 創(chuàng)建消費(fèi)者組1,0-0表示從頭拉取消息 127.0.0.1:6379> XGROUP CREATE queue group1 0-0 OK // 創(chuàng)建消費(fèi)者組2,0-0表示從頭拉取消息 127.0.0.1:6379> XGROUP CREATE queue group2 0-0 OK
消費(fèi)者組創(chuàng)建好之后,我們可以給每個(gè)消費(fèi)者組下面掛一個(gè)消費(fèi)者,讓它們分別處理同一批數(shù)據(jù)。
第一個(gè)消費(fèi)組開始消費(fèi):
// group1的consumer開始消費(fèi),>表示拉取最新數(shù)據(jù) 127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS queue > 1) 1) "queue" 2) 1) 1) "1618470740565-0" 2) 1) "name" 2) "zhangsan" 2) 1) "1618470743793-0" 2) 1) "name" 2) "lisi"
同樣地,第二個(gè)消費(fèi)組開始消費(fèi):
// group2的consumer開始消費(fèi),>表示拉取最新數(shù)據(jù) 127.0.0.1:6379> XREADGROUP GROUP group2 consumer COUNT 5 STREAMS queue > 1) 1) "queue" 2) 1) 1) "1618470740565-0" 2) 1) "name" 2) "zhangsan" 2) 1) "1618470743793-0" 2) 1) "name" 2) "lisi"
我們可以看到,這 2 組消費(fèi)者,都可以獲取同一批數(shù)據(jù)進(jìn)行處理了。
這樣一來,就達(dá)到了多組消費(fèi)者「訂閱」消費(fèi)的目的。
1.3.4 stream不丟消息
消息處理時(shí)異常,Stream
能否保證消息不丟失,重新消費(fèi)?
除了上面拉取消息時(shí)用到了消息 ID
,這里為了保證重新消費(fèi),也要用到這個(gè)消息 ID。
當(dāng)一組消費(fèi)者處理完消息后,需要執(zhí)行 XACK
命令告知 Redis
,這時(shí) Redis
就會(huì)把這條消息標(biāo)記為處理完成
// group1下的 1618472043089-0 消息已處理完成 127.0.0.1:6379> XACK queue group1 1618472043089-0
如果消費(fèi)者異常宕機(jī),肯定不會(huì)發(fā)送 XACK
,那么 Redis
就會(huì)依舊保留這條消息。
待這組消費(fèi)者重新上線后,Redis
就會(huì)把之前沒有處理成功的數(shù)據(jù),重新發(fā)給這個(gè)消費(fèi)者。這樣一來,即使消費(fèi)者異常,也不會(huì)丟失數(shù)據(jù)了。
// 消費(fèi)者重新上線,0-0表示重新拉取未ACK的消息 127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS queue 0-0 // 之前沒消費(fèi)成功的數(shù)據(jù),依舊可以重新消費(fèi) 1) 1) "queue" 2) 1) 1) "1618472043089-0" 2) 1) "name" 2) "zhangsan" 2) 1) "1618472045158-0" 2) 1) "name" 2) "lisi"
1.3.5 stream持久化處理
Stream
是新增加的數(shù)據(jù)類型,它與其它數(shù)據(jù)類型一樣,每個(gè)寫操作,也都會(huì)寫入到 RDB
和 AOF
中
我們只需要配置好持久化策略,這樣的話,就算 Redis 宕機(jī)重啟,Stream
中的數(shù)據(jù)也可以從 RDB
或 AOF
中恢復(fù)回來。
1.3.6 stream消息堆積
消息堆積時(shí),Stream 是怎么處理的?
其實(shí),當(dāng)消息隊(duì)列發(fā)生消息堆積時(shí),一般只有 2 個(gè)解決方案:
生產(chǎn)者限流
:避免消費(fèi)者處理不及時(shí),導(dǎo)致持續(xù)積壓丟棄消息
:中間件丟棄舊消息,只保留固定長度的新消息
而 Redis
在實(shí)現(xiàn) Stream
時(shí),采用了第 2 個(gè)方案。
在發(fā)布消息時(shí),你可以指定隊(duì)列的最大長度,防止隊(duì)列積壓導(dǎo)致內(nèi)存爆炸。
// 隊(duì)列長度最大10000 127.0.0.1:6379> XADD queue MAXLEN 10000 * name zhangsan "1618473015018-0"
當(dāng)隊(duì)列長度超過上限后,舊消息會(huì)被刪除,只保留固定長度的新消息。
這么來看,Stream
在消息積壓時(shí),如果指定了最大長度,還是有可能丟失消息的。
除了以上介紹到的命令,Stream
還支持查看消息長度XLEN
、查看消費(fèi)者狀態(tài)XINFO
等命令,使用也比較簡單,你可以查詢官方文檔了解一下,這里就不過多介紹了。
好了,通過以上介紹,我們可以看到,Redis
的 Stream
幾乎覆蓋到了消息隊(duì)列的各種場景,是不是覺得很完美?
既然它的功能這么強(qiáng)大,這是不是意味著,Redis
真的可以作為專業(yè)的消息隊(duì)列中間件來使用呢?
但是還差一點(diǎn)
,就算 Redis
能做到以上這些,也只是趨近于專業(yè)的消息隊(duì)列。
原因在于 Redis
本身的一些問題,如果把其定位成消息隊(duì)列,還是有些欠缺的。
到這里,就不得不把 Redis
與專業(yè)的隊(duì)列中間件做對(duì)比了。
1.4 與專業(yè)消息對(duì)比
與專業(yè)的消息隊(duì)列對(duì)比
其實(shí),一個(gè)專業(yè)的消息隊(duì)列,必須要做到兩大塊:
- 消息不丟
- 消息可堆積
前面我們討論的重點(diǎn),很大篇幅圍繞的是第一點(diǎn)展開的。
這里我們換個(gè)角度,從一個(gè)消息隊(duì)列的「使用模型」來分析一下,怎么做,才能保證數(shù)據(jù)不丟?
使用一個(gè)消息隊(duì)列,其實(shí)就分為三大塊:生產(chǎn)者、隊(duì)列中間件、消費(fèi)者
消息是否會(huì)發(fā)生丟失,其重點(diǎn)也就在于以下 3 個(gè)環(huán)節(jié):
- 生產(chǎn)者會(huì)不會(huì)丟消息?
- 消費(fèi)者會(huì)不會(huì)丟消息?
- 隊(duì)列中間件會(huì)不會(huì)丟消息?
1.4.1 生產(chǎn)者會(huì)不會(huì)丟消息
當(dāng)生產(chǎn)者在發(fā)布消息時(shí),可能發(fā)生以下異常情況:
- 消息沒發(fā)出去:網(wǎng)絡(luò)故障或其它問題導(dǎo)致發(fā)布失敗,中間件直接返回失敗
- 不確定是否發(fā)布成功:網(wǎng)絡(luò)問題導(dǎo)致發(fā)布超時(shí),可能數(shù)據(jù)已發(fā)送成功,但讀取響應(yīng)結(jié)果超時(shí)了
如果是情況 1,消息根本沒發(fā)出去,那么重新發(fā)一次就好了。
如果是情況 2,生產(chǎn)者沒辦法知道消息到底有沒有發(fā)成功?所以,為了避免消息丟失,它也只能繼續(xù)重試,直到發(fā)布成功為止。
生產(chǎn)者一般會(huì)設(shè)定一個(gè)最大重試次數(shù),超過上限依舊失敗,需要記錄日志報(bào)警處理。
也就是說,生產(chǎn)者為了避免消息丟失,只能采用失敗重試的方式來處理。
但發(fā)現(xiàn)沒有?這也意味著消息可能會(huì)重復(fù)發(fā)送。
是的,在使用消息隊(duì)列時(shí),要保證消息不丟,寧可重發(fā),也不能丟棄。
那消費(fèi)者這邊,就需要多做一些邏輯了。
對(duì)于敏感業(yè)務(wù),當(dāng)消費(fèi)者收到重復(fù)數(shù)據(jù)數(shù)據(jù)時(shí),要設(shè)計(jì)冪等邏輯,保證業(yè)務(wù)的正確性。
從這個(gè)角度來看,生產(chǎn)者會(huì)不會(huì)丟消息,取決于生產(chǎn)者對(duì)于異常情況的處理是否合理。
所以,無論是 Redis
還是專業(yè)的隊(duì)列中間件,生產(chǎn)者在這一點(diǎn)上都是可以保證消息不丟的。
1.4.2 消費(fèi)者會(huì)不會(huì)丟消息
這種情況就是我們前面提到的,消費(fèi)者拿到消息后,還沒處理完成,就異常宕機(jī)了,那消費(fèi)者還能否重新消費(fèi)失敗的消息?
要解決這個(gè)問題,消費(fèi)者在處理完消息后,必須「告知」隊(duì)列中間件,隊(duì)列中間件才會(huì)把標(biāo)記已處理,否則仍舊把這些數(shù)據(jù)發(fā)給消費(fèi)者。
這種方案需要消費(fèi)者和中間件互相配合,才能保證消費(fèi)者這一側(cè)的消息不丟。
無論是 Redis
的 Stream
,還是專業(yè)的隊(duì)列中間件,例如 RabbitMQ、Kafka
,其實(shí)都是這么做的。
所以,從這個(gè)角度來看,Redis
也是合格的。
1.4.3 隊(duì)列中間件會(huì)不會(huì)丟消息
前面 2 個(gè)問題都比較好處理,只要客戶端和服務(wù)端配合好,就能保證生產(chǎn)端、消費(fèi)端都不丟消息。
但是,如果隊(duì)列中間件本身就不可靠呢?
畢竟生產(chǎn)者和消費(fèi)這都依賴它,如果它不可靠,那么生產(chǎn)者和消費(fèi)者無論怎么做,都無法保證數(shù)據(jù)不丟。
在這個(gè)方面,Redis
其實(shí)沒有達(dá)到要求。
Redis
在以下 2 個(gè)場景下,都會(huì)導(dǎo)致數(shù)據(jù)丟失。
AOF
持久化配置為每秒寫盤,但這個(gè)寫盤過程是異步的,Redis
宕機(jī)時(shí)會(huì)存在數(shù)據(jù)丟失的可能
主從復(fù)制也是異步的,主從切換時(shí),也存在丟失數(shù)據(jù)的可能(從庫還未同步完成主庫發(fā)來的數(shù)據(jù),就被提成主庫)
基于以上原因我們可以看到,Redis
本身的無法保證嚴(yán)格的數(shù)據(jù)完整性。
所以,如果把 Redis
當(dāng)做消息隊(duì)列,在這方面是有可能導(dǎo)致數(shù)據(jù)丟失的。
再來看那些專業(yè)的消息隊(duì)列中間件是如何解決這個(gè)問題的?
像 RabbitMQ
或 Kafka
這類專業(yè)的隊(duì)列中間件,在使用時(shí),一般是部署一個(gè)集群,生產(chǎn)者在發(fā)布消息時(shí),隊(duì)列中間件通常會(huì)寫多個(gè)節(jié)點(diǎn)
,
以此保證消息的完整性。這樣一來,即便其中一個(gè)節(jié)點(diǎn)掛了,也能保證集群的數(shù)據(jù)不丟失。
也正因?yàn)槿绱耍?code>RabbitMQ、Kafka在設(shè)計(jì)時(shí)也更復(fù)雜。畢竟,它們是專門針對(duì)隊(duì)列場景設(shè)計(jì)的。
但 Redis
的定位則不同,它的定位更多是當(dāng)作緩存來用,它們兩者在這個(gè)方面肯定是存在差異的。
1.4.4 消息積壓怎么辦
因?yàn)?nbsp;Redis
的數(shù)據(jù)都存儲(chǔ)在內(nèi)存中,這就意味著一旦發(fā)生消息積壓,則會(huì)導(dǎo)致 Redis
的內(nèi)存持續(xù)增長,如果超過機(jī)器內(nèi)存上限,就會(huì)面臨被 OOM
的風(fēng)險(xiǎn)。
所以,Redis
的 Stream
提供了可以指定隊(duì)列最大長度的功能,就是為了避免這種情況發(fā)生。
但 Kafka、RabbitMQ
這類消息隊(duì)列就不一樣了,它們的數(shù)據(jù)都會(huì)存儲(chǔ)在磁盤上,磁盤的成本要比內(nèi)存小得多,當(dāng)消息積壓時(shí),無非就是多占用一些磁盤空間,相比于內(nèi)存,在面對(duì)積壓時(shí)也會(huì)更加坦然
綜上,我們可以看到,把 Redis
當(dāng)作隊(duì)列來使用時(shí),始終面臨的 2 個(gè)問題:
Redis
本身可能會(huì)丟數(shù)據(jù)- 面對(duì)消息積壓,
Redis
內(nèi)存資源緊張
到這里,Redis
是否可以用作隊(duì)列,我想這個(gè)答案你應(yīng)該會(huì)比較清晰了。
如果你的業(yè)務(wù)場景足夠簡單,對(duì)于數(shù)據(jù)丟失不敏感,而且消息積壓概率比較小的情況下,把 Redis 當(dāng)作隊(duì)列是完全可以的。
而且,Redis
相比于 Kafka、RabbitMQ
,部署和運(yùn)維也更加輕量。
如果你的業(yè)務(wù)場景對(duì)于數(shù)據(jù)丟失非常敏感,而且寫入量非常大,消息積壓時(shí)會(huì)占用很多的機(jī)器資源,那么我建議你使用專業(yè)的消息隊(duì)列中間件。
總結(jié)
到此這篇關(guān)于消息隊(duì)列中為什么不用redis作為隊(duì)列的文章就介紹到這了,更多相關(guān)消息隊(duì)列不用redis作隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Springboot3+Redis實(shí)現(xiàn)消息隊(duì)列的多種方法小結(jié)
- SpringBoot集成Redisson實(shí)現(xiàn)消息隊(duì)列的示例代碼
- redis?消息隊(duì)列完成秒殺過期訂單處理方法(一)
- 如何使用?redis?消息隊(duì)列完成秒殺過期訂單處理操作(二)
- Redis高階使用消息隊(duì)列分布式鎖排行榜等(高階用法)
- Redis消息隊(duì)列的三種實(shí)現(xiàn)方式
- Redis使用ZSET實(shí)現(xiàn)消息隊(duì)列的項(xiàng)目實(shí)踐
- Redis使用ZSET實(shí)現(xiàn)消息隊(duì)列使用小結(jié)
- python使用redis實(shí)現(xiàn)消息隊(duì)列(異步)的實(shí)現(xiàn)完整例程
- 詳解Redis Stream做消息隊(duì)列
- 基于Redis實(shí)現(xiàn)消息隊(duì)列的示例代碼
相關(guān)文章
redis中RedissonLock如何實(shí)現(xiàn)等待鎖的
本文主要介紹了redis中RedissonLock如何實(shí)現(xiàn)等待鎖的,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-11-11Redis遍歷海量數(shù)據(jù)集的幾種實(shí)現(xiàn)方法
Redis作為一個(gè)高性能的鍵值存儲(chǔ)數(shù)據(jù)庫,廣泛應(yīng)用于各種場景,包括緩存、消息隊(duì)列、排行榜,本文主要介紹了Redis遍歷海量數(shù)據(jù)集的幾種實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-02-02