欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java中RabbitMQ延遲隊列實現(xiàn)詳解

 更新時間:2023年09月20日 10:13:11   作者:CD4356  
這篇文章主要介紹了Java中RabbitMQ延遲隊列實現(xiàn)詳解,消息過期后,根據(jù)routing-key的不同,又會被死信交換機路由到不同的死信隊列中,消費者只需要監(jiān)聽對應的死信隊列進行消費即可,需要的朋友可以參考下

一、RabbitMQ延遲隊列實現(xiàn)

1.1、RabbitMQ延遲隊列實現(xiàn)流程

cd

  1. 生產(chǎn)者生產(chǎn)一條延遲消息,根據(jù)延遲時間的不同,利用不同的routing-key將消息路由到不同的延遲隊列,每個隊列都設置了不同的 TTL 屬性 ( TTL ( Time To Live ) 生存時間 ),并綁定到同一個死信交換機中。
  2. 消息過期后,根據(jù)routing-key的不同,又會被死信交換機路由到不同的死信隊列中,消費者只需要監(jiān)聽對應的死信隊列進行消費即可。

1.2、配置RabbitMQ連接

#[ RabbitMQ相關配置 ]
#rabbitmq服務器IP
spring.rabbitmq.host=安裝RabbitMQ的服務器IP
#rabbitmq服務器端口(默認為5672)
spring.rabbitmq.port=5672
#用戶名
spring.rabbitmq.username=guest
#用戶密碼
spring.rabbitmq.password=guest
#虛擬主機(一個RabbitMQ服務可以配置多個虛擬主機,每一個虛擬機主機之間是相互隔離,相互獨立的,授權(quán)用戶到指定的virtual-host就可以發(fā)送消息到指定隊列)
#vhost虛擬主機地址( 默認為/ )
spring.rabbitmq.virtual-host=/

1.3、創(chuàng)建配置類

配置兩個交換機、四個隊列、以及根據(jù)路由鍵配置交換機和隊列的綁定關系

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfiguration {
    //延遲交換機
    public static final String DELAY_EXCHANGE = "delay_exchange";
    //延遲隊列A
    public static final String DELAY_QUEUE_A = "delay_queue_a";
    //延遲隊列B
    public static final String DELAY_QUEUE_B = "delay_queue_b";
    //延遲路由鍵10S
    public static final String DELAY_QUEUE_10S_ROUTING_KEY = "delay_queue_10s_routing_key";
    //延遲路由鍵60S
    public static final String DELAY_QUEUE_60S_ROUTING_KEY = "delay_queue_60s_routing_key";
    //死信交換機
    public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange";
    //死信隊列A
    public static final String DEAD_LETTER_QUEUE_A = "dead_letter_queue_a";
    //死信隊列B
    public static final String DEAD_LETTER_QUEUE_B = "dead_letter_queue_b";
    //死信路由鍵10S
    public static final String DEAD_LETTER_QUEUE_10S_ROUTING_KEY = "dead_letter_queue_10s_routing_key";
    //死信路由鍵60S
    public static final String DEAD_LETTER_QUEUE_60S_ROUTING_KEY = "dead_letter_queue_60s_routing_key";
    //延遲交換機
    @Bean("delayExchange")
    public DirectExchange delayExchange(){
        return new DirectExchange(DELAY_EXCHANGE, true, false);
    }
    //延遲隊列A
    @Bean("delayQueueA")
    public Queue delayQueueA(){
        Map<String, Object> args = new HashMap<>();
        //設置延遲隊列綁定的死信交換機
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        //設置延遲隊列綁定的死信路由鍵
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_10S_ROUTING_KEY);
        //設置延遲隊列的 TTL 消息存活時間
        args.put("x-message-ttl", 10*1000);
        return new Queue(DELAY_QUEUE_A, true, false, false, args);
    }
    //延遲隊列B
    @Bean("delayQueueB")
    public Queue delayQueueB(){
        Map<String, Object> args = new HashMap<>();
        //設置延遲隊列綁定的死信交換機
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        //設置延遲隊列綁定的死信路由鍵
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_60S_ROUTING_KEY);
        //設置延遲隊列的 TTL 消息存活時間
        args.put("x-message-ttl", 60*1000);
        return new Queue(DELAY_QUEUE_B, true, false, false, args);
    }
    //延遲隊列A的綁定關系
    @Bean("delayBindingA")
    public Binding delayBindingA(@Qualifier("delayQueueA")Queue queue,
                                 @Qualifier("delayExchange")DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_10S_ROUTING_KEY);
    }
    //延遲隊列B的綁定關系
    @Bean("delayBindingB")
    public Binding delayBindingB(@Qualifier("delayQueueB")Queue queue,
                                 @Qualifier("delayExchange")DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_60S_ROUTING_KEY);
    }
    //死信交換機
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE, true, false);
    }
    //死信隊列A
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUE_A, true);
    }
    //死信隊列B
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB(){
        return new Queue(DEAD_LETTER_QUEUE_B, true);
    }
    //死信隊列A的綁定關系
    @Bean("deadLetterBindingA")
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA")Queue queue,
                                 @Qualifier("deadLetterExchange")DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_10S_ROUTING_KEY);
    }
    //死信隊列B的綁定關系
    @Bean("deadLetterBindingB")
    public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB")Queue queue,
                                      @Qualifier("deadLetterExchange")DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_60S_ROUTING_KEY);
    }
}

