欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

RabbitMQ中的延遲隊(duì)列機(jī)制詳解

 更新時(shí)間:2023年09月20日 11:10:39   作者:煎丶包  
這篇文章主要介紹了RabbitMQ中的延遲隊(duì)列機(jī)制詳解,延時(shí)隊(duì)列內(nèi)部是有序的,最重要的特性就體現(xiàn)在它的延時(shí)屬性上,延時(shí)隊(duì)列中的元素是希望,在指定時(shí)間到了以后或之前取出和處理,簡(jiǎn)單來(lái)說(shuō),延時(shí)隊(duì)列就是用來(lái)存放需要在指定時(shí)間被處理的元素的隊(duì)列,需要的朋友可以參考下

一、延遲隊(duì)列

延時(shí)隊(duì)列內(nèi)部是有序的,最重要的特性就體現(xiàn)在它的延時(shí)屬性上,延時(shí)隊(duì)列中的元素是希望 在指定時(shí)間到了以后或之前取出和處理,簡(jiǎn)單來(lái)說(shuō),延時(shí)隊(duì)列就是用來(lái)存放需要在指定時(shí)間被處理的元素的隊(duì)列。

二、隊(duì)列TTL

在這里插入圖片描述

創(chuàng)建一個(gè)配置類(lèi),聲明并配置交換機(jī)和隊(duì)列

@Configuration
public class TtlQueueConfig {
    //普通交換機(jī)名稱(chēng)
    public static final String NORMAL_EXCHANGE = "X";
    //死信交換機(jī)名稱(chēng)
    public static final String DEAD_EXCHANGE = "Y";
    //普通隊(duì)列名稱(chēng)
    public static final String NORMAL_QUEUE_A = "QA";
    public static final String NORMAL_QUEUE_B = "QA";
    //死信隊(duì)列名稱(chēng)
    public static final String DEAD_QUEUE = "QD";
    //聲明普通交換機(jī)
    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(NORMAL_EXCHANGE);
    }
    //聲明死信交換機(jī)
    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(DEAD_EXCHANGE);
    }
    //聲明普通隊(duì)列,TTL為10s
    @Bean("QA")
    public Queue qA() {
        Map<String, Object> arguments = new HashMap<>();
        //設(shè)置死信交換機(jī)
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //設(shè)置死信RoutingKey
        arguments.put("x-dead-letter-routing-key", "YD");
        //設(shè)置TTL
        arguments.put("x-message-ttl", 10000);
        return QueueBuilder.durable(NORMAL_QUEUE_A).withArguments(arguments).build();
    }
    //聲明普通隊(duì)列,TTL為10s
    @Bean("QB")
    public Queue qB() {
        Map<String, Object> arguments = new HashMap<>();
        //設(shè)置死信交換機(jī)
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //設(shè)置死信RoutingKey
        arguments.put("x-dead-letter-routing-key", "YD");
        //設(shè)置TTL
        arguments.put("x-message-ttl", 40000);
        return QueueBuilder.durable(NORMAL_QUEUE_B).withArguments(arguments).build();
    }
    //聲明死信隊(duì)列
    @Bean("QD")
    public Queue qD() {
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }
    //綁定對(duì)應(yīng)的交換機(jī)和隊(duì)列
    @Bean
    public Binding queueABindingX(@Qualifier("QA") Queue QA,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(QA).to(xExchange).with("XA");
    }
    @Bean
    public Binding queueBBindingX(@Qualifier("QB") Queue QB,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(QB).to(xExchange).with("XB");
    }
    @Bean
    public Binding queueDBindingY(@Qualifier("QD") Queue QD,
                                  @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(QD).to(yExchange).with("YD");
    }
}

創(chuàng)建一個(gè)生產(chǎn)者

@Slf4j
@RestController
public class SendMsgController {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @GetMapping("/ttl/sendMsg/{message}")
    public void sendMsg(@PathVariable String message) {
        log.info("當(dāng)前時(shí)間:{}, 發(fā)送一條信息給兩個(gè)隊(duì)列:{}", new Date().toString(), message);
        rabbitTemplate.convertAndSend("X","XA","消息來(lái)自TTL為10s的隊(duì)列QA:" + message);
        rabbitTemplate.convertAndSend("X","XB","消息來(lái)自TTL為40s的隊(duì)列QB:" + message);
    }
}

