Redis 數(shù)據(jù)類型Streams詳解
Redis Streams 是 Redis 5.0 引入的一種新的數(shù)據(jù)類型,它提供了一種強(qiáng)大的日志結(jié)構(gòu)化數(shù)據(jù)存儲(chǔ)方式。Streams 類型非常適合用于構(gòu)建消息隊(duì)列、事件日志以及其他需要持久化和高效處理時(shí)間序列數(shù)據(jù)的應(yīng)用場(chǎng)景。
1 基本特性
- 持久性:與傳統(tǒng)的發(fā)布/訂閱不同,Streams 中的消息是持久化的,即使客戶端斷開(kāi)連接后重新連接,仍然可以訪問(wèn)到之前的消息。
- 多消費(fèi)者支持:支持多個(gè)消費(fèi)者組(consumer groups),每個(gè)組可以獨(dú)立地消費(fèi)流中的消息。消費(fèi)者組允許不同的消費(fèi)者處理相同的消息,但每個(gè)消息在一個(gè)組內(nèi)只能被一個(gè)消費(fèi)者處理一次。
- 消息 ID 和范圍查詢:每條消息都有一個(gè)唯一的 ID,由時(shí)間戳和序列號(hào)組成??梢酝ㄟ^(guò)指定消息 ID 范圍來(lái)獲取特定時(shí)間段內(nèi)的消息。
- 阻塞讀?。褐С肿枞x?。?code>XREAD 和
XREADGROUP
命令的BLOCK
選項(xiàng)),使得客戶端可以在沒(méi)有新消息時(shí)等待一段時(shí)間。 - 自動(dòng)刪除:可以設(shè)置最大長(zhǎng)度(
MAXLEN
選項(xiàng))來(lái)限制流的大小,超過(guò)長(zhǎng)度的消息會(huì)自動(dòng)被刪除。 - 靈活的消息格式:每條消息可以包含多個(gè)字段-值對(duì),類似于哈希表,這使得消息可以攜帶豐富的信息。
2 主要操作命令
2.1 XADD key ID field value [field value ...]
向指定的流中添加一條新消息,ID 可以是 *(表示自動(dòng)生成)或指定的時(shí)間戳和序列號(hào)。
127.0.0.1:6379> xadd mystream * sensor_id 123 temmperature 22.5 "1729306027171-0"
返回的結(jié)構(gòu)可以分為兩部分:
- 時(shí)間戳:
1729306027171 (
表示條目被添加的時(shí)間,單位是毫秒。你可以將這個(gè)時(shí)間戳轉(zhuǎn)換為可讀的日期和時(shí)間格式。)
- 序列號(hào):
0 (
表示在同一毫秒內(nèi)這是第一個(gè)條目。如果在同一毫秒內(nèi)添加了多個(gè)條目,序列號(hào)將會(huì)遞增,例如1729306027171-1
、1729306027171-2
等。)
2.2 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
- 從一個(gè)或多個(gè) Stream 中讀取數(shù)據(jù)。
COUNT
指定返回的最大條目數(shù)。BLOCK
指定在沒(méi)有新消息時(shí)阻塞的時(shí)間(毫秒)。STREAMS
指定要讀取的 Stream 和起始 ID。
COUNT
指定返回的最大條目數(shù)。BLOCK
指定在沒(méi)有新消息時(shí)阻塞的時(shí)間(毫秒)。STREAMS
指定要讀取的 Stream 和起始 ID。
127.0.0.1:6379> xread count 2 streams mystream 0-0 1) 1) "mystream" 2) 1) 1) "1729306027171-0" 2) 1) "sensor_id" 2) "123" 3) "temmperature" 4) "22.5"
2.3 XRANGE key start end [COUNT count]
- 返回指定 ID 范圍內(nèi)的條目。
start
和end
是 ID,可以使用-
表示最小 ID,+
表示最大 ID。
127.0.0.1:6379> xrange mystream - + 1) 1) "1729306027171-0" 2) 1) "sensor_id" 2) "123" 3) "temmperature" 4) "22.5"
2.4 XREVRANGE key end start [COUNT count]
返回指定 ID 范圍內(nèi)的條目,但按逆序排列。
127.0.0.1:6379> xadd mystream * sensor_id 234 temmperature 23.5 "1729329067777-0" 127.0.0.1:6379> xadd mystream * sensor_id 345 "1729329079135-0" 127.0.0.1:6379> xrevrange mystream + - count 2 1) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345" 2) 1) "1729329067777-0" 2) 1) "sensor_id" 2) "234" 3) "temmperature" 4) "23.5"
2.5 XGROUP CREATE key groupname id-or-$ [MKSTREAM]
- 創(chuàng)建一個(gè)新的消費(fèi)者組。
id-or-$
是起始位置,可以是具體的 ID 或$
表示只消費(fèi)新的條目。MKSTREAM
如果 Stream 不存在則創(chuàng)建它。
127.0.0.1:6379> xgroup create mystream mygroup 0 OK
2.6 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- 從消費(fèi)者組中讀取數(shù)據(jù)。
- GROUP:指定消費(fèi)者組的名稱。
- consumer:指定消費(fèi)者的名稱。
- COUNT count:可選參數(shù),指定一次最多讀取的消息數(shù)量。
- BLOCK milliseconds:可選參數(shù),如果當(dāng)前沒(méi)有可用的消息,命令將阻塞指定的時(shí)間(以毫秒為單位),等待新消息的到來(lái)。
- NOACK: 表示不確認(rèn)消息,通常用于快速消費(fèi)。
- STREAMS:指定要讀取的流及其對(duì)應(yīng)的 ID。
- ID 通常是一個(gè)特殊值
>
,表示只讀取新的消息;也可以是具體的 ID,表示從該 ID 開(kāi)始讀取。
127.0.0.1:6379> xreadgroup group mygroup consumer1 count 2 streams mystream > 1) 1) "mystream" 2) 1) 1) "1729306027171-0" 2) 1) "sensor_id" 2) "123" 3) "temmperature" 4) "22.5" 2) 1) "1729329067777-0" 2) 1) "sensor_id" 2) "234" 3) "temmperature" 4) "23.5"
2.7 XACK key group ID [ID ...]
確認(rèn)已處理的消息。XACK
命令用于確認(rèn)消費(fèi)者組中的消息已經(jīng)被成功處理。當(dāng)你使用 XACK
命令時(shí),Redis 會(huì)將指定的消息從“待處理”狀態(tài)轉(zhuǎn)換為“已確認(rèn)”狀態(tài),并從消費(fèi)者的待處理列表中移除。
127.0.0.1:6379> xack mystream mygroup 1729329067777-0 (integer) 1 127.0.0.1:6379> xack mystream mygroup 1729329079135-0 (integer) 0
當(dāng) XACK 命令成功確認(rèn)一條消息時(shí),返回值為 1,表示該消息已經(jīng)被確認(rèn)并且從待處理列表中移除。例如,如果消息 1729329067777-0 是由 consumer1 處理的,并且現(xiàn)在調(diào)用 XACK 確認(rèn)它,那么這條消息將不再出現(xiàn)在 consumer1 的待處理列表中。
2.8 XPENDING key group [start end count] [IDLE idle]
查看待處理的消息。
127.0.0.1:6379> xpending mystream mygroup 1) (integer) 1 2) "1729306027171-0" 3) "1729306027171-0" 4) 1) 1) "consumer1" 2) "1" 127.0.0.1:6379> xack mystream mygroup 1729306027171-0 (integer) 1 127.0.0.1:6379> xpending mystream mygroup 1) (integer) 0 2) (nil) 3) (nil) 4) (nil)
2.9 XCLAIM key group consumer min-idle-time ID [ID ...] [IDLE idle] [TIME time] [RETries count] [FORCE]
用于將一個(gè)或多個(gè)消息從一個(gè)消費(fèi)者轉(zhuǎn)移到另一個(gè)消費(fèi)者。這個(gè)命令通常用于處理消息超時(shí)或重新分配消息的情況。XCLAIM
允許你手動(dòng)將消息從一個(gè)消費(fèi)者的待處理列表移動(dòng)到另一個(gè)消費(fèi)者的待處理列表。
127.0.0.1:6379> xreadgroup group mygroup consumer1 count 2 streams mystream > 1) 1) "mystream" 2) 1) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345" 127.0.0.1:6379> xclaim mystream mygroup consumer2 10000 1729329079135-0 1) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345"
- mystream:流的名稱。mygroup:消費(fèi)者組的名稱。
- consumer2:目標(biāo)消費(fèi)者的名稱,即消息將被轉(zhuǎn)移給這個(gè)消費(fèi)者。
- 10000:消息的空閑時(shí)間(以毫秒為單位)。只有那些空閑時(shí)間超過(guò)這個(gè)值的消息才會(huì)被轉(zhuǎn)移。
- 1729329079135-0:要轉(zhuǎn)移的消息 ID。
2.10 XINFO
獲取 Stream 或消費(fèi)者組的信息。
127.0.0.1:6379> xinfo stream mystream 1) "length" 2) (integer) 3 3) "radix-tree-keys" 4) (integer) 1 5) "radix-tree-nodes" 6) (integer) 2 7) "groups" 8) (integer) 1 9) "last-generated-id" 10) "1729329079135-0" 11) "first-entry" 12) 1) "1729306027171-0" 2) 1) "sensor_id" 2) "123" 3) "temmperature" 4) "22.5" 13) "last-entry" 14) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345" 127.0.0.1:6379> xinfo groups mystream 1) 1) "name" 2) "mygroup" 3) "consumers" 4) (integer) 2 5) "pending" 6) (integer) 1 7) "last-delivered-id" 8) "1729329079135-0" 127.0.0.1:6379> xinfo consumers mystream mygroup 1) 1) "name" 2) "consumer1" 3) "pending" 4) (integer) 0 5) "idle" 6) (integer) 255317 2) 1) "name" 2) "consumer2" 3) "pending" 4) (integer) 1 5) "idle" 6) (integer) 191940
XINFO STREAM mystream
length
:
流中的消息總數(shù):3 條。
radix-tree-keys
:
用于存儲(chǔ)流數(shù)據(jù)的 radix tree 中的鍵的數(shù)量:1 個(gè)。
radix-tree-nodes
:
用于存儲(chǔ)流數(shù)據(jù)的 radix tree 中的節(jié)點(diǎn)數(shù)量:2 個(gè)。
groups
:
與該流關(guān)聯(lián)的消費(fèi)者組數(shù)量:1 個(gè)。
last-generated-id
:
流中最后生成的消息 ID:1729329079135-0
。
first-entry
:
流中的第一條消息: 消息 ID: 1729306027171-0
消息內(nèi)容: sensor_id
: 123
temmperature
: 22.5
last-entry
:
流中的最后一條消息: 消息 ID: 1729329079135-0
消息內(nèi)容: sensor_id
: 345
XINFO GROUPS mystream
name
:
消費(fèi)者組的名稱:mygroup
。
consumers
:
該組中的消費(fèi)者數(shù)量:2 個(gè)。
pending
:
該組中待處理的消息數(shù)量:1 條。
last-delivered-id
:
該組中最后一個(gè)被交付的消息 ID:1729329079135-0
。
XINFO CONSUMERS mystream mygroup
第一個(gè)消費(fèi)者:
第一個(gè)消費(fèi)者:
name
:consumer1
pending
: 待處理的消息數(shù)量:0 條idle
: 空閑時(shí)間(以毫秒為單位):255,317 毫秒(約 4 分鐘 15 秒)
第二個(gè)消費(fèi)者:
name
:consumer2
pending
: 待處理的消息數(shù)量:1 條idle
: 空閑時(shí)間(以毫秒為單位):191,940 毫秒(約 3 分鐘 12 秒)
2.11 XDEL key ID [ID ...]
從 Stream 中刪除一個(gè)或多個(gè)條目。
127.0.0.1:6379> xdel mystream 1729306027171-0 (integer) 1 127.0.0.1:6379> xrange mystrea - + (empty list or set) 127.0.0.1:6379> xrange mystream - + 1) 1) "1729329067777-0" 2) 1) "sensor_id" 2) "234" 3) "temmperature" 4) "23.5" 2) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345"
2.12 XTRIM key MAXLEN [~] len
修剪 Stream,保留最多 len 個(gè)條目,~ 表示近似長(zhǎng)度。
127.0.0.1:6379> xrange mystream - + 1) 1) "1729329067777-0" 2) 1) "sensor_id" 2) "234" 3) "temmperature" 4) "23.5" 2) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345" 127.0.0.1:6379> xtrim mystream maxlen 1 (integer) 1 127.0.0.1:6379> xrange mystream - + 1) 1) "1729329079135-0" 2) 1) "sensor_id" 2) "345"
3 使用場(chǎng)景
- 日志記錄:可以用來(lái)存儲(chǔ)系統(tǒng)的日志信息,方便后續(xù)分析和處理。
- 事件流:處理實(shí)時(shí)事件,如傳感器數(shù)據(jù)、用戶行為等。
- 消息隊(duì)列:實(shí)現(xiàn)可靠的消息傳遞系統(tǒng),支持多個(gè)消費(fèi)者組。
- 任務(wù)隊(duì)列:管理后臺(tái)任務(wù),確保任務(wù)被正確處理。
更多命令請(qǐng)參考:Commands | Docs
到此這篇關(guān)于Redis 數(shù)據(jù)類型Streams的文章就介紹到這了,更多相關(guān)Redis 數(shù)據(jù)類型Streams內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis的setNX分布式鎖超時(shí)時(shí)間失效 -1問(wèn)題及解決
這篇文章主要介紹了Redis的setNX分布式鎖超時(shí)時(shí)間失效 -1問(wèn)題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-01-01RedisTemplate集成+封裝RedisUtil過(guò)程
本文介紹了如何搭建一個(gè)多模塊的Redis項(xiàng)目,包括項(xiàng)目搭建、配置和測(cè)試,通過(guò)使用父項(xiàng)目管理多個(gè)子模塊,可以實(shí)現(xiàn)單點(diǎn)構(gòu)建、統(tǒng)一版本管理和清晰的項(xiàng)目結(jié)構(gòu),文章還提供了在Spring Boot項(xiàng)目中集成RedisTemplate的示例,并解決了編碼問(wèn)題2024-12-12將音頻文件轉(zhuǎn)二進(jìn)制分包存儲(chǔ)到Redis的實(shí)現(xiàn)方法(奇淫技巧操作)
這篇文章主要介紹了將音頻文件轉(zhuǎn)二進(jìn)制分包存儲(chǔ)到Redis的實(shí)現(xiàn)方法(奇淫技巧操作),本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07redis主從切換導(dǎo)致的數(shù)據(jù)丟失與陷入只讀狀態(tài)故障解決方案
這篇文章主要介紹了redis主從切換導(dǎo)致的數(shù)據(jù)丟失與陷入只讀狀態(tài)故障解決方案的相關(guān)資料,需要的朋友可以參考下2023-05-05詳解redis在微服務(wù)領(lǐng)域的貢獻(xiàn)
本文以dubbo為例看下redis是如何利用自身特性來(lái)完成注冊(cè)中心的功能,對(duì)redis微服務(wù)相關(guān)知識(shí)感興趣的朋友一起看看吧2021-10-10