欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

如何通過(guò)RabbitMq實(shí)現(xiàn)動(dòng)態(tài)定時(shí)任務(wù)詳解

 更新時(shí)間:2022年01月13日 15:59:30   作者:我也是禿頭猿猿  
工作中經(jīng)常會(huì)有定時(shí)任務(wù)的需求,常見(jiàn)的做法可以使用Timer、Quartz、Hangfire等組件,這次想嘗試下新的思路,使用RabbitMQ死信隊(duì)列的機(jī)制來(lái)實(shí)現(xiàn)定時(shí)任務(wù),下面這篇文章主要給大家介紹了關(guān)于如何通過(guò)RabbitMq實(shí)現(xiàn)動(dòng)態(tài)定時(shí)任務(wù)的相關(guān)資料,需要的朋友可以參考下

一、需求背景

定時(shí)任務(wù)的需求所謂是數(shù)不勝數(shù),其中實(shí)現(xiàn)方式也是百花齊放,用得最多的大概率為Springboot中的 @Scheduled(cron = “0 0 1 1 * ?”) 注解,或者是定時(shí)任務(wù)XXL-JOB框架,這兩者我接觸的比較多,除此之外還有,Quartz 、elastic-job、但這兩個(gè)在分布式領(lǐng)域而言,和XXL-JOBB比較,XXL-JOB更為受歡迎。無(wú)論是這些框架或者是springboot自帶的定時(shí)任務(wù)組件,基本上都能滿足固定定時(shí)任務(wù)的需求。而我們今天討論的是動(dòng)態(tài)定時(shí)任務(wù)的實(shí)現(xiàn)。

動(dòng)態(tài)定時(shí)任務(wù)的需求其實(shí)在現(xiàn)實(shí)生活中隨處可見(jiàn),如花費(fèi)到期多久之后發(fā)送信息提醒用戶?時(shí)間間隔是多少。又或者客戶下單之后多久提醒商家發(fā)貨,提醒的頻率又是多少…。這樣的需求還有很多。今天我們針對(duì)此類需求進(jìn)行探討。

二、方案思考

(1)需求大致分析

對(duì)于此類需求相比于傳統(tǒng)的定時(shí)任務(wù)無(wú)非多了可控性, 其可控性包括了定時(shí)任務(wù)開(kāi)始和結(jié)束時(shí)間的可控性,周期性可控性,只要解決了這兩個(gè)問(wèn)題,實(shí)際上此類的需求也就迎刃而解了。

(2)可嘗試的方案

前面提供的方案只做文字探索性描述。

2.1、 采用重寫(xiě)Springboot 的定時(shí)框架,從數(shù)據(jù)庫(kù)中讀取cron表達(dá)式來(lái)實(shí)現(xiàn)可控性周期。

其本質(zhì)是通過(guò)如下線程進(jìn)行動(dòng)態(tài)定時(shí)任務(wù)的創(chuàng)建,從而實(shí)現(xiàn)對(duì)應(yīng)的周期可控性。

ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();

其具體的細(xì)節(jié)不再說(shuō),其存在的痛點(diǎn)包括了

1 . 需要另外邏輯去實(shí)現(xiàn)可控性開(kāi)始時(shí)間和結(jié)束時(shí)間

2. 此任務(wù)開(kāi)啟的入?yún)⑹莄orn表達(dá)式,需要另外的邏輯將其進(jìn)行轉(zhuǎn)化,太過(guò)于猥瑣

2.2、采用時(shí)間線程池

時(shí)間線程池我忘記叫什么,他是可以指定開(kāi)始時(shí)間,周期時(shí)間的,相對(duì)而言,比第一種方案來(lái)得更為直觀,其我考慮到的痛點(diǎn)如下。其實(shí)上面那種方案也是有這個(gè)痛點(diǎn)的。

  1. 多節(jié)點(diǎn),多服務(wù)的服務(wù)部署情況下,無(wú)法實(shí)現(xiàn)高可用特性
  2. 需要編寫(xiě)過(guò)多的邏輯來(lái)管理任務(wù)線程,如果不夠謹(jǐn)慎,有可能造成內(nèi)存浪費(fèi)。

2.3、采用延時(shí)操作

簡(jiǎn)單言之,實(shí)際上只要實(shí)現(xiàn)了延時(shí)操作 便是實(shí)現(xiàn)了動(dòng)態(tài)的開(kāi)始時(shí)間以及周期性運(yùn)行,可以利用其遞歸的概念實(shí)現(xiàn)所謂的動(dòng)態(tài)周期。

