rabbitmq延遲隊列的使用方式
rabbitmq延遲隊列的使用
1、場景:
1.定時發(fā)布文章
2.秒殺之后,給30分鐘時間進行支付,如果30分鐘后,沒有支付,訂單取消。
3.預約餐廳,提前半個小時發(fā)短信通知用戶。
A -> 13:00 17:00 16:30 延遲時間: 730 * 60 * 1000
B -> 11:00 18:00 17:30 延遲時間: 1330 * 60 * 1000
C -> 8:00 14:00 13:30 延遲時間: 11*30 * 60 * 1000
第一種方式:創(chuàng)建具有超時功能且綁定死信交換機的消息隊列
@Bean public Queue directQueueLong(){ return QueueBuilder.durable("業(yè)務隊列名稱") .deadLetterExchange("死信交換機名稱") .deadLetterRoutingKey("死信隊列 RoutingKey") .ttl(20000) // 消息停留時間 //.maxLength(500) .build(); }
監(jiān)聽死信隊列,即可處理超時的消息隊列
缺點:
上述實現(xiàn)方式中,ttl延時隊列中所有的消息超時時間都是一樣的,如果不同消息想設置不一樣的超時時間,就需要建立多個不同超時時間的消息隊列,比較麻煩,且不利于維護。
第二種方式:創(chuàng)建通用延時消息
rabbitTemplate.convertAndSend("交換機名稱", "RoutingKey","對象", message => { message.getMessageProperties().setExpiration(String.valueOf(5000)) return message; } );
缺點:
該種方式可以創(chuàng)建一個承載不同超時時間消息的消息隊列,但是這種方式有一個問題,如果消息隊列中排在前面的消息沒有到超時時間,即使后面的消息到了超時時間,先到超時時間的消息也不會進入死信隊列,而是先檢查排在最前面的消息隊列是否到了超時時間,如果到了超時時間才會繼續(xù)檢查后面的消息。
第三種方式:使用rabbitmq的延時隊列插件,實現(xiàn)同一個隊列中有多個不同超時時間的消息,并按時間超時順序出隊
1、下載延遲插件
在 RabbitMQ 的 3.5.7 版本之后,提供了一個插件(rabbitmq-delayed-message-exchange)來實現(xiàn)延遲隊列 ,同時需保證 Erlang/OPT 版本為 18.0 之后。
我這里 MQ 的版本是 3.10.0 現(xiàn)在去 GitHub 上根據(jù)版本號下載插件
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
2、安裝插件并啟用
我用的是 Docker 客戶端,下載完成后直接把插件放在 /opt/rabbitmq 目錄,然后拷貝到容器內(nèi)plugins目錄下(rabbitmq是容器的name,也可以使用容器id)
docker cp /opt/rabbitmq/rabbitmq_delayed_message_exchange-3.10.0.ez rabbitmq:/plugins
進入 Docker 容器
docker exec -it rabbitmq /bin/bash
在plugins內(nèi)啟用插件
#先執(zhí)行,解除防火墻限制,增加文件權限 cd plugins umask 0022 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
退出容器
exit
重啟 RabbitMQ
docker restart rabbitmq
通過UI查看
原理
代碼使用
消費者
/* * Copyright (c) 2020, 2024, fpl1116.cn All rights reserved. * */ package com.fpl.consumers; import cn.hutool.core.map.MapUtil; import com.fpl.model.OrderingOk; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Map; /** * <p>Project: spring-rabbitmq - DelayConsumer</p> * <p>Powered by fpl1116 On 2024-04-09 11:34:07</p> * <p>描述:<p> * * @author penglei * @version 1.0 * @since 1.8 */ @Configuration @Slf4j public class DelayConsumer { @Bean public Queue delayQueue1(){ return QueueBuilder.durable("Delay_Q01").lazy().build(); } @Bean public CustomExchange delayExchange(){ //參數(shù)x-delayed-type Map<String, Object> map = MapUtil.of("x-delayed-type","direct"); return new CustomExchange("Delay_E01","x-delayed-message",true,false,map); } @Bean public Binding binding1(Queue delayQueue1, CustomExchange delayExchange){ return BindingBuilder.bind(delayQueue1).to(delayExchange).with("RK01").noargs(); } // @RabbitListener(queues = "Delay_Q01") public void receiveMessage(OrderingOk msg) { log.info("消費者1 收到消息:"+ msg ); } }
生產(chǎn)者
/* * Copyright (c) 2020, 2024, fpl1116.cn All rights reserved. * */ package com.fpl.provider; import com.fpl.model.OrderingOk; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * <p>Project: spring-rabbitmq - DelayProvider</p> * <p>Powered by fpl1116 On 2024-04-09 11:35:51</p> * <p>描述:<p> * * @author penglei * @version 1.0 * @since 1.8 */ @Service public class DelayProvider { @Autowired private RabbitTemplate rabbitTemplate; public void send(OrderingOk orderingOk) { rabbitTemplate.convertAndSend("Delay_E01", "RK01", orderingOk,new MessagePostProcessor(){ @Override public Message postProcessMessage(Message message) throws AmqpException { int id = orderingOk.getId(); int ttl = 0; if(id == 1){ ttl = 50*1000; }else if(id == 2){ ttl = 30*1000; }else if(id ==3){ ttl = 40*1000; }else if(id ==4){ ttl = 10*1000; }else if(id ==5){ ttl = 20*1000; } //延遲交換機使用的delay參數(shù),設置消息的延期時長,單位是微妙 message.getMessageProperties().setDelay(ttl); //延遲交換機消息默認是持久化的 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); return message; } }); } }
測試
@Test void test5() throws IOException { for (int i = 1; i <=5;i++){ OrderingOk orderingOk = OrderingOk.builder().id(i).name("張 " + i).build(); delayProvider.send(orderingOk); System.out.println("發(fā)送成功:"+i); } System.in.read(); }
到此這篇關于rabbitmq延遲隊列的使用的文章就介紹到這了,更多相關rabbitmq延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Springcloud-nacos實現(xiàn)配置和注冊中心的方法
這篇文章主要介紹了Springcloud-nacos實現(xiàn)配置和注冊中心的方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-07-07使用mybatis切片實現(xiàn)數(shù)據(jù)權限控制的操作流程
數(shù)據(jù)權限控制需要對查詢出的數(shù)據(jù)進行篩選,對業(yè)務入侵最少的方式就是利用mybatis或者數(shù)據(jù)庫連接池的切片對已有業(yè)務的sql進行修改,本文給大家介紹了使用mybatis切片實現(xiàn)數(shù)據(jù)權限控制的操作流程,需要的朋友可以參考下2024-07-07