欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

JAVA中通過(guò)Redis實(shí)現(xiàn)延時(shí)任務(wù)demo實(shí)例

 更新時(shí)間:2024年08月24日 11:24:16   作者:MuShanYu  
Redis在2.0版本時(shí)引入了發(fā)布訂閱(pub/sub)功能,在發(fā)布訂閱中有一個(gè)channel(頻道),與消息隊(duì)列中的topic(主題)類(lèi)似,可以通過(guò)redis的發(fā)布訂閱者模式實(shí)現(xiàn)延時(shí)任務(wù)功能,實(shí)例中會(huì)議室預(yù)約系統(tǒng),用戶預(yù)約管理員審核后生效,如未審批,需要自動(dòng)變超期未處理,使用延時(shí)任務(wù)

先說(shuō)結(jié)論,有兩種方式可以實(shí)現(xiàn):

  1. 通過(guò)Redis監(jiān)聽(tīng)過(guò)期key實(shí)現(xiàn)。
  2. 使用Redisson 內(nèi)置的延時(shí)隊(duì)列實(shí)現(xiàn)。

1.監(jiān)聽(tīng)key過(guò)期事件實(shí)現(xiàn)

1.1 實(shí)現(xiàn)原理

Redis在2.0版本時(shí)引入了發(fā)布訂閱(pub/sub)功能,在發(fā)布訂閱中有一個(gè)channel(頻道),與消息隊(duì)列中的topic(主題)類(lèi)似。可以通過(guò)redis的發(fā)布訂閱者模式實(shí)現(xiàn)延時(shí)任務(wù)功能。

pub/sub即發(fā)布者publisher和訂閱者subscriber,也可以叫生產(chǎn)者和消費(fèi)者。發(fā)布者通過(guò)PUBLISH投遞消息給指定的channel,訂閱者通過(guò)SUBSCRIBER訂閱自己關(guān)心的channel,訂閱者可以訂閱一個(gè)或者多個(gè)不同的channel。

在發(fā)布訂閱模式下生產(chǎn)者需要將消息發(fā)送到指定的channel中,消費(fèi)者需要訂閱對(duì)應(yīng)channel拿到想要的消息。Redis中有很多默認(rèn)的channel,這些channel是由Redis本身向他們發(fā)送消息的,這不是我們自己編寫(xiě)的代碼,其中keyevent@:expired 是其中的一個(gè)默認(rèn)channel,db表示的是redis的哪一個(gè)數(shù)據(jù)庫(kù)。這個(gè)channel負(fù)責(zé)監(jiān)聽(tīng)過(guò)期的key,也就是說(shuō)如果有一個(gè)key過(guò)期了,那么redis會(huì)將這個(gè)key過(guò)期的信息發(fā)送到這個(gè)頻道,我們只需要監(jiān)聽(tīng)這個(gè)頻道就可以拿到對(duì)應(yīng)的過(guò)期key信息,這樣我們就能實(shí)現(xiàn)一個(gè)延遲任務(wù)功能了。

舉個(gè)列子:比如我現(xiàn)在需要實(shí)現(xiàn)一個(gè)郵件提醒功能,需要在任務(wù)發(fā)布后的前24小時(shí)通過(guò)郵件通知未完成的用戶。我們可以在任務(wù)發(fā)布時(shí)設(shè)置一個(gè)key,這個(gè)key的過(guò)期時(shí)間是當(dāng)前時(shí)間到任務(wù)前24小時(shí),監(jiān)聽(tīng)對(duì)應(yīng)的key過(guò)期channel,當(dāng)key過(guò)期后拿到對(duì)應(yīng)的key,去執(zhí)行你自定義的業(yè)務(wù)邏輯即可,當(dāng)然這個(gè)key需要你進(jìn)行設(shè)計(jì),比如可以為任務(wù)id等等。

1.2 實(shí)現(xiàn)Demo

現(xiàn)在有一個(gè)會(huì)議室預(yù)約的系統(tǒng),用戶可以通過(guò)該系統(tǒng)填寫(xiě)預(yù)約理由進(jìn)行預(yù)約,該預(yù)約請(qǐng)求需要管理員完成審核后才能生效。有一個(gè)需求,如果該預(yù)約沒(méi)有被審批,那么需要自動(dòng)將該預(yù)約申請(qǐng)置為超期未處理。這里我們就可以使用延時(shí)任務(wù)實(shí)現(xiàn)這個(gè)功能。