redis 隊(duì)列來(lái)實(shí)現(xiàn)延時(shí)

redis的體量本身定位就不高,在數(shù)據(jù)量(任務(wù)量)過(guò)大時(shí),對(duì)redis的壓力也很大,redis不一定扛得住。但其實(shí)通過(guò)redis來(lái)實(shí)現(xiàn)延時(shí)消息這樣的成功案列還是有很多的。在這里就不細(xì)說(shuō)了。

RabbitMq實(shí)現(xiàn)延時(shí)消息。

通過(guò)MQ實(shí)現(xiàn)延時(shí)消息是本文的重點(diǎn),在標(biāo)題三會(huì)細(xì)說(shuō)。

三、通過(guò)RabbitMQ實(shí)現(xiàn)延時(shí)任務(wù)并間接實(shí)現(xiàn)動(dòng)態(tài)定時(shí)任務(wù)。

(1)通過(guò)死信的方式實(shí)現(xiàn)延時(shí)信息消費(fèi)

通過(guò)創(chuàng)建死信隊(duì)列來(lái)實(shí)現(xiàn)延時(shí)任務(wù),然后再通過(guò)遞歸思想實(shí)現(xiàn)對(duì)應(yīng)的邏輯,就可以實(shí)現(xiàn)對(duì)應(yīng)的動(dòng)態(tài)延時(shí)任務(wù),但是這個(gè)會(huì)存在以下下幾個(gè)痛點(diǎn)。

隊(duì)列順序消費(fèi)

通過(guò)死信,我們確實(shí)可以動(dòng)態(tài)的控制消息的消費(fèi)時(shí)間,但是消息在隊(duì)列里面,如果隊(duì)列里面存在多個(gè)信息任務(wù),前一個(gè)未到消費(fèi)時(shí)間,后一個(gè)已經(jīng)到了消費(fèi)時(shí)間,這就好導(dǎo)致了,即使后面任務(wù)信息消費(fèi)時(shí)間到了,卻沒(méi)法被消費(fèi)的問(wèn)題。解決方法,對(duì)隊(duì)列進(jìn)行排序邏輯,但如果這樣做的話,就有點(diǎn)猥瑣了。

開(kāi)銷過(guò)大。

對(duì)于通過(guò)死信來(lái)實(shí)現(xiàn)延時(shí)消息,網(wǎng)上有挺多優(yōu)秀的博客介紹,在此就不做說(shuō)明了。

(2)通過(guò)MQ延時(shí)插件實(shí)現(xiàn)延時(shí)任務(wù)(重點(diǎn))

使用延時(shí)插件需要MQ在3.6以上(實(shí)際上我在嘗試下載的時(shí)候并未發(fā)現(xiàn)git上有對(duì)應(yīng)3.6的插件,所以還是選擇較高的版本比較好)。

四、MQ延時(shí)任務(wù)插件實(shí)現(xiàn)動(dòng)態(tài)定時(shí)任務(wù)

(1)安裝延時(shí)插件

這里不做過(guò)多說(shuō)明,重點(diǎn)在于編碼的實(shí)現(xiàn),主要步驟如下。

去官網(wǎng)下載對(duì)應(yīng)版本的插件,地址為下載地址

插件名字為rabbitmq_delayed_message_exchange

將插件放到MQ插件目錄下,然后cmd命令解壓網(wǎng)(網(wǎng)上有命令),然后重啟mq服務(wù)。大概就這樣的一個(gè)過(guò)程。

(2)編碼實(shí)現(xiàn)

創(chuàng)建隊(duì)列

這里只弄了對(duì)應(yīng)的核心代碼,大致就是創(chuàng)建延時(shí)交換機(jī),延時(shí)隊(duì)列,以及綁定器,對(duì)應(yīng)的key,value如下

    public static final String DELAY_EXCHANGE = "delay.exchange";

    public static final String DELAY_ROUTE_KEY = "delay.routeKey";

    public static final String DELAY_QUEUE = "delay.queue";

    /**
     * 延時(shí)交換機(jī)
     * @return 延時(shí)交換機(jī)
     */
    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> arguments = new HashMap<>(1);
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange(DELAY_EXCHANGE,"x-delayed-message",true,false,arguments);
    }

    /**
     * mq已經(jīng)安裝了延時(shí)插件使用,否則得使用延時(shí)插件
     * @return 延時(shí)發(fā)送隊(duì)列。
     */
    @Bean
    public Queue delayQueue() {
        return new Queue(DELAY_QUEUE,true,false,false);
    }

    /**
     * 延時(shí)綁定區(qū)
     * @return 延時(shí)綁定區(qū)
     */
    @Bean
    public Binding delayBind() {
        return BindingBuilder.bind(this.delayQueue()).to(this.delayExchange()).with(DELAY_ROUTE_KEY).noargs();
    }

