欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Springboot死信隊列?DLX?配置和使用思路分析

 更新時間:2022年03月01日 09:44:20   作者:專注寫bug  
死信隊列簡稱就是DLX,死信交換機(jī)和死信隊列和普通的沒有區(qū)別,當(dāng)消息成為死信后,如果該隊列綁定了死信交換機(jī),則消息會被死信交換機(jī)重新路由到死信隊列,本文給大家介紹Springboot死信隊列?DLX的相關(guān)知識,感興趣的朋友一起看看吧

前言

上一篇博客 Springboot——整合RabbitMq測試TTL中,針對設(shè)置單個消息期限或者整個隊列消息期限,進(jìn)行了一些配置和說明。同時也都列舉了一些區(qū)別關(guān)系。

但考慮過一個問題了沒有?

不管是設(shè)置哪種方式,如果消息期限到了,隊列都會將該消息進(jìn)行丟棄處理。
這么做合適么?

假設(shè)是某個設(shè)備的重要信息,或者某個重要的訂單信息,因為規(guī)定時間內(nèi)未被及時消費就將其舍棄,是否會造成很嚴(yán)重的后果?

有人會說,設(shè)置消息永不過期!等著消費者能夠成功監(jiān)聽到該隊列,將消息消費不就可以了嘛!

但這里需要考慮另外一個問題:
每個服務(wù)器的容量是有上限的!如果消息一直存在隊列,如果一直不會被消費,豈不是很占用服務(wù)器資源?

如何解決這個問題,就是今天這篇文章需要說到的死信隊列

什么是死信

說道死信,可能大部分觀眾大姥爺會有懵逼的想法,什么是死信?

死信隊列,俗稱DLX,翻譯過來的名稱為Dead Letter Exchange 死信交換機(jī)。

當(dāng)消息限定時間內(nèi)未被消費,成為 Dead Message后,可以被重新發(fā)送另一個交換機(jī)中,發(fā)揮其應(yīng)有的價值!

配置和測試死信

思路分析

需要測試死信隊列,則需要先梳理整體的思路,如可以采取如下方式進(jìn)行配置:

在這里插入圖片描述

從上面的邏輯圖中,可以發(fā)現(xiàn)大致的思路:

1、消息隊列分為正常交換機(jī)、正常消息隊列;以及死信交換機(jī)死信隊列。

2、正常隊列針對死信信息,需要將數(shù)據(jù) 重新 發(fā)送至死信交換機(jī)中。

配置類編寫

結(jié)合上面的思路,編寫具體的配置類。如下所示:

package cn.linkpower.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * 死信隊列配置
 */