第一步我們需要在房間進(jìn)行預(yù)約操作的時(shí)候,同時(shí)去緩存一個(gè)key,這個(gè)key就緩存成房間預(yù)約申請(qǐng)的id,這樣當(dāng)key過(guò)期時(shí),我們就能拿到對(duì)應(yīng)的申請(qǐng)信息,從而去通知對(duì)應(yīng)的審核人。

房間預(yù)約操作時(shí)設(shè)置對(duì)應(yīng)緩存key:

private void setRoomApplyNotifyCache(RoomReservation roomReservation, String userId) {
        // 記錄當(dāng)前時(shí)間->房間預(yù)約起始時(shí)間,redis緩存,用于判斷是否管理員超期未處理,自動(dòng)更改狀態(tài),通知用戶房間預(yù)約超期未處理,防止占用時(shí)間段,用戶可以重新預(yù)約
        long cacheTimeSecond = DateUtil.between(new Date(), new Date(roomReservation.getStartTime()), DateUnit.SECOND);
        String roomOccupancyApplyKey = "record_reserve_key:" + roomReservation.getId();
        redisCacheUtil.setCacheObject(roomOccupancyApplyKey, userId, cacheTimeSecond, TimeUnit.SECONDS);
        // 前一個(gè)小時(shí)提醒負(fù)責(zé)人審核。 預(yù)約間隔最少是30分鐘
        long cacheNotifyChargerSecond = cacheTimeSecond - (60 * 60);
        // 當(dāng)前時(shí)間距離預(yù)約起始時(shí)間小于一個(gè)小時(shí)
        if (cacheTimeSecond <= 3600L && cacheTimeSecond > 1800L) {
            // 不足一個(gè)小時(shí),但是大于半個(gè)小時(shí)
            cacheNotifyChargerSecond = cacheTimeSecond - (30 * 60);
        } else if (cacheTimeSecond < 1800L) {
            // 不設(shè)置通知審核人
            return;
        }
        // 緩存
        String notifyChargerKey = RedisCacheKey.ROOM_APPLY_TIMEOUT_NOTIFY_KEY.concatKey(roomReservation.getId());
        redisCacheUtil.setCacheObject(notifyChargerKey, userId, cacheNotifyChargerSecond, TimeUnit.SECONDS);
    }

監(jiān)聽(tīng)key過(guò)期channel并作出處理

@Component
public class RedisExpiredKeyListenerComponent extends KeyExpirationEventMessageListener {
	// 通過(guò)構(gòu)造函數(shù)注入 RedisMessageListenerContainer 給 KeyExpirationEventMessageListener
    public RedisExpiredKeyListenerComponent(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    @Override
    protected void doRegister(RedisMessageListenerContainer listenerContainer) {
        listenerContainer.addMessageListener(this, new PatternTopic("__keyevent@0__:expired"));
    }
    
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String expiredKey = message.toString();
        if (expiredKey.startsWith("record_reserve_key:")) {
            String reserveId = expiredKey.substring("record_reserve_key:".length());
            // 根據(jù)id查詢房間預(yù)約信息,發(fā)送給審核人通知郵件。
            // ....
        }
    }
}

這樣就非常簡(jiǎn)單的實(shí)現(xiàn)了延時(shí)任務(wù)的功能。

1.3 有什么缺陷?

  1. 時(shí)效性差
    為什么這么說(shuō)?因?yàn)檫^(guò)期事件消息是在Redis刪除key時(shí)才發(fā)布的,而不是key過(guò)期時(shí)就發(fā)布了。
    Redis中常用的過(guò)期策略有:
  • 惰性刪除
    只會(huì)在取出key時(shí)判斷key是否已經(jīng)過(guò)期,這樣對(duì)cpu比較友好,因?yàn)椴挥妙l繁的去掃描所有的key。
  • 定期刪除
    每隔一段時(shí)間抽取一批key執(zhí)行過(guò)期key刪除操作。并且,Redis 底層會(huì)通過(guò)限制刪除操作執(zhí)行的時(shí)長(zhǎng)和頻率來(lái)減少刪除操作對(duì) CPU 時(shí)間的影響。

定期刪除對(duì)內(nèi)存更加友好,惰性刪除對(duì) CPU 更加友好。兩者各有千秋,所以 Redis 采用的是 定期刪除+惰性/懶漢式刪除 。

