Redis 數(shù)據(jù)類型Streams詳解
Redis Streams 是 Redis 5.0 引入的一種新的數(shù)據(jù)類型,它提供了一種強(qiáng)大的日志結(jié)構(gòu)化數(shù)據(jù)存儲方式。Streams 類型非常適合用于構(gòu)建消息隊列、事件日志以及其他需要持久化和高效處理時間序列數(shù)據(jù)的應(yīng)用場景。
1 基本特性
- 持久性:與傳統(tǒng)的發(fā)布/訂閱不同,Streams 中的消息是持久化的,即使客戶端斷開連接后重新連接,仍然可以訪問到之前的消息。
- 多消費者支持:支持多個消費者組(consumer groups),每個組可以獨立地消費流中的消息。消費者組允許不同的消費者處理相同的消息,但每個消息在一個組內(nèi)只能被一個消費者處理一次。
- 消息 ID 和范圍查詢:每條消息都有一個唯一的 ID,由時間戳和序列號組成??梢酝ㄟ^指定消息 ID 范圍來獲取特定時間段內(nèi)的消息。
- 阻塞讀?。褐С肿枞x取(
XREAD
和XREADGROUP
命令的BLOCK
選項),使得客戶端可以在沒有新消息時等待一段時間。 - 自動刪除:可以設(shè)置最大長度(
MAXLEN
選項)來限制流的大小,超過長度的消息會自動被刪除。 - 靈活的消息格式:每條消息可以包含多個字段-值對,類似于哈希表,這使得消息可以攜帶豐富的信息。
2 主要操作命令
2.1 XADD key ID field value [field value ...]
向指定的流中添加一條新消息,ID 可以是 *(表示自動生成)或指定的時間戳和序列號。
127.0.0.1:6379> xadd mystream * sensor_id 123 temmperature 22.5 "1729306027171-0"
返回的結(jié)構(gòu)可以分為兩部分:
- 時間戳:
1729306027171 (
表示條目被添加的時間,單位是毫秒。你可以將這個時間戳轉(zhuǎn)換為可讀的日期和時間格式。)
- 序列號:
0 (
表示在同一毫秒內(nèi)這是第一個條目。如果在同一毫秒內(nèi)添加了多個條目,序列號將會遞增,例如1729306027171-1
、1729306027171-2
等。)
2.2 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
- 從一個或多個 Stream 中讀取數(shù)據(jù)。
COUNT
指定返回的最大條目數(shù)。BLOCK
指定在沒有新消息時阻塞的時間(毫秒)。STREAMS
指定要讀取的 Stream 和起始 ID。
COUNT
指定返回的最大條目數(shù)。BLOCK
指定在沒有新消息時阻塞的時間(毫秒)。STREAMS
指定要讀取的 Stream 和起始 ID。
127.0.0.1:6379> xread count 2 streams mystream 0-0 1) 1) "mystream" 2) 1) 1) "1729306027171-0" 2) 1) "sensor_id" 2) "123" 3) "temmperature" 4) "22.5"
2.3 XRANGE key start end [COUNT count]
- 返回指定 ID 范圍內(nèi)的條目。
start
和end
是 ID,可以使用-
表示最小 ID,+
表示最大 ID。
127.0.0.1:6379> xrange mystream - + 1) 1) "1729306027171-0" 2) 1) "sensor_id" 2) "123" 3) "temmperature" 4) "22.5"
2.4 XREVRANGE key end start [COUNT count]
返回指定 ID 范圍內(nèi)的條目,但按逆序排列。
127.0.0.1:6379> xadd mystream * sensor_id 234 temmperature 23.5 "1729329067777-0" 127.0.0.1:6379> xadd mystream * sensor_id 345 "1729329079135-0" 127.0.0.1:6379> xrevrange mystream + - count 2 1) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345" 2) 1) "1729329067777-0" 2) 1) "sensor_id" 2) "234" 3) "temmperature" 4) "23.5"
2.5 XGROUP CREATE key groupname id-or-$ [MKSTREAM]
- 創(chuàng)建一個新的消費者組。
id-or-$
是起始位置,可以是具體的 ID 或$
表示只消費新的條目。MKSTREAM
如果 Stream 不存在則創(chuàng)建它。
127.0.0.1:6379> xgroup create mystream mygroup 0 OK
2.6 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- 從消費者組中讀取數(shù)據(jù)。
- GROUP:指定消費者組的名稱。
- consumer:指定消費者的名稱。
- COUNT count:可選參數(shù),指定一次最多讀取的消息數(shù)量。
- BLOCK milliseconds:可選參數(shù),如果當(dāng)前沒有可用的消息,命令將阻塞指定的時間(以毫秒為單位),等待新消息的到來。
- NOACK: 表示不確認(rèn)消息,通常用于快速消費。
- STREAMS:指定要讀取的流及其對應(yīng)的 ID。
- ID 通常是一個特殊值
>
,表示只讀取新的消息;也可以是具體的 ID,表示從該 ID 開始讀取。
127.0.0.1:6379> xreadgroup group mygroup consumer1 count 2 streams mystream > 1) 1) "mystream" 2) 1) 1) "1729306027171-0" 2) 1) "sensor_id" 2) "123" 3) "temmperature" 4) "22.5" 2) 1) "1729329067777-0" 2) 1) "sensor_id" 2) "234" 3) "temmperature" 4) "23.5"
2.7 XACK key group ID [ID ...]
確認(rèn)已處理的消息。XACK
命令用于確認(rèn)消費者組中的消息已經(jīng)被成功處理。當(dāng)你使用 XACK
命令時,Redis 會將指定的消息從“待處理”狀態(tài)轉(zhuǎn)換為“已確認(rèn)”狀態(tài),并從消費者的待處理列表中移除。
127.0.0.1:6379> xack mystream mygroup 1729329067777-0 (integer) 1 127.0.0.1:6379> xack mystream mygroup 1729329079135-0 (integer) 0
當(dāng) XACK 命令成功確認(rèn)一條消息時,返回值為 1,表示該消息已經(jīng)被確認(rèn)并且從待處理列表中移除。例如,如果消息 1729329067777-0 是由 consumer1 處理的,并且現(xiàn)在調(diào)用 XACK 確認(rèn)它,那么這條消息將不再出現(xiàn)在 consumer1 的待處理列表中。
2.8 XPENDING key group [start end count] [IDLE idle]
查看待處理的消息。
127.0.0.1:6379> xpending mystream mygroup 1) (integer) 1 2) "1729306027171-0" 3) "1729306027171-0" 4) 1) 1) "consumer1" 2) "1" 127.0.0.1:6379> xack mystream mygroup 1729306027171-0 (integer) 1 127.0.0.1:6379> xpending mystream mygroup 1) (integer) 0 2) (nil) 3) (nil) 4) (nil)
2.9 XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE idle] [TIME time] [RETries count] [FORCE]
用于將一個或多個消息從一個消費者轉(zhuǎn)移到另一個消費者。這個命令通常用于處理消息超時或重新分配消息的情況。XCLAIM
允許你手動將消息從一個消費者的待處理列表移動到另一個消費者的待處理列表。
127.0.0.1:6379> xreadgroup group mygroup consumer1 count 2 streams mystream > 1) 1) "mystream" 2) 1) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345" 127.0.0.1:6379> xclaim mystream mygroup consumer2 10000 1729329079135-0 1) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345"
- mystream:流的名稱。mygroup:消費者組的名稱。
- consumer2:目標(biāo)消費者的名稱,即消息將被轉(zhuǎn)移給這個消費者。
- 10000:消息的空閑時間(以毫秒為單位)。只有那些空閑時間超過這個值的消息才會被轉(zhuǎn)移。
- 1729329079135-0:要轉(zhuǎn)移的消息 ID。
2.10 XINFO
獲取 Stream 或消費者組的信息。
127.0.0.1:6379> xinfo stream mystream 1) "length" 2) (integer) 3 3) "radix-tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "groups" 8) (integer) 1 9) "last-generated-id" 10) "1729329079135-0" 11) "first-entry" 12) 1) "1729306027171-0" 2) 1) "sensor_id" 2) "123" 3) "temmperature" 4) "22.5" 13) "last-entry" 14) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345" 127.0.0.1:6379> xinfo groups mystream 1) 1) "name" 2) "mygroup" 3) "consumers" 4) (integer) 2 5) "pending" 6) (integer) 1 7) "last-delivered-id" 8) "1729329079135-0" 127.0.0.1:6379> xinfo consumers mystream mygroup 1) 1) "name" 2) "consumer1" 3) "pending" 4) (integer) 0 5) "idle" 6) (integer) 255317 2) 1) "name" 2) "consumer2" 3) "pending" 4) (integer) 1 5) "idle" 6) (integer) 191940
XINFO STREAM mystream
length
:
流中的消息總數(shù):3 條。
radix-tree-keys
:
用于存儲流數(shù)據(jù)的 radix tree 中的鍵的數(shù)量:1 個。
radix-tree-nodes
:
用于存儲流數(shù)據(jù)的 radix tree 中的節(jié)點數(shù)量:2 個。
groups
:
與該流關(guān)聯(lián)的消費者組數(shù)量:1 個。
last-generated-id
:
流中最后生成的消息 ID:1729329079135-0
。
first-entry
:
流中的第一條消息: 消息 ID: 1729306027171-0
消息內(nèi)容: sensor_id
: 123
temmperature
: 22.5
last-entry
:
流中的最后一條消息: 消息 ID: 1729329079135-0
消息內(nèi)容: sensor_id
: 345
XINFO GROUPS mystream
name
:
消費者組的名稱:mygroup
。
consumers
:
該組中的消費者數(shù)量:2 個。
pending
:
該組中待處理的消息數(shù)量:1 條。
last-delivered-id
:
該組中最后一個被交付的消息 ID:1729329079135-0
。
XINFO CONSUMERS mystream mygroup
第一個消費者:
第一個消費者:
name
:consumer1
pending
: 待處理的消息數(shù)量:0 條idle
: 空閑時間(以毫秒為單位):255,317 毫秒(約 4 分鐘 15 秒)
第二個消費者:
name
:consumer2
pending
: 待處理的消息數(shù)量:1 條idle
: 空閑時間(以毫秒為單位):191,940 毫秒(約 3 分鐘 12 秒)
2.11 XDEL key ID [ID ...]
從 Stream 中刪除一個或多個條目。
127.0.0.1:6379> xdel mystream 1729306027171-0 (integer) 1 127.0.0.1:6379> xrange mystrea - + (empty list or set) 127.0.0.1:6379> xrange mystream - + 1) 1) "1729329067777-0" 2) 1) "sensor_id" 2) "234" 3) "temmperature" 4) "23.5" 2) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345"
2.12 XTRIM key MAXLEN [~] len
修剪 Stream,保留最多 len 個條目,~ 表示近似長度。
127.0.0.1:6379> xrange mystream - + 1) 1) "1729329067777-0" 2) 1) "sensor_id" 2) "234" 3) "temmperature" 4) "23.5" 2) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345" 127.0.0.1:6379> xtrim mystream maxlen 1 (integer) 1 127.0.0.1:6379> xrange mystream - + 1) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345"
3 使用場景
- 日志記錄:可以用來存儲系統(tǒng)的日志信息,方便后續(xù)分析和處理。
- 事件流:處理實時事件,如傳感器數(shù)據(jù)、用戶行為等。
- 消息隊列:實現(xiàn)可靠的消息傳遞系統(tǒng),支持多個消費者組。
- 任務(wù)隊列:管理后臺任務(wù),確保任務(wù)被正確處理。
更多命令請參考:Commands | Docs
到此這篇關(guān)于Redis 數(shù)據(jù)類型Streams的文章就介紹到這了,更多相關(guān)Redis 數(shù)據(jù)類型Streams內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
將音頻文件轉(zhuǎn)二進(jìn)制分包存儲到Redis的實現(xiàn)方法(奇淫技巧操作)
這篇文章主要介紹了將音頻文件轉(zhuǎn)二進(jìn)制分包存儲到Redis的實現(xiàn)方法(奇淫技巧操作),本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-07-07redis主從切換導(dǎo)致的數(shù)據(jù)丟失與陷入只讀狀態(tài)故障解決方案
這篇文章主要介紹了redis主從切換導(dǎo)致的數(shù)據(jù)丟失與陷入只讀狀態(tài)故障解決方案的相關(guān)資料,需要的朋友可以參考下2023-05-05詳解redis在微服務(wù)領(lǐng)域的貢獻(xiàn)
本文以dubbo為例看下redis是如何利用自身特性來完成注冊中心的功能,對redis微服務(wù)相關(guān)知識感興趣的朋友一起看看吧2021-10-10