欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

如何利用rabbitMq的死信隊(duì)列實(shí)現(xiàn)延時(shí)消息

 更新時(shí)間:2023年01月18日 15:43:03   作者:好大的月亮  
這篇文章主要介紹了如何利用rabbitMq的死信隊(duì)列實(shí)現(xiàn)延時(shí)消息問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

前言

使用mq自帶的死信去實(shí)現(xiàn)延時(shí)消息要注意一個(gè)坑點(diǎn),就是mq只會(huì)檢測(cè)隊(duì)首的消息的過(guò)期時(shí)間,假設(shè)先放入隊(duì)列10s過(guò)期消息,再放入2s過(guò)期。

mq會(huì)檢測(cè)頭部10s是否過(guò)期,10s不過(guò)期的情況下,2s就算過(guò)去也不會(huì)跑到死信.

mq基本的消息模型

mq死信隊(duì)列的消息模型

簡(jiǎn)單的說(shuō)就是先弄一個(gè)正常隊(duì)列,然后不要設(shè)置消費(fèi)者,接著給這個(gè)正常隊(duì)列綁定一個(gè)死信隊(duì)列,這個(gè)死信隊(duì)列設(shè)置方式和正常隊(duì)列沒(méi)啥區(qū)別。

然后監(jiān)聽(tīng)這個(gè)死信隊(duì)列的消費(fèi).

一般死信隊(duì)列由三大核心組件組成:死信交換機(jī)+死信路由+TTL(消息過(guò)期時(shí)間)

通常死信隊(duì)列由“面向生產(chǎn)者的基本交換機(jī)+基本路由”綁定而成,所以生產(chǎn)者首先是將消息發(fā)送至“基本交換機(jī)+基本路由”所綁定而成的消息模型中,即間接性地進(jìn)入到死信隊(duì)列中,當(dāng)過(guò)了TTL,消息將“掛掉”,從而進(jìn)入下一個(gè)中轉(zhuǎn)站,即“面下那個(gè)消費(fèi)者的死信交換機(jī)+死信路由”所綁定而成的消息模型中.

maven依賴

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
 </dependency>

配置普通隊(duì)列和死信隊(duì)列

yml文件

spring:
  application:
    name: ttl-queue
  rabbitmq:
    host: 110.40.181.73
    port: 35672
    username: root
    password: 10086
    virtual-host: /fchan
    connection-timeout: 15000
    # 發(fā)送確認(rèn)
    #publisher-confirms: true
    # 路由失敗回調(diào)
    publisher-returns: true
    template:
      # 必須設(shè)置成true 消息路由失敗通知監(jiān)聽(tīng)者,而不是將消息丟棄
      mandatory: true
    listener:
      simple:
        # 每次從RabbitMQ獲取的消息數(shù)量
        prefetch: 1
        default-requeue-rejected: false
        # 每個(gè)隊(duì)列啟動(dòng)的消費(fèi)者數(shù)量
        concurrency: 1
        # 每個(gè)隊(duì)列最大的消費(fèi)者數(shù)量
        max-concurrency: 1
        # 簽收模式為手動(dòng)簽收-那么需要在代碼中手動(dòng)ACK
        acknowledge-mode: manual
package com.fchan.mq.mqDelay.dlx;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyRabConfig {

    //定義普通任務(wù)隊(duì)列(其實(shí)就是一個(gè)普通隊(duì)列,只是沒(méi)有消費(fèi)者)
    @Bean("delayQue")
    public Queue delayQue(){
        //普通隊(duì)列綁定死信交換機(jī)及路由key

        //delayQueue延時(shí)隊(duì)列
        return QueueBuilder.durable("delayQueue")
                //指定這個(gè)延時(shí)隊(duì)列下的死信交換機(jī)
                //如果消息過(guò)時(shí)則會(huì)被投遞到當(dāng)前對(duì)應(yīng)的my-dlx-exchange
                .withArgument("x-dead-letter-exchange", "my-dlx-exchange")
                //死信交換機(jī)綁定的路由key
                //如果消息過(guò)時(shí),my-dlx-exchange會(huì)根據(jù)routing-key-delay投遞消息到對(duì)應(yīng)的隊(duì)列
                //mq發(fā)送消息的時(shí)候一般指定的是exchange交換機(jī)+這個(gè)routingKey來(lái)找到隊(duì)列,這個(gè)應(yīng)該是出于有時(shí)候需要讓mq的交換機(jī)將一個(gè)消息發(fā)送到多個(gè)隊(duì)列所采用的方式
                .withArgument("x-dead-letter-routing-key", "routing-key-delay")
                .build();
    }

    //定義普通隊(duì)列的交換機(jī)
    @Bean("delayExchange")
    public Exchange delayExchange(){
        return ExchangeBuilder.directExchange("delayExchange").build();
    }

    //綁定普通隊(duì)列的交換機(jī)
    @Bean("delayBinding")
    public Binding delayBinding(@Qualifier("delayExchange") Exchange exchange, @Qualifier("delayQue") Queue queue){
        return BindingBuilder.bind(queue).to(exchange).with("delayBindingRoutingKey").noargs();
    }



    //定義死信隊(duì)列
    @Bean("dlxQueue")
    public Queue dlxQueue(){
        return QueueBuilder.durable("my-dlx-queue").build();
    }


    //定義死信交換機(jī),這里的死信交換機(jī)要和上面普通隊(duì)列所綁定的交換機(jī)名稱一致
    @Bean("dlxExchange")
    public Exchange dlxExchange(){
        return ExchangeBuilder.directExchange("my-dlx-exchange").build();
    }


    //綁定交換機(jī)與死信隊(duì)列
    @Bean("dlxBinding")
    public Binding dlxBinding(@Qualifier("dlxExchange") Exchange exchange, @Qualifier("dlxQueue") Queue queue){
        //這兒這個(gè)routingKey要和上面正常隊(duì)列中所綁定的死信routingKey一致
        return BindingBuilder.bind(queue).to(exchange).with("routing-key-delay").noargs();
    }

}

死信隊(duì)列消費(fèi)者

package com.fchan.mq.mqDelay;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

@Component
public class MyRabbitConsume {
    Logger log = LoggerFactory.getLogger(MyRabbitConsume.class);

    @RabbitListener(queues = {"my-dlx-queue"})
    public void receiveDeadMessage(Map<String,Object> map, Message message, Channel channel) throws Exception {
        log.info("收到死信信息:{}",map);
        log.info("然后進(jìn)行一系列邏輯處理 Thanks?(?ω?)?");
		//手動(dòng)確認(rèn)消息        
		channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

發(fā)送消息測(cè)試

/**
 * 發(fā)送消息到延時(shí)隊(duì)列 delayQueue
 * 消息5秒后會(huì)被投遞到死信隊(duì)列
 * @return
 */
@GetMapping("sendMsgToDelay")
public String sendMsgToDelay(String msg){
    Map<String,Object> map = new HashMap<>();
    map.put("msg", msg);
    rabbitTemplate.convertAndSend("delayExchange","delayBindingRoutingKey", map, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            //設(shè)置消息過(guò)期時(shí)間,5秒過(guò)期
            message.getMessageProperties().setExpiration("5000");
            return message;
        }
    });
    return msg;
}

測(cè)試成功

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • SpringBoot+微信小程序?qū)崿F(xiàn)文件上傳與下載功能詳解

    SpringBoot+微信小程序?qū)崿F(xiàn)文件上傳與下載功能詳解

    這篇文章主要為大家介紹了SpringBoot整合微信小程序?qū)崿F(xiàn)文件上傳與下載功能,文中的實(shí)現(xiàn)步驟講解詳細(xì),快跟隨小編一起學(xué)習(xí)一下吧
    2022-03-03
  • Java微服務(wù)的打包問(wèn)題解決

    Java微服務(wù)的打包問(wèn)題解決

    本文主要介紹了Java微服務(wù)的打包問(wèn)題解決,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-06-06
  • Java后臺(tái)如何處理日期參數(shù)格式

    Java后臺(tái)如何處理日期參數(shù)格式

    這篇文章主要介紹了Java后臺(tái)如何處理日期參數(shù)格式問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-07-07
  • Java中序列化和反序列化的完整講解

    Java中序列化和反序列化的完整講解

    序列化是將對(duì)象轉(zhuǎn)換成二進(jìn)制字節(jié)流的過(guò)程;反序列化是從二進(jìn)制字節(jié)流中恢復(fù)對(duì)象的過(guò)程。文中將為大家詳細(xì)講講二者的原理與實(shí)現(xiàn),需要的可以參考一下
    2022-11-11
  • 解決javaBean規(guī)范導(dǎo)致json傳參首字母大寫(xiě)將永遠(yuǎn)獲取不到問(wèn)題

    解決javaBean規(guī)范導(dǎo)致json傳參首字母大寫(xiě)將永遠(yuǎn)獲取不到問(wèn)題

    這篇文章主要介紹了解決javaBean規(guī)范導(dǎo)致json傳參首字母大寫(xiě)將永遠(yuǎn)獲取不到問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • Spring實(shí)戰(zhàn)之FileSystemResource加載資源文件示例

    Spring實(shí)戰(zhàn)之FileSystemResource加載資源文件示例

    這篇文章主要介紹了Spring實(shí)戰(zhàn)之FileSystemResource加載資源文件,結(jié)合實(shí)例形式分析了spring FileSystemResource加載xml資源文件的具體實(shí)現(xiàn)步驟與相關(guān)操作技巧,需要的朋友可以參考下
    2019-12-12
  • IDEA Ultimate2020.2版本配置Tomcat詳細(xì)教程

    IDEA Ultimate2020.2版本配置Tomcat詳細(xì)教程

    這篇文章主要介紹了IDEA Ultimate2020.2版本配置Tomcat教程,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-09-09
  • 解決feign調(diào)用接口不穩(wěn)定的問(wèn)題

    解決feign調(diào)用接口不穩(wěn)定的問(wèn)題

    這篇文章主要介紹了解決feign調(diào)用接口不穩(wěn)定的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2020-09-09
  • StackTraceElement獲取方法調(diào)用棧信息實(shí)例詳解

    StackTraceElement獲取方法調(diào)用棧信息實(shí)例詳解

    這篇文章主要介紹了StackTraceElement獲取方法調(diào)用棧信息實(shí)例詳解,分享了相關(guān)代碼示例,小編覺(jué)得還是挺不錯(cuò)的,具有一定借鑒價(jià)值,需要的朋友可以參考下
    2018-02-02
  • Springboot如何使用mybatis實(shí)現(xiàn)攔截SQL分頁(yè)

    Springboot如何使用mybatis實(shí)現(xiàn)攔截SQL分頁(yè)

    這篇文章主要介紹了Springboot使用mybatis實(shí)現(xiàn)攔截SQL分頁(yè),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-06-06

最新評(píng)論