欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

springboot整合redisson實(shí)現(xiàn)延時(shí)隊(duì)列(附倉(cāng)庫(kù)地址)

 更新時(shí)間:2024年10月16日 09:28:03   作者:ゞ註﹎錠oo  
延時(shí)隊(duì)列用于管理需要定時(shí)執(zhí)行的任務(wù),對(duì)于大數(shù)據(jù)量和高實(shí)時(shí)性需求,使用延時(shí)隊(duì)列比定時(shí)掃庫(kù)更高效,Redisson提供一種高效的延時(shí)隊(duì)列實(shí)現(xiàn)方式,本文就來詳細(xì)的介紹一下,感興趣都可以了解學(xué)習(xí)

應(yīng)用場(chǎng)景

通常在一些需要經(jīng)歷一段時(shí)間或者到達(dá)某個(gè)指定時(shí)間節(jié)點(diǎn)才會(huì)執(zhí)行的功能,比如以下這些場(chǎng)景:

  • 訂單超時(shí)提醒
  • 收貨自動(dòng)確認(rèn)
  • 會(huì)議提醒
  • 代辦事項(xiàng)提醒

為什么使用延時(shí)隊(duì)列

對(duì)于數(shù)據(jù)量小且實(shí)時(shí)性要求不高的需求來說,最簡(jiǎn)單的方法就是定時(shí)掃描數(shù)據(jù)庫(kù)。

但是,當(dāng)數(shù)量達(dá)到數(shù)百萬、上千萬級(jí)別且時(shí),定時(shí)掃庫(kù)就顯得非常低效且消耗資源,

甚至有些時(shí)間間隔小實(shí)時(shí)性要求高的情況,上一次掃描還沒結(jié)束,下一次就又開始了,

這時(shí)候如果使用延時(shí)隊(duì)列就會(huì)比較合適

延時(shí)隊(duì)列的幾種方式:

  • Quartz 定時(shí)任務(wù)實(shí)現(xiàn)掃庫(kù)
  • DelayQueue JDK中提供了一組實(shí)現(xiàn)延遲隊(duì)列的API
  • Redis sorted set
  • Redis 過期鍵監(jiān)聽回調(diào)
  • RabbitMQ 死信隊(duì)列
  • RabbitMQ 基于插件實(shí)現(xiàn)延遲隊(duì)列
  • Wheel 時(shí)間輪訓(xùn)算法

Redisson 實(shí)現(xiàn)延時(shí)隊(duì)列

顧名思義 Redis son 就是 Redis 的兒子,舉個(gè)栗子先:

1.引入 pom

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>${lastest.version}</version>
</dependency>

2.封裝一個(gè) RedissonQueue 類

@Service
public class RedissonQueue {

    public static final String QUEUE = "delayQueue";

    // 默認(rèn)超時(shí)時(shí)間,30秒
    public static final Integer DEFAULT_TIMEOUT = 30;

    @Resource
    private RedissonClient redissonClient;

    // 加入任務(wù)并設(shè)置到期時(shí)間
    public void offer(String taskId, Integer timeout) {
        RDelayedQueue<String> delayedQueue = delayedQueue();
        delayedQueue.offer(taskId, Objects.isNull(timeout) ? DEFAULT_TIMEOUT : timeout, TimeUnit.SECONDS);
    }

    // 移除任務(wù)
    public void remove(String taskId) {
        RDelayedQueue<String> delayedQueue = delayedQueue();
        delayedQueue.removeIf(messageId -> messageId.equals(taskId));
    }

    // 任務(wù)列表
    public RDelayedQueue<String> delayedQueue() {
        RBlockingDeque<String> blockingDeque = blockingDeque();
        return redissonClient.getDelayedQueue(blockingDeque);
    }

    public RBlockingDeque<String> blockingDeque() {
        return redissonClient.getBlockingDeque(QUEUE);
    }

    public boolean isShutdown() {
        return redissonClient.isShutdown();
    }

