欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Springboot死信隊(duì)列?DLX?配置和使用思路分析

 更新時(shí)間:2022年03月01日 09:44:20   作者:專注寫bug  
死信隊(duì)列簡(jiǎn)稱就是DLX,死信交換機(jī)和死信隊(duì)列和普通的沒(méi)有區(qū)別,當(dāng)消息成為死信后,如果該隊(duì)列綁定了死信交換機(jī),則消息會(huì)被死信交換機(jī)重新路由到死信隊(duì)列,本文給大家介紹Springboot死信隊(duì)列?DLX的相關(guān)知識(shí),感興趣的朋友一起看看吧

前言

上一篇博客 Springboot——整合RabbitMq測(cè)試TTL中,針對(duì)設(shè)置單個(gè)消息期限或者整個(gè)隊(duì)列消息期限,進(jìn)行了一些配置和說(shuō)明。同時(shí)也都列舉了一些區(qū)別關(guān)系。

但考慮過(guò)一個(gè)問(wèn)題了沒(méi)有?

不管是設(shè)置哪種方式,如果消息期限到了,隊(duì)列都會(huì)將該消息進(jìn)行丟棄處理。
這么做合適么?

假設(shè)是某個(gè)設(shè)備的重要信息,或者某個(gè)重要的訂單信息,因?yàn)?code>規(guī)定時(shí)間內(nèi)未被及時(shí)消費(fèi)就將其舍棄,是否會(huì)造成很嚴(yán)重的后果?

有人會(huì)說(shuō),設(shè)置消息永不過(guò)期!等著消費(fèi)者能夠成功監(jiān)聽到該隊(duì)列,將消息消費(fèi)不就可以了嘛!

但這里需要考慮另外一個(gè)問(wèn)題:
每個(gè)服務(wù)器的容量是有上限的!如果消息一直存在隊(duì)列,如果一直不會(huì)被消費(fèi),豈不是很占用服務(wù)器資源?

如何解決這個(gè)問(wèn)題,就是今天這篇文章需要說(shuō)到的死信隊(duì)列。

什么是死信

說(shuō)道死信,可能大部分觀眾大姥爺會(huì)有懵逼的想法,什么是死信?

死信隊(duì)列,俗稱DLX,翻譯過(guò)來(lái)的名稱為Dead Letter Exchange 死信交換機(jī)

當(dāng)消息限定時(shí)間內(nèi)未被消費(fèi),成為 Dead Message后,可以被重新發(fā)送另一個(gè)交換機(jī)中,發(fā)揮其應(yīng)有的價(jià)值!

配置和測(cè)試死信

思路分析

需要測(cè)試死信隊(duì)列,則需要先梳理整體的思路,如可以采取如下方式進(jìn)行配置:

在這里插入圖片描述

從上面的邏輯圖中,可以發(fā)現(xiàn)大致的思路:

1、消息隊(duì)列分為正常交換機(jī)正常消息隊(duì)列;以及死信交換機(jī)死信隊(duì)列。

2、正常隊(duì)列針對(duì)死信信息,需要將數(shù)據(jù) 重新 發(fā)送至死信交換機(jī)中。

配置類編寫

結(jié)合上面的思路,編寫具體的配置類。如下所示:

package cn.linkpower.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * 死信隊(duì)列配置
 */
