SpringBoot使用Redis實現(xiàn)消息隊列的方法小結
使用 Redis 實現(xiàn)消息隊列的幾種方式
Redis 提供了多種方式來實現(xiàn)消息隊列。
Pub/Sub
訂閱發(fā)布模式,發(fā)布者把消息發(fā)布到某個 Channel,該 Channel 的所有訂閱者都會收到消息。但是這種方式最大的問題是 「發(fā)布出去的消息,如果沒有被監(jiān)聽消費,或者消費過程中宕機,那么消息就會永遠丟失」。適合用于臨時通知之類的場景,對于需要保證數(shù)據(jù)不丟失的場景不能使用這種方式。
List
List 是 Redis 提供的一種數(shù)據(jù)類型,底層是鏈表,可以用來實現(xiàn)隊列、棧。
Stream
Stream 是一個由 Redis 5 引入的,功能完善的消息隊列。想必也是 Redis 官方團隊看到太多人拿 Redis 當消息隊列使,于是干脆就在 Redis 上設計出一個類似于 Kafka 的消息隊列。
Steam 支持消費組消費,一條消息只能被消費組中的其中一個消費者消費。支持 「消息確認」、支持 「回溯消費」 還支持把未 ACK(確認)的消息轉移給其他消費者進行重新消費,在進行轉移的時候還會累計消息的轉移次數(shù),當次數(shù)達到一定閾值還沒消費成功,就可以放入死信隊列。
這也是 Redis 種最復雜的一種數(shù)據(jù)類型。如果你真的到了需要使用 Redis Steam 作為消息隊列的地步,那不如直接使用 RabbitMQ 等更加成熟且穩(wěn)定的消息隊列系統(tǒng)。
使用 List 實現(xiàn)可靠的消息隊列
目前來說,這是用得最多的一種方式,適用于大多數(shù)簡單的消息隊列應用場景。List 類型有很多指令,但是作為消息隊列來說用到的只有幾個個:
「LPUSH key element [element ...]
」
把元素插入到 List 的首部,如果 List 不存在,會自動創(chuàng)建。
「BRPOPLPUSH source destination timeout
」
移除并且返回 List (source)尾部的最后一個元素,并且同時會把這個元素插入到另一個 List (destination)的首部。
當 source List 中沒有元素時,Redis 會阻塞連接,直到有其他客戶端向其推送元素或超時。超時時間(秒)為 0 表示永遠不超時。
注意,這個命令是 「原子性」 的,也就是說只要客戶端獲取到了返回的元素,那么這個元素一定就會在 destination List 有備份。這是實現(xiàn)可靠消息隊列的關鍵!
「RPOPLPUSH source destination
」
同上,它是 BRPOPLPUSH
命令的 「非阻塞」 版,如果 List 中沒有元素就會立即返回 null
。
「LREM key count element
」
從 List 中刪除元素,count 的值不同,刪除的方式也不同:
count > 0:從頭到尾開始搜索,刪除與 element 相等的元素,最多刪除 count 個。
count < 0:從尾到頭開始搜索,刪除與 element 相等的元素,最多刪除 count (絕對值)個。
count = 0:刪除所有與元素相等的元素。
「
BLMOVE
和LMOVE
命令」
LMOVE source destination <LEFT | RIGHT> <LEFT | RIGHT>
BLMOVE source destination <LEFT | RIGHT> <LEFT | RIGHT> timeout
從 Redis 6.2.0 開始,
BRPOPLPUSH
和RPOPLPUSH
命令就被聲明為廢棄了,取而代之的是語義更加明確的BLMOVE
和LMOVE
命令。?
BLMOVE
和LMOVE
可以通過參數(shù)指定元素出隊列(source)的方向,和入隊列(destination)的方向,除此以外并無其他區(qū)別。
實現(xiàn)思路
了解了上述幾個命令后,一個簡單易用且可靠的消息隊列就呼之欲出了。
生產(chǎn)者使用
LPUSH
命令往消息隊列生產(chǎn)消息消費者使用
BRPOPLPUSH
命令從隊列消費消息,并且還會在獲取并返回消息的時候把該消息推送到另一個消息隊列,也就是 Pending 隊列,這個隊列中存儲的就是未被消費者 ACK 的消息消費者成功消費完畢后,使用
LREM
命令從 Pending 隊列中刪除這條消息,整個消費過程結束如果消費者在消費過程中出現(xiàn)異常、宕機,那么需要在恢復后從 Pending 隊列中獲取到這條消息,再進行重新消費,從而保證了消息隊列的可靠性,不會丟失消息(可能存在重復消費,需要做好冪等處理)
在 Spring Boot 中實現(xiàn)
首先,創(chuàng)建 Spring Boot 項目,并整合 Redis。
創(chuàng)建一個 OrderConsumer
Bean 模擬從隊列中消費訂單 ID。
package cn.springdoc.demo.consumer; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; @Component public class OrderConsumer implements ApplicationRunner, Runnable { static final Logger log = LoggerFactory.getLogger(OrderConsumer.class); // 消息隊列 final String queue = "queue_orders"; // pending 隊列,即待確認消息的隊列 final String pendingQueue = "pending_queue_orders"; @Autowired StringRedisTemplate stringRedisTemplate; @Override public void run(ApplicationArguments args) throws Exception { // 應用啟動后,創(chuàng)建新的線程來執(zhí)行消費任務 Thread thread = new Thread(this); thread.setName("order-consumer-thread"); thread.start(); } @Override public void run() { while (true) { try { // 1:消費者,從隊列未彈出消息,并推送到 pending 隊列,整個過程是原子性的 // 最多阻塞 5 秒,超過 5 秒后還沒有消息,則返回 null String item = stringRedisTemplate.opsForList().rightPopAndLeftPush(queue, pendingQueue, 5, TimeUnit.SECONDS); if (item == null) { log.info("等待消息 ..."); continue ; } try { // 2:解析為 Long Long orderId = Long.parseLong(item); // 模擬消息消費 log.info("消費消息: {}", orderId); } catch (Exception e) { log.error("消費異常:{}", e.getMessage()); continue; } // 3:消費成功,從 pending 隊列刪除記錄,相當于確認消費 stringRedisTemplate.opsForList().remove(pendingQueue, 0, item); } catch (Exception e) { log.error("隊列監(jiān)聽異常:{}", e.getMessage()); break; } } log.info("退出消費"); } }
OrderConsumer
實現(xiàn)了 ApplicationRunner
接口,在應用就緒后創(chuàng)建新的消費線程進行消費。
stringRedisTemplate.opsForList().rightPopAndLeftPush
方法從 queue
隊列消費一條消息,同時把消息添加到 pendingQueue
隊列。該方法底層調用的正是 brpoplpush
命令,最多阻塞 5 秒,超時后返回 null
。
得到消息后解析為 Long
類型,模擬消費,即輸出到日志。如果消費成功,則調用 stringRedisTemplate.opsForList().remove
方法(底層正是 LREM
命令)從 pendingQueue
隊列中刪除消息。如果消費失敗,失敗的消息會在 pendingQueue
隊列中繼續(xù)存在,不會丟失,可以重新投遞消費或者是人工處理。
測試
啟動應用后,通過 Redis 客戶端往 queue_orders
隊列推送消息:
> lpush queue_orders 10000 "1" > lpush queue_orders 10010 "1" > lpush queue_orders 10011 "1" > lpush queue_orders Nan "1"
往 queue_orders
隊列推送了四條訂單的 ID。注意最后一條消息值是 Nan
,這會導致 Long.parseLong
異常從而導致消費失敗。
服務端輸出日志如下:
[ main] cn.springdoc.demo.DemoApplication : Started DemoApplication in 3.769 seconds (process running for 4.18) [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 等待消息 ... [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 消費消息: 10000 [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 等待消息 ... [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 消費消息: 10010 [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 等待消息 ... [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 消費消息: 10011 [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 消費異常:For input string: "Nan" [consumer-thread] c.springdoc.demo.consumer.OrderConsumer : 等待消息 ...
符合預期,前面三條消息都成功消費,最后一條消息消費失敗。按照設計,這條消費失敗的消息應該在 Pending 隊列 pending_queue_orders
中存在。且應該只有這一條消息,因為其他三條消息都消費成功。
查看 pending_queue_orders
隊列中的所有元素:
> lrange pending_queue_orders 0 -1 1) "Nan"
一切 OK,該隊列中只有 Nan
這條消息,正是消費失敗的那條消息。
此時,你如果想查看一下 Redis 中的所有 key,你會發(fā)現(xiàn)只有 pending_queue_orders
隊列存在:
> keys * 1) "pending_queue_orders"
queue_orders
隊列呢?這是 Redis List
的一個特性,當從 List
中彈出最后一個元素后,Redis 就會刪除這個 List
。queue_orders
中的元素都被彈出了,所以它被刪除了。當再次嘗試往 queue_orders
中壓入消息時,它會自動創(chuàng)建。也就是說 「我們不需要手動預先創(chuàng)建隊列, Redis 會自己創(chuàng)建,也會在合適的時間刪除,而這一切都是線程安全的」。
由于這是線程安全的,所以隊列中的 「一條消息只能被一個消費者(客戶端)進行消費」,這非常適合在分布式或者是集群模式下使用,不必擔心同一條消息被多個消費者消費到。
注意,Pending 隊列中的消息可能存在重復消費的可能。例如,消費者成功消費消息后,在調用 remove
方法從 Pending 隊列中刪除消息時失敗,那么 Pending 隊列中的這條刪除失敗的消息其實已經(jīng)是被成功消費了的,需要在業(yè)務中考慮到!
使用 BLMOVE 和 LMOVE 命令
上文說過,從 Redis 6.2.0 開始 BRPOPLPUSH
和 RPOPLPUSH
命令就被聲明為廢棄了,后續(xù)版本中推薦使用 BLMOVE
和 LMOVE
命令。
目前 StringRedisTemplate
(Spring Boot 3.2.2)并未直接提供與 BLMOVE
和 LMOVE
命令對應的 API 方法,但是可以獲取到底層連接對象來調用 BLMOVE
和 LMOVE
命令。
String item = this.stringRedisTemplate.execute(new RedisCallback<String>() { @Override public String doInRedis(RedisConnection connection) throws DataAccessException { // 調用 bLMove 命令 byte[] ret = connection.listCommands().bLMove(queue.getBytes(), pendingQueue.getBytes(), Direction.RIGHT, Direction.LEFT, 5); return ret == null ? null : new String(ret); } });
Redis 的持久化方式
Redis 是一個內(nèi)存數(shù)據(jù)庫,為了保證數(shù)據(jù)的安全不丟失,它提供了兩種數(shù)據(jù)備份(持久化)方式,即 「RDB」 和 「AOF」。
「RDB」:生成某一時刻的數(shù)據(jù)快照,通過子進程進行備份,數(shù)據(jù)可能不完整(取決于備份周期)。
「AOF」:通過記錄執(zhí)行的指令到文件來實現(xiàn)數(shù)據(jù)備份,相對完整性較高,但是會記錄每一條執(zhí)行命令,性能會有一定影響。
這就需要根據(jù)你的業(yè)務場景來選擇合適的持久化方式,也可以同時配合使用 「RDB」 和 「AOF」 兩種方式,兼顧性能和數(shù)據(jù)安全。
總結
本文介紹了如何在 Spring Boot 中使用 Redis List 的 BRPOPLPUSH
/BLMOVE
命令來實現(xiàn)一個線程安全且可靠的消息隊列。
以上就是SpringBoot使用Redis實現(xiàn)消息隊列的方法小結的詳細內(nèi)容,更多關于SpringBoot Redis消息隊列的資料請關注腳本之家其它相關文章!
相關文章
Mybatis使用JSONObject接收數(shù)據(jù)庫查詢的方法
這篇文章主要介紹了Mybatis使用JSONObject接收數(shù)據(jù)庫查詢,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-12-12Springboot3集成Knife4j的步驟以及使用(最完整版)
這篇文章主要介紹了Springboot3集成Knife4j的步驟以及使用的相關資料,Knife4j是一種增強Swagger的工具,支持黑色主題和更多配置選項,它與swagger-bootstrap-ui相比,提供了更現(xiàn)代的外觀和更多的功能,需要的朋友可以參考下2024-11-11SSH框架網(wǎng)上商城項目第2戰(zhàn)之基本增刪查改、Service和Action的抽取
SSH框架網(wǎng)上商城項目第2戰(zhàn)之基本增刪查改、Service和Action的抽取,感興趣的小伙伴們可以參考一下2016-05-05spring中@autowired、@Qualifier、@Primary注解的使用說明
這篇文章主要介紹了spring中@autowired、@Qualifier、@Primary注解的使用,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11Java和C語言分別實現(xiàn)水仙花數(shù)及拓展代碼
這篇文章主要介紹了分別用Java和C語言實現(xiàn)水仙花數(shù),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-11-11