@Configuration
public class DeadMsgMqConfig {
    // 定義正常交換機(jī)和正常隊列信息(交換機(jī)名、隊列名、路由key)
    public static final String queue_name = "xj_natural_queue";
    public static final String exchange_name = "xj_natural_exchange";
    public static final String routing_key = "xj_natural_routingKey";
    // 定義死信交換機(jī)名、死信隊列名、路由key
    public static final String queue_name_dead = "xj_dead_queue";
    public static final String exchange_name_dead = "xj_dead_exchange";
    public static final String routing_key_dead = "xj_dead_routingKey";
    /**
     * 設(shè)置正常的消息隊列;
     * 正常的消息隊列具備以下幾種功能:
     * 1、消息正常消費,需要綁定對應(yīng)的消費者(這里為了測試死信,不創(chuàng)建消費者)
     * 2、當(dāng)消息失效后,需要將指定的消息發(fā)送至 死信交換機(jī) 中
     * @return
     */
    @Bean(value = "getNaturalQueue")
    public Queue getNaturalQueue(){
        return QueueBuilder.durable(queue_name)
                // 正常的隊列,在消息失效后,需要將消息丟入 死信 交換機(jī)中
                // 這里只需要針對名稱進(jìn)行綁定
                .withArgument("x-dead-letter-exchange",exchange_name_dead)
                // 丟入 死信交換機(jī),需要設(shè)定指定的 routingkey
                .withArgument("x-dead-letter-routing-key",routing_key_dead)
                // 設(shè)置正常隊列中消息的存活時間為 10s,當(dāng)然也可以針對單個消息進(jìn)行設(shè)定不同的過期時間
                .withArgument("x-message-ttl",10000)
                // 設(shè)定當(dāng)前隊列中,允許存放的最大消息數(shù)目
                .withArgument("x-max-length",10)
                .build();
    }
     * 設(shè)定正常的消息交換機(jī)
    @Bean(value = "getNaturalExchange")
    public Exchange getNaturalExchange(){
        // 這里為了測試,采取 direct exchange
        return ExchangeBuilder.directExchange(exchange_name)
                .durable(true) // 設(shè)定持久化
     * 將正常的消息交換機(jī)和正常的消息隊列進(jìn)行綁定
     * @param queue
     * @param directExchange
    @Bean
    public Binding bindNaturalExchangeAndQueue(
            @Qualifier(value = "getNaturalQueue") Queue queue,
            @Qualifier(value = "getNaturalExchange") Exchange directExchange
    ){
        return BindingBuilder
                // 綁定消息隊列
                .bind(queue)
                // 至指定的消息交換機(jī)
                .to(directExchange)
                // 匹配 routingkey
                .with(routing_key)
                // 無參數(shù),不加會報錯提示
                .noargs();
     * 定義死信隊列
    @Bean(value = "getDealQueue")
    public Queue getDealQueue(){
        return QueueBuilder.durable(queue_name_dead).build();
     * 定義死信交換機(jī)
    @Bean(value = "getDeadExchange")
    public Exchange getDeadExchange(){
        return ExchangeBuilder.directExchange(exchange_name_dead).durable(true).build();
     * 將死信交換機(jī)和死信隊列進(jìn)行綁定
     * @param deadQueue
     * @param directDeadExchange
    public Binding bindDeadExchangeAndQueue(
            @Qualifier(value = "getDealQueue") Queue deadQueue,
            @Qualifier(value = "getDeadExchange") Exchange directDeadExchange
        return BindingBuilder.bind(deadQueue).to(directDeadExchange).with(routing_key_dead).noargs();
}

編寫消息發(fā)送服務(wù)

默認(rèn)采取rabbitTemplate.convertAndSend方法,進(jìn)行消息的發(fā)送處理。但為了保證消息生產(chǎn)者能夠成功將數(shù)據(jù)發(fā)送至正常交換機(jī),同時為了保證正常交換機(jī)能夠?qū)?shù)據(jù)信息,推送至正常消息隊列。需要對其增加監(jiān)聽。

package cn.linkpower.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RabbitmqService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 直接發(fā)送消息
     * @param exchange
     * @param routingKey
     * @param msg
     */
    public void sendMessage(String exchange,String routingKey,Object msg) {
        // 設(shè)置交換機(jī)處理失敗消息的模式     true 表示消息由交換機(jī) 到達(dá)不了隊列時,會將消息重新返回給生產(chǎn)者
        // 如果不設(shè)置這個指令,則交換機(jī)向隊列推送消息失敗后,不會觸發(fā) setReturnCallback
        rabbitTemplate.setMandatory(true);
        //消息消費者確認(rèn)收到消息后,手動ack回執(zhí)
        rabbitTemplate.setConfirmCallback(this);
        // return 配置
        rabbitTemplate.setReturnCallback(this);
        //發(fā)送消息
        rabbitTemplate.convertAndSend(exchange,routingKey,msg);
    }
     * 交換機(jī)并未將數(shù)據(jù)丟入指定的隊列中時,觸發(fā)
     *  channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes());
     *  參數(shù)三:true  表示如果消息無法正常投遞,則return給生產(chǎn)者 ;false 表示直接丟棄
     * @param message   消息對象
     * @param replyCode 錯誤碼
     * @param replyText 錯誤信息
     * @param exchange 交換機(jī)
     * @param routingKey 路由鍵
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" ");
     * 消息生產(chǎn)者發(fā)送消息至交換機(jī)時觸發(fā),用于判斷交換機(jī)是否成功收到消息
     * @param correlationData  相關(guān)配置信息
     * @param ack exchange 交換機(jī),判斷交換機(jī)是否成功收到消息    true 表示交換機(jī)收到
     * @param cause  失敗原因
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("---- confirm ----ack="+ack+"  cause="+String.valueOf(cause));
        log.info("correlationData -->"+correlationData.toString());
        if(ack){
            // 交換機(jī)接收到
            log.info("---- confirm ----ack==true  cause="+cause);
        }else{
            // 沒有接收到
            log.info("---- confirm ----ack==false  cause="+cause);
        }
}

測試

既然說到測試,那么需要編寫一個測試類,能夠?qū)a(chǎn)生的消息,推送至指定的正常消息交換機(jī)中去。

package cn.linkpower.controller;

import cn.linkpower.config.DeadMsgMqConfig;
import cn.linkpower.service.RabbitmqService;
import lombok.extern.slf4j.Slf4j;
import org.apache.tomcat.jni.Time;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;
@Slf4j
@RestController
public class DeadMsgController {
    @Autowired
    private RabbitmqService rabbitmqService;
    @RequestMapping("/deadMsgTest")
    public String deadMsgTest() throws InterruptedException {
        // 向正常的消息隊列中丟數(shù)據(jù),測試限定時間未消費后,死信隊列的情況
        // 配置文件中,針對于正常隊列而言,設(shè)置有10條上限大小
        for (int i = 0; i < 20; i++) {
            String msg = "dead msg test "+i;
            log.info("發(fā)送消息,消息信息為:{}",msg);
            // 向正常的消息交換機(jī)中傳遞數(shù)據(jù)
            rabbitmqService.sendMessage(DeadMsgMqConfig.exchange_name,DeadMsgMqConfig.routing_key,msg);
            TimeUnit.SECONDS.sleep(2);
        }
        return "ok";
    }
}