@Configuration
public class DeadMsgMqConfig {
    // 定義正常交換機(jī)和正常隊(duì)列信息(交換機(jī)名、隊(duì)列名、路由key)
    public static final String queue_name = "xj_natural_queue";
    public static final String exchange_name = "xj_natural_exchange";
    public static final String routing_key = "xj_natural_routingKey";
    // 定義死信交換機(jī)名、死信隊(duì)列名、路由key
    public static final String queue_name_dead = "xj_dead_queue";
    public static final String exchange_name_dead = "xj_dead_exchange";
    public static final String routing_key_dead = "xj_dead_routingKey";
    /**
     * 設(shè)置正常的消息隊(duì)列;
     * 正常的消息隊(duì)列具備以下幾種功能:
     * 1、消息正常消費(fèi),需要綁定對(duì)應(yīng)的消費(fèi)者(這里為了測(cè)試死信,不創(chuàng)建消費(fèi)者)
     * 2、當(dāng)消息失效后,需要將指定的消息發(fā)送至 死信交換機(jī) 中
     * @return
     */
    @Bean(value = "getNaturalQueue")
    public Queue getNaturalQueue(){
        return QueueBuilder.durable(queue_name)
                // 正常的隊(duì)列,在消息失效后,需要將消息丟入 死信 交換機(jī)中
                // 這里只需要針對(duì)名稱進(jìn)行綁定
                .withArgument("x-dead-letter-exchange",exchange_name_dead)
                // 丟入 死信交換機(jī),需要設(shè)定指定的 routingkey
                .withArgument("x-dead-letter-routing-key",routing_key_dead)
                // 設(shè)置正常隊(duì)列中消息的存活時(shí)間為 10s,當(dāng)然也可以針對(duì)單個(gè)消息進(jìn)行設(shè)定不同的過(guò)期時(shí)間
                .withArgument("x-message-ttl",10000)
                // 設(shè)定當(dāng)前隊(duì)列中,允許存放的最大消息數(shù)目
                .withArgument("x-max-length",10)
                .build();
    }
     * 設(shè)定正常的消息交換機(jī)
    @Bean(value = "getNaturalExchange")
    public Exchange getNaturalExchange(){
        // 這里為了測(cè)試,采取 direct exchange
        return ExchangeBuilder.directExchange(exchange_name)
                .durable(true) // 設(shè)定持久化
     * 將正常的消息交換機(jī)和正常的消息隊(duì)列進(jìn)行綁定
     * @param queue
     * @param directExchange
    @Bean
    public Binding bindNaturalExchangeAndQueue(
            @Qualifier(value = "getNaturalQueue") Queue queue,
            @Qualifier(value = "getNaturalExchange") Exchange directExchange
    ){
        return BindingBuilder
                // 綁定消息隊(duì)列
                .bind(queue)
                // 至指定的消息交換機(jī)
                .to(directExchange)
                // 匹配 routingkey
                .with(routing_key)
                // 無(wú)參數(shù),不加會(huì)報(bào)錯(cuò)提示
                .noargs();
     * 定義死信隊(duì)列
    @Bean(value = "getDealQueue")
    public Queue getDealQueue(){
        return QueueBuilder.durable(queue_name_dead).build();
     * 定義死信交換機(jī)
    @Bean(value = "getDeadExchange")
    public Exchange getDeadExchange(){
        return ExchangeBuilder.directExchange(exchange_name_dead).durable(true).build();
     * 將死信交換機(jī)和死信隊(duì)列進(jìn)行綁定
     * @param deadQueue
     * @param directDeadExchange
    public Binding bindDeadExchangeAndQueue(
            @Qualifier(value = "getDealQueue") Queue deadQueue,
            @Qualifier(value = "getDeadExchange") Exchange directDeadExchange
        return BindingBuilder.bind(deadQueue).to(directDeadExchange).with(routing_key_dead).noargs();
}

編寫消息發(fā)送服務(wù)

默認(rèn)采取rabbitTemplate.convertAndSend方法,進(jìn)行消息的發(fā)送處理。但為了保證消息生產(chǎn)者能夠成功將數(shù)據(jù)發(fā)送至正常交換機(jī),同時(shí)為了保證正常交換機(jī)能夠?qū)?shù)據(jù)信息,推送至正常消息隊(duì)列。需要對(duì)其增加監(jiān)聽。

package cn.linkpower.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RabbitmqService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 直接發(fā)送消息
     * @param exchange
     * @param routingKey
     * @param msg
     */
    public void sendMessage(String exchange,String routingKey,Object msg) {
        // 設(shè)置交換機(jī)處理失敗消息的模式     true 表示消息由交換機(jī) 到達(dá)不了隊(duì)列時(shí),會(huì)將消息重新返回給生產(chǎn)者
        // 如果不設(shè)置這個(gè)指令,則交換機(jī)向隊(duì)列推送消息失敗后,不會(huì)觸發(fā) setReturnCallback
        rabbitTemplate.setMandatory(true);
        //消息消費(fèi)者確認(rèn)收到消息后,手動(dòng)ack回執(zhí)
        rabbitTemplate.setConfirmCallback(this);
        // return 配置
        rabbitTemplate.setReturnCallback(this);
        //發(fā)送消息
        rabbitTemplate.convertAndSend(exchange,routingKey,msg);
    }
     * 交換機(jī)并未將數(shù)據(jù)丟入指定的隊(duì)列中時(shí),觸發(fā)
     *  channel.basicPublish(exchange_name,next.getKey(), true, properties,next.getValue().getBytes());
     *  參數(shù)三:true  表示如果消息無(wú)法正常投遞,則return給生產(chǎn)者 ;false 表示直接丟棄
     * @param message   消息對(duì)象
     * @param replyCode 錯(cuò)誤碼
     * @param replyText 錯(cuò)誤信息
     * @param exchange 交換機(jī)
     * @param routingKey 路由鍵
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("---- returnedMessage ----replyCode="+replyCode+" replyText="+replyText+" ");
     * 消息生產(chǎn)者發(fā)送消息至交換機(jī)時(shí)觸發(fā),用于判斷交換機(jī)是否成功收到消息
     * @param correlationData  相關(guān)配置信息
     * @param ack exchange 交換機(jī),判斷交換機(jī)是否成功收到消息    true 表示交換機(jī)收到
     * @param cause  失敗原因
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("---- confirm ----ack="+ack+"  cause="+String.valueOf(cause));
        log.info("correlationData -->"+correlationData.toString());
        if(ack){
            // 交換機(jī)接收到
            log.info("---- confirm ----ack==true  cause="+cause);
        }else{
            // 沒(méi)有接收到
            log.info("---- confirm ----ack==false  cause="+cause);
        }
}

測(cè)試

既然說(shuō)到測(cè)試,那么需要編寫一個(gè)測(cè)試類,能夠?qū)a(chǎn)生的消息,推送至指定的正常消息交換機(jī)中去。

package cn.linkpower.controller;

import cn.linkpower.config.DeadMsgMqConfig;
import cn.linkpower.service.RabbitmqService;
import lombok.extern.slf4j.Slf4j;
import org.apache.tomcat.jni.Time;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;
@Slf4j
@RestController
public class DeadMsgController {
    @Autowired
    private RabbitmqService rabbitmqService;
    @RequestMapping("/deadMsgTest")
    public String deadMsgTest() throws InterruptedException {
        // 向正常的消息隊(duì)列中丟數(shù)據(jù),測(cè)試限定時(shí)間未消費(fèi)后,死信隊(duì)列的情況
        // 配置文件中,針對(duì)于正常隊(duì)列而言,設(shè)置有10條上限大小
        for (int i = 0; i < 20; i++) {
            String msg = "dead msg test "+i;
            log.info("發(fā)送消息,消息信息為:{}",msg);
            // 向正常的消息交換機(jī)中傳遞數(shù)據(jù)
            rabbitmqService.sendMessage(DeadMsgMqConfig.exchange_name,DeadMsgMqConfig.routing_key,msg);
            TimeUnit.SECONDS.sleep(2);
        }
        return "ok";
    }
}

啟動(dòng)項(xiàng)目,訪問(wèn)指定的鏈接,進(jìn)行數(shù)據(jù)產(chǎn)生和將消息發(fā)送交換機(jī)操作:

http://localhost/deadMsgTest

在這里插入圖片描述

 

在這里插入圖片描述

在這里插入圖片描述

在這里插入圖片描述

在這里插入圖片描述

在這里插入圖片描述

在這里插入圖片描述

控制臺(tái)部分日志展示:

在這里插入圖片描述

消息什么時(shí)候會(huì)成為死信消息?

1、隊(duì)列消息長(zhǎng)度到達(dá)限制;

2、消費(fèi)者拒接消費(fèi)消息,basicNack/basicReject,并且不把消息重新放入原目標(biāo)隊(duì)列,requeue=false

channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);

