欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RabbitMQ消息的延遲隊列詳解

 更新時間:2024年02月11日 10:31:27   作者:云村小威  
這篇文章主要介紹了RabbitMQ消息的延遲隊列,延遲隊列也就是死信交換機,有些隊列的消息成為死信后,消息中間件可以將其從當前隊列發(fā)送到另一個隊列中,這個隊列就是死信隊列,感興趣的同學可以參考下文

Dead Letter Exchange(死信交換機)

在MQ中,當消息成為死信(Dead message 死掉的信息)后,消息中間件可以將其從當前隊列發(fā)送到另一個隊列中,這個隊列就是死信隊列。而 在RabbitMQ中,由于有交換機的概念,實際是將死信發(fā)送給了死信交換機(Dead Letter Exchange,簡稱DLX)。死信交換機和死信隊列和普通的沒有區(qū)別。

消息成為死信的情況

  • 隊列消息長度到達限制
  • 消費者拒簽消息,并且不把消息重新放入原隊列
  • 消息到達存活時間未被消費

有些隊列的消息成為死信后,(比如過期了或者隊列滿了)這些死信一般情況下是會被 RabbitMQ 清理的。但是你可以配置某個交換機為此隊列的死信交換機,該隊列的消息成為死信后會被重新發(fā)送到此 DLX 。至于怎么處理這個DLX中的死信就是看具體的業(yè)務場景了,DLX 中的信息可以被路由到新的隊列。

生產者

    /**
     * 普通交換機綁定普通交換機
     *
     * @return
     */
    @Bean
    public Queue queueA() {
        //信息配置
        Map<String, Object> map = new HashMap<>();
        //message在該隊列queue的存活時間最大為15秒
        map.put("x-message-ttl", 15000);
        //x-dead-letter-exchange參數(shù)是設置該隊列的死信交換器(DLX)
        map.put("x-dead-letter-exchange", "exchangeB");
        //x-dead-letter-routing-key參數(shù)是給這個DLX指定路由鍵
        map.put("x-dead-letter-routing-key", "queueB");
        return new Queue("queueA", true, false, false, map);
    }
    @Bean
    public DirectExchange exchangeA() {
        return new DirectExchange("exchangeA");
    }
    @Bean
    public Binding bindingA() {
        return BindingBuilder
                .bind(queueA())
                .to(exchangeA()).with("queueA");
    }
    /**
     * 死信交換機綁定死信交換機
     *
     * @return
     */
    @Bean
    public Queue queueB() {
        return new Queue("queueB");
    }
    @Bean
    public DirectExchange exchangeB() {
        return new DirectExchange("exchangeB");
    }
    @Bean
    public Binding bindingB() {
        return BindingBuilder
                .bind(queueB())
                .to(exchangeB()).with("queueB");
    }

模擬發(fā)送請求

    @RequestMapping("/send6")
    public String sendSix() throws JsonProcessingException {
        rabbitTemplate.convertAndSend("exchangeA", "queueA", "檢查訂單是否過期");
        return "??";
    }

這時我發(fā)送請求到隊列queueA,并設置了15秒的延遲,將超時的信息調用到死信交換機中。在這里我是沒開啟消費者所有沒有消費者去處理該請求的,信息在queueA隊列等待15秒后將會轉到死信交換機queueB隊列進行處理:

延遲隊列

延遲隊列,即消息進入隊列后不會立即被消費,只有到達指定時間后,才會被消費。經典的應用場景是下單減庫存。

根據(jù)以上結論,在rabbitmq中消費者只要接到信息就會自動確認進行處理。所以在上面并沒有開啟消費者,當請求時效后(如訂單未支付,定時30分鐘自動取消功能)我們不應該再讓它正常處理,而把該請求放到死信交換機中安排對應的處理,所以我們需要打消費者自動處理請求改成手動。

如果手動確認則當消費者調用 ack、nack、reject 幾種方法進行確認,手動確認可以在業(yè)務失敗后進行一些操作,如果消息未被 ACK 則會發(fā)送到下一個消費者

如果某個服務忘記 ACK 了,則 RabbitMQ 不會再發(fā)送數(shù)據(jù)給它,因為 RabbitMQ 認為該服務的處理能力有限

ACK 機制還可以起到限流作用,比如在接收到某條消息時休眠幾秒鐘

消息確認模式有:

  • AcknowledgeMode.NONE:自動確認
  • AcknowledgeMode.AUTO:根據(jù)情況確認
  • AcknowledgeMode.MANUAL:手動確認

確認消息(局部方法處理消息)

默認情況下消息消費者是自動 ack (確認)消息的,如果要手動 ack(確認)則需要修改確認模式為 manual

消費者添加手動確認消息配置配置 :

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manua

消費者接受消息:

package com.ycxw.consumer.demos;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class DLXReceiver {
    @RabbitListener(queues = {"queueA"})
    @RabbitHandler
    public void handlerA(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        System.out.println("已接受到隊列queueA傳遞過來的消息:" + msg);
        channel.basicReject(tag, false);// 拒接消息,如果為true則拒絕后又從新回到隊列被接受(循環(huán)),除非消息過期。
        //channel.basicAck(tag, true); 確認消息()一次性全接受,如果為false則接受一次
    }
    /**
     * 接受死信消息
     *
     * @param msg
     */
    @RabbitListener(queues = {"queueB"})
    @RabbitHandler
    public void handlerB(String msg) {
        /**
         * ...接受到信息,去數(shù)據(jù)庫處理
         */
        System.out.println("已接受到隊列queueB傳遞過來的消息:" + msg);
    }
}

第一次進入普通隊列別拒絕后,轉到死信隊列中處理...

需要注意的 basicAck 方法需要傳遞兩個參數(shù)

  • deliveryTag(唯一標識 ID):當一個消費者向 RabbitMQ 注冊后,會建立起一個 Channel ,RabbitMQ 會用 basic.deliver 方法向消費者推送消息,這個方法攜帶了一個 delivery tag, 它代表了 RabbitMQ 向該 Channel 投遞的這條消息的唯一標識 ID,是一個單調遞增的正整數(shù),delivery tag 的范圍僅限于 Channel
  • multiple:為了減少網絡流量,手動確認可以被批處理,當該參數(shù)為 true 時,則可以一次性確認 delivery_tag 小于等于傳入值的所有消息

到此這篇關于RabbitMQ消息的延遲隊列詳解的文章就介紹到這了,更多相關RabbitMQ延遲隊列內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Java的內存區(qū)域與內存溢出異常你了解嗎

    Java的內存區(qū)域與內存溢出異常你了解嗎

    這篇文章主要為大家詳細介紹了Java的內存區(qū)域與內存溢出異常,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助
    2022-03-03
  • Java堆內存又溢出了!教你一招必殺技(推薦)

    Java堆內存又溢出了!教你一招必殺技(推薦)

    這篇文章主要介紹了Java內存溢出問題,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-04-04
  • Spring如何使用@Indexed加快啟動速度

    Spring如何使用@Indexed加快啟動速度

    這篇文章主要介紹了Spring如何使用@Indexed加快啟動速度,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • Java中Base64加密解密舉例詳解

    Java中Base64加密解密舉例詳解

    Base64編碼是我們程序開發(fā)中經常使用到的編碼方法,它是一種基于用64個可打印字符來表示二進制數(shù)據(jù)的表示方法,這篇文章主要給大家介紹了關于Java中Base64加密解密的相關資料,需要的朋友可以參考下
    2024-05-05
  • java 通過發(fā)送json,post請求,返回json數(shù)據(jù)的方法

    java 通過發(fā)送json,post請求,返回json數(shù)據(jù)的方法

    下面小編就為大家分享一篇java 通過發(fā)送json,post請求,返回json數(shù)據(jù)的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2018-03-03
  • springboot如何實現(xiàn)自動裝配源碼解讀

    springboot如何實現(xiàn)自動裝配源碼解讀

    這篇文章主要介紹了springboot如何實現(xiàn)自動裝配源碼賞析,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-12-12
  • Spring Boot 3 整合 Spring Cloud Gateway實踐過程

    Spring Boot 3 整合 Spring Cloud 

    本文介紹了如何使用SpringCloudAlibaba2023.0.0.0版本構建一個微服務網關,包括統(tǒng)一路由、限流防刷和登錄鑒權等功能,并通過一個項目實例進行詳細說明,感興趣的朋友一起看看吧
    2025-02-02
  • Java?IDEA集成開發(fā)工具中英文切換圖文教程

    Java?IDEA集成開發(fā)工具中英文切換圖文教程

    相信很多小伙伴們剛接觸IDEA時,看到一堆英文界面不知道如何下手,這篇文章主要給大家介紹了關于Java?IDEA集成開發(fā)工具中英文切換的相關資料,需要的朋友可以參考下
    2024-04-04
  • java中this關鍵字的詳細使用介紹

    java中this關鍵字的詳細使用介紹

    大家好,本篇文章主要講的是java中this關鍵字的詳細使用介紹,感興趣的同學趕快來看一看吧,對你有幫助的話記得收藏一下,方便下次瀏覽
    2022-01-01
  • Fastjson 常用API介紹及下載地址(推薦)

    Fastjson 常用API介紹及下載地址(推薦)

    Fastjson是一個Java語言編寫的高性能功能完善的JSON庫。接下來通過本文給大家分享Fastjson 常用API介紹及下載地址,感興趣的朋友一起看看吧
    2017-11-11

最新評論