因此,就會(huì)存在我設(shè)置了 key 的過(guò)期時(shí)間,但到了指定時(shí)間 key 還未被刪除,進(jìn)而沒(méi)有發(fā)布過(guò)期事件的情況。

  1. 丟消息
    Redis 的 pub/sub 模式中的消息并不支持持久化,這與消息隊(duì)列不同。在 Redis 的 pub/sub 模式中,發(fā)布者將消息發(fā)送給指定的頻道,訂閱者監(jiān)聽(tīng)相應(yīng)的頻道以接收消息。當(dāng)沒(méi)有訂閱者時(shí),消息會(huì)被直接丟棄,在 Redis 中不會(huì)存儲(chǔ)該消息。
  2. 多服務(wù)實(shí)例的情況下存在消息重復(fù)問(wèn)題
    Redis 的 pub/sub 模式目前只有廣播模式,這意味著當(dāng)生產(chǎn)者向特定頻道發(fā)布一條消息時(shí),所有訂閱相關(guān)頻道的消費(fèi)者都能夠收到該消息。
    這個(gè)時(shí)候,我們需要注意多個(gè)服務(wù)實(shí)例重復(fù)處理消息的問(wèn)題,這會(huì)增加代碼開(kāi)發(fā)量和維護(hù)難度。

2. 通過(guò)Redission實(shí)現(xiàn)

1、引入 Redission 依賴:

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.16.2</version>
</dependency>

2、創(chuàng)建 Redisson 配置類(lèi):

@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://localhost:6379");
        return Redisson.create(config);
    }
}

3、封裝了一個(gè)延遲隊(duì)列類(lèi) RedissonDelayQueue

@Component
public class RedissonDelayQueue {

    private static final Logger log = LoggerFactory.getLogger(RedissonDelayQueue.class);

    @Autowired
    private RedissonClient redissonClient;
    // 延遲隊(duì)列
    private RDelayedQueue<String> delayQueue;
    // 阻塞隊(duì)列
    private RBlockingQueue<String> blockingQueue;

    private ExecutorService executorService;

    public RedissonDelayQueue() {
        this.executorService = new ThreadPoolExecutor(
                5,
                10,
                0L, TimeUnit.MILLISECONDS,
                new java.util.concurrent.LinkedBlockingQueue<>(),
                new CustomThreadFactory()
        );
    }

    @PostConstruct
    public void init() {
        blockingQueue = redissonClient.getBlockingQueue("myQueue");
        delayQueue = redissonClient.getDelayedQueue(blockingQueue);
        startConsumer();
    }

    private void startConsumer() {
        executorService.submit(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    // 從阻塞隊(duì)列中獲取任務(wù)
                    String task = blockingQueue.take();
                    log.info("Received task: {}", task);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                } catch (Exception e) {
                    log.error("Error processing task", e);
                }
            }
        });
    }

    public void addTask(String task, long delay) {
        log.info("Add task: {} with delay: {} seconds", task, delay);
        // 將任務(wù)添加到延遲隊(duì)列
        delayQueue.offer(task, delay, TimeUnit.SECONDS);
    }

    private static class CustomThreadFactory implements ThreadFactory {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, "DelayQueue-Consumer");
            thread.setDaemon(true);
            return thread;
        }
    }
}

RedissonDelayQueue 中的兩個(gè)核心方法:

  • startConsumer():?jiǎn)?dòng)一個(gè)消費(fèi)者線程,從阻塞隊(duì)列 blockingQueue 中獲取任務(wù)并處理。
  • addTask(String task, long delay):將一個(gè)任務(wù)添加到延遲隊(duì)列中,并指定延遲時(shí)間。

4、編一個(gè) Controller 測(cè)試一下:

@RestController
public class TaskController {

    @Autowired
    private RedissonDelayQueue redissonDelayQueue;

    @PostMapping("/addTask")
    public void addTask(@RequestParam String task, @RequestParam long delay) {
        redissonDelayQueue.addTask(task, delay);
    }
}

GET http://localhost:8080/addTask?task=test-task&delay=15

控制臺(tái)輸出:

可以看到任務(wù)的確是延遲了15s后開(kāi)始執(zhí)行的。

3. 為什么用Redisson更好?

Redisson 是一個(gè)開(kāi)源的 Java 語(yǔ)言 Redis 客戶端,提供了很多開(kāi)箱即用的功能,比如多種分布式鎖的實(shí)現(xiàn)、延時(shí)隊(duì)列。

我們可以借助 Redisson 內(nèi)置的延時(shí)隊(duì)列 RDelayedQueue 來(lái)實(shí)現(xiàn)延時(shí)任務(wù)功能。