啟動項目,訪問指定的鏈接,進(jìn)行數(shù)據(jù)產(chǎn)生和將消息發(fā)送交換機(jī)操作:

http://localhost/deadMsgTest

在這里插入圖片描述

 

在這里插入圖片描述

在這里插入圖片描述

在這里插入圖片描述

在這里插入圖片描述

在這里插入圖片描述

在這里插入圖片描述

控制臺部分日志展示:

在這里插入圖片描述

消息什么時候會成為死信消息?

1、隊列消息長度到達(dá)限制;

2、消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目標(biāo)隊列,requeue=false;

channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);

在這里插入圖片描述

3、原隊列存在消息過期設(shè)置,消息到達(dá)超時時間未被消費;

總結(jié)

此處只是為了進(jìn)行配置和測試需要,暫未定義任何正常消息隊列消費者死信消息隊列消費者信息。

1、死信交換機(jī)和死信隊列和普通的沒有區(qū)別

2、當(dāng)消息成為死信后,如果該隊列綁定了死信交換機(jī),則消息會被死信交換機(jī)重新路由到死信隊列

參考資料

RabbitMQ死信隊列在SpringBoot中的使用

代碼下載

gitee 代碼下載

到此這篇關(guān)于Springboot死信隊列 DLX 配置和使用的文章就介紹到這了,更多相關(guān)Springboot死信隊列 DLX內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java基礎(chǔ)之SpringBoot整合knife4j

    Java基礎(chǔ)之SpringBoot整合knife4j

    Swagger現(xiàn)在已經(jīng)成了最流行的接口文檔生成與管理工具,但是你是否在用的時候也在吐槽,它是真的不好看,接口測試的json數(shù)據(jù)沒法格式化,測試地址如果更改了還要去改配置,接口測試時增加token驗證是真的麻煩…針對Swagger的種種缺點,Knife4j就呼之欲出了.需要的朋友可以參考下
    2021-05-05
  • SpringBoot在IDEA中實現(xiàn)熱部署(JRebel實用版)

    SpringBoot在IDEA中實現(xiàn)熱部署(JRebel實用版)

    這篇文章主要介紹了SpringBoot在IDEA中實現(xiàn)熱部署(JRebel實用版),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-05-05
  • java實現(xiàn)二分法查找出數(shù)組重復(fù)數(shù)字

    java實現(xiàn)二分法查找出數(shù)組重復(fù)數(shù)字

    這篇文章主要為大家詳細(xì)介紹了java實現(xiàn)二分法查找出數(shù)組重復(fù)數(shù)字,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2018-11-11
  • JAVA遍歷一個文件夾中的所有文件的小例子

    JAVA遍歷一個文件夾中的所有文件的小例子

    在實際項目中給定一文件夾,得到這個文件夾下所有的文件這樣的需求并不是很多,更多的是查找或是刪除某一具體的文件
    2013-10-10
  • Servlet輸出一個驗證碼圖片的實現(xiàn)方法實例

    Servlet輸出一個驗證碼圖片的實現(xiàn)方法實例

    這篇文章主要給大家介紹了關(guān)于Servlet輸出一個驗證碼圖片的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-01-01
  • java短路邏輯運算符實例用法詳解

    java短路邏輯運算符實例用法詳解

    在本篇文章里小編給大家分享的是一篇關(guān)于java短路邏輯運算符實例用法內(nèi)容,有需要的朋友們可以學(xué)習(xí)參考下。
    2021-04-04
  • java  Vector和ArrayList的分析及比較

    java Vector和ArrayList的分析及比較

    這篇文章主要介紹了java Vector和ArrayList的分析及比較的相關(guān)資料,Vector是多線程安全的,而ArrayList不是,本文主要做對比對這兩個方法,需要的朋友可以參考下
    2016-11-11
  • idea引入外部jar包的方法實現(xiàn)

    idea引入外部jar包的方法實現(xiàn)

    本文主要介紹了idea引入外部jar包的方法實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-06-06
  • Maven構(gòu)建生命周期詳細(xì)介紹

    Maven構(gòu)建生命周期詳細(xì)介紹

    這篇文章主要介紹了Maven構(gòu)建生命周期詳細(xì)介紹,小編覺得還是挺不錯的,這里分享給大家,需要的朋友可以參考下。
    2017-11-11
  • Java動態(tài)代理分析及簡單實例

    Java動態(tài)代理分析及簡單實例

    這篇文章主要介紹了 Java動態(tài)代理分析及簡單實例的相關(guān)資料,需要的朋友可以參考下
    2017-02-02

最新評論