生產(chǎn)者

這里寫(xiě)得比較隨意,也直接使用了lombok,還直接用了 @Service ,有點(diǎn)草率,主要為了讓讀者看得清晰。還用了hutool工具類的JSONUtil。

可以清晰的看到主方法里面需要傳一個(gè)Integer類型的入?yún)?,這個(gè)時(shí)間我將其轉(zhuǎn)換成了秒,其MQ實(shí)際入?yún)楹撩?,所以讀者不要被誤導(dǎo)。入?yún)ime通俗的講就是這個(gè)消息多久之后被消費(fèi)。不需要在乎順序。

package com.linkyoyo.bill.mq.impl;

import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.linkyoyo.bill.bo.WorkOrderDelaySenMailActionBO;
import com.linkyoyo.bill.config.RabbitMQConfig;
import com.linkyoyo.bill.mq.DelaySenderService;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * 延時(shí)發(fā)送
 * @author 鄒 [295006967@qq.com]
 * @date 2022/1/4 20:33
 */
@Slf4j
@RequiredArgsConstructor
@AllArgsConstructor
@Service
public class DelaySenderServiceImpl implements DelaySenderService {

    private final RabbitTemplate rabbitTemplate;

    @Override
    @Async
    public void sendMessageByDelay(JSONObject message, Integer time) {
        if(ObjectUtil.isNull(message) || ObjectUtil.isNull(time)) {
            return;
        }
        rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE, RabbitMQConfig.DELAY_ROUTE_KEY, message, msg -> {
            msg.getMessageProperties().setHeader("x-delay", time * 1000);
            return msg;
        });
        log.info("延時(shí)發(fā)送成功:延時(shí)周期時(shí)間{}毫秒,消息內(nèi)容為{}", time * 1000, message);
    }

    @Override
    public void sendMessageByDelay(WorkOrderDelaySenMailActionBO actionBO) {
        Integer afterSecond = actionBO.getAfterSecond();
        if(ObjectUtil.isNull(afterSecond)) {
            afterSecond = 0;
        }
        this.sendMessageByDelay(JSONUtil.parseObj(actionBO), afterSecond);
    }
}


消費(fèi)者

消費(fèi)者的demo不太好寫(xiě),只是做了一個(gè)簡(jiǎn)單的偽代碼。 以定時(shí)任務(wù)發(fā)郵箱為例

1- 消費(fèi)者線程開(kāi)始,先執(zhí)行發(fā)郵箱任務(wù)

2- 發(fā)送完郵箱之后判斷是否還需要發(fā)郵箱,如果需要,就再通過(guò)生產(chǎn)者發(fā)送延時(shí)郵箱 此時(shí)可以指定下一次消費(fèi)的時(shí)間,以此流程走下去便是一套動(dòng)態(tài)任務(wù)的流程實(shí)現(xiàn)??梢詤⒖己罄m(xù)的流程圖。

這樣就實(shí)現(xiàn)一個(gè)簡(jiǎn)易的定時(shí)任務(wù)發(fā)送郵箱的邏輯

	private final DelaySenderService delaySenderService;

    @RabbitHandler
    @RabbitListener(queues = RabbitMQConfig.DELAY_QUEUE)
    public void delayConsumer(Message message) {
        //業(yè)務(wù)邏輯
        this.sendMail(workOrderDelaySenMailActionBO);
        // 判斷是否需要遞歸執(zhí)行定時(shí)任務(wù)(實(shí)際上就是使用生產(chǎn)者再發(fā)一次延時(shí)消息,確認(rèn)下一次消費(fèi))
        if(需要進(jìn)行定時(shí)任務(wù)) {
             this.sendDelayMessageToMq(workOrderDelaySenMailActionBO);
        }
        log.info("信息為:{}", message.getBody());
    }

大致流程就這么多了,以下是整套步驟流閉環(huán)程圖

(3)流程圖

總結(jié)

