Redisson延時(shí)隊(duì)列RedissonDelayed的具體使用
一、案例場(chǎng)景
定時(shí)調(diào)度基本是每個(gè)項(xiàng)目都會(huì)遇到的業(yè)務(wù)場(chǎng)景,一般地,都會(huì)通過(guò)任務(wù)調(diào)度工具執(zhí)行定時(shí)任務(wù)完成,定時(shí)任務(wù)有兩點(diǎn)缺陷:
- 定時(shí)任務(wù)執(zhí)行頻度限制,實(shí)際執(zhí)行的時(shí)間可能會(huì)晚于理想的設(shè)定時(shí)間,例如,如果要通過(guò)定時(shí)任務(wù)實(shí)現(xiàn)在下單后15分鐘仍未支付則取消訂單的功能,假設(shè)定時(shí)任務(wù)的執(zhí)行頻度為每分鐘執(zhí)行一次,對(duì)于有些訂單而言,其實(shí)際取消時(shí)間是介于15-16分鐘之間,不夠精確;
- 定時(shí)任務(wù)執(zhí)行需要時(shí)間,定時(shí)任務(wù)的執(zhí)行也需要時(shí)間,如果業(yè)務(wù)場(chǎng)景的數(shù)據(jù)量較大,執(zhí)行一次定時(shí)任務(wù)需要足夠長(zhǎng)的時(shí)間,進(jìn)一步放大了缺點(diǎn)一。
二、技術(shù)選型
Redis實(shí)現(xiàn)延時(shí)隊(duì)列有兩種實(shí)現(xiàn)方式:
- key失效監(jiān)聽(tīng)回調(diào);
key失效監(jiān)聽(tīng)存在兩個(gè)問(wèn)題:① Redis的pubsub不會(huì)被持久化,服務(wù)器宕機(jī)就會(huì)被丟棄,這點(diǎn)就很致命,因?yàn)檎l(shuí)也無(wú)法保證redis服務(wù)一直不宕機(jī);②沒(méi)有高級(jí)特性,沒(méi)有ack機(jī)制,可靠性不高。 - zset分?jǐn)?shù)存時(shí)間戳。
zset的實(shí)現(xiàn)是,輪詢隊(duì)列頭部來(lái)獲取超期的時(shí)間戳,實(shí)現(xiàn)延時(shí)效果,可靠性更高,并且數(shù)據(jù)會(huì)被持久化,這就很好的規(guī)避了key失效監(jiān)聽(tīng)回調(diào)的問(wèn)題,如果redis服務(wù)崩潰,還是有丟失數(shù)據(jù)的可能。
Redisson的RDelayedQueue是一個(gè)封裝好的zset實(shí)現(xiàn)的延時(shí)隊(duì)列,最終選擇了這個(gè)方案。其實(shí)還有一些優(yōu)秀的方案可供選擇,例如rocketmq、pulsar等擁有定時(shí)投遞功能的消息隊(duì)列;我這邊優(yōu)先考慮在不引入新的中間鍵的情況下使用RDelayedQueue技術(shù)進(jìn)行實(shí)現(xiàn)。
注意:在不方便獲得專業(yè)消息隊(duì)列時(shí)可以考慮使用redissondelayqueue等基于redis的延時(shí)隊(duì)列方案,但要為redis崩潰等情況設(shè)計(jì)補(bǔ)償保護(hù)機(jī)制。
三、編碼實(shí)現(xiàn)
1、引入依賴
<!--redisson--> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.20.0</version> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-data-27</artifactId> <version>3.20.0</version> </dependency>
2、創(chuàng)建配置類
import com.geovis.common.redis.utils.RedisUtils; import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @date 2023/8/30 15:05 */ @Configuration public class RedissonQueueConfig { private final String queueName = "orderQueue"; @Bean public RBlockingQueue<String> blockingQueue() { return RedisUtils.getClient().getBlockingQueue(queueName); } @Bean public RDelayedQueue<String> delayedQueue(RBlockingQueue<String> blockQueue) { return RedisUtils.getClient().getDelayedQueue(blockQueue); } }
其中RedisUtils.getClient()是為了獲取RedissonClient 對(duì)象,這里我使用Redis工具類直接獲取,我把工具類也簡(jiǎn)單展示出來(lái)吧。
import org.redisson.api.*; /** *Redis工具類 */ public class RedisUtils { private static final RedissonClient CLIENT = SpringUtils.getBean(RedissonClient.class); /** * 獲取客戶端實(shí)例 */ public static RedissonClient getClient() { return CLIENT; } }
3、持續(xù)監(jiān)聽(tīng)線程
import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingQueue; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; /** * @date 2023/8/30 15:09 */ @Slf4j @Component public class OrderTask { @Resource private RBlockingQueue<Object> blockingQueue; @PostConstruct public void take() { new Thread(() -> { while (true) { try { log.info(blockingQueue.take().toString()); //將到期的數(shù)據(jù)取出來(lái),如果一直沒(méi)有到期數(shù)據(jù),就一直等待。 } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } }
4、編寫controller進(jìn)行測(cè)試調(diào)用
import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RDelayedQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.TimeUnit; /** * 測(cè)試接口類 * @date 2023/8/30 16:56 */ @Validated @RequiredArgsConstructor @RestController @RequestMapping("/forest") @Slf4j public class ForestController { @Autowired private RDelayedQueue delayedQueue; @GetMapping(value = "/offerAsync") public void offerAsync() { //20秒后到期,在監(jiān)聽(tīng)現(xiàn)成哪里可以打印出 1234567890 delayedQueue.offerAsync("1234567890", 20, TimeUnit.SECONDS); } }
到這里基本就完成了Demo編碼,具體要根據(jù)業(yè)務(wù)修改對(duì)應(yīng)的代碼,本demo親測(cè)沒(méi)有問(wèn)題。
四、原理
用戶傳進(jìn)來(lái)的延遲時(shí)間必須大于0,小于0拋出異常代碼結(jié)束。將用戶傳進(jìn)來(lái)的時(shí)間轉(zhuǎn)換為毫秒,并加上系統(tǒng)當(dāng)前時(shí)間,計(jì)算出來(lái)的就是過(guò)期時(shí)間。到了過(guò)期時(shí)間消費(fèi)者就可以把該任務(wù)取出來(lái)消費(fèi)了。
結(jié)合上圖所示,首先創(chuàng)建了一個(gè)Redisson實(shí)現(xiàn)的阻塞隊(duì)列RBlockingQueue的實(shí)例blockingQueue,然后又使用該阻塞隊(duì)列blockingQueue創(chuàng)建了一個(gè)延時(shí)隊(duì)列RDelayedQueue的實(shí)例delayedQueue。延時(shí)消息添加后并不是立即進(jìn)入到阻塞隊(duì)列blockingQueue中,而是到達(dá)了設(shè)定的延時(shí)時(shí)間之后才會(huì)從延時(shí)隊(duì)列delayedQueue進(jìn)入到阻塞隊(duì)列blockingQueue;因此,延時(shí)消息的添加由延時(shí)隊(duì)列delayedQueue完成,而延時(shí)隊(duì)列的消費(fèi)則由阻塞隊(duì)列blockingQueue完成。注意,這里如果直接對(duì)延時(shí)隊(duì)列delayedQueue進(jìn)行監(jiān)聽(tīng),則延時(shí)消息剛加入時(shí)就會(huì)被消費(fèi),達(dá)不到延時(shí)的效果。
相比于Redisson官網(wǎng)文檔延時(shí)隊(duì)列中給出的代碼示例,這里被包裝隊(duì)列使用阻塞隊(duì)列RBlockingQueue的好處是blockingQueue.take()會(huì)一直阻塞直至隊(duì)列內(nèi)有可消費(fèi)延時(shí)消息,避免無(wú)意義的循環(huán)占用CPU。
到此這篇關(guān)于Redisson延時(shí)隊(duì)列RedissonDelayed的具體使用的文章就介紹到這了,更多相關(guān)Redisson延時(shí)隊(duì)列RedissonDelayed內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- redis實(shí)現(xiàn)延時(shí)隊(duì)列的兩種方式(小結(jié))
- 基于Redis實(shí)現(xiàn)延時(shí)隊(duì)列的優(yōu)化方案小結(jié)
- 生產(chǎn)redisson延時(shí)隊(duì)列不消費(fèi)問(wèn)題排查解決
- Redisson 分布式延時(shí)隊(duì)列 RedissonDelayedQueue 運(yùn)行流程
- redis使用zset實(shí)現(xiàn)延時(shí)隊(duì)列的示例代碼
- redis實(shí)現(xiàn)分布式延時(shí)隊(duì)列的示例代碼
- Redis消息隊(duì)列、阻塞隊(duì)列、延時(shí)隊(duì)列的實(shí)現(xiàn)
- Redis簡(jiǎn)易延時(shí)隊(duì)列的實(shí)現(xiàn)示例
- redis和rabbitmq實(shí)現(xiàn)延時(shí)隊(duì)列的示例代碼
相關(guān)文章
Redis?鍵值對(duì)(key-value)數(shù)據(jù)庫(kù)實(shí)現(xiàn)方法
Redis 的鍵值對(duì)中的 key 就是字符串對(duì)象,而 value 可以是字符串對(duì)象,也可以是集合數(shù)據(jù)類型的對(duì)象,比如 List 對(duì)象,Hash 對(duì)象、Set 對(duì)象和 Zset 對(duì)象,這篇文章主要介紹了Redis?鍵值對(duì)數(shù)據(jù)庫(kù)是怎么實(shí)現(xiàn)的,需要的朋友可以參考下2024-05-05Redis使用RedisTemplate導(dǎo)致key亂碼問(wèn)題解決
本文主要介紹了Redis使用RedisTemplate導(dǎo)致key亂碼問(wèn)題解決,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-06-06Redis報(bào)錯(cuò)UnrecognizedPropertyException: Unrecognized 
在使用SpringBoot訪問(wèn)Redis時(shí),報(bào)錯(cuò)提示識(shí)別不了屬性headPart,經(jīng)過(guò)排查,發(fā)現(xiàn)并非Serializable或getset方法問(wèn)題,而是存在一個(gè)方法getHeadPart,但無(wú)headPart屬性,解決方案是將getHeadPart改為makeHeadPart2024-10-10Redis進(jìn)行緩存操作的實(shí)現(xiàn)
本文主要介紹了Redis進(jìn)行緩存操作,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2025-03-03淺談Redis?中的過(guò)期刪除策略和內(nèi)存淘汰機(jī)制
本文主要介紹了Redis?中的過(guò)期刪除策略和內(nèi)存淘汰機(jī)制,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04redis服務(wù)器允許遠(yuǎn)程主機(jī)訪問(wèn)的方法
今天小編就為大家分享一篇redis服務(wù)器允許遠(yuǎn)程主機(jī)訪問(wèn)的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-05-05