    public void shutdown() {
        redissonClient.shutdown();
    }

}

3.交給 Spring 管理

@Slf4j
@Service
public class RedissonService implements ApplicationRunner {

    @Resource
    private RedissonQueue redissonQueue;

    @Resource(name = "threadPoolTaskExecutor")
    private ThreadPoolTaskExecutor executor;

    @Override
    public void run(ApplicationArguments args) {
        RBlockingDeque<String> blockingDeque = redissonQueue.blockingDeque();
        executor.execute(() -> {
            while (true) {
                if (redissonQueue.isShutdown()) {
                    return;
                } else {
                    String messageId = null;
                    try {
                        messageId = blockingDeque.take();
                    } catch (InterruptedException e) {
                        log.warn("RedissonConsumer error:{}", e.getMessage());
                    }
                    if (!Objects.isNull(messageId) && !messageId.isEmpty()) {
                        log.warn("timeout messageId : {}", messageId);
                    }
                }
            }
        });

    }

    // 初始化,啟動(dòng)服務(wù)就執(zhí)行一次
    @PostConstruct
    public void init() {
        redissonQueue.delayedQueue();
    }

    @PreDestroy
    public void shutdown() {
        redissonQueue.shutdown();
    }

}

4.測(cè)試接口

@Operation(summary = "添加任務(wù)", description = "添加任務(wù)")
@PostMapping
public ResponseEntity<?> add(@RequestParam(value = "taskId", required = false) String taskId,
                             @RequestParam(value = "timeout", required = false) Integer timeout) {
    taskId = StringUtils.isEmpty(taskId) ? String.valueOf(snowflake.nextId()) : taskId;
    redissonQueue.offer(taskId, timeout);
    return ResponseEntity.ok().body(redissonQueue.delayedQueue());
}

@Operation(summary = "移除任務(wù)", description = "移除任務(wù)")
@DeleteMapping(value = "/{taskId}")
public ResponseEntity<?> remove(@PathVariable("taskId") String taskId) {
    redissonQueue.remove(taskId);
    return ResponseEntity.ok().body(redissonQueue.delayedQueue());
}

5.測(cè)試結(jié)果

添加10個(gè)任務(wù)

在這里插入圖片描述

刪除第1個(gè)任務(wù)

在這里插入圖片描述

可以看到第一個(gè)任務(wù)刪除后沒有被執(zhí)行(沒有設(shè)置到期時(shí)間,默認(rèn)為30秒到期)

在這里插入圖片描述

實(shí)現(xiàn)原理

  • redisson_delay_queue_timeout:delayQueue,sorted set 數(shù)據(jù)類型,存放所有延遲任務(wù),按延遲任務(wù)的到期時(shí)間戳(提交任務(wù)時(shí)間戳 +
    延遲時(shí)間)排序,所以列表最前面第一個(gè)元素就是整個(gè)延遲隊(duì)列中最早被執(zhí)行的任務(wù)。
  • redisson_delay_queue:delayQueue,list 數(shù)據(jù)類型,也是存放所有任務(wù)。
  • delayQueue,list 數(shù)據(jù)類型,被稱為目標(biāo)隊(duì)列,這個(gè)里面存放的任務(wù)都是已經(jīng)到延遲時(shí)間的,可以被消費(fèi)者獲取的任務(wù),所以上面示例中
    RBlockingQueue 的 take 方法是從此目標(biāo)隊(duì)列中獲取任務(wù)的。
  • redisson_delay_queue_channel:delayQueue,是一個(gè) channel,用來通知客戶端開啟一個(gè)延遲任務(wù)
  • 生產(chǎn)者提交任務(wù)時(shí)將任務(wù)放到 redisson_delay_queue_timeout:delayQueue 中,提交任務(wù)的時(shí)間戳+延遲時(shí)間
  • 客戶端會(huì)有一個(gè)延遲任務(wù),這個(gè)延遲任務(wù)會(huì)向 Redis Server 發(fā)送一段 lua 腳本,Redis 執(zhí)行 lua 腳本中的命令,此操作是原子性的