在這里插入圖片描述

3、原隊(duì)列存在消息過(guò)期設(shè)置,消息到達(dá)超時(shí)時(shí)間未被消費(fèi);

總結(jié)

此處只是為了進(jìn)行配置和測(cè)試需要,暫未定義任何正常消息隊(duì)列消費(fèi)者死信消息隊(duì)列消費(fèi)者信息。

1、死信交換機(jī)和死信隊(duì)列和普通的沒(méi)有區(qū)別

2、當(dāng)消息成為死信后,如果該隊(duì)列綁定了死信交換機(jī),則消息會(huì)被死信交換機(jī)重新路由到死信隊(duì)列

參考資料

RabbitMQ死信隊(duì)列在SpringBoot中的使用

代碼下載

gitee 代碼下載

到此這篇關(guān)于Springboot死信隊(duì)列 DLX 配置和使用的文章就介紹到這了,更多相關(guān)Springboot死信隊(duì)列 DLX內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java基礎(chǔ)之SpringBoot整合knife4j

    Java基礎(chǔ)之SpringBoot整合knife4j

    Swagger現(xiàn)在已經(jīng)成了最流行的接口文檔生成與管理工具,但是你是否在用的時(shí)候也在吐槽,它是真的不好看,接口測(cè)試的json數(shù)據(jù)沒(méi)法格式化,測(cè)試地址如果更改了還要去改配置,接口測(cè)試時(shí)增加token驗(yàn)證是真的麻煩…針對(duì)Swagger的種種缺點(diǎn),Knife4j就呼之欲出了.需要的朋友可以參考下
    2021-05-05
  • SpringBoot在IDEA中實(shí)現(xiàn)熱部署(JRebel實(shí)用版)

    SpringBoot在IDEA中實(shí)現(xiàn)熱部署(JRebel實(shí)用版)

    這篇文章主要介紹了SpringBoot在IDEA中實(shí)現(xiàn)熱部署(JRebel實(shí)用版),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-05-05
  • java實(shí)現(xiàn)二分法查找出數(shù)組重復(fù)數(shù)字

    java實(shí)現(xiàn)二分法查找出數(shù)組重復(fù)數(shù)字

    這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)二分法查找出數(shù)組重復(fù)數(shù)字,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2018-11-11
  • JAVA遍歷一個(gè)文件夾中的所有文件的小例子

    JAVA遍歷一個(gè)文件夾中的所有文件的小例子

    在實(shí)際項(xiàng)目中給定一文件夾,得到這個(gè)文件夾下所有的文件這樣的需求并不是很多,更多的是查找或是刪除某一具體的文件
    2013-10-10
  • Servlet輸出一個(gè)驗(yàn)證碼圖片的實(shí)現(xiàn)方法實(shí)例

    Servlet輸出一個(gè)驗(yàn)證碼圖片的實(shí)現(xiàn)方法實(shí)例

    這篇文章主要給大家介紹了關(guān)于Servlet輸出一個(gè)驗(yàn)證碼圖片的實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2021-01-01
  • java短路邏輯運(yùn)算符實(shí)例用法詳解

    java短路邏輯運(yùn)算符實(shí)例用法詳解

    在本篇文章里小編給大家分享的是一篇關(guān)于java短路邏輯運(yùn)算符實(shí)例用法內(nèi)容,有需要的朋友們可以學(xué)習(xí)參考下。
    2021-04-04
  • java  Vector和ArrayList的分析及比較

    java Vector和ArrayList的分析及比較

    這篇文章主要介紹了java Vector和ArrayList的分析及比較的相關(guān)資料,Vector是多線程安全的,而ArrayList不是,本文主要做對(duì)比對(duì)這兩個(gè)方法,需要的朋友可以參考下
    2016-11-11
  • idea引入外部jar包的方法實(shí)現(xiàn)

    idea引入外部jar包的方法實(shí)現(xiàn)

    本文主要介紹了idea引入外部jar包的方法實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-06-06
  • Maven構(gòu)建生命周期詳細(xì)介紹

    Maven構(gòu)建生命周期詳細(xì)介紹

    這篇文章主要介紹了Maven構(gòu)建生命周期詳細(xì)介紹,小編覺(jué)得還是挺不錯(cuò)的,這里分享給大家,需要的朋友可以參考下。
    2017-11-11
  • Java動(dòng)態(tài)代理分析及簡(jiǎn)單實(shí)例

    Java動(dòng)態(tài)代理分析及簡(jiǎn)單實(shí)例

    這篇文章主要介紹了 Java動(dòng)態(tài)代理分析及簡(jiǎn)單實(shí)例的相關(guān)資料,需要的朋友可以參考下
    2017-02-02

最新評(píng)論