rabbitmq延遲隊列的使用方式
rabbitmq延遲隊列的使用
1、場景:
1.定時發(fā)布文章
2.秒殺之后,給30分鐘時間進(jìn)行支付,如果30分鐘后,沒有支付,訂單取消。
3.預(yù)約餐廳,提前半個小時發(fā)短信通知用戶。
A -> 13:00 17:00 16:30 延遲時間: 730 * 60 * 1000
B -> 11:00 18:00 17:30 延遲時間: 1330 * 60 * 1000
C -> 8:00 14:00 13:30 延遲時間: 11*30 * 60 * 1000
第一種方式:創(chuàng)建具有超時功能且綁定死信交換機的消息隊列
@Bean
public Queue directQueueLong(){
return QueueBuilder.durable("業(yè)務(wù)隊列名稱")
.deadLetterExchange("死信交換機名稱")
.deadLetterRoutingKey("死信隊列 RoutingKey")
.ttl(20000) // 消息停留時間
//.maxLength(500)
.build();
}監(jiān)聽死信隊列,即可處理超時的消息隊列
缺點:
上述實現(xiàn)方式中,ttl延時隊列中所有的消息超時時間都是一樣的,如果不同消息想設(shè)置不一樣的超時時間,就需要建立多個不同超時時間的消息隊列,比較麻煩,且不利于維護(hù)。
第二種方式:創(chuàng)建通用延時消息
rabbitTemplate.convertAndSend("交換機名稱", "RoutingKey","對象",
message => {
message.getMessageProperties().setExpiration(String.valueOf(5000))
return message;
}
);缺點:
該種方式可以創(chuàng)建一個承載不同超時時間消息的消息隊列,但是這種方式有一個問題,如果消息隊列中排在前面的消息沒有到超時時間,即使后面的消息到了超時時間,先到超時時間的消息也不會進(jìn)入死信隊列,而是先檢查排在最前面的消息隊列是否到了超時時間,如果到了超時時間才會繼續(xù)檢查后面的消息。
第三種方式:使用rabbitmq的延時隊列插件,實現(xiàn)同一個隊列中有多個不同超時時間的消息,并按時間超時順序出隊
1、下載延遲插件
在 RabbitMQ 的 3.5.7 版本之后,提供了一個插件(rabbitmq-delayed-message-exchange)來實現(xiàn)延遲隊列 ,同時需保證 Erlang/OPT 版本為 18.0 之后。
我這里 MQ 的版本是 3.10.0 現(xiàn)在去 GitHub 上根據(jù)版本號下載插件
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
2、安裝插件并啟用
我用的是 Docker 客戶端,下載完成后直接把插件放在 /opt/rabbitmq 目錄,然后拷貝到容器內(nèi)plugins目錄下(rabbitmq是容器的name,也可以使用容器id)
docker cp /opt/rabbitmq/rabbitmq_delayed_message_exchange-3.10.0.ez rabbitmq:/plugins
進(jìn)入 Docker 容器
docker exec -it rabbitmq /bin/bash
在plugins內(nèi)啟用插件
#先執(zhí)行,解除防火墻限制,增加文件權(quán)限 cd plugins umask 0022 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
退出容器
exit
重啟 RabbitMQ
docker restart rabbitmq
通過UI查看

原理

代碼使用
消費者
/*
* Copyright (c) 2020, 2024, fpl1116.cn All rights reserved.
*
*/
package com.fpl.consumers;
import cn.hutool.core.map.MapUtil;
import com.fpl.model.OrderingOk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
/**
* <p>Project: spring-rabbitmq - DelayConsumer</p>
* <p>Powered by fpl1116 On 2024-04-09 11:34:07</p>
* <p>描述:<p>
*
* @author penglei
* @version 1.0
* @since 1.8
*/
@Configuration
@Slf4j
public class DelayConsumer {
@Bean
public Queue delayQueue1(){
return QueueBuilder.durable("Delay_Q01").lazy().build();
}
@Bean
public CustomExchange delayExchange(){
//參數(shù)x-delayed-type
Map<String, Object> map = MapUtil.of("x-delayed-type","direct");
return new CustomExchange("Delay_E01","x-delayed-message",true,false,map);
}
@Bean
public Binding binding1(Queue delayQueue1, CustomExchange delayExchange){
return BindingBuilder.bind(delayQueue1).to(delayExchange).with("RK01").noargs();
}
// @RabbitListener(queues = "Delay_Q01")
public void receiveMessage(OrderingOk msg) {
log.info("消費者1 收到消息:"+ msg );
}
}生產(chǎn)者
/*
* Copyright (c) 2020, 2024, fpl1116.cn All rights reserved.
*
*/
package com.fpl.provider;
import com.fpl.model.OrderingOk;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* <p>Project: spring-rabbitmq - DelayProvider</p>
* <p>Powered by fpl1116 On 2024-04-09 11:35:51</p>
* <p>描述:<p>
*
* @author penglei
* @version 1.0
* @since 1.8
*/
@Service
public class DelayProvider {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(OrderingOk orderingOk) {
rabbitTemplate.convertAndSend("Delay_E01", "RK01", orderingOk,new MessagePostProcessor(){
@Override
public Message postProcessMessage(Message message) throws AmqpException {
int id = orderingOk.getId();
int ttl = 0;
if(id == 1){
ttl = 50*1000;
}else if(id == 2){
ttl = 30*1000;
}else if(id ==3){
ttl = 40*1000;
}else if(id ==4){
ttl = 10*1000;
}else if(id ==5){
ttl = 20*1000;
}
//延遲交換機使用的delay參數(shù),設(shè)置消息的延期時長,單位是微妙
message.getMessageProperties().setDelay(ttl);
//延遲交換機消息默認(rèn)是持久化的
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
return message;
}
});
}
}
測試
@Test
void test5() throws IOException {
for (int i = 1; i <=5;i++){
OrderingOk orderingOk = OrderingOk.builder().id(i).name("張 " + i).build();
delayProvider.send(orderingOk);
System.out.println("發(fā)送成功:"+i);
}
System.in.read();
}到此這篇關(guān)于rabbitmq延遲隊列的使用的文章就介紹到這了,更多相關(guān)rabbitmq延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Springcloud-nacos實現(xiàn)配置和注冊中心的方法
這篇文章主要介紹了Springcloud-nacos實現(xiàn)配置和注冊中心的方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-07-07
使用mybatis切片實現(xiàn)數(shù)據(jù)權(quán)限控制的操作流程
數(shù)據(jù)權(quán)限控制需要對查詢出的數(shù)據(jù)進(jìn)行篩選,對業(yè)務(wù)入侵最少的方式就是利用mybatis或者數(shù)據(jù)庫連接池的切片對已有業(yè)務(wù)的sql進(jìn)行修改,本文給大家介紹了使用mybatis切片實現(xiàn)數(shù)據(jù)權(quán)限控制的操作流程,需要的朋友可以參考下2024-07-07
springBoot service層事務(wù)控制的操作
這篇文章主要介紹了springBoot service層事務(wù)控制的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02