到此這篇關(guān)于如何通過(guò)RabbitMq實(shí)現(xiàn)動(dòng)態(tài)定時(shí)任務(wù)的文章就介紹到這了,更多相關(guān)RabbitMq實(shí)現(xiàn)動(dòng)態(tài)定時(shí)任務(wù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java?協(xié)程?Quasar詳解

    Java?協(xié)程?Quasar詳解

    這篇文章主要介紹了Java?協(xié)程?Quasar詳解,文章圍繞主題展開(kāi)詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下
    2022-07-07
  • Spring?boot?admin?服務(wù)監(jiān)控利器詳解

    Spring?boot?admin?服務(wù)監(jiān)控利器詳解

    這篇文章主要介紹了Spring?boot?admin?服務(wù)監(jiān)控利器詳解,文章圍繞主題展開(kāi)詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下
    2022-08-08
  • springcloud gateway如何實(shí)現(xiàn)路由和負(fù)載均衡

    springcloud gateway如何實(shí)現(xiàn)路由和負(fù)載均衡

    這篇文章主要介紹了springcloud gateway如何實(shí)現(xiàn)路由和負(fù)載均衡的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • Java中動(dòng)態(tài)地改變數(shù)組長(zhǎng)度及數(shù)組轉(zhuǎn)Map的代碼實(shí)例分享

    Java中動(dòng)態(tài)地改變數(shù)組長(zhǎng)度及數(shù)組轉(zhuǎn)Map的代碼實(shí)例分享

    這篇文章主要介紹了Java中動(dòng)態(tài)地改變數(shù)組長(zhǎng)度及數(shù)組轉(zhuǎn)map的代碼分享,其中轉(zhuǎn)Map利用到了java.util.Map接口,需要的朋友可以參考下
    2016-03-03
  • idea指定maven的settings文件不生效的問(wèn)題解決

    idea指定maven的settings文件不生效的問(wèn)題解決

    本文主要介紹了idea指定maven的settings文件不生效的問(wèn)題解決,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-06-06
  • 詳解Java模擬棧的實(shí)現(xiàn)以及Stack類的介紹

    詳解Java模擬棧的實(shí)現(xiàn)以及Stack類的介紹

    棧是一種數(shù)據(jù)結(jié)構(gòu),它按照后進(jìn)先出的原則來(lái)存儲(chǔ)和訪問(wèn)數(shù)據(jù)。Stack是一個(gè)類,表示棧數(shù)據(jù)結(jié)構(gòu)的實(shí)現(xiàn)。本文就來(lái)和大家介紹一下Java模擬棧的實(shí)現(xiàn)以及Stack類的使用,需要的可以參考一下
    2023-04-04
  • Spring?@Bean?修飾方法時(shí)注入?yún)?shù)的操作方法

    Spring?@Bean?修飾方法時(shí)注入?yún)?shù)的操作方法

    對(duì)于 Spring 而言,IOC 容器中的 Bean 對(duì)象的創(chuàng)建和使用是一大重點(diǎn),Spring 也為我們提供了注解方式創(chuàng)建 bean 對(duì)象:使用 @Bean,這篇文章主要介紹了Spring?@Bean?修飾方法時(shí)如何注入?yún)?shù),需要的朋友可以參考下
    2023-10-10
  • 一篇文章帶你入門(mén)java網(wǎng)絡(luò)編程

    一篇文章帶你入門(mén)java網(wǎng)絡(luò)編程

    網(wǎng)絡(luò)編程是指編寫(xiě)運(yùn)行在多個(gè)設(shè)備(計(jì)算機(jī))的程序,這些設(shè)備都通過(guò)網(wǎng)絡(luò)連接起來(lái)。本文介紹了一些網(wǎng)絡(luò)編程基礎(chǔ)的概念,并用Java來(lái)實(shí)現(xiàn)TCP和UDP的Socket的編程,來(lái)讓讀者更好的了解其原理
    2021-08-08
  • slf4j?jcl?jul?log4j1?log4j2?logback各組件系統(tǒng)日志切換

    slf4j?jcl?jul?log4j1?log4j2?logback各組件系統(tǒng)日志切換

    這篇文章主要介紹了slf4j、jcl、jul、log4j1、log4j2、logback的大總結(jié),各個(gè)組件的jar包以及目前系統(tǒng)日志需要切換實(shí)現(xiàn)方式的方法,有需要的朋友可以借鑒參考下
    2022-03-03
  • SpringBoot?替換?if?的參數(shù)校驗(yàn)示例代碼

    SpringBoot?替換?if?的參數(shù)校驗(yàn)示例代碼

    Spring?Validation是對(duì)hibernate?validation的二次封裝,用于支持spring?mvc參數(shù)自動(dòng)校驗(yàn),接下來(lái),我們以spring-boot項(xiàng)目為例,介紹Spring?Validation的使用,需要的朋友可以參考下
    2022-12-12

最新評(píng)論