創(chuàng)建一個(gè)消費(fèi)者

@Slf4j
@Component
public class DeadLetterQueueConsumer {
    //接收消息
    @RabbitListener(queues = "QD")
    public void receivedQD(Message message, Channel channel) {
        String msg = new String(message.getBody());
        log.info("當(dāng)前時(shí)間:{}, 收到死信隊(duì)列的消息:{}", new Date().toString(), message);
    }
}

瀏覽器發(fā)送消息

在這里插入圖片描述

消費(fèi)者分別過(guò)了10s和40s接收到了消息

在這里插入圖片描述

三、延遲隊(duì)列的優(yōu)化

不同的延遲時(shí)間需要設(shè)置不同的 TTL ,可以?xún)?yōu)化聲明一個(gè)通用的 QC 隊(duì)列,具體的延遲時(shí)間有生產(chǎn)者決定

在這里插入圖片描述

在配置類(lèi) TtlQueueConfig 中配置通用隊(duì)列 QC

    //通用隊(duì)列名稱(chēng)
    public static final String Generic_QUEUE_C = "QC";
    //聲明通用隊(duì)列
    @Bean("QC")
    public Queue qC() {
        Map<String, Object> arguments = new HashMap<>();
        //設(shè)置死信交換機(jī)
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //設(shè)置死信RoutingKey
        arguments.put("x-dead-letter-routing-key", "YD");
        //因?yàn)槭峭ㄓ藐?duì)列,所以不設(shè)置TTL,由生產(chǎn)者指定消息的TTL
        return QueueBuilder.durable(Generic_QUEUE_C).withArguments(arguments).build();
    }
    //綁定通用隊(duì)列和普通交換機(jī)
    @Bean
    public Binding queueCBindingX(@Qualifier("QC") Queue QC,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(QC).to(xExchange).with("XC");
    }
    //綁定通用隊(duì)列和死信交換機(jī)
    @Bean
    public Binding queueCBindingY(@Qualifier("QC") Queue QC,
                                  @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(QC).to(yExchange).with("YD");
    }

生產(chǎn)者發(fā)送消息,并指定 TTL 時(shí)長(zhǎng)

    //發(fā)送消息,并指定消息的TTL
    @GetMapping("/ttl/sendExpirationMsg/{message}/{ttlTime}")
    public void sendExpirationMsg(@PathVariable("message") String message, @PathVariable("ttlTime") String ttlTime) {
        log.info("當(dāng)前時(shí)間:{}, 發(fā)送一條TTL為{}ms的消息給隊(duì)列QC:{}", new Date().toString(), ttlTime, message);
        rabbitTemplate.convertAndSend("X", "XC", message, msg -> {
            //設(shè)置消息的TTL時(shí)長(zhǎng)
            msg.getMessageProperties().setExpiration(ttlTime);
            return msg;
        });
    }

發(fā)送兩條消息

在這里插入圖片描述

在這里插入圖片描述

消費(fèi)者接收消息

在這里插入圖片描述

但是,如果連續(xù)發(fā)送兩條消息,如果使用在消息屬性上設(shè)置 TTL 的方式,消息可能并不會(huì)按時(shí)“死亡“,因?yàn)?RabbitMQ 只會(huì)檢查第一個(gè)消息是否過(guò)期,如果過(guò)期則丟到死信隊(duì)列,如果第一個(gè)消息的延時(shí)時(shí)長(zhǎng)很長(zhǎng),而第二個(gè)消息的延時(shí)時(shí)長(zhǎng)很短,第二個(gè)消息并不會(huì)優(yōu)先得到執(zhí)行。結(jié)果會(huì)導(dǎo)致第二條消息消費(fèi)者收到時(shí)間有誤。

在這里插入圖片描述

四、基于 RabbitMQ 插件實(shí)現(xiàn)延遲隊(duì)列

