欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

redis和rabbitmq實現(xiàn)延時隊列的示例代碼

 更新時間:2024年03月20日 11:03:52   作者:開心就好啦啦啦  
在高并發(fā)場景下,延遲隊列顯得尤為重要,本文主要介紹了兩種方式,redis和rabbitmq實現(xiàn)延時隊列,具有一定的參考價值,感興趣的可以了解一下

延遲隊列使用場景

1. 訂單超時處理:延遲隊列可以用于處理訂單超時問題。當(dāng)用戶下單后,將訂單信息放入延遲隊列,并設(shè)置一定的超時時間。如果在超時時間內(nèi)用戶未支付訂單,消費者會從延遲隊列中獲取到該訂單,并執(zhí)行相應(yīng)的處理操作,如取消訂單、釋放庫存等。

2. 優(yōu)惠券過期提醒:延遲隊列可以用于優(yōu)惠券的過期提醒功能。將即將過期的優(yōu)惠券信息放入延遲隊列,并設(shè)置合適的延遲時間。當(dāng)延遲時間到達時,消費者將提醒用戶優(yōu)惠券即將過期,引導(dǎo)用戶盡快使用。

3. 異步通知與提醒:延遲隊列可以用于異步通知和提醒功能。例如,當(dāng)用戶完成某個操作后,系統(tǒng)可以將相關(guān)通知消息放入延遲隊列,并設(shè)置一定的延遲時間,以便在合適的時機發(fā)送通知給用戶。

Redis中zset實現(xiàn)延時隊列

1. 創(chuàng)建延遲隊列服務(wù)類

  • 創(chuàng)建一個延遲隊列的服務(wù)類,例如DelayQueueService,用于操作Redis中的ZSet。這個服務(wù)類需要完成以下功能:
  • 將消息放入延遲隊列:將消息作為元素添加到ZSet中,設(shè)置對應(yīng)的延遲時間作為分數(shù)。輪詢并處理已到期的消息:定時任務(wù)或者消息消費者輪詢檢查ZSet中的元素,獲取到達指定時間的消息進行處理。刪除已處理的消息:處理完消息后,從ZSet中將其刪除。
@Service
public class DelayQueueService {
    private static final String DELAY_QUEUE_KEY = "delay_queue";


    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    public void addToDelayQueue(String message,long delayTime){
        redisTemplate.opsForZSet().add(DELAY_QUEUE_KEY,message,System.currentTimeMillis()+delayTime);
    }

    public void processDelayedMessage(){
        //reverseRangeByScore 從高到低
        //rangeByScore 從低到高
        Set<String> messages = redisTemplate.opsForZSet().rangeByScore(DELAY_QUEUE_KEY, 0, System.currentTimeMillis());
        for(String message:messages){
            //處理消息
            System.out.println(message);
            redisTemplate.opsForZSet().remove(DELAY_QUEUE_KEY,message);
        }

    }
}

2. 配置定時任務(wù)或消息消費者

使用Spring Boot的定時任務(wù)或消息隊列框架,定時調(diào)用延遲隊列服務(wù)類的輪詢方法或監(jiān)聽指定的消息隊列,可以將輪訓(xùn)粒度放到1s一次。

@Component
public class DelayQueueSchedule {
    @Autowired
    private DelayQueueService delayQueueService;


    // 每隔一段時間進行輪詢并處理延遲消息
    @Scheduled(fixedDelay = 1000)
    public void pollAndProcessDelayedMessages() {
        delayQueueService.pollAndProcessDelayedMessages();
    }
}

然后在啟動類上通過@EnableScheduling注解開啟任務(wù)調(diào)度能力。

缺點:使用ZSET(有序集合,Sorted Set)來實現(xiàn)延遲任務(wù)調(diào)度(如訂單超時取消)是一種有效的方法,但它也有一些缺點和限制:

  • 內(nèi)存消耗:ZSET 在Redis中是一個有序集合,它需要占用一定的內(nèi)存來存儲成員和分數(shù)。如果你需要存儲大量的延遲任務(wù),可能會導(dǎo)致內(nèi)存消耗較大。這可能會對Redis服務(wù)器的性能和成本產(chǎn)生影響,特別是在大規(guī)模應(yīng)用中。
  • 不適用于大規(guī)模延遲任務(wù):ZSET 可以處理相對較小數(shù)量的延遲任務(wù),但當(dāng)需要管理大規(guī)模延遲任務(wù)隊列時,可能會導(dǎo)致性能下降。在這種情況下,需要考慮更高效的延遲隊列解決方案,例如使用分布式消息隊列。
  • 無法動態(tài)修改延遲時間: 一旦將任務(wù)添加到ZSET中,你不能輕松地修改任務(wù)的延遲時間。如果需要在任務(wù)已經(jīng)添加后更改延遲時間,可能需要復(fù)雜的操作。
  • 沒有重試機制:ZSET 只能用于一次性延遲任務(wù),無法自動處理任務(wù)失敗后的重試。如果任務(wù)在執(zhí)行時失敗,你需要自己實現(xiàn)重試邏輯。
  • 沒有持久化: Redis是內(nèi)存數(shù)據(jù)庫,如果Redis服務(wù)器重啟或發(fā)生故障,已添加的延遲任務(wù)數(shù)據(jù)將丟失。雖然可以通過Redis持久化機制來部分解決這個問題,但仍然存在一定風(fēng)險。
  • 復(fù)雜性增加: 使用ZSET來管理延遲任務(wù)隊列需要編寫復(fù)雜的代碼來處理任務(wù)的添加、檢索和刪除。這可能增加應(yīng)用程序的復(fù)雜性。

