Java中RabbitMQ延遲隊列實現(xiàn)詳解
一、RabbitMQ延遲隊列實現(xiàn)
1.1、RabbitMQ延遲隊列實現(xiàn)流程
- 生產(chǎn)者生產(chǎn)一條延遲消息,根據(jù)延遲時間的不同,利用不同的routing-key將消息路由到不同的延遲隊列,每個隊列都設置了不同的 TTL 屬性 ( TTL ( Time To Live ) 生存時間 ),并綁定到同一個死信交換機中。
- 消息過期后,根據(jù)routing-key的不同,又會被死信交換機路由到不同的死信隊列中,消費者只需要監(jiān)聽對應的死信隊列進行消費即可。
1.2、配置RabbitMQ連接
#[ RabbitMQ相關配置 ] #rabbitmq服務器IP spring.rabbitmq.host=安裝RabbitMQ的服務器IP #rabbitmq服務器端口(默認為5672) spring.rabbitmq.port=5672 #用戶名 spring.rabbitmq.username=guest #用戶密碼 spring.rabbitmq.password=guest #虛擬主機(一個RabbitMQ服務可以配置多個虛擬主機,每一個虛擬機主機之間是相互隔離,相互獨立的,授權(quán)用戶到指定的virtual-host就可以發(fā)送消息到指定隊列) #vhost虛擬主機地址( 默認為/ ) spring.rabbitmq.virtual-host=/
1.3、創(chuàng)建配置類
配置兩個交換機、四個隊列、以及根據(jù)路由鍵配置交換機和隊列的綁定關系
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class RabbitMQConfiguration { //延遲交換機 public static final String DELAY_EXCHANGE = "delay_exchange"; //延遲隊列A public static final String DELAY_QUEUE_A = "delay_queue_a"; //延遲隊列B public static final String DELAY_QUEUE_B = "delay_queue_b"; //延遲路由鍵10S public static final String DELAY_QUEUE_10S_ROUTING_KEY = "delay_queue_10s_routing_key"; //延遲路由鍵60S public static final String DELAY_QUEUE_60S_ROUTING_KEY = "delay_queue_60s_routing_key"; //死信交換機 public static final String DEAD_LETTER_EXCHANGE = "dead_letter_exchange"; //死信隊列A public static final String DEAD_LETTER_QUEUE_A = "dead_letter_queue_a"; //死信隊列B public static final String DEAD_LETTER_QUEUE_B = "dead_letter_queue_b"; //死信路由鍵10S public static final String DEAD_LETTER_QUEUE_10S_ROUTING_KEY = "dead_letter_queue_10s_routing_key"; //死信路由鍵60S public static final String DEAD_LETTER_QUEUE_60S_ROUTING_KEY = "dead_letter_queue_60s_routing_key"; //延遲交換機 @Bean("delayExchange") public DirectExchange delayExchange(){ return new DirectExchange(DELAY_EXCHANGE, true, false); } //延遲隊列A @Bean("delayQueueA") public Queue delayQueueA(){ Map<String, Object> args = new HashMap<>(); //設置延遲隊列綁定的死信交換機 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); //設置延遲隊列綁定的死信路由鍵 args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_10S_ROUTING_KEY); //設置延遲隊列的 TTL 消息存活時間 args.put("x-message-ttl", 10*1000); return new Queue(DELAY_QUEUE_A, true, false, false, args); } //延遲隊列B @Bean("delayQueueB") public Queue delayQueueB(){ Map<String, Object> args = new HashMap<>(); //設置延遲隊列綁定的死信交換機 args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); //設置延遲隊列綁定的死信路由鍵 args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_60S_ROUTING_KEY); //設置延遲隊列的 TTL 消息存活時間 args.put("x-message-ttl", 60*1000); return new Queue(DELAY_QUEUE_B, true, false, false, args); } //延遲隊列A的綁定關系 @Bean("delayBindingA") public Binding delayBindingA(@Qualifier("delayQueueA")Queue queue, @Qualifier("delayExchange")DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_10S_ROUTING_KEY); } //延遲隊列B的綁定關系 @Bean("delayBindingB") public Binding delayBindingB(@Qualifier("delayQueueB")Queue queue, @Qualifier("delayExchange")DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_60S_ROUTING_KEY); } //死信交換機 @Bean("deadLetterExchange") public DirectExchange deadLetterExchange(){ return new DirectExchange(DEAD_LETTER_EXCHANGE, true, false); } //死信隊列A @Bean("deadLetterQueueA") public Queue deadLetterQueueA(){ return new Queue(DEAD_LETTER_QUEUE_A, true); } //死信隊列B @Bean("deadLetterQueueB") public Queue deadLetterQueueB(){ return new Queue(DEAD_LETTER_QUEUE_B, true); } //死信隊列A的綁定關系 @Bean("deadLetterBindingA") public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA")Queue queue, @Qualifier("deadLetterExchange")DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_10S_ROUTING_KEY); } //死信隊列B的綁定關系 @Bean("deadLetterBindingB") public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB")Queue queue, @Qualifier("deadLetterExchange")DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_60S_ROUTING_KEY); } }
1.4、創(chuàng)建一個枚舉類來配置延遲類型
@Getter @AllArgsConstructor public enum DelayTypeEnum { //10s DELAY_10s(1), //60s DELAY_60s(2); private Integer type; /** * 延遲類型 * @param type * @return 延遲類型 */ public static DelayTypeEnum getDelayTypeEnum(Integer type){ if(Objects.equals(type, DELAY_10s.type)){ return DELAY_10s; } if(Objects.equals(type, DELAY_60s.type)){ return DELAY_60s; } return null; } }
1.5、創(chuàng)建生產(chǎn)者類發(fā)送消息
import com.cd.springbootrabbitmq.enums.DelayTypeEnum; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_EXCHANGE; import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_QUEUE_10S_ROUTING_KEY; import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DELAY_QUEUE_60S_ROUTING_KEY; /** * 延遲消息生產(chǎn)者 */ @Component public class DelayMessageProducer { @Autowired private RabbitTemplate rabbitTemplate; /** * 發(fā)送延遲消息 * @param message 要發(fā)送的消息 * @param type 延遲類型(延時10s的延遲隊列 或 延時60s的延遲隊列) */ public void sendDelayMessage(String message, DelayTypeEnum type){ switch (type){ case DELAY_10s: rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_10S_ROUTING_KEY, message); break; case DELAY_60s: rabbitTemplate.convertAndSend(DELAY_EXCHANGE, DELAY_QUEUE_60S_ROUTING_KEY, message); break; default: break; } } }
1.6、創(chuàng)建消費者類消費消息
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_A; import static com.cd.springbootrabbitmq.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_B; @Slf4j @Component public class DeadLetterQueueConsumer { @Autowired private RabbitTemplate rabbitTemplate; /** * 監(jiān)聽死信隊列A * @param message 接收的信息 */ //@RabbitListener(queues = "dead_letter_queue_a") @RabbitListener(queues = DEAD_LETTER_QUEUE_A) public void receiveA(Message message) { String msg = new String(message.getBody()); // 記錄日志 log.info("當前時間:{},死信隊列A收到的消息:{}", LocalDateTime.now(), msg); } /** * 監(jiān)聽死信隊列B * @param message 接收的信息 */ //@RabbitListener(queues = "dead_letter_queue_b") @RabbitListener(queues = DEAD_LETTER_QUEUE_B) public void receiveB(Message message){ String msg = new String(message.getBody()); // 記錄日志 log.info("當前時間:{},死信隊列B收到的消息:{}", LocalDateTime.now(), msg); } }
1.7、創(chuàng)建控制類
import com.cd.springbootrabbitmq.enums.DelayTypeEnum; import com.cd.springbootrabbitmq.producer.DelayMessageProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.util.Objects; @Slf4j @RestController @RequestMapping("/rabbitmq") public class RabbitMQController { @Autowired private DelayMessageProducer producer; @RequestMapping("/send") public void send(String message, Integer delayType){ // 記錄日志 log.info("當前時間:{},消息:{},延遲類型:{}", LocalDateTime.now(), message, delayType); // 發(fā)送延遲消息 producer.sendDelayMessage(message, Objects.requireNonNull(DelayTypeEnum.getDelayTypeEnum(delayType))); } }
1.8、測試
在瀏覽器中先后提交下面兩個請求:
1)localhost:8080/rabbitmq/send?message=測試自定義延遲處理60s&delayType=2
2)localhost:8080/rabbitmq/send?message=測試自定義延遲處理10s&delayType=1
查看idea控制臺:
到此這篇關于Java中RabbitMQ延遲隊列實現(xiàn)詳解的文章就介紹到這了,更多相關RabbitMQ延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Java數(shù)據(jù)結(jié)構(gòu) 遞歸之迷宮回溯案例講解
這篇文章主要介紹了Java數(shù)據(jù)結(jié)構(gòu)遞歸之迷宮回溯案例講解,本篇文章通過簡要的案例,講解了該項技術(shù)的了解與使用,以下就是詳細內(nèi)容,需要的朋友可以參考下2021-08-08使用Java獲取系統(tǒng)信息的常用代碼整理總結(jié)
這篇文章主要介紹了使用Java獲取系統(tǒng)信息的常用代碼整理總結(jié),在服務器端一般經(jīng)常能夠用到,歡迎收藏,需要的朋友可以參考下2015-11-11spring?boot?3使用?elasticsearch?提供搜索建議的實例詳解
這篇文章主要介紹了spring?boot3使用elasticsearch提供搜索建議,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2023-08-08springboot+vue+elementsUI實現(xiàn)分角色注冊登錄界面功能
這篇文章主要給大家介紹了關于springboot+vue+elementsUI實現(xiàn)分角色注冊登錄界面功能的相關資料,Spring?Boot和Vue.js是兩個非常流行的開源框架,可以用來構(gòu)建Web應用程序,需要的朋友可以參考下2023-07-07解決Lombok使用@Builder無法build父類屬性的問題
這篇文章主要介紹了解決Lombok使用@Builder無法build父類屬性的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-09-09