Redisson延時隊列RedissonDelayed的具體使用
一、案例場景
定時調(diào)度基本是每個項目都會遇到的業(yè)務(wù)場景,一般地,都會通過任務(wù)調(diào)度工具執(zhí)行定時任務(wù)完成,定時任務(wù)有兩點(diǎn)缺陷:
- 定時任務(wù)執(zhí)行頻度限制,實(shí)際執(zhí)行的時間可能會晚于理想的設(shè)定時間,例如,如果要通過定時任務(wù)實(shí)現(xiàn)在下單后15分鐘仍未支付則取消訂單的功能,假設(shè)定時任務(wù)的執(zhí)行頻度為每分鐘執(zhí)行一次,對于有些訂單而言,其實(shí)際取消時間是介于15-16分鐘之間,不夠精確;
 - 定時任務(wù)執(zhí)行需要時間,定時任務(wù)的執(zhí)行也需要時間,如果業(yè)務(wù)場景的數(shù)據(jù)量較大,執(zhí)行一次定時任務(wù)需要足夠長的時間,進(jìn)一步放大了缺點(diǎn)一。
 
二、技術(shù)選型
Redis實(shí)現(xiàn)延時隊列有兩種實(shí)現(xiàn)方式:
- key失效監(jiān)聽回調(diào);
key失效監(jiān)聽存在兩個問題:① Redis的pubsub不會被持久化,服務(wù)器宕機(jī)就會被丟棄,這點(diǎn)就很致命,因?yàn)檎l也無法保證redis服務(wù)一直不宕機(jī);②沒有高級特性,沒有ack機(jī)制,可靠性不高。 - zset分?jǐn)?shù)存時間戳。
zset的實(shí)現(xiàn)是,輪詢隊列頭部來獲取超期的時間戳,實(shí)現(xiàn)延時效果,可靠性更高,并且數(shù)據(jù)會被持久化,這就很好的規(guī)避了key失效監(jiān)聽回調(diào)的問題,如果redis服務(wù)崩潰,還是有丟失數(shù)據(jù)的可能。 
Redisson的RDelayedQueue是一個封裝好的zset實(shí)現(xiàn)的延時隊列,最終選擇了這個方案。其實(shí)還有一些優(yōu)秀的方案可供選擇,例如rocketmq、pulsar等擁有定時投遞功能的消息隊列;我這邊優(yōu)先考慮在不引入新的中間鍵的情況下使用RDelayedQueue技術(shù)進(jìn)行實(shí)現(xiàn)。
注意:在不方便獲得專業(yè)消息隊列時可以考慮使用redissondelayqueue等基于redis的延時隊列方案,但要為redis崩潰等情況設(shè)計補(bǔ)償保護(hù)機(jī)制。
三、編碼實(shí)現(xiàn)
1、引入依賴
            <!--redisson-->
            <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson-spring-boot-starter</artifactId>
                <version>3.20.0</version>
            </dependency>
            <dependency>
                <groupId>org.redisson</groupId>
                <artifactId>redisson-spring-data-27</artifactId>
                <version>3.20.0</version>
            </dependency>
2、創(chuàng)建配置類
import com.geovis.common.redis.utils.RedisUtils;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * @date 2023/8/30 15:05
 */
@Configuration
public class RedissonQueueConfig {
    private final String queueName = "orderQueue";
    @Bean
    public RBlockingQueue<String> blockingQueue() {
        return RedisUtils.getClient().getBlockingQueue(queueName);
    }
    @Bean
    public RDelayedQueue<String> delayedQueue(RBlockingQueue<String> blockQueue) {
        return RedisUtils.getClient().getDelayedQueue(blockQueue);
    }
}
其中RedisUtils.getClient()是為了獲取RedissonClient 對象,這里我使用Redis工具類直接獲取,我把工具類也簡單展示出來吧。
import org.redisson.api.*;
/**
*Redis工具類
*/
public class RedisUtils {
    private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class);
    /**
     * 獲取客戶端實(shí)例
     */
    public static RedissonClient getClient() {
        return CLIENT;
    }
}
3、持續(xù)監(jiān)聽線程
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
/**
 * @date 2023/8/30 15:09
 */
