欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot使用Redis實現(xiàn)消息隊列的方法小結

 更新時間:2024年04月12日 11:53:02   作者:springdoc.cn  
在應用中把Redis當成消息隊列來使用已經(jīng)屢見不鮮了,我想主要原因是當代應用十有八九都會用到 Redis,因此不用再引入其他消息隊列系統(tǒng),而且Redis提供了好幾種實現(xiàn)消息隊列的方法,用起來也簡單,本文給大家介紹了SpringBoot使用Redis實現(xiàn)消息隊列的方法小結

使用 Redis 實現(xiàn)消息隊列的幾種方式

Redis 提供了多種方式來實現(xiàn)消息隊列。

Pub/Sub

訂閱發(fā)布模式,發(fā)布者把消息發(fā)布到某個 Channel,該 Channel 的所有訂閱者都會收到消息。但是這種方式最大的問題是 「發(fā)布出去的消息,如果沒有被監(jiān)聽消費,或者消費過程中宕機,那么消息就會永遠丟失」。適合用于臨時通知之類的場景,對于需要保證數(shù)據(jù)不丟失的場景不能使用這種方式。

List

List 是 Redis 提供的一種數(shù)據(jù)類型,底層是鏈表,可以用來實現(xiàn)隊列、棧。

Stream

Stream 是一個由 Redis 5 引入的,功能完善的消息隊列。想必也是 Redis 官方團隊看到太多人拿 Redis 當消息隊列使,于是干脆就在 Redis 上設計出一個類似于 Kafka 的消息隊列。

Steam 支持消費組消費,一條消息只能被消費組中的其中一個消費者消費。支持 「消息確認」、支持 「回溯消費」 還支持把未 ACK(確認)的消息轉移給其他消費者進行重新消費,在進行轉移的時候還會累計消息的轉移次數(shù),當次數(shù)達到一定閾值還沒消費成功,就可以放入死信隊列。

這也是 Redis 種最復雜的一種數(shù)據(jù)類型。如果你真的到了需要使用 Redis Steam 作為消息隊列的地步,那不如直接使用 RabbitMQ 等更加成熟且穩(wěn)定的消息隊列系統(tǒng)。

使用 List 實現(xiàn)可靠的消息隊列

目前來說,這是用得最多的一種方式,適用于大多數(shù)簡單的消息隊列應用場景。List 類型有很多指令,但是作為消息隊列來說用到的只有幾個個:

LPUSH key element [element ...]

把元素插入到 List 的首部,如果 List 不存在,會自動創(chuàng)建。

BRPOPLPUSH source destination timeout

移除并且返回 List (source)尾部的最后一個元素,并且同時會把這個元素插入到另一個 List (destination)的首部。

當 source List 中沒有元素時,Redis 會阻塞連接,直到有其他客戶端向其推送元素或超時。超時時間(秒)為 0 表示永遠不超時。

注意,這個命令是 「原子性」 的,也就是說只要客戶端獲取到了返回的元素,那么這個元素一定就會在 destination List 有備份。這是實現(xiàn)可靠消息隊列的關鍵!

RPOPLPUSH source destination

同上,它是 BRPOPLPUSH 命令的 「非阻塞」 版,如果 List 中沒有元素就會立即返回 null。

LREM key count element

從 List 中刪除元素,count 的值不同,刪除的方式也不同:

  • count > 0:從頭到尾開始搜索,刪除與 element 相等的元素,最多刪除 count 個。

  • count < 0:從尾到頭開始搜索,刪除與 element 相等的元素,最多刪除 count (絕對值)個。

  • count = 0:刪除所有與元素相等的元素。

BLMOVE 和 LMOVE 命令」

  • LMOVE source destination <LEFT | RIGHT> <LEFT | RIGHT>

  • BLMOVE source destination <LEFT | RIGHT> <LEFT | RIGHT> timeout

從 Redis 6.2.0 開始,BRPOPLPUSH 和 RPOPLPUSH 命令就被聲明為廢棄了,取而代之的是語義更加明確的 BLMOVE 和 LMOVE 命令。

BLMOVE 和 LMOVE 可以通過參數(shù)指定元素出隊列(source)的方向,和入隊列(destination)的方向,除此以外并無其他區(qū)別。