1.4、創(chuàng)建一個枚舉類來配置延遲類型

@Getter
@AllArgsConstructor
public enum DelayTypeEnum {
    //10s
    DELAY_10s(1),
    //60s
    DELAY_60s(2);
    private Integer type;
    /**
     * 延遲類型
     * @param type
     * @return 延遲類型
     */
    public static DelayTypeEnum getDelayTypeEnum(Integer type){
        if(Objects.equals(type, DELAY_10s.type)){
            return DELAY_10s;
        }
        if(Objects.equals(type, DELAY_60s.type)){
            return DELAY_60s;
        }
        return null;
    }
}

1.5、創(chuàng)建生產(chǎn)者類發(fā)送消息

import com.cd.springbootrabbitmq.enums.DelayTypeEnum;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_EXCHANGE;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_QUEUE_10S_ROUTING_KEY;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_QUEUE_60S_ROUTING_KEY;
/**
 * 延遲消息生產(chǎn)者
 */
@Component
public class DelayMessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 發(fā)送延遲消息
     * @param message  要發(fā)送的消息
     * @param type  延遲類型(延時10s的延遲隊列 或 延時60s的延遲隊列)
     */
    public void sendDelayMessage(String message, DelayTypeEnum type){
        switch (type){
            case DELAY_10s:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_10S_ROUTING_KEY, message);
                break;
            case DELAY_60s:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_60S_ROUTING_KEY, message);
                break;
            default:
                break;
        }
    }
}

1.6、創(chuàng)建消費者類消費消息

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_A;
import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_B;
@Slf4j
@Component
public class DeadLetterQueueConsumer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 監(jiān)聽死信隊列A
     * @param message  接收的信息
     */
    //@RabbitListener(queues = "dead_letter_queue_a")
    @RabbitListener(queues = DEAD_LETTER_QUEUE_A)
    public void receiveA(Message message) {
        String msg = new String(message.getBody());
        // 記錄日志
        log.info("當前時間:{},死信隊列A收到的消息:{}", LocalDateTime.now(), msg);
    }
    /**
     * 監(jiān)聽死信隊列B
     * @param message  接收的信息
     */
    //@RabbitListener(queues = "dead_letter_queue_b")
    @RabbitListener(queues = DEAD_LETTER_QUEUE_B)
    public void receiveB(Message message){
        String msg = new String(message.getBody());
        // 記錄日志
        log.info("當前時間:{},死信隊列B收到的消息:{}", LocalDateTime.now(), msg);
    }
}

1.7、創(chuàng)建控制類