Redisson 的延遲隊(duì)列 RDelayedQueue 是基于 Redis 的 SortedSet 來(lái)實(shí)現(xiàn)的。SortedSet 是一個(gè)有序集合,其中的每個(gè)元素都可以設(shè)置一個(gè)分?jǐn)?shù),代表該元素的權(quán)重。Redisson 利用這一特性,將需要延遲執(zhí)行的任務(wù)插入到 SortedSet 中,并給它們?cè)O(shè)置相應(yīng)的過(guò)期時(shí)間作為分?jǐn)?shù)。

Redisson 在客戶端(即應(yīng)用程序進(jìn)程)中啟動(dòng)一個(gè)定時(shí)任務(wù),到時(shí)間后使用 zrangebyscore 命令掃描 SortedSet 中過(guò)期的元素(即分?jǐn)?shù)小于或等于當(dāng)前時(shí)間的元素),然后將這些過(guò)期元素從 SortedSet 中移除,并將它們加入到就緒消息列表( List 結(jié)構(gòu))中。

當(dāng)任務(wù)被移到實(shí)際的就緒消息列表中時(shí),Redisson 通常還會(huì)通過(guò)發(fā)布/訂閱機(jī)制(Redis 的 Pub/Sub 模型)來(lái)通知消費(fèi)者有新任務(wù)到達(dá)。

就緒消息列表是一個(gè)阻塞隊(duì)列,消費(fèi)者可以使用阻塞操作(如 BLPOP key 0,0 表示無(wú)限等待,直到有消息進(jìn)入隊(duì)列)監(jiān)聽(tīng)。由于 Redis 的 Pub/Sub 機(jī)制是事件驅(qū)動(dòng)的,它避免了輪詢開(kāi)銷(xiāo),只有在有新消息時(shí)才會(huì)觸發(fā)處理邏輯。

注意:Redisson 的定時(shí)任務(wù)調(diào)度器并不是以固定的時(shí)間間隔頻繁調(diào)用 zrangebyscore 命令進(jìn)行掃描,而是根據(jù) SortedSet 中最近的到期時(shí)間來(lái)動(dòng)態(tài)調(diào)整下一次檢查的時(shí)間點(diǎn)。

當(dāng)然對(duì)于幾天或者幾周后才會(huì)執(zhí)行的任務(wù),可以結(jié)合mysql進(jìn)行優(yōu)化??梢酝ㄟ^(guò)定時(shí)任務(wù)(例如 XXL-JOB、Spring Task)定期(如每 15 分鐘或 30 分鐘)掃描 MySQL 中即將到期的任務(wù)(例如在未來(lái) 2 小時(shí)內(nèi)到期的任務(wù))并推送到 Redis 中。

4. 為什么不直接用消息隊(duì)列呢?

在我的項(xiàng)目中(https://github.com/MuShanYu/apply-room-record),由于沒(méi)有其他場(chǎng)景需要使用消息隊(duì)列,因此不想為了單一的延時(shí)任務(wù)場(chǎng)景引入消息隊(duì)列。引入 MQ 會(huì)增加系統(tǒng)的復(fù)雜性,需要維護(hù)額外的組件和配置,還會(huì)增加成本,這是不太可取的。

如果項(xiàng)目將來(lái)確實(shí)有需要引入 MQ 的場(chǎng)景且 Redis 延時(shí)任務(wù)確實(shí)不再滿足項(xiàng)目需求,我會(huì)考慮將延時(shí)任務(wù)的實(shí)現(xiàn)平滑遷移到 MQ 上。

個(gè)人項(xiàng)目中使用的是簡(jiǎn)單的key過(guò)期監(jiān)聽(tīng)策略,正在優(yōu)化。

希望這篇文章能夠?qū)δ阌兴鶐椭?/p>

總結(jié)

Redis在2.0版本時(shí)引入了發(fā)布訂閱(pub/sub)功能,在發(fā)布訂閱中有一個(gè)channel(頻道),與消息隊(duì)列中的topic(主題)類(lèi)似,可以通過(guò)redis的發(fā)布訂閱者模式實(shí)現(xiàn)延時(shí)任務(wù)功能,實(shí)例中會(huì)議室預(yù)約系統(tǒng),用戶預(yù)約管理員審核后生效,如未審批,需要自動(dòng)變超期未處理,使用延時(shí)任務(wù)。

 