@Slf4j
@Component
public class OrderTask {
    @Resource
    private RBlockingQueue<Object> blockingQueue;
    @PostConstruct
    public void take() {
        new Thread(() -> {
            while (true) {
                try {
                	log.info(blockingQueue.take().toString());  //將到期的數(shù)據(jù)取出來,如果一直沒有到期數(shù)據(jù),就一直等待。
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}
4、編寫controller進(jìn)行測試調(diào)用
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RDelayedQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;
/**
 * 測試接口類
 * @date 2023/8/30 16:56
 */
@Validated
@RequiredArgsConstructor
@RestController
@RequestMapping("/forest")
@Slf4j
public class ForestController {
    @Autowired
    private RDelayedQueue delayedQueue;
    
    @GetMapping(value = "/offerAsync")
    public void offerAsync() {
    	//20秒后到期,在監(jiān)聽現(xiàn)成哪里可以打印出  1234567890
        delayedQueue.offerAsync("1234567890", 20, TimeUnit.SECONDS);   
    }
}
到這里基本就完成了Demo編碼,具體要根據(jù)業(yè)務(wù)修改對應(yīng)的代碼,本demo親測沒有問題。
四、原理
用戶傳進(jìn)來的延遲時間必須大于0,小于0拋出異常代碼結(jié)束。將用戶傳進(jìn)來的時間轉(zhuǎn)換為毫秒,并加上系統(tǒng)當(dāng)前時間,計算出來的就是過期時間。到了過期時間消費(fèi)者就可以把該任務(wù)取出來消費(fèi)了。

結(jié)合上圖所示,首先創(chuàng)建了一個Redisson實(shí)現(xiàn)的阻塞隊列RBlockingQueue的實(shí)例blockingQueue,然后又使用該阻塞隊列blockingQueue創(chuàng)建了一個延時隊列RDelayedQueue的實(shí)例delayedQueue。延時消息添加后并不是立即進(jìn)入到阻塞隊列blockingQueue中,而是到達(dá)了設(shè)定的延時時間之后才會從延時隊列delayedQueue進(jìn)入到阻塞隊列blockingQueue;因此,延時消息的添加由延時隊列delayedQueue完成,而延時隊列的消費(fèi)則由阻塞隊列blockingQueue完成。注意,這里如果直接對延時隊列delayedQueue進(jìn)行監(jiān)聽,則延時消息剛加入時就會被消費(fèi),達(dá)不到延時的效果。
相比于Redisson官網(wǎng)文檔延時隊列中給出的代碼示例,這里被包裝隊列使用阻塞隊列RBlockingQueue的好處是blockingQueue.take()會一直阻塞直至隊列內(nèi)有可消費(fèi)延時消息,避免無意義的循環(huán)占用CPU。
到此這篇關(guān)于Redisson延時隊列RedissonDelayed的具體使用的文章就介紹到這了,更多相關(guān)Redisson延時隊列RedissonDelayed內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- redis實(shí)現(xiàn)延時隊列的兩種方式(小結(jié))
 - 基于Redis實(shí)現(xiàn)延時隊列的優(yōu)化方案小結(jié)
 - 生產(chǎn)redisson延時隊列不消費(fèi)問題排查解決
 - Redisson 分布式延時隊列 RedissonDelayedQueue 運(yùn)行流程
 - redis使用zset實(shí)現(xiàn)延時隊列的示例代碼
 - redis實(shí)現(xiàn)分布式延時隊列的示例代碼
 - Redis消息隊列、阻塞隊列、延時隊列的實(shí)現(xiàn)
 - Redis簡易延時隊列的實(shí)現(xiàn)示例
 - redis和rabbitmq實(shí)現(xiàn)延時隊列的示例代碼
 
相關(guān)文章
 Redis?鍵值對(key-value)數(shù)據(jù)庫實(shí)現(xiàn)方法
Redis 的鍵值對中的 key 就是字符串對象,而 value 可以是字符串對象,也可以是集合數(shù)據(jù)類型的對象,比如 List 對象,Hash 對象、Set 對象和 Zset 對象,這篇文章主要介紹了Redis?鍵值對數(shù)據(jù)庫是怎么實(shí)現(xiàn)的,需要的朋友可以參考下2024-05-05
 Redis使用RedisTemplate導(dǎo)致key亂碼問題解決
本文主要介紹了Redis使用RedisTemplate導(dǎo)致key亂碼問題解決,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-06-06
 Redis報錯UnrecognizedPropertyException: Unrecognized 
在使用SpringBoot訪問Redis時,報錯提示識別不了屬性headPart,經(jīng)過排查,發(fā)現(xiàn)并非Serializable或getset方法問題,而是存在一個方法getHeadPart,但無headPart屬性,解決方案是將getHeadPart改為makeHeadPart2024-10-10
 Redis進(jìn)行緩存操作的實(shí)現(xiàn)
本文主要介紹了Redis進(jìn)行緩存操作,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2025-03-03
 淺談Redis?中的過期刪除策略和內(nèi)存淘汰機(jī)制
本文主要介紹了Redis?中的過期刪除策略和內(nèi)存淘汰機(jī)制,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04
 redis服務(wù)器允許遠(yuǎn)程主機(jī)訪問的方法
今天小編就為大家分享一篇redis服務(wù)器允許遠(yuǎn)程主機(jī)訪問的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-05-05