lua 腳本主要干兩件事

  • 將到了延遲時(shí)間的任務(wù)從 redisson_delay_queue_timeout:delayQueue 中移除,存到 delayQueue 這個(gè)目標(biāo)隊(duì)列
  • 獲取到 redisson_delay_queue_timeout:delayQueue 中最早到期時(shí)間的任務(wù)的到期時(shí)間戳,發(fā)布到 redisson_delay_queue_channel:
    delayQueue channel 中

當(dāng)客戶端監(jiān)聽到 redisson_delay_queue_channel:delayQueue 這個(gè) channel 的消息時(shí),會(huì)再次提交一個(gè)客戶端延遲任務(wù),延遲時(shí)間就是消息(最早到期時(shí)間任務(wù)的到期時(shí)間戳)當(dāng)前時(shí)間戳
這個(gè)時(shí)間其實(shí)也就是 redisson_delay_queue_channel:delayQueue 中最早到期時(shí)間的任務(wù)的剩余的延遲時(shí)間。
一旦時(shí)間來到最早到期時(shí)間任務(wù)的到期時(shí)間戳,redisson_delay_queue_timeout:delayQueue 中最早到期時(shí)間的任務(wù)已經(jīng)到期,客戶端的延遲任務(wù)也同時(shí)到期,
于是開始執(zhí)行 lua 腳本操作,及時(shí)將到期任務(wù)放到目標(biāo)隊(duì)列中。然后再次發(fā)布剩余的延遲任務(wù)中最早到期任務(wù)的到期時(shí)間戳到 channel
中,
如此循環(huán)運(yùn)行下去,保證 redisson_delay_queue_timeout:delayQueue 中到期數(shù)據(jù)能及時(shí)放到目標(biāo)隊(duì)列中。
這里存在一個(gè)特殊情況,需要項(xiàng)目啟動(dòng)時(shí)就執(zhí)行一次延時(shí)隊(duì)列。因?yàn)橛捎跊]有客戶端延遲任務(wù)的執(zhí)行,
可能會(huì)出現(xiàn) redisson_delay_queue_timeout:delayQueue 隊(duì)列中有到期但是沒有被放到目標(biāo)隊(duì)列的可能,啟動(dòng)就執(zhí)行一次是為了保證到期的數(shù)據(jù)能被及時(shí)放到目標(biāo)隊(duì)列中。

結(jié)論

  • Redisson 方案理論上沒有延遲,但當(dāng)消息數(shù)量劇增,消費(fèi)者消費(fèi)緩慢這種情況下,可能會(huì)導(dǎo)致延遲任務(wù)消費(fèi)的延遲。

  • 消息丟失問題 Redisson 方案最大程度上減輕消息丟失的可能性,因?yàn)樗腥蝿?wù)都是存在 list 和 sorted set 兩種數(shù)據(jù)類型中,Redis
    有持久化機(jī)制。除非整個(gè) redis 集群宕機(jī),可能丟失一小部分?jǐn)?shù)據(jù)。

  • 廣播任務(wù)問題,是不會(huì)出現(xiàn)的,因?yàn)槊總€(gè)客戶端都是從同一個(gè)目標(biāo)隊(duì)列中獲取任務(wù)。

Redisson 這種實(shí)現(xiàn)方案是比較合適且靠譜的,一般中小型項(xiàng)目建議用 Redisson 實(shí)現(xiàn)延遲隊(duì)列,規(guī)模較大的項(xiàng)目直接上 MQ。

整合DEMO倉(cāng)庫(kù)地址

