如何使用redis的stream數(shù)據(jù)類型做消息隊列
在redis5.0之前,如果想使用它作為簡單的消息隊列,最好的選擇就是自身提供的pub/sub模式.它支持簡單的發(fā)布/訂閱模式,發(fā)布一個channel綁定一條消息,然后可以有多個消費者監(jiān)聽這個channel,每個消費者都能收到相同的消息。不支持持久化,不支持查詢,不支持分組,不支持分片消費,也沒有提供很好的監(jiān)控手段(有簡單的pubsub容器命令,可以看有哪些channel,訂閱者數(shù)量等)。但是5.0之后,倘若我們?nèi)巳赃x擇redis作為簡單消息隊列,就可以使用新的數(shù)據(jù)類型STREAM
STREAM數(shù)據(jù)類型介紹
數(shù)據(jù)類型基礎(chǔ)說明
- 可以理解為一個有時間序列的一組數(shù)據(jù)集合,每一條新增的數(shù)據(jù)都是追加到數(shù)據(jù)集末尾,每一條數(shù)據(jù)都有自己的唯一id
- 底層數(shù)據(jù)結(jié)構(gòu)是基數(shù)樹
- 一個Stream可以有多個消費者分組group,每一個group也可以有多個消費者consumer,支持分片讀取,全部讀取,按照ID分段讀取
- 隨機訪問時間復(fù)雜度是O(1),向流中添加一個條目的時間為O(1)。 訪問任意一項的時間為O(n),其中n是ID的長度.
常用命令及詳解
XADD 向指定的 Stream 添加一條新消息。
XADD key [MAXLEN [~] count] * field1 value1 [field2 value2 ...]參數(shù)說明:
key:Stream 的名稱。
MAXLEN [~] count:可選,限制 Stream 最大長度,超出自動裁剪最老消息。~ 表示近似修剪,性能更優(yōu)?!緦嶋H上使用要注意,超過最大值直接丟棄,也就是“消失了“】
*:讓 Redis 自動生成消息ID,也可自定義ID。
field value:消息體的鍵值對。
用法舉例:XADD mystream * name Alice age 20XRANGE 按ID范圍讀取 Stream 中的消息
XRANGE key start end [COUNT count]參數(shù)說明:
start、end:起止ID,- 表示最小ID,+ 表示最大ID。
COUNT:可選,限制返回條數(shù)。
用法舉例:XRANGE mystream - + # 讀取所有消息XREAD 從一個或多個 Stream 讀取新消息,可阻塞等待
XREAD [BLOCK milliseconds] STREAMS key [key ...] id [id ...]參數(shù)說明:
BLOCK:可選,阻塞等待新消息的毫秒數(shù)。
STREAMS:后面跟 Stream 名稱和起始ID。
用法舉例:XREAD BLOCK 5000 STREAMS mystream $$ 表示只讀新消息XGROUP 創(chuàng)建、刪除、管理 Stream 的消費者組。
XGROUP CREATE mystream mygroup 0-0 MKSTREAM常用子命令:- 創(chuàng)建組:
XGROUP CREATE mystream mygroup 0-0 MKSTREAM0-0:從頭消費;$:只消費新消息。
MKSTREAM:Stream 不存在時自動創(chuàng)建 - 刪除組:
XGROUP DESTROY mystream mygroup - 創(chuàng)建消費者、刪除消費者。一般不需要,會自動創(chuàng)建
XGROUP CREATECONSUMER mystream mygroup consumer-1XGROUP CREATECONSUMER mystream mygroup consumer-1
- 創(chuàng)建組:
XREADGROUP 以消費者組身份讀取消息,實現(xiàn)分布式并發(fā)消費
XREADGROUP GROUP group consumer [BLOCK milliseconds] STREAMS key [key ...] id [id ...]參數(shù)說明:
GROUP group consumer:指定組名和消費者名。
id:> 表示只讀未分配的新消息,其他ID(如0)可用于補償pending。
舉例:XREADGROUP GROUP mygroup consumer-1 BLOCK 5000 STREAMS mystream >XPENDING 查看某個組下所有未ack的消息(即已分配但未確認(rèn))注意這里不是消息的快照,它只是存儲消息的ID列表,并不會復(fù)制一份消息內(nèi)容
XPENDING key group [start end count [consumer]]舉例:XPENDING mystream mygroup - + 10
XPENDING mystream mygroup - + 10 consumer-1XACK 用于確認(rèn)消息已被消費,也就是從pending狀態(tài)PEL中移除
舉例:XACK mystream mygroup 1680000000000-0XCLAIM/XAUTOCLAIM 將長時間未ack的pending消息轉(zhuǎn)移到其他消費者/實現(xiàn)自動補償。
舉例:XCLAIM mystream mygroup consumer-2 60000 1680000000000-0XAUTOCLAIM mystream mygroup consumer-2 60000 0-0 COUNT 10XTRIM 限制流的最大長度,自動刪除最老的消息。無論是否被ack的消息,都會被裁減。
語法:XTRIM key MAXLEN [~] count
舉例:XTRIM mystream MAXLEN ~ 1000XDEL 從Stream中刪除指定ID的消息,可以一次刪除多個,用空格隔開即可
XDEL mystream 1680000000000-0
實際使用場景
可用作消息隊列
- 當(dāng)需要一個輕量級的、安全性要求比較低、可靠性不要求那么高的一個消息隊列時,使用stream就很合適,性能也非常不錯,單機能支持每秒幾十萬的寫入
- 典型場景:訂單異步處理、短信/郵件通知、日志收集、任務(wù)分發(fā)等
可以作為事件總線
- 作為事件總線,支撐微服務(wù)間的事件發(fā)布與訂閱,作為事件源(例如,跟蹤用戶操作、點擊等)。
- 例如:用戶注冊事件、支付完成事件等,多個服務(wù)可并發(fā)消費
延遲隊列/死信隊列
- 利用 Stream 的 pending/ack/xclaim 機制實現(xiàn)可靠的延遲消息、死信消息補償。
實時數(shù)據(jù)流處理
- IoT、監(jiān)控、風(fēng)控等場景下,設(shè)備/傳感器數(shù)據(jù)實時寫入 Stream,后端實時消費分析。
- 支持高并發(fā)寫入和多消費者并發(fā)處理
重要說明
關(guān)于持久化和消息刪除
- 消息是默認(rèn)就持久化的,并且并不提供設(shè)置過期時間,那么如果在消息量大且請求量大的情況下,會占用很多內(nèi)存
- 如果在新增消息的時候使用maxlen選項限定了stream的長度,那么一定要考慮使用多個consumer,而且要提供一定的處理機制在某些consumer不可用的時候,將消息XCLAIM到可用的消費者。避免超過限定長度后,丟失消息。
- 不推薦每次消費完成后使用Xdel去刪除,而是采用Xtrim收縮,結(jié)合Xinfo、Xlen等命令定期檢測stream的長度,然后根據(jù)實際情況設(shè)置合理的收縮長度,定期的清理不再使用的消息。因為即使使用Xdel取刪除消息,在當(dāng)前的實現(xiàn)中,直到宏節(jié)點完全為空時才真正回收內(nèi)存
讀取的阻塞和非阻塞
- XRANGE 、XREAD 或 XREADGROUP ,沒有BLOCK選項時,像任何其他Redis命令一樣同步調(diào)用,此時他們就是同步命令;如果加上BLOCK選項就時非阻塞的,等待指定的毫秒直到有可以消費的消息并立即返回
插入的性能
- XADD 非???,如果使用流水線,在普通機器中每秒可以輕松插入50萬到100萬項
- 以下是官網(wǎng)提供的延遲測試結(jié)果:【在這里,我們每次迭代最多處理10k條消息,這意味著 XREADGROUP 的 COUNT 參數(shù)被設(shè)置為10000。這增加了大量的延遲,但為了讓緩慢的消費者能夠跟上消息流,這是必需的。因此,你可以預(yù)期真實世界的延遲要小得多】
Results obtained: 結(jié)果: Processed between 0 and 1 ms -> 74.11% Processed between 1 and 2 ms -> 25.80% Processed between 2 and 3 ms -> 0.06% Processed between 3 and 4 ms -> 0.01% Processed between 4 and 5 ms -> 0.02% 因此,99.9%的請求的延遲<= 2毫秒,異常值仍然非常接近平均值。
- 另外需要注意的是,從Redis 6.2.0版本開始,才增加了 IDLE 選項和獨占范圍間隔,雖然5.0就引入了stream數(shù)據(jù)類型
消費者組
- 何時不需要消費者組:如果你有一個數(shù)據(jù)流和多個客戶端,而且你希望所有客戶端都能收到所有信息,那么你就不需要消費者組。
- 如果你有一個數(shù)據(jù)流和多個客戶端,而且你希望在客戶端之間對數(shù)據(jù)流進(jìn)行分區(qū)或分片,以便每個客戶端都能獲得到達(dá)數(shù)據(jù)流的消息的子集,那么你就需要一個消費者組。
- 當(dāng)使用 XREADGROUP 讀取時,服務(wù)器將記錄哪些消息給到了哪些消費者:消息將存儲在使用者組內(nèi)的 Pending Entries List (PEL) 中,該列表是已傳遞但尚未確認(rèn)的消息 ID 列表。
- 當(dāng)實際場景是:可靠性不是必需的,并且偶爾的消息丟失是可以接受的情況下,可以使用 NOACK 子命令來避免將消息添加到 PEL。這相當(dāng)于在讀取消息時確認(rèn)消息(自動ACK)。
- 使用 XREADGROUP 時,在 STREAMS 選項中指定的 ID 可以是以下兩種之一:
特殊的 > ID,表示消費者只想接收從未發(fā)送給其他消費者的信息。它的意思是,給我新郵件。
任何其他 ID,即 0 或任何其他有效 ID 或不完整 ID(僅毫秒時間部分),都將導(dǎo)致返回發(fā)送命令的用戶的待處理條目,且 ID 大于所提供的 ID。因此,基本上如果 ID 不大于,那么命令將只允許客戶訪問其待處理條目:已向其發(fā)送但尚未確認(rèn)的信息。請注意,在這種情況下,BLOCK 和 NOACK 都會被忽略。
屬于PEL中的消息可以刪除嗎
pending狀態(tài)的消息是可以被刪除的,redis并沒有設(shè)計未確認(rèn)的消息不允許刪除。如果采用xdel刪除消息后,pending列表將仍然保留待消費消息的ID,但是消息內(nèi)容沒有了。因此,在讀取此類PEL條目時,Redis會返回一個空值。
一個stream的一個group多個consumer時如何消費的
1. 分區(qū)/競爭消費(Work Queue 模式)
- 每條消息只會被 group 下的一個消費者消費,不會被所有消費者都消費。
- Redis 會將新消息分配給 group 內(nèi)“空閑”的消費者,實現(xiàn)消息的負(fù)載均衡(輪詢或空閑優(yōu)先,具體是由實現(xiàn)的客戶端決定)。
- 多個消費者并發(fā)時,消息會被“分?jǐn)?rdquo;到各個消費者,每個消息只會被其中一個消費。
- 消息被轉(zhuǎn)XCLAIM到另一個消費者時會增加投遞次數(shù),并發(fā)時投遞次數(shù)、時間戳都會變化,因此也只有一個消費者成功獲取。XPENDING命令就可以看到每個消息被投遞的次數(shù)
2. pending 機制
- 消費者用 XREADGROUP 拉取消息后,消息會進(jìn)入該消費者的 pending(未確認(rèn))列表,直到被 XACK。
- 如果某個消費者掛掉,pending 里的消息可以被其他消費者用 XCLAIM/XAUTOCLAIM 方式“搶救”回來,保證消息最終被消費。
3. 分布式環(huán)境下的存儲
- stream的增加數(shù)據(jù)和其他數(shù)據(jù)類型一樣,都是需要一個唯一的key,然后給key綁定指定數(shù)據(jù)類型的一個或者多個值
- 那也就是說,即使在分布式存儲環(huán)境下,它和其他的key一樣,相同的key的數(shù)據(jù)一定存在同一個分片上(因為redis的分片機制就是按照Key來實現(xiàn)的)
- 實際使用時key的設(shè)置就要相對分散,否則數(shù)據(jù)會傾斜到某些節(jié)點上
x. 如果要“廣播”效果(每個消費者都收到同一條消息),需要每個消費者用不同的 group?;蛘叨紡V播了,就使用PUB/SUB吧,,~~
觀測流
- Redis流和消費者組有不同的方式來觀察正在發(fā)生的事情,比如前面說的XPENDING ,它允許我們檢查在給定時刻正在處理的消息列表,以及它們的空閑時間和交付數(shù)量
- XINFO:這個命令使用子命令來顯示流及其消費者組狀態(tài)的不同信息。例如,XINFO流報告有關(guān)流本身的信息。可以用于Stream、Group、CONSUMERS
- 實際項目中可結(jié)合其他命令,直觀的展示流的各種信息,比如有多少個分組、有哪些分組、有哪些消費者、消費者狀態(tài)、消費進(jìn)度、總條目數(shù)據(jù)等。有了這些信息就可以對消息的可靠性進(jìn)行分析,還能及時發(fā)現(xiàn)資源占用情況,結(jié)合定時任務(wù)等作出具體性能調(diào)整。
更加詳細(xì)stream的細(xì)節(jié)介紹,可以參考官網(wǎng):https://redis.io/docs/latest/develop/data-types/streams
稍后我將具體介紹如何在代碼中使用stream來作為消息隊列。
到此這篇關(guān)于使用redis的stream數(shù)據(jù)類型做消息隊列的文章就介紹到這了,更多相關(guān)redis消息隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis內(nèi)存碎片率調(diào)優(yōu)處理方式
Redis集群因內(nèi)存碎片率超過1.5觸發(fā)告警,分析發(fā)現(xiàn)內(nèi)因與外因?qū)е聝?nèi)存碎片,內(nèi)因為操作系統(tǒng)內(nèi)存分配機制,外因為Redis操作特性,使用Redis內(nèi)置內(nèi)存碎片清理機制可有效降低碎片率,但需注意可能影響性能,建議使用MEMORY命令診斷內(nèi)存使用情況,合理配置參數(shù)以優(yōu)化性能2024-09-09
如何使用docker?compose一鍵部署redis服務(wù)
這篇文章主要介紹了如何使用Docker和docker-compose搭建Redis服務(wù),包括創(chuàng)建安裝目錄、配置文件、啟動服務(wù)、查看狀態(tài)、登錄驗證、連接測試和查看信息等步驟,需要的朋友可以參考下2025-02-02