Rabbitmq實現(xiàn)延遲隊列

死信,顧名思義就是無法被消費的消息。一般來說,producer 將消息投遞到 broker 或者直接到queue 里了,consumer 從 queue 取出消息進行消費,但某些時候由于特定的原因?qū)е聁ueu 中的某些消息無法被消費,這樣的消息如果沒有后續(xù)的處理,就變成了死信,有死信自然就有了死信隊列。

列出2種實現(xiàn)方式。
(1)使用Time To Live(TTL) + Dead Letter Exchanges(DLX)死信隊列組合實現(xiàn)延遲隊列的效果。
(2)使用RabbitMQ官方延遲插件rabbitmq_delayed_message_exchange,實現(xiàn)延時隊列效果。

由于TTL(生存時間)過期導(dǎo)致的死信,就是我們實現(xiàn)延遲隊列的的方式。
我們需要聲明如下形式的交互機和隊列,以及對應(yīng)的routing key,并進行綁定:

請?zhí)砑訄D片描述

上圖綁定的代碼如下所示

@Configuration
public class DeadQueueConfig {
    //普通交換機及隊列
    public static final String X_EXCHANGE = "X";
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    //死信交換機及隊列
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    public static final String DEAD_LETTER_QUEUE = "QD";
    //通用隊列
    public static final String QUEUE_C = "QC";


    // 聲明 xExchange
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    //聲明隊列 A ttl 為 10s 并綁定到對應(yīng)的死信交換機
    @Bean("queueA")
    public Queue queueA() {
        Map<String, Object> args = new HashMap<>(3);
        //聲明當(dāng)前隊列綁定的死信交換機
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //聲明當(dāng)前隊列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //聲明隊列的 TTL
        args.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
    }
    //聲明隊列A綁定X交換機  路由為XA
    @Bean
    public Binding queueABingX(@Qualifier("queueA") Queue queueA,
                               @Qualifier("xExchange") DirectExchange xExchange){
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    //聲明隊列 B ttl 為 40s 并綁定到對應(yīng)的死信交換機
    @Bean("queueB")
    public Queue queueB() {
        Map<String, Object> args = new HashMap<>(3);
        //聲明當(dāng)前隊列綁定的死信交換機
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //聲明當(dāng)前隊列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //聲明隊列的 TTL
        args.put("x-message-ttl", 40000);
        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    }

    //聲明隊列 B 綁定 X 交換機
    @Bean
    public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
    }

    //聲明通用隊列C 不設(shè)ttl,由消費者決定ttl
    @Bean("queueC")
    public Queue queueC() {
        Map<String, Object> args = new HashMap<>(3);
        //聲明當(dāng)前隊列綁定的死信交換機
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //聲明當(dāng)前隊列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        return QueueBuilder.durable(QUEUE_C).withArguments(args).build();
    }
    // 聲明隊列 C 綁定 X 交換機
    @Bean
    public Binding queuecBindingX(@Qualifier("queueC") Queue queueC,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueC).to(xExchange).with("XC");
    }

    // 聲明 死信隊列交換機
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }
    //聲明死信隊列 QD
    @Bean("queueD")
    public Queue queueD() {
        return new Queue(DEAD_LETTER_QUEUE,true);
    }
    //聲明死信隊列 QD 綁定關(guān)系
    @Bean
    public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
                                        @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }

}

其中,QD為死信隊列。當(dāng)QA和QB隊列中的消息,達到設(shè)定的TTL(10s和40s)后,將進入指定的死信隊列QD。該方法缺點就是一個TTL對應(yīng)一個隊列

其中的QC作為通用的隊列,即在消費者處指定消息對應(yīng)的TTL,TTL過期后轉(zhuǎn)入死信隊列。使用該通用隊列可以避免每增加一個新的時間需求,就要新增一個隊列的問題。但該方法由于隊列先進先出的性質(zhì),會導(dǎo)致一定的問題:

即先發(fā)出一個TTL為10s的消息a,進入隊列;再馬上發(fā)出一個TTL為2s的消息b,進入隊列。由于隊列的性質(zhì),會在消息a的TTL結(jié)束后,a進入死信隊列后,b才會進入死信隊列。而不是根據(jù)TTL的時間,b比a先進入死信隊列。

聲明交換機、隊列,并綁定成功后,編寫死信隊列消費者代碼;

@Component
@Slf4j
public class DeadQueueConsumer {

    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("當(dāng)前時間:{},收到死信隊列信息:{}", new Date().toString(), msg);
    }
}

在controller中編寫生產(chǎn)者代碼,進行測試:

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMsg/{message}")
    public String sendMsg(@PathVariable String message){
        log.info("當(dāng)前時間:{},發(fā)送一條信息給兩個 TTL 隊列:{}", new Date(), message);
        rabbitTemplate.convertAndSend("X", "XA", "消息來自 ttl 為 10S 的隊列: " + message);
        rabbitTemplate.convertAndSend("X", "XB", "消息來自 ttl 為 40S 的隊列: " + message);
        return "finish";
    }

結(jié)果如圖:

請?zhí)砑訄D片描述

測試通用隊列QC的效果:

@GetMapping("/send/{message}/{ttlTime}")
    public void sendMsg(@PathVariable String message, @PathVariable String ttlTime) {
        rabbitTemplate.convertAndSend("X", "XC", message, correlationData -> {
            correlationData.getMessageProperties().setExpiration(ttlTime);
            return correlationData;
        });
        log.info("當(dāng)前時間:{},發(fā)送一條時長{}毫秒 TTL 信息給隊列 C:{}", new Date(), ttlTime, message);
    }

結(jié)果如下圖

請?zhí)砑訄D片描述

可以看到, 兩條消息幾乎同時到達死信隊列,因為TTL為2s的消息由于被堵在TTL為10s的消息后導(dǎo)致。

到此這篇關(guān)于redis和rabbitmq實現(xiàn)延時隊列的示例代碼的文章就介紹到這了,更多相關(guān)redis rabbitmq 延時隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Redis遍歷海量數(shù)據(jù)的實現(xiàn)示例

    Redis遍歷海量數(shù)據(jù)的實現(xiàn)示例

    本文主要介紹了 Redis遍歷海量數(shù)據(jù)的實現(xiàn)示例,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2025-04-04
  • Redis出現(xiàn)(error)NOAUTH?Authentication?required.報錯的解決辦法(秒懂!)

    Redis出現(xiàn)(error)NOAUTH?Authentication?required.報錯的解決辦法(秒懂!)

    這篇文章主要給大家介紹了關(guān)于Redis出現(xiàn)(error)NOAUTH?Authentication?required.報錯的解決辦法,對于 這個錯誤這通常是因為Redis服務(wù)器需要密碼進行身份驗證,但客戶端沒有提供正確的身份驗證信息導(dǎo)致的,需要的朋友可以參考下
    2024-03-03
  • Linux上安裝Redis詳細教程

    Linux上安裝Redis詳細教程

    這篇文章主要給大家詳細介紹了在Linux上安裝Redis詳細教程,文中有詳細的代碼示例和安裝步驟,對我們學(xué)習(xí)安裝redis有一定的幫助,需要的朋友可以參考下
    2023-07-07
  • 利用Redis實現(xiàn)訪問次數(shù)限流的方法詳解

    利用Redis實現(xiàn)訪問次數(shù)限流的方法詳解

    這篇文章主要給大家介紹了關(guān)于如何利用Redis實現(xiàn)訪問次數(shù)限流的相關(guān)資料,文中通過實例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2022-02-02
  • Windows下Redis安裝配置簡單教程

    Windows下Redis安裝配置簡單教程

    這篇文章主要為大家詳細介紹了Windows下Redis安裝配置簡單教程,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2016-12-12
  • Redis分布式鎖之紅鎖的實現(xiàn)

    Redis分布式鎖之紅鎖的實現(xiàn)

    本文主要介紹了Redis分布式鎖之紅鎖的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2022-08-08
  • redis 查看所有的key方式

    redis 查看所有的key方式

    這篇文章主要介紹了redis 查看所有的key方式,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-05-05
  • 深入理解Redis 內(nèi)存管理

    深入理解Redis 內(nèi)存管理

    本文主要介紹了深入理解Redis 內(nèi)存管理,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2025-02-02
  • redis實現(xiàn)簡單分布式鎖

    redis實現(xiàn)簡單分布式鎖

    這篇文章主要介紹了redis實現(xiàn)簡單分布式鎖,文中通過代碼示例講解的非常詳細,需要的朋友可以參考下
    2013-09-09
  • redis5集群如何主動手工切換主從節(jié)點命令

    redis5集群如何主動手工切換主從節(jié)點命令

    這篇文章主要介紹了redis5集群如何主動手工切換主從節(jié)點命令,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-01-01

最新評論