到此這篇關(guān)于JAVA中通過(guò)Redis實(shí)現(xiàn)延時(shí)任務(wù)demo實(shí)例的文章就介紹到這了,更多相關(guān)JAVA中Redis實(shí)現(xiàn)延時(shí)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Springboot整合hibernate validator 全局異常處理步驟詳解

    Springboot整合hibernate validator 全局異常處理步驟詳解

    本文分步驟給大家介紹Springboot整合hibernate validator 全局異常處理,補(bǔ)呢文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧
    2024-01-01
  • 新手學(xué)習(xí)java反射基礎(chǔ)

    新手學(xué)習(xí)java反射基礎(chǔ)

    Java反射就是在運(yùn)行狀態(tài)中,對(duì)于任意一個(gè)類(lèi),都能夠知道這個(gè)類(lèi)的所有屬性和方法;對(duì)于任意一個(gè)對(duì)象,都能夠調(diào)用它的任意方法和屬性;并且能改變它的屬性。下面我們來(lái)一起學(xué)習(xí)一下吧
    2019-06-06
  • Java編程之jdk1.4,jdk1.5和jdk1.6的區(qū)別分析(經(jīng)典)

    Java編程之jdk1.4,jdk1.5和jdk1.6的區(qū)別分析(經(jīng)典)

    這篇文章主要介紹了Java編程之jdk1.4,jdk1.5和jdk1.6的區(qū)別分析,結(jié)合實(shí)例形式較為詳細(xì)的分析說(shuō)明了jdk1.4,jdk1.5和jdk1.6版本的使用區(qū)別,需要的朋友可以參考下
    2015-12-12
  • JAVA提高第七篇 類(lèi)加載器解析

    JAVA提高第七篇 類(lèi)加載器解析

    這篇文章主要為大家詳細(xì)介紹了JAVA提高第七篇類(lèi)加載器的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-10-10
  • SpringBoot項(xiàng)目防止反編譯的方案

    SpringBoot項(xiàng)目防止反編譯的方案

    最近項(xiàng)目要求部署到其他公司的服務(wù)器上,但是又不想將源碼泄露出去,要求對(duì)正式環(huán)境的啟動(dòng)包進(jìn)行安全性處理,防止客戶直接通過(guò)反編譯工具將代碼反編譯出來(lái),所以本文給大家介紹了SpringBoot項(xiàng)目如何防止反編譯,需要的朋友可以參考下
    2024-01-01
  • springboot引入druid解析sql的過(guò)程

    springboot引入druid解析sql的過(guò)程

    在開(kāi)發(fā)中,有時(shí)我們可能會(huì)需要獲取SQL中的表名,那么因?yàn)椴煌臄?shù)據(jù)源類(lèi)型SQL會(huì)存在部分差異,那么我們就可以使用alibaba 的druid包實(shí)現(xiàn)不同的數(shù)據(jù)源類(lèi)型的sql解析,需要的朋友可以參考下
    2023-08-08
  • Java真題實(shí)練掌握哈希表的使用

    Java真題實(shí)練掌握哈希表的使用

    哈希表是一種根據(jù)關(guān)鍵碼去尋找值的數(shù)據(jù)映射結(jié)構(gòu),該結(jié)構(gòu)通過(guò)把關(guān)鍵碼映射的位置去尋找存放值的地方,說(shuō)起來(lái)可能感覺(jué)有點(diǎn)復(fù)雜,我想我舉個(gè)例子你就會(huì)明白了,最典型的的例子就是字典
    2022-07-07
  • IntelliJ IDEA像Eclipse一樣打開(kāi)多個(gè)項(xiàng)目的圖文教程

    IntelliJ IDEA像Eclipse一樣打開(kāi)多個(gè)項(xiàng)目的圖文教程

    這篇文章主要介紹了IntelliJ IDEA像Eclipse一樣打開(kāi)多個(gè)項(xiàng)目的方法圖文教程講解,需要的朋友可以參考下
    2018-03-03
  • Java線程池如何實(shí)現(xiàn)精準(zhǔn)控制每秒API請(qǐng)求

    Java線程池如何實(shí)現(xiàn)精準(zhǔn)控制每秒API請(qǐng)求

    這篇文章主要介紹了Java線程池如何實(shí)現(xiàn)精準(zhǔn)控制每秒API請(qǐng)求問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-08-08
  • spring boot定時(shí)任務(wù)接收郵件并且存儲(chǔ)附件的方法講解

    spring boot定時(shí)任務(wù)接收郵件并且存儲(chǔ)附件的方法講解

    今天小編就為大家分享一篇關(guān)于spring boot定時(shí)任務(wù)接收郵件并且存儲(chǔ)附件的方法講解,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧
    2019-03-03

最新評(píng)論