如果不能實(shí)現(xiàn)在消息粒度上的 TTL ,并使其在設(shè)置的 TTL 時(shí)間及時(shí)死亡,就無(wú)法設(shè)計(jì)成一個(gè)通用的延時(shí)隊(duì)列??梢允褂没?RabbitMQ 插件來(lái)實(shí)現(xiàn)延遲隊(duì)列,從而解決這個(gè)問(wèn)題。

基于 RabbitMQ 插件實(shí)現(xiàn)延遲,是交換機(jī)實(shí)現(xiàn)延遲,而不再是隊(duì)列實(shí)現(xiàn)延遲

在這里插入圖片描述

在這里插入圖片描述

創(chuàng)建一個(gè)基于插件的延遲隊(duì)列配置類(lèi) DelayedQueueConfig

@Configuration
public class DelayedQueueConfig {
    //交換機(jī)名稱(chēng)
    public static final String DELAYED_EXCHANGE_NAME = "delayed_exchange";
    //隊(duì)列名稱(chēng)
    public static final String DELAYED_QUEUE_NAME = "delayed_queue";
    //routingKey
    public static final String DELAYED_ROUTING_KEY = "delayed_routingKey";
    //聲明交換機(jī)
    @Bean
    public CustomExchange delayedExchange() {
        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");  //設(shè)置延遲類(lèi)型
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
    }
    //聲明隊(duì)列
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }
    //綁定隊(duì)列和交換機(jī)
    @Bean
    public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,
                                                      @Qualifier("delayedExchange") CustomExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

創(chuàng)建生產(chǎn)者發(fā)送延遲消息

    //基于插件發(fā)送消息
    @GetMapping("/ttl/sendDelayedMsg/{message}/{delayedTime}")
    public void sendDelayedMsg(@PathVariable("message") String message, @PathVariable("delayedTime") Integer delayedTime) {
        log.info("當(dāng)前時(shí)間:{}, 發(fā)送一條時(shí)長(zhǎng)為{}ms的消息給延遲隊(duì)列delayed_queue:{}", new Date().toString(), delayedTime, message);
        rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> {
            //設(shè)置消息的延遲時(shí)長(zhǎng)
            msg.getMessageProperties().setDelay(delayedTime);
            return msg;
        });
    }

創(chuàng)建消費(fèi)者

@Slf4j
@Component
public class DelayedQueueConsumer {
    //監(jiān)聽(tīng)消息
    @RabbitListener(queues = {DelayedQueueConfig.DELAYED_QUEUE_NAME})
    public void receiveDelayQueue(Message message){
        String msg = new String(message.getBody());
        log.info("當(dāng)前時(shí)間:{}, 收到延遲隊(duì)列的消息:{}", new Date().toString(), msg);
    }
}

當(dāng)連續(xù)發(fā)送兩條不同延遲時(shí)長(zhǎng)的消息時(shí),消費(fèi)者會(huì)先接收到延遲時(shí)長(zhǎng)短的那條消息,再接收延遲時(shí)長(zhǎng)長(zhǎng)的那條消息。

在這里插入圖片描述

實(shí)現(xiàn)延遲隊(duì)列,一種是基于死信隊(duì)列的方式,一種是基于RabbitMQ插件的方式。

延時(shí)隊(duì)列在需要延時(shí)處理的場(chǎng)景下非常有用,使用 RabbitMQ 來(lái)實(shí)現(xiàn)延時(shí)隊(duì)列可以很好的利用RabbitMQ 的特性,如消息可靠發(fā)送、消息可靠投遞、死信隊(duì)列來(lái)保障消息至少被消費(fèi)一次以及未被正確處理的消息不會(huì)被丟棄。另外,通過(guò) RabbitMQ 集群的特性,可以很好的解決單點(diǎn)故障問(wèn)題,不會(huì)因?yàn)閱蝹€(gè)節(jié)點(diǎn)掛掉導(dǎo)致延時(shí)隊(duì)列不可用或者消息丟失。

到此這篇關(guān)于RabbitMQ中的延遲隊(duì)列機(jī)制詳解的文章就介紹到這了,更多相關(guān)RabbitMQ延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • springboot中如何使用自定義兩級(jí)緩存

    springboot中如何使用自定義兩級(jí)緩存

    本話(huà)題主要就是討論如何在springboot的基礎(chǔ)上,無(wú)縫集成ehcache和redis作為一二級(jí)緩存,并且實(shí)現(xiàn)緩存同步。
    2021-05-05
  • Java如何高效使用OpenCV圖像處理庫(kù)

    Java如何高效使用OpenCV圖像處理庫(kù)

    OpenCV是一個(gè)開(kāi)源的計(jì)算機(jī)視覺(jué)庫(kù),它提供了一系列豐富的圖像處理和計(jì)算機(jī)視覺(jué)算法,包括圖像讀取、顯示、濾波、特征檢測(cè)、目標(biāo)跟蹤等功能,這篇文章主要給大家介紹了關(guān)于Java如何高效使用OpenCV圖像處理庫(kù)的相關(guān)資料,需要的朋友可以參考下
    2024-03-03
  • java接口性能從20s優(yōu)化到500ms示例詳解

    java接口性能從20s優(yōu)化到500ms示例詳解

    這篇文章主要為大家介紹了java接口性能從20s優(yōu)化到500ms的操作技巧示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-07-07
  • jpa多數(shù)據(jù)源時(shí)Hibernate配置自動(dòng)生成表不生效的解決

    jpa多數(shù)據(jù)源時(shí)Hibernate配置自動(dòng)生成表不生效的解決

    這篇文章主要介紹了jpa多數(shù)據(jù)源時(shí)Hibernate配置自動(dòng)生成表不生效的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-02-02
  • 詳解Lombok的坑

    詳解Lombok的坑

    這篇文章主要介紹了詳解Lombok的坑,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-11-11
  • java equals和==的區(qū)別詳解

    java equals和==的區(qū)別詳解

    這篇文章主要介紹了java equals和==的區(qū)別詳解的相關(guān)資料,需要的朋友可以參考下
    2016-10-10
  • Java實(shí)現(xiàn)讀取Excel文件功能(EasyExcel初使用)

    Java實(shí)現(xiàn)讀取Excel文件功能(EasyExcel初使用)

    EasyExcel是一款基于Java語(yǔ)言的開(kāi)源Excel解析工具,可以幫助我們快速、高效地讀取和寫(xiě)入Excel文件,這篇文章主要給大家介紹了關(guān)于Java實(shí)現(xiàn)讀取Excel文件功能的相關(guān)資料,使用的是EasyExcel,需要的朋友可以參考下
    2024-07-07
  • 一文搞懂Java?ScheduledExecutorService的使用

    一文搞懂Java?ScheduledExecutorService的使用

    JUC包(java.util.concurrent)中提供了對(duì)定時(shí)任務(wù)的支持,即ScheduledExecutorService接口。本文主要對(duì)ScheduledExecutorService的使用進(jìn)行簡(jiǎn)單的介紹,需要的可以參考一下
    2022-11-11
  • SpringBoot實(shí)現(xiàn)Tomcat集群的會(huì)話(huà)管理功能

    SpringBoot實(shí)現(xiàn)Tomcat集群的會(huì)話(huà)管理功能

    在使用 Tomcat 集群時(shí),由于每個(gè) Tomcat 實(shí)例的 Session 存儲(chǔ)是獨(dú)立的,導(dǎo)致無(wú)法實(shí)現(xiàn) Session 的共享,這可能影響到用戶(hù)跨節(jié)點(diǎn)的訪問(wèn),為了實(shí)現(xiàn)跨 Tomcat 實(shí)例共享 Session,可以使用 Spring Session 配合 Redis 進(jìn)行集中式會(huì)話(huà)管理,需要的朋友可以參考下
    2024-12-12
  • Java數(shù)據(jù)結(jié)構(gòu)之簡(jiǎn)單的連接點(diǎn)(link)實(shí)現(xiàn)方法示例

    Java數(shù)據(jù)結(jié)構(gòu)之簡(jiǎn)單的連接點(diǎn)(link)實(shí)現(xiàn)方法示例

    這篇文章主要介紹了Java數(shù)據(jù)結(jié)構(gòu)之簡(jiǎn)單的連接點(diǎn)(link)實(shí)現(xiàn)方法,涉及java指針指向節(jié)點(diǎn)的相關(guān)使用技巧,需要的朋友可以參考下
    2017-10-10

最新評(píng)論