到此這篇關(guān)于springboot整合redisson實(shí)現(xiàn)延時(shí)隊(duì)列(附倉(cāng)庫(kù)地址)的文章就介紹到這了,更多相關(guān)springboot redisson延時(shí)隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • HttpClient實(shí)現(xiàn)遠(yuǎn)程調(diào)用

    HttpClient實(shí)現(xiàn)遠(yuǎn)程調(diào)用

    這篇文章主要為大家詳細(xì)介紹了HttpClient實(shí)現(xiàn)遠(yuǎn)程調(diào)用的方法,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-08-08
  • Java實(shí)現(xiàn)二分查找BinarySearch算法

    Java實(shí)現(xiàn)二分查找BinarySearch算法

    這篇文章主要介紹了Java實(shí)現(xiàn)二分查找BinarySearch算法,二分查找針對(duì)的是一個(gè)有序的數(shù)據(jù)集合,每次都通過跟區(qū)間的中間元素對(duì)比,將待查找的區(qū)間縮小為之前的一半,直到找到要查找的元素,或者區(qū)間被縮小為 0,需要的朋友可以參考下
    2023-12-12
  • 運(yùn)行時(shí)常量池和字符串常量池的區(qū)別及說明

    運(yùn)行時(shí)常量池和字符串常量池的區(qū)別及說明

    這篇文章主要介紹了運(yùn)行時(shí)常量池和字符串常量池的區(qū)別及說明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • java處理轉(zhuǎn)義字符↑ → ↓ 保存后的展示還原操作

    java處理轉(zhuǎn)義字符↑ → ↓ 保存后的展示還原操作

    這篇文章主要介紹了java處理轉(zhuǎn)義字符↑ → ↓ 保存后的展示還原操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-06-06
  • SpringMVC基于注解方式實(shí)現(xiàn)上傳下載

    SpringMVC基于注解方式實(shí)現(xiàn)上傳下載

    本文主要介紹了SpringMVC基于注解方式實(shí)現(xiàn)上傳下載,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2022-04-04
  • 使用SpringBoot整合Jpa的過程詳解

    使用SpringBoot整合Jpa的過程詳解

    SpringBoot是一種快速開發(fā)框架,它簡(jiǎn)化了Java應(yīng)用程序的開發(fā)過程,而Jpa是Java持久化規(guī)范的一種實(shí)現(xiàn),將SpringBoot與Jpa整合可以更加方便地進(jìn)行數(shù)據(jù)庫(kù)操作,提高開發(fā)效率,本文將介紹如何使用Spring Boot整合Jpa,幫助讀者快速上手并應(yīng)用于實(shí)際項(xiàng)目中
    2023-12-12
  • Java創(chuàng)建子線程的兩種方法

    Java創(chuàng)建子線程的兩種方法

    這篇文章主要介紹了Java創(chuàng)建子線程的兩種方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-05-05
  • Java自定義異常類的實(shí)例詳解

    Java自定義異常類的實(shí)例詳解

    這篇文章主要介紹了Java自定義異常類的實(shí)例詳解的相關(guān)資料,希望通過本文能幫助到大家,讓大家學(xué)習(xí)理解掌握這部分內(nèi)容,需要的朋友可以參考下
    2017-09-09
  • Springmvc自定義參數(shù)轉(zhuǎn)換實(shí)現(xiàn)代碼解析

    Springmvc自定義參數(shù)轉(zhuǎn)換實(shí)現(xiàn)代碼解析

    這篇文章主要介紹了Springmvc自定義參數(shù)轉(zhuǎn)換實(shí)現(xiàn)代碼解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-07-07
  • SpringBoot如何實(shí)現(xiàn)緩存預(yù)熱

    SpringBoot如何實(shí)現(xiàn)緩存預(yù)熱

    緩存預(yù)熱是指在 Spring Boot 項(xiàng)目啟動(dòng)時(shí),預(yù)先將數(shù)據(jù)加載到緩存系統(tǒng)(如 Redis)中的一種機(jī)制,本文主要介紹了SpringBoot如何實(shí)現(xiàn)緩存預(yù)熱,感興趣的可以了解下
    2024-12-12

最新評(píng)論