Rabbitmq延遲隊(duì)列實(shí)現(xiàn)定時(shí)任務(wù)的方法
場景
開發(fā)中經(jīng)常需要用到定時(shí)任務(wù),對(duì)于商城來說,定時(shí)任務(wù)尤其多,比如優(yōu)惠券定時(shí)過期、訂單定時(shí)關(guān)閉、微信支付2小時(shí)未支付關(guān)閉訂單等等,都需要用到定時(shí)任務(wù),但是定時(shí)任務(wù)本身有一個(gè)問題,一般來說我們都是通過定時(shí)輪詢查詢數(shù)據(jù)庫來判斷是否有任務(wù)需要執(zhí)行,也就是說不管怎么樣,我們需要先查詢數(shù)據(jù)庫,而且有些任務(wù)對(duì)時(shí)間準(zhǔn)確要求比較高的,需要每秒查詢一次,對(duì)于系統(tǒng)小倒是無所謂,如果系統(tǒng)本身就大而且數(shù)據(jù)也多的情況下,這就不大現(xiàn)實(shí)了,所以需要其他方式的,當(dāng)然實(shí)現(xiàn)的方式有多種多樣的,比如Redis實(shí)現(xiàn)定時(shí)隊(duì)列、基于優(yōu)先級(jí)隊(duì)列的JDK延遲隊(duì)列、時(shí)間輪等。因?yàn)槲覀冺?xiàng)目中本身就使用到了Rabbitmq,所以基于方便開發(fā)和維護(hù)的原則,我們使用了Rabbitmq延遲隊(duì)列來實(shí)現(xiàn)定時(shí)任務(wù),不知道rabbitmq是什么的和不知道springboot怎么集成Rabbitmq的可以查看我之前的文章 Spring boot集成RabbitMQ
Rabbitmq延遲隊(duì)列
Rabbitmq本身是沒有延遲隊(duì)列的,只能通過Rabbitmq本身隊(duì)列的特性來實(shí)現(xiàn),想要Rabbitmq實(shí)現(xiàn)延遲隊(duì)列,需要使用Rabbitmq的死信交換機(jī)(Exchange)和消息的存活時(shí)間TTL(Time To Live)
死信交換機(jī)
一個(gè)消息在滿足如下條件下,會(huì)進(jìn)死信交換機(jī),記住這里是交換機(jī)而不是隊(duì)列,一個(gè)交換機(jī)可以對(duì)應(yīng)很多隊(duì)列。
- 一個(gè)消息被Consumer拒收了,并且reject方法的參數(shù)里requeue是false。也就是說不會(huì)被再次放在隊(duì)列里,被其他消費(fèi)者使用。
- 上面的消息的TTL到了,消息過期了。
- 隊(duì)列的長度限制滿了。排在前面的消息會(huì)被丟棄或者扔到死信路由上。
死信交換機(jī)就是普通的交換機(jī),只是因?yàn)槲覀儼堰^期的消息扔進(jìn)去,所以叫死信交換機(jī),并不是說死信交換機(jī)是某種特定的交換機(jī)
消息TTL(消息存活時(shí)間)
消息的TTL就是消息的存活時(shí)間。RabbitMQ可以對(duì)隊(duì)列和消息分別設(shè)置TTL。對(duì)隊(duì)列設(shè)置就是隊(duì)列沒有消費(fèi)者連著的保留時(shí)間,也可以對(duì)每一個(gè)單獨(dú)的消息做單獨(dú)的設(shè)置。超過了這個(gè)時(shí)間,我們認(rèn)為這個(gè)消息就死了,稱之為死信。如果隊(duì)列設(shè)置了,消息也設(shè)置了,那么會(huì)取小的。所以一個(gè)消息如果被路由到不同的隊(duì)列中,這個(gè)消息死亡的時(shí)間有可能不一樣(不同的隊(duì)列設(shè)置)。這里單講單個(gè)消息的TTL,因?yàn)樗攀菍?shí)現(xiàn)延遲任務(wù)的關(guān)鍵。
byte[] messageBodyBytes = "Hello, world!".getBytes(); AMQP.BasicProperties properties = new AMQP.BasicProperties(); properties.setExpiration("60000"); channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);
可以通過設(shè)置消息的expiration字段或者x-message-ttl屬性來設(shè)置時(shí)間,兩者是一樣的效果。只是expiration字段是字符串參數(shù),所以要寫個(gè)int類型的字符串: 當(dāng)上面的消息扔到隊(duì)列中后,過了60秒,如果沒有被消費(fèi),它就死了。不會(huì)被消費(fèi)者消費(fèi)到。這個(gè)消息后面的,沒有“死掉”的消息對(duì)頂上來,被消費(fèi)者消費(fèi)。死信在隊(duì)列中并不會(huì)被刪除和釋放,它會(huì)被統(tǒng)計(jì)到隊(duì)列的消息數(shù)中去
處理流程圖
創(chuàng)建交換機(jī)(Exchanges)和隊(duì)列(Queues)
創(chuàng)建死信交換機(jī)
如圖所示,就是創(chuàng)建一個(gè)普通的交換機(jī),這里為了方便區(qū)分,把交換機(jī)的名字取為:delay
創(chuàng)建自動(dòng)過期消息隊(duì)列
這個(gè)隊(duì)列的主要作用是讓消息定時(shí)過期的,比如我們需要2小時(shí)候關(guān)閉訂單,我們就需要把消息放進(jìn)這個(gè)隊(duì)列里面,把消息過期時(shí)間設(shè)置為2小時(shí)
創(chuàng)建一個(gè)一個(gè)名為delay_queue1的自動(dòng)過期的隊(duì)列,當(dāng)然圖片上面的參數(shù)并不會(huì)讓消息自動(dòng)過期,因?yàn)槲覀儾]有設(shè)置x-message-ttl參數(shù),如果整個(gè)隊(duì)列的消息有消息都是相同的,可以設(shè)置,這里為了靈活,所以并沒有設(shè)置,另外兩個(gè)參數(shù)x-dead-letter-exchange代表消息過期后,消息要進(jìn)入的交換機(jī),這里配置的是delay,也就是死信交換機(jī),x-dead-letter-routing-key是配置消息過期后,進(jìn)入死信交換機(jī)的routing-key,跟發(fā)送消息的routing-key一個(gè)道理,根據(jù)這個(gè)key將消息放入不同的隊(duì)列
創(chuàng)建消息處理隊(duì)列
這個(gè)隊(duì)列才是真正處理消息的隊(duì)列,所有進(jìn)入這個(gè)隊(duì)列的消息都會(huì)被處理
消息隊(duì)列的名字為delay_queue2
消息隊(duì)列綁定到交換機(jī)
進(jìn)入交換機(jī)詳情頁面,將創(chuàng)建的2個(gè)隊(duì)列(delay queue1和delay queue2)綁定到交換機(jī)上面
自動(dòng)過期消息隊(duì)列的routing key 設(shè)置為delay
綁定delay queue2
delay queue2 的key要設(shè)置為創(chuàng)建自動(dòng)過期的隊(duì)列的x-dead-letter-routing-key參數(shù),這樣當(dāng)消息過期的時(shí)候就可以自動(dòng)把消息放入delay_queue2這個(gè)隊(duì)列中了
綁定后的管理頁面如下圖:
當(dāng)然這個(gè)綁定也可以使用代碼來實(shí)現(xiàn),只是為了直觀表現(xiàn),所以本文使用的管理平臺(tái)來操作
發(fā)送消息
String msg = "hello word"; MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration("6000"); messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes()); Message message = new Message(msg.getBytes(), messageProperties); rabbitTemplate.convertAndSend("delay", "delay",message);
主要的代碼就是
messageProperties.setExpiration("6000");
設(shè)置了讓消息6秒后過期
注意:因?yàn)橐屜⒆詣?dòng)過期,所以一定不能設(shè)置delay_queue1的監(jiān)聽,不能讓這個(gè)隊(duì)列里面的消息被接受到,否則消息一旦被消費(fèi),就不存在過期了
接收消息
接收消息配置好delay_queue2的監(jiān)聽就好了
package wang.raye.rabbitmq.demo1; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DelayQueue { /** 消息交換機(jī)的名字*/ public static final String EXCHANGE = "delay"; /** 隊(duì)列key1*/ public static final String ROUTINGKEY1 = "delay"; /** 隊(duì)列key2*/ public static final String ROUTINGKEY2 = "delay_key"; /** * 配置鏈接信息 * @return */ @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory("120.76.237.8",5672); connectionFactory.setUsername("kberp"); connectionFactory.setPassword("kberp"); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); // 必須要設(shè)置 return connectionFactory; } /** * 配置消息交換機(jī) * 針對(duì)消費(fèi)者配置 FanoutExchange: 將消息分發(fā)到所有的綁定隊(duì)列,無routingkey的概念 HeadersExchange :通過添加屬性key-value匹配 DirectExchange:按照routingkey分發(fā)到指定隊(duì)列 TopicExchange:多關(guān)鍵字匹配 */ @Bean public DirectExchange defaultExchange() { return new DirectExchange(EXCHANGE, true, false); } /** * 配置消息隊(duì)列2 * 針對(duì)消費(fèi)者配置 * @return */ @Bean public Queue queue() { return new Queue("delay_queue2", true); //隊(duì)列持久 } /** * 將消息隊(duì)列2與交換機(jī)綁定 * 針對(duì)消費(fèi)者配置 * @return */ @Bean @Autowired public Binding binding() { return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2); } /** * 接受消息的監(jiān)聽,這個(gè)監(jiān)聽會(huì)接受消息隊(duì)列1的消息 * 針對(duì)消費(fèi)者配置 * @return */ @Bean @Autowired public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory()); container.setQueues(queue()); container.setExposeListenerChannel(true); container.setMaxConcurrentConsumers(1); container.setConcurrentConsumers(1); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設(shè)置確認(rèn)模式手工確認(rèn) container.setMessageListener(new ChannelAwareMessageListener() { public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception { byte[] body = message.getBody(); System.out.println("delay_queue2 收到消息 : " + new String(body)); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認(rèn)消息成功消費(fèi) } }); return container; } }
在消息監(jiān)聽中處理需要定時(shí)處理的任務(wù)就好了,因?yàn)镽abbitmq能發(fā)送消息,所以可以把任務(wù)特征碼發(fā)過來,比如關(guān)閉訂單就把訂單id發(fā)過來,這樣就避免了需要查詢一下那些訂單需要關(guān)閉而加重MySQL負(fù)擔(dān)了,畢竟一旦訂單量大的話,查詢本身也是一件很費(fèi)IO的事情
總結(jié)
基于Rabbitmq實(shí)現(xiàn)定時(shí)任務(wù),就是將消息設(shè)置一個(gè)過期時(shí)間,放入一個(gè)沒有讀取的隊(duì)列中,讓消息過期后自動(dòng)轉(zhuǎn)入另外一個(gè)隊(duì)列中,監(jiān)控這個(gè)隊(duì)列消息的監(jiān)聽處來處理定時(shí)任務(wù)具體的操作
以上就是本文的全部內(nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- RabbitMQ 實(shí)現(xiàn)延遲隊(duì)列的兩種方式詳解
- RabbitMQ死信機(jī)制實(shí)現(xiàn)延遲隊(duì)列的實(shí)戰(zhàn)
- 手把手帶你掌握SpringBoot RabbitMQ延遲隊(duì)列
- 如何通過Python實(shí)現(xiàn)RabbitMQ延遲隊(duì)列
- RabbitMQ延遲隊(duì)列及消息延遲推送實(shí)現(xiàn)詳解
- Spring Boot與RabbitMQ結(jié)合實(shí)現(xiàn)延遲隊(duì)列的示例
- C#實(shí)現(xiàn)rabbitmq 延遲隊(duì)列功能實(shí)例代碼
- rabbitmq延遲隊(duì)列的使用方式
相關(guān)文章
SpringBoot快速入門及起步依賴解析(實(shí)例詳解)
SpringBoot?是由?Pivotal?團(tuán)隊(duì)提供的全新框架,其設(shè)計(jì)目的是用來簡化?Spring?應(yīng)用的初始搭建以及開發(fā)過程,這篇文章主要介紹了SpringBoot快速入門及起步依賴解析,需要的朋友可以參考下2022-10-10Java 中Timer和TimerTask 定時(shí)器和定時(shí)任務(wù)使用的例子
這篇文章主要介紹了Java 中Timer和TimerTask 定時(shí)器和定時(shí)任務(wù)使用的例子,非常具有實(shí)用價(jià)值,需要的朋友可以參考下2017-05-05Java手機(jī)號(hào)碼工具類示例詳解(判斷運(yùn)營商、獲取歸屬地)
這篇文章主要介紹了Java手機(jī)號(hào)碼工具類示例詳解,通過手機(jī)號(hào)碼來判斷運(yùn)營商獲取歸屬地,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-02-02Spring Cloud Gateway 內(nèi)存溢出的解決方案
這篇文章主要介紹了Spring Cloud Gateway 內(nèi)存溢出的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07Spring security如何重寫Filter實(shí)現(xiàn)json登錄
這篇文章主要介紹了Spring security 如何重寫Filter實(shí)現(xiàn)json登錄,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09