如何通過RabbitMq實現動態(tài)定時任務詳解
一、需求背景
定時任務的需求所謂是數不勝數,其中實現方式也是百花齊放,用得最多的大概率為Springboot中的 @Scheduled(cron = “0 0 1 1 * ?”) 注解,或者是定時任務XXL-JOB框架,這兩者我接觸的比較多,除此之外還有,Quartz 、elastic-job、但這兩個在分布式領域而言,和XXL-JOBB比較,XXL-JOB更為受歡迎。無論是這些框架或者是springboot自帶的定時任務組件,基本上都能滿足固定定時任務的需求。而我們今天討論的是動態(tài)定時任務的實現。
動態(tài)定時任務的需求其實在現實生活中隨處可見,如花費到期多久之后發(fā)送信息提醒用戶?時間間隔是多少。又或者客戶下單之后多久提醒商家發(fā)貨,提醒的頻率又是多少…。這樣的需求還有很多。今天我們針對此類需求進行探討。
二、方案思考
(1)需求大致分析
對于此類需求相比于傳統(tǒng)的定時任務無非多了可控性, 其可控性包括了定時任務開始和結束時間的可控性,周期性可控性,只要解決了這兩個問題,實際上此類的需求也就迎刃而解了。
(2)可嘗試的方案
前面提供的方案只做文字探索性描述。
2.1、 采用重寫Springboot 的定時框架,從數據庫中讀取cron表達式來實現可控性周期。
其本質是通過如下線程進行動態(tài)定時任務的創(chuàng)建,從而實現對應的周期可控性。
ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
其具體的細節(jié)不再說,其存在的痛點包括了
1 . 需要另外邏輯去實現可控性開始時間和結束時間
2. 此任務開啟的入參是corn表達式,需要另外的邏輯將其進行轉化,太過于猥瑣
2.2、采用時間線程池
時間線程池我忘記叫什么,他是可以指定開始時間,周期時間的,相對而言,比第一種方案來得更為直觀,其我考慮到的痛點如下。其實上面那種方案也是有這個痛點的。
- 多節(jié)點,多服務的服務部署情況下,無法實現高可用特性
- 需要編寫過多的邏輯來管理任務線程,如果不夠謹慎,有可能造成內存浪費。
2.3、采用延時操作
簡單言之,實際上只要實現了延時操作 便是實現了動態(tài)的開始時間以及周期性運行,可以利用其遞歸的概念實現所謂的動態(tài)周期。
redis 隊列來實現延時
redis的體量本身定位就不高,在數據量(任務量)過大時,對redis的壓力也很大,redis不一定扛得住。但其實通過redis來實現延時消息這樣的成功案列還是有很多的。在這里就不細說了。
RabbitMq實現延時消息。
通過MQ實現延時消息是本文的重點,在標題三會細說。
三、通過RabbitMQ實現延時任務并間接實現動態(tài)定時任務。
(1)通過死信的方式實現延時信息消費
通過創(chuàng)建死信隊列來實現延時任務,然后再通過遞歸思想實現對應的邏輯,就可以實現對應的動態(tài)延時任務,但是這個會存在以下下幾個痛點。
隊列順序消費
通過死信,我們確實可以動態(tài)的控制消息的消費時間,但是消息在隊列里面,如果隊列里面存在多個信息任務,前一個未到消費時間,后一個已經到了消費時間,這就好導致了,即使后面任務信息消費時間到了,卻沒法被消費的問題。解決方法,對隊列進行排序邏輯,但如果這樣做的話,就有點猥瑣了。
開銷過大。
對于通過死信來實現延時消息,網上有挺多優(yōu)秀的博客介紹,在此就不做說明了。
(2)通過MQ延時插件實現延時任務(重點)
使用延時插件需要MQ在3.6以上(實際上我在嘗試下載的時候并未發(fā)現git上有對應3.6的插件,所以還是選擇較高的版本比較好)。
四、MQ延時任務插件實現動態(tài)定時任務
(1)安裝延時插件
這里不做過多說明,重點在于編碼的實現,主要步驟如下。
去官網下載對應版本的插件,地址為下載地址
插件名字為rabbitmq_delayed_message_exchange
將插件放到MQ插件目錄下,然后cmd命令解壓網(網上有命令),然后重啟mq服務。大概就這樣的一個過程。
(2)編碼實現
創(chuàng)建隊列
這里只弄了對應的核心代碼,大致就是創(chuàng)建延時交換機,延時隊列,以及綁定器,對應的key,value如下
public static final String DELAY_EXCHANGE = "delay.exchange"; public static final String DELAY_ROUTE_KEY = "delay.routeKey"; public static final String DELAY_QUEUE = "delay.queue"; /** * 延時交換機 * @return 延時交換機 */ @Bean public CustomExchange delayExchange() { Map<String, Object> arguments = new HashMap<>(1); arguments.put("x-delayed-type", "direct"); return new CustomExchange(DELAY_EXCHANGE,"x-delayed-message",true,false,arguments); } /** * mq已經安裝了延時插件使用,否則得使用延時插件 * @return 延時發(fā)送隊列。 */ @Bean public Queue delayQueue() { return new Queue(DELAY_QUEUE,true,false,false); } /** * 延時綁定區(qū) * @return 延時綁定區(qū) */ @Bean public Binding delayBind() { return BindingBuilder.bind(this.delayQueue()).to(this.delayExchange()).with(DELAY_ROUTE_KEY).noargs(); }
生產者
這里寫得比較隨意,也直接使用了lombok,還直接用了 @Service ,有點草率,主要為了讓讀者看得清晰。還用了hutool工具類的JSONUtil。
可以清晰的看到主方法里面需要傳一個Integer類型的入參,這個時間我將其轉換成了秒,其MQ實際入參為毫秒,所以讀者不要被誤導。入參time通俗的講就是這個消息多久之后被消費。不需要在乎順序。
package com.linkyoyo.bill.mq.impl; import cn.hutool.core.util.ObjectUtil; import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.linkyoyo.bill.bo.WorkOrderDelaySenMailActionBO; import com.linkyoyo.bill.config.RabbitMQConfig; import com.linkyoyo.bill.mq.DelaySenderService; import lombok.AllArgsConstructor; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; /** * 延時發(fā)送 * @author 鄒 [295006967@qq.com] * @date 2022/1/4 20:33 */ @Slf4j @RequiredArgsConstructor @AllArgsConstructor @Service public class DelaySenderServiceImpl implements DelaySenderService { private final RabbitTemplate rabbitTemplate; @Override @Async public void sendMessageByDelay(JSONObject message, Integer time) { if(ObjectUtil.isNull(message) || ObjectUtil.isNull(time)) { return; } rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE, RabbitMQConfig.DELAY_ROUTE_KEY, message, msg -> { msg.getMessageProperties().setHeader("x-delay", time * 1000); return msg; }); log.info("延時發(fā)送成功:延時周期時間{}毫秒,消息內容為{}", time * 1000, message); } @Override public void sendMessageByDelay(WorkOrderDelaySenMailActionBO actionBO) { Integer afterSecond = actionBO.getAfterSecond(); if(ObjectUtil.isNull(afterSecond)) { afterSecond = 0; } this.sendMessageByDelay(JSONUtil.parseObj(actionBO), afterSecond); } }
消費者
消費者的demo不太好寫,只是做了一個簡單的偽代碼。 以定時任務發(fā)郵箱為例
1- 消費者線程開始,先執(zhí)行發(fā)郵箱任務
2- 發(fā)送完郵箱之后判斷是否還需要發(fā)郵箱,如果需要,就再通過生產者發(fā)送延時郵箱 此時可以指定下一次消費的時間,以此流程走下去便是一套動態(tài)任務的流程實現。可以參考后續(xù)的流程圖。
這樣就實現一個簡易的定時任務發(fā)送郵箱的邏輯
private final DelaySenderService delaySenderService; @RabbitHandler @RabbitListener(queues = RabbitMQConfig.DELAY_QUEUE) public void delayConsumer(Message message) { //業(yè)務邏輯 this.sendMail(workOrderDelaySenMailActionBO); // 判斷是否需要遞歸執(zhí)行定時任務(實際上就是使用生產者再發(fā)一次延時消息,確認下一次消費) if(需要進行定時任務) { this.sendDelayMessageToMq(workOrderDelaySenMailActionBO); } log.info("信息為:{}", message.getBody()); }
大致流程就這么多了,以下是整套步驟流閉環(huán)程圖
(3)流程圖
總結
到此這篇關于如何通過RabbitMq實現動態(tài)定時任務的文章就介紹到這了,更多相關RabbitMq實現動態(tài)定時任務內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Spring?boot?admin?服務監(jiān)控利器詳解
這篇文章主要介紹了Spring?boot?admin?服務監(jiān)控利器詳解,文章圍繞主題展開詳細的內容介紹,具有一定的參考價值,需要的小伙伴可以參考一下2022-08-08springcloud gateway如何實現路由和負載均衡
這篇文章主要介紹了springcloud gateway如何實現路由和負載均衡的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07Java中動態(tài)地改變數組長度及數組轉Map的代碼實例分享
這篇文章主要介紹了Java中動態(tài)地改變數組長度及數組轉map的代碼分享,其中轉Map利用到了java.util.Map接口,需要的朋友可以參考下2016-03-03idea指定maven的settings文件不生效的問題解決
本文主要介紹了idea指定maven的settings文件不生效的問題解決,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-06-06slf4j?jcl?jul?log4j1?log4j2?logback各組件系統(tǒng)日志切換
這篇文章主要介紹了slf4j、jcl、jul、log4j1、log4j2、logback的大總結,各個組件的jar包以及目前系統(tǒng)日志需要切換實現方式的方法,有需要的朋友可以借鑒參考下2022-03-03