Redis消息隊列的三種實現方式
前言
為什么要使用Redis的消息隊列?
成本低,對于RabbitMQ或是Kafka來說,已經是重量級的消息隊列。
Redis的三種實現方式:
- List結構:一種有序的雙向鏈表
- PubSub發(fā)布訂閱:基于點對點的消息模型
- Stream:在Redis5.0之后提供的,比較完善的消息隊列模型
List實現消息隊列
我們可以利用Redis中List的命令LPUSH與RPOP來實現消息的發(fā)送與接收,但是需要注意的是,隊列中沒有消息時,RPOP會返回null,不會向JVM中阻塞隊列一樣進行阻塞并等待消息,因此這里應該使用BRPOP來實現阻塞效果。
優(yōu)點:利用Redis存儲,不受限于JVM內存上限。
基于Redis的持久化機制,數據安全性有保證。
可以滿足消息有序性。
缺點:無法避免消息丟失。
只支持單消費者。
PubSub消息隊列
是Redis2.0版本引入的消息傳遞模型,顧名思義,消費者可以訂閱一個或多個channel,生產者向對應的channel發(fā)送消息后,所有訂閱者都能收到相關信息。
PubSub消息隊列的基本命令
# 訂閱一個或多個頻道 SUBSCRIBE channel [channel] # 向一個頻道發(fā)送消息 PUBLISH channel msg # 訂閱與pattern格式匹配的所有頻道 PSUBSCRIBE pattern [pattern]
優(yōu)點:采用發(fā)布訂閱模式,支持多生產者,多消費者。
缺點:不支持數據持久化。
無法避免消息丟失。
消息堆積有上限,超出時數據丟失。
Stream消息隊列
Stream是Redis5.0之后引入新的數據類型,支持持久化,因此相比于PubSub更加安全,可以通過Stream實現一個功能完善的消息隊列
發(fā)送消息的命令:
XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID filed value[filed value]
命令解釋:
- key:隊列名稱
- NOMKSTREAM:如果隊列不存在,是否自動創(chuàng)建隊列,默認是自動創(chuàng)建
- MAXLEN|MINID [=|~] threshold [LIMIT count]:設置消息隊列的最大消息數量
- *|ID:消息的唯一ID,*代表由Redis自動生成,格式是"時間戳-遞增數字"
- field value:發(fā)送到隊列的消息名稱為Entry
讀取消息的第一種方法
命令如下
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
命令解釋:
- COUNT count:每次讀取消息的最大數量
- BLOCK milliseconds:當沒有消息時進行阻塞,并指定阻塞時長,如果為0則指永久阻塞
- STREAMS key:要從哪個隊列讀取消息
- ID:起始ID,只返回大于該ID的消息,0表示從第一個消息開始讀取,$表示從最新消息開始
XREAD命令的特點:
- 消息可回溯
- 一個消息可以被多個消費者拿到
- 可以阻塞讀取
- 有消息漏讀的風險
讀取消息的第二種方法
將多個消費者劃分到一個組(Consumer Group)當中,監(jiān)聽同一個隊列。特點如下
- 消息分流:隊列中的消息會分流給組內的不同消費者,而不是重復消費,從而加快消息處理的速度
- 消息標識:消費者組會維護一個標示記錄最后一個被處理的消息哪怕消費者宕機重啟,還會從標示之后讀取消息。確保每一個消息都會被消費
- 消息確認:消費者獲取消息后,消息處于pending狀態(tài),并存入一個pending-list。當處理完成后需要通過XACK來確認消息,標記消息為已處理,才會從pending-list移除。
創(chuàng)建消費者組命令:
XGROUP CREATE key groupName ID [MKSTREAM]
命令解釋:
- key:隊列名稱
- groupName:消費者組名稱
- ID:起始ID標識,$代表隊列中最后一個消息,0則代表隊列中第一個消息
- MKSTREAM:隊列不存在時自動創(chuàng)建
# 刪除指定的消費者組 XGROUP DESTORY key groupName # 給指定的消費者組添加消費者 XGROUP CREATECONSUMER key groupName consumername # 刪除消費者組中的指定消費者 XGROUP DELCONSUMER key groupname comsumername
從消費者組中讀取消息
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID[ID ...]
命令解釋:
- group:消費組名稱
- consumer:消費者名稱,如果消費者不存在,會自動創(chuàng)建一個消費者
- count:本次查詢的最大數量
- BLOCK milliseconds:當沒有消息時最長等待時間
- NOACK:無需手動ACK,獲取到消息后自動確認
- STREAMS key:指定隊列名稱
- ID:獲取消息的起始ID。" > "表示從下一個未消費的消息開始。其它則是根據指定id從pending-list中獲取已消費但未確認的消息,例如0,是從pending-list中的第一個消息開始
Group類型的消息隊列特點:
- 消息可回溯
- 可以多消費者爭搶消息
- 可以阻塞讀取
- 沒有消息漏讀風險
- 有消息確認機制,保證消息至少被消費一次
三種實現方式對比
LIST | PubSub | Stream | |
消息持久化 | 支持 | 不支持 | 支持 |
阻塞讀取 | 支持 | 支持 | 支持 |
消息堆積處理 | 受限于內存空間,可以利用多消費者加快處理 | 受限于消費者緩沖區(qū) | 受限于隊列長度,可以利用消費者組提高消費速度,減少堆積 |
消息確認機制 | 不支持 | 不支持 | 支持 |
消息回溯 | 不支持 | 不支持 | 支持 |
到此這篇關于Redis消息隊列的三種實現方式的文章就介紹到這了,更多相關Redis消息隊列 內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
通過docker和docker-compose安裝redis兩種方式詳解
這篇文章主要介紹了通過docker和docker-compose安裝redis的兩種方式,Docker安裝方式包括拉取鏡像、查看本地鏡像、運行容器和測試連接,Docker Compose安裝方式包括目錄結構、配置文件、啟動和關閉容器、檢查啟動情況以及查看CPU和內存使用狀態(tài),需要的朋友可以參考下2024-12-12使用lua+redis解決發(fā)多張券的并發(fā)問題
這篇文章主要介紹了使用lua+redis解決發(fā)多張券的并發(fā)問題,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-01-01使用Jedis線程池returnResource異常注意事項
這篇文章主要介紹了使用Jedis線程池returnResource異常注意事項,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-03-03