?

實現(xiàn)思路

了解了上述幾個命令后,一個簡單易用且可靠的消息隊列就呼之欲出了。

  • 生產(chǎn)者使用 LPUSH 命令往消息隊列生產(chǎn)消息

  • 消費者使用 BRPOPLPUSH 命令從隊列消費消息,并且還會在獲取并返回消息的時候把該消息推送到另一個消息隊列,也就是 Pending 隊列,這個隊列中存儲的就是未被消費者 ACK 的消息

  • 消費者成功消費完畢后,使用 LREM 命令從 Pending 隊列中刪除這條消息,整個消費過程結束

  • 如果消費者在消費過程中出現(xiàn)異常、宕機,那么需要在恢復后從 Pending 隊列中獲取到這條消息,再進行重新消費,從而保證了消息隊列的可靠性,不會丟失消息(可能存在重復消費,需要做好冪等處理)

在 Spring Boot 中實現(xiàn)

首先,創(chuàng)建 Spring Boot 項目,并整合 Redis。

創(chuàng)建一個 OrderConsumer Bean 模擬從隊列中消費訂單 ID。

package cn.springdoc.demo.consumer;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

@Component
public class OrderConsumer implements ApplicationRunner, Runnable {

    static final Logger log = LoggerFactory.getLogger(OrderConsumer.class);

    // 消息隊列
    final String queue = "queue_orders";

    // pending 隊列,即待確認消息的隊列
    final String pendingQueue = "pending_queue_orders";

    @Autowired
    StringRedisTemplate stringRedisTemplate;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 應用啟動后,創(chuàng)建新的線程來執(zhí)行消費任務
        Thread thread = new Thread(this);
        thread.setName("order-consumer-thread");
        thread.start();
    }

    @Override
    public void run() {
        while (true) {
            try {
                // 1:消費者,從隊列未彈出消息,并推送到 pending 隊列,整個過程是原子性的
                // 最多阻塞 5 秒,超過 5 秒后還沒有消息,則返回 null
                String item = stringRedisTemplate.opsForList().rightPopAndLeftPush(queue, pendingQueue, 5, TimeUnit.SECONDS);
                
                if (item == null) {
                    log.info("等待消息 ...");
                    continue ;
                }
                
                try {
                    
                    // 2:解析為 Long
                    Long orderId = Long.parseLong(item);
                    
                    // 模擬消息消費
                    log.info("消費消息: {}", orderId);
                    
                } catch (Exception e) {
                    log.error("消費異常:{}", e.getMessage());
                    continue;
                }
                
                // 3:消費成功,從 pending 隊列刪除記錄,相當于確認消費
                stringRedisTemplate.opsForList().remove(pendingQueue, 0, item);
            } catch (Exception e) {
                log.error("隊列監(jiān)聽異常:{}", e.getMessage());
                break;
            }
        }
        log.info("退出消費");
    }
}

OrderConsumer 實現(xiàn)了 ApplicationRunner 接口,在應用就緒后創(chuàng)建新的消費線程進行消費。

stringRedisTemplate.opsForList().rightPopAndLeftPush 方法從 queue 隊列消費一條消息,同時把消息添加到  pendingQueue 隊列。該方法底層調用的正是 brpoplpush 命令,最多阻塞 5 秒,超時后返回 null。

得到消息后解析為 Long 類型,模擬消費,即輸出到日志。如果消費成功,則調用 stringRedisTemplate.opsForList().remove 方法(底層正是 LREM 命令)從 pendingQueue 隊列中刪除消息。如果消費失敗,失敗的消息會在 pendingQueue 隊列中繼續(xù)存在,不會丟失,可以重新投遞消費或者是人工處理。

測試

啟動應用后,通過 Redis 客戶端往 queue_orders 隊列推送消息:

> lpush queue_orders 10000
"1"
> lpush queue_orders 10010
"1"
> lpush queue_orders 10011
"1"
> lpush queue_orders Nan
"1"

往 queue_orders 隊列推送了四條訂單的 ID。注意最后一條消息值是 Nan,這會導致 Long.parseLong 異常從而導致消費失敗。

服務端輸出日志如下:

[           main] cn.springdoc.demo.DemoApplication        : Started DemoApplication in 3.769 seconds (process running for 4.18)
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 等待消息 ...
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 消費消息: 10000
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 等待消息 ...
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 消費消息: 10010
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 等待消息 ...
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 消費消息: 10011
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 消費異常:For input string: "Nan"
[consumer-thread] c.springdoc.demo.consumer.OrderConsumer  : 等待消息 ...

符合預期,前面三條消息都成功消費,最后一條消息消費失敗。按照設計,這條消費失敗的消息應該在 Pending 隊列 pending_queue_orders 中存在。且應該只有這一條消息,因為其他三條消息都消費成功。

查看 pending_queue_orders 隊列中的所有元素:

> lrange pending_queue_orders 0 -1
1) "Nan"

一切 OK,該隊列中只有 Nan 這條消息,正是消費失敗的那條消息。

此時,你如果想查看一下 Redis 中的所有 key,你會發(fā)現(xiàn)只有 pending_queue_orders 隊列存在:

> keys *
1) "pending_queue_orders"

queue_orders 隊列呢?這是 Redis List 的一個特性,當從 List 中彈出最后一個元素后,Redis 就會刪除這個 List。queue_orders 中的元素都被彈出了,所以它被刪除了。當再次嘗試往 queue_orders 中壓入消息時,它會自動創(chuàng)建。也就是說 「我們不需要手動預先創(chuàng)建隊列, Redis 會自己創(chuàng)建,也會在合適的時間刪除,而這一切都是線程安全的」。

由于這是線程安全的,所以隊列中的 「一條消息只能被一個消費者(客戶端)進行消費」,這非常適合在分布式或者是集群模式下使用,不必擔心同一條消息被多個消費者消費到。

 

注意,Pending 隊列中的消息可能存在重復消費的可能。例如,消費者成功消費消息后,在調用 remove 方法從 Pending 隊列中刪除消息時失敗,那么 Pending 隊列中的這條刪除失敗的消息其實已經(jīng)是被成功消費了的,需要在業(yè)務中考慮到!

使用 BLMOVE 和 LMOVE 命令

上文說過,從 Redis 6.2.0 開始 BRPOPLPUSH 和 RPOPLPUSH 命令就被聲明為廢棄了,后續(xù)版本中推薦使用 BLMOVE 和 LMOVE 命令。

目前 StringRedisTemplate (Spring Boot 3.2.2)并未直接提供與 BLMOVE 和 LMOVE 命令對應的 API 方法,但是可以獲取到底層連接對象來調用 BLMOVE 和 LMOVE 命令。

String item = this.stringRedisTemplate.execute(new RedisCallback<String>() {
    @Override
    public String doInRedis(RedisConnection connection) throws DataAccessException {
        // 調用 bLMove 命令
        byte[] ret = connection.listCommands().bLMove(queue.getBytes(), pendingQueue.getBytes(), Direction.RIGHT, Direction.LEFT, 5);
        return ret == null ? null : new String(ret);
    }
});

Redis 的持久化方式

Redis 是一個內(nèi)存數(shù)據(jù)庫,為了保證數(shù)據(jù)的安全不丟失,它提供了兩種數(shù)據(jù)備份(持久化)方式,即 「RDB」 和 「AOF」。

  • 「RDB」:生成某一時刻的數(shù)據(jù)快照,通過子進程進行備份,數(shù)據(jù)可能不完整(取決于備份周期)。

  • 「AOF」:通過記錄執(zhí)行的指令到文件來實現(xiàn)數(shù)據(jù)備份,相對完整性較高,但是會記錄每一條執(zhí)行命令,性能會有一定影響。

這就需要根據(jù)你的業(yè)務場景來選擇合適的持久化方式,也可以同時配合使用 「RDB」 和 「AOF」 兩種方式,兼顧性能和數(shù)據(jù)安全。

總結

本文介紹了如何在 Spring Boot 中使用 Redis List 的 BRPOPLPUSH/BLMOVE 命令來實現(xiàn)一個線程安全且可靠的消息隊列。

以上就是SpringBoot使用Redis實現(xiàn)消息隊列的方法小結的詳細內(nèi)容,更多關于SpringBoot Redis消息隊列的資料請關注腳本之家其它相關文章!

相關文章

最新評論