Redis?使用?List?實(shí)現(xiàn)消息隊(duì)列的優(yōu)缺點(diǎn)
分布式系統(tǒng)中必備的一個(gè)中間件就是消息隊(duì)列,通過消息隊(duì)列我們能對服務(wù)間進(jìn)行異步解耦、流量消峰、實(shí)現(xiàn)最終一致性。
目前市面上已經(jīng)有 RabbitMQ、RochetMQ、ActiveMQ、Kafka
等,有人會問:“Redis 適合做消息隊(duì)列么?”
在回答這個(gè)問題之前,我們先從本質(zhì)思考:
- 消息隊(duì)列提供了什么特性?
- Redis 如何實(shí)現(xiàn)消息隊(duì)列?是否滿足存取需求?
今天,碼哥結(jié)合消息隊(duì)列的特點(diǎn)一步步帶大家分析使用 Redis 的 List 作為消息隊(duì)列的實(shí)現(xiàn)原理,并分享如何把 SpringBoot 與 Redission 整合運(yùn)用到項(xiàng)目中。
什么是消息隊(duì)列
消息隊(duì)列是一種異步的服務(wù)間通信方式,適用于分布式和微服務(wù)架構(gòu)。消息在被處理和刪除之前一直存儲在隊(duì)列上。
每條消息僅可被一位用戶處理一次。消息隊(duì)列可被用于分離重量級處理、緩沖或批處理工作以及緩解高峰期工作負(fù)載。
- Producer:消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生和發(fā)送消息到 Broker;
- Broker:消息處理中心。負(fù)責(zé)消息存儲、確認(rèn)、重試等,一般其中會包含多個(gè) queue;
- Consumer:消息消費(fèi)者,負(fù)責(zé)從 Broker 中獲取消息,并進(jìn)行相應(yīng)處理;
- 消息隊(duì)列的使用場景有哪些呢?
- 消息隊(duì)列在實(shí)際應(yīng)用中包括如下四個(gè)場景:
- 應(yīng)用耦合:發(fā)送方、接收方系統(tǒng)之間不需要了解雙方,只需要認(rèn)識消息。多應(yīng)用間通過消息隊(duì)列對同一消息進(jìn)行處理,避免調(diào)用接口失敗導(dǎo)致整個(gè)過程失??;
- 異步處理:多應(yīng)用對消息隊(duì)列中同一消息進(jìn)行處理,應(yīng)用間并發(fā)處理消息,相比串行處理,減少處理時(shí)間;
- 限流削峰:廣泛應(yīng)用于秒殺或搶購活動(dòng)中,避免流量過大導(dǎo)致應(yīng)用系統(tǒng)掛掉的情況;
- 消息驅(qū)動(dòng)的系統(tǒng):系統(tǒng)分為消息隊(duì)列、消息生產(chǎn)者、消息消費(fèi)者,生產(chǎn)者負(fù)責(zé)產(chǎn)生消息,消費(fèi)者(可能有多個(gè))負(fù)責(zé)對消息進(jìn)行處理;
消息隊(duì)列滿足哪些特性
消息有序性
消息是異步處理的,但是消費(fèi)者需要按照生產(chǎn)者發(fā)送消息的順序來消費(fèi),避免出現(xiàn)后發(fā)送的消息被先處理的情況。
重復(fù)消息處理
生產(chǎn)者可能因?yàn)榫W(wǎng)絡(luò)問題出現(xiàn)消息重傳導(dǎo)致消費(fèi)者可能會收到多條重復(fù)消息。
同樣的消息重復(fù)多次的話可能會造成一業(yè)務(wù)邏輯多次執(zhí)行,需要確保如何避免重復(fù)消費(fèi)問題。
可靠性
一次保證消息的傳遞。如果發(fā)送消息時(shí)接收者不可用,消息隊(duì)列會保留消息,直到成功地傳遞它。
當(dāng)消費(fèi)者重啟后,可以繼續(xù)讀取消息進(jìn)行處理,防止消息遺漏。
List 實(shí)現(xiàn)消息隊(duì)列
Redis 的列表(List)是一種線性的有序結(jié)構(gòu),可以按照元素被推入列表中的順序來存儲元素,能滿足「先進(jìn)先出」的需求,這些元素既可以是文字?jǐn)?shù)據(jù),又可以是二進(jìn)制數(shù)據(jù)。
LPUSH
生產(chǎn)者使用 LPUSH key element[element...]
將消息插入到隊(duì)列的頭部,如果 key 不存在則會創(chuàng)建一個(gè)空的隊(duì)列再插入消息。
如下,生產(chǎn)者向隊(duì)列 queue 先后插入了 「Java」「碼哥字節(jié)」「Go」,返回值表示消息插入隊(duì)列后的個(gè)數(shù)。
> LPUSH queue Java 碼哥字節(jié) Go (integer) 3
RPOP
消費(fèi)者使用 RPOP key
依次讀取隊(duì)列的消息,先進(jìn)先出,所以 「Java」會先讀取消費(fèi):
> RPOP queue "Java" > RPOP queue "碼哥字節(jié)" > RPOP queue "Go"
實(shí)時(shí)消費(fèi)問題
65 哥:這么簡單就實(shí)現(xiàn)了么?
別高興的太早, LPUSH、RPOP
存在一個(gè)性能風(fēng)險(xiǎn),生產(chǎn)者向隊(duì)列插入數(shù)據(jù)的時(shí)候,List 并不會主動(dòng)通知消費(fèi)者及時(shí)消費(fèi)。
我們需要寫一個(gè) while(true)
不停地調(diào)用 RPOP
指令,當(dāng)有新消息就會返回消息,否則返回空。
程序需要不斷輪詢并判斷是否為空再執(zhí)行消費(fèi)邏輯,這就會導(dǎo)致即使沒有新消息寫入到隊(duì)列,消費(fèi)者也要不停地調(diào)用 RPOP
命令占用 CPU
資源。
65 哥:要如何避免循環(huán)調(diào)用導(dǎo)致的 CPU 性能損耗呢?
Redis 提供了 BLPOP、BRPOP
阻塞讀取的命令, 消費(fèi)者在在讀取隊(duì)列沒有數(shù)據(jù)的時(shí)候自動(dòng)阻塞,直到有新的消息寫入隊(duì)列,才會繼續(xù)讀取新消息執(zhí)行業(yè)務(wù)邏輯。
BRPOP queue 0
參數(shù) 0 表示阻塞等待時(shí)間無無限制
重復(fù)消費(fèi)
- 消息隊(duì)列為每一條消息生成一個(gè)「全局 ID」;
- 生產(chǎn)者為每一條消息創(chuàng)建一條「全局 ID」,消費(fèi)者把一件處理過的消息 ID 記錄下來判斷是否重復(fù)。
其實(shí)這就是冪等,對于同一條消息,消費(fèi)者收到后處理一次的結(jié)果和多次的結(jié)果是一致的。
消息可靠性
65 哥:消費(fèi)者從 List 中讀取一條在消息處理過程中宕機(jī)了就會導(dǎo)致消息沒有處理完成,可是數(shù)據(jù)已經(jīng)沒有保存在 List 中了咋辦?
本質(zhì)就是消費(fèi)者在處理消息的時(shí)候崩潰了,就無法再還原消息,缺乏一個(gè)消息確認(rèn)機(jī)制。
Redis 提供了 RPOPLPUSH、BRPOPLPUSH(阻塞)
兩個(gè)指令,含義是從 List 從讀取消息的同時(shí)把這條消息復(fù)制到另一個(gè) List 中(備份),并且是原子操作。
我們就可以在業(yè)務(wù)流程正確處理完成后再刪除隊(duì)列消息實(shí)現(xiàn)消息確認(rèn)機(jī)制。如果在處理消息的時(shí)候宕機(jī)了,重啟后再從備份 List 中讀取消息處理。
LPUSH redisMQ 公眾號 碼哥字節(jié) BRPOPLPUSH redisMQ redisMQBack
生產(chǎn)者用 LPUSH
把消息插入到 redisMQ 隊(duì)列中,消費(fèi)者使用 BRPOPLPUSH
讀取消息「公眾號」,同時(shí)該消息會被插入到 「redisMQBack」隊(duì)列中。
如果消費(fèi)成功則把「redisMQBack」的消息刪除即可,異常的話可以繼續(xù)從 「redisMQBack」再次讀取消息處理。
需要注意的是
如果生產(chǎn)者消息發(fā)送的很快,而消費(fèi)者處理速度慢就會導(dǎo)致消息堆積,給 Redis 的內(nèi)存帶來過大壓力。
Redission 實(shí)戰(zhàn)
在 Java 中,我們可以利用 Redission 封裝的 API 來快速實(shí)現(xiàn)隊(duì)列,接下來碼哥基于 SpringBoot 2.1.4 版本來交大家如何整合并實(shí)戰(zhàn)。
詳細(xì) API 文檔大家可查閱: https://github.com/redisson/redisson/wiki/7.-Distributed-collections
添加依賴
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.16.7</version> </dependency>
添加 Redis 配置,碼哥的 Redis 沒有配置密碼,大家根據(jù)實(shí)際情況配置即可。
spring: application: name: redission redis: host: 127.0.0.1 port: 6379 ssl: false
Java 代碼實(shí)戰(zhàn)
RBlockingDeque 繼承 java.util.concurrent.BlockingDeque
,在使用過程中我們完全可以根據(jù)接口文檔來選擇合適的 API 去實(shí)現(xiàn)業(yè)務(wù)邏輯。
主要方法如下
碼哥采用了雙端隊(duì)列來舉例
@Slf4j @Service public class QueueService { @Autowired private RedissonClient redissonClient; private static final String REDIS_MQ = "redisMQ"; /** * 發(fā)送消息到隊(duì)列頭部 * * @param message */ public void sendMessage(String message) { RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(REDIS_MQ); try { blockingDeque.putFirst(message); log.info("將消息: {} 插入到隊(duì)列。", message); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 從隊(duì)列尾部阻塞讀取消息,若沒有消息,線程就會阻塞等待新消息插入,防止 CPU 空轉(zhuǎn) */ public void onMessage() { RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(REDIS_MQ); while (true) { try { String message = blockingDeque.takeLast(); log.info("從隊(duì)列 {} 中讀取到消息:{}.", REDIS_MQ, message); } catch (InterruptedException e) { e.printStackTrace(); } } }
單元測試
@RunWith(SpringRunner.class) @SpringBootTest(classes = RedissionApplication.class) public class RedissionApplicationTests { @Autowired private QueueService queueService; @Test public void testQueue() throws InterruptedException { new Thread(() -> { for (int i = 0; i < 1000; i++) { queueService.sendMessage("消息" + i); } }).start(); new Thread(() -> queueService.onMessage()).start(); Thread.currentThread().join(); } }
總結(jié)
可以使用 List 數(shù)據(jù)結(jié)構(gòu)來實(shí)現(xiàn)消息隊(duì)列,滿足先進(jìn)先出。為了實(shí)現(xiàn)消息可靠性,Redis 提供了 BRPOPLPUSH 命令是解決。
Redis 是一個(gè)非常輕量級的鍵值數(shù)據(jù)庫,部署一個(gè) Redis 實(shí)例就是啟動(dòng)一個(gè)進(jìn)程,部署 Redis 集群,也就是部署多個(gè) Redis 實(shí)例。
而 Kafka、RabbitMQ 部署時(shí),涉及額外的組件,例如 Kafka 的運(yùn)行就需要再部署 ZooKeeper。相比 Redis 來說,Kafka 和 RabbitMQ 一般被認(rèn)為是重量級的消息隊(duì)列。
需要注意的是,我們要避免生產(chǎn)者過快,消費(fèi)者過慢導(dǎo)致的消息堆積占用 Redis 的內(nèi)存。
在消息量不大的情況下使用 Redis 作為消息隊(duì)列,他能給我們帶來高性能的消息讀寫,這似乎也是一個(gè)很好消息隊(duì)列解決方案。
到此這篇關(guān)于Redis 使用 List 實(shí)現(xiàn)消息隊(duì)列有哪些利弊的文章就介紹到這了,更多相關(guān)redis list消息隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
redis?for?windows?6.2.6安裝包最新步驟詳解
這篇文章主要介紹了redis?for?windows?6.2.6安裝包全網(wǎng)首發(fā),使用Windows計(jì)劃任務(wù)自動(dòng)運(yùn)行redis服務(wù),文章給大家講解的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-04-04淺析Redis中紅鎖RedLock的實(shí)現(xiàn)原理
RedLock?是一種分布式鎖的實(shí)現(xiàn)算法,由?Redis?的作者?Salvatore?Sanfilippo(也稱為?Antirez)提出,本文主要為大家詳細(xì)介紹了紅鎖RedLock的實(shí)現(xiàn)原理,感興趣的可以了解下2024-02-02Redis在項(xiàng)目中的使用(JedisPool方式)
項(xiàng)目操作redis是使用的RedisTemplate方式,另外還可以完全使用JedisPool和Jedis來操作redis,本文給大家介紹Redis在項(xiàng)目中的使用,JedisPool方式,感興趣的朋友跟隨小編一起看看吧2021-12-12redis的五大數(shù)據(jù)類型應(yīng)用場景分析
這篇文章主要介紹了redis的五大數(shù)據(jù)類型實(shí)現(xiàn)原理,本文給大家分享五大數(shù)據(jù)類型的應(yīng)用場景分析,需要的朋友可以參考下2021-08-08Redis 的查詢很快的原因解析及Redis 如何保證查詢的高效
由于redis是內(nèi)存數(shù)據(jù)庫,歸功于它的數(shù)據(jù)結(jié)構(gòu)所以查詢效率非常高,今天通過本文給大家介紹下Redis 的查詢很快的原因解析及Redis 如何保證查詢的高效,感興趣的朋友一起看看吧2022-03-03深入解析Redis的LRU與LFU算法實(shí)現(xiàn)
這篇文章主要重點(diǎn)介紹了Redis的LRU與LFU算法實(shí)現(xiàn),并分析總結(jié)了兩種算法的實(shí)現(xiàn)效果以及存在的問題,并闡述其優(yōu)劣特性,感興趣的小伙伴跟著小編一起來看看吧2023-07-07