import com.cd.springbootrabbitmq.enums.DelayTypeEnum;
import com.cd.springbootrabbitmq.producer.DelayMessageProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.util.Objects;
@Slf4j
@RestController
@RequestMapping("/rabbitmq")
public class RabbitMQController {
    @Autowired
    private DelayMessageProducer producer;
    @RequestMapping("/send")
    public void send(String message, Integer delayType){
        // 記錄日志
        log.info("當前時間:{},消息:{},延遲類型:{}", LocalDateTime.now(), message, delayType);
        // 發(fā)送延遲消息
        producer.sendDelayMessage(message, Objects.requireNonNull(DelayTypeEnum.getDelayTypeEnum(delayType)));
    }
}

1.8、測試

在瀏覽器中先后提交下面兩個請求:

1)localhost:8080/rabbitmq/send?message=測試自定義延遲處理60s&delayType=2

2)localhost:8080/rabbitmq/send?message=測試自定義延遲處理10s&delayType=1

查看idea控制臺:

cd

到此這篇關于Java中RabbitMQ延遲隊列實現(xiàn)詳解的文章就介紹到這了,更多相關RabbitMQ延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • 老生常談Scanner的基本用法

    老生常談Scanner的基本用法

    下面小編就為大家?guī)硪黄仙U凷canner的基本用法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-07-07
  • Java數(shù)據(jù)結(jié)構(gòu) 遞歸之迷宮回溯案例講解

    Java數(shù)據(jù)結(jié)構(gòu) 遞歸之迷宮回溯案例講解

    這篇文章主要介紹了Java數(shù)據(jù)結(jié)構(gòu)遞歸之迷宮回溯案例講解,本篇文章通過簡要的案例,講解了該項技術(shù)的了解與使用,以下就是詳細內(nèi)容,需要的朋友可以參考下
    2021-08-08
  • 使用Java獲取系統(tǒng)信息的常用代碼整理總結(jié)

    使用Java獲取系統(tǒng)信息的常用代碼整理總結(jié)

    這篇文章主要介紹了使用Java獲取系統(tǒng)信息的常用代碼整理總結(jié),在服務器端一般經(jīng)常能夠用到,歡迎收藏,需要的朋友可以參考下
    2015-11-11
  • spring?boot?3使用?elasticsearch?提供搜索建議的實例詳解

    spring?boot?3使用?elasticsearch?提供搜索建議的實例詳解

    這篇文章主要介紹了spring?boot3使用elasticsearch提供搜索建議,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-08-08
  • Spring boot的上傳圖片功能實例詳解

    Spring boot的上傳圖片功能實例詳解

    Spring Boot是由Pivotal團隊提供的全新框架,其設計目的是用來簡化新Spring應用的初始搭建以及開發(fā)過程。這篇文章主要介紹了Spring boot 上傳圖片,需要的朋友可以參考下
    2018-03-03
  • springboot+vue+elementsUI實現(xiàn)分角色注冊登錄界面功能

    springboot+vue+elementsUI實現(xiàn)分角色注冊登錄界面功能

    這篇文章主要給大家介紹了關于springboot+vue+elementsUI實現(xiàn)分角色注冊登錄界面功能的相關資料,Spring?Boot和Vue.js是兩個非常流行的開源框架,可以用來構(gòu)建Web應用程序,需要的朋友可以參考下
    2023-07-07
  • 聊聊mybatis sql的括號問題

    聊聊mybatis sql的括號問題

    這篇文章主要介紹了mybatis sql的括號問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • 深入理解Java并發(fā)編程之ThreadLocal

    深入理解Java并發(fā)編程之ThreadLocal

    本文主要介紹了Java并發(fā)編程之ThreadLocal,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-08-08
  • 指定jdk啟動jar包的方法總結(jié)

    指定jdk啟動jar包的方法總結(jié)

    這篇文章主要給大家總結(jié)介紹了關于指定jdk啟動jar包的方法,文中通過實例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2023-07-07
  • 解決Lombok使用@Builder無法build父類屬性的問題

    解決Lombok使用@Builder無法build父類屬性的問題

    這篇文章主要介紹了解決Lombok使用@Builder無法build父類屬性的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-09-09

最新評論