springboot整合redisson實(shí)現(xiàn)延時隊(duì)列(附倉庫地址)
應(yīng)用場景
通常在一些需要經(jīng)歷一段時間或者到達(dá)某個指定時間節(jié)點(diǎn)才會執(zhí)行的功能,比如以下這些場景:
- 訂單超時提醒
- 收貨自動確認(rèn)
- 會議提醒
- 代辦事項(xiàng)提醒
為什么使用延時隊(duì)列
對于數(shù)據(jù)量小且實(shí)時性要求不高的需求來說,最簡單的方法就是定時掃描數(shù)據(jù)庫。
但是,當(dāng)數(shù)量達(dá)到數(shù)百萬、上千萬級別且時,定時掃庫就顯得非常低效且消耗資源,
甚至有些時間間隔小實(shí)時性要求高的情況,上一次掃描還沒結(jié)束,下一次就又開始了,
這時候如果使用延時隊(duì)列就會比較合適
延時隊(duì)列的幾種方式:
- Quartz 定時任務(wù)實(shí)現(xiàn)掃庫
- DelayQueue JDK中提供了一組實(shí)現(xiàn)延遲隊(duì)列的API
- Redis sorted set
- Redis 過期鍵監(jiān)聽回調(diào)
- RabbitMQ 死信隊(duì)列
- RabbitMQ 基于插件實(shí)現(xiàn)延遲隊(duì)列
- Wheel 時間輪訓(xùn)算法
Redisson 實(shí)現(xiàn)延時隊(duì)列
顧名思義 Redis son 就是 Redis 的兒子,舉個栗子先:
1.引入 pom
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>${lastest.version}</version>
</dependency>
2.封裝一個 RedissonQueue 類
@Service
public class RedissonQueue {
public static final String QUEUE = "delayQueue";
// 默認(rèn)超時時間,30秒
public static final Integer DEFAULT_TIMEOUT = 30;
@Resource
private RedissonClient redissonClient;
// 加入任務(wù)并設(shè)置到期時間
public void offer(String taskId, Integer timeout) {
RDelayedQueue<String> delayedQueue = delayedQueue();
delayedQueue.offer(taskId, Objects.isNull(timeout) ? DEFAULT_TIMEOUT : timeout, TimeUnit.SECONDS);
}
// 移除任務(wù)
public void remove(String taskId) {
RDelayedQueue<String> delayedQueue = delayedQueue();
delayedQueue.removeIf(messageId -> messageId.equals(taskId));
}
// 任務(wù)列表
public RDelayedQueue<String> delayedQueue() {
RBlockingDeque<String> blockingDeque = blockingDeque();
return redissonClient.getDelayedQueue(blockingDeque);
}
public RBlockingDeque<String> blockingDeque() {
return redissonClient.getBlockingDeque(QUEUE);
}
public boolean isShutdown() {
return redissonClient.isShutdown();
}
public void shutdown() {
redissonClient.shutdown();
}
}
3.交給 Spring 管理
@Slf4j
@Service
public class RedissonService implements ApplicationRunner {
@Resource
private RedissonQueue redissonQueue;
@Resource(name = "threadPoolTaskExecutor")
private ThreadPoolTaskExecutor executor;
@Override
public void run(ApplicationArguments args) {
RBlockingDeque<String> blockingDeque = redissonQueue.blockingDeque();
executor.execute(() -> {
while (true) {
if (redissonQueue.isShutdown()) {
return;
} else {
String messageId = null;
try {
messageId = blockingDeque.take();
} catch (InterruptedException e) {
log.warn("RedissonConsumer error:{}", e.getMessage());
}
if (!Objects.isNull(messageId) && !messageId.isEmpty()) {
log.warn("timeout messageId : {}", messageId);
}
}
}
});
}
// 初始化,啟動服務(wù)就執(zhí)行一次
@PostConstruct
public void init() {
redissonQueue.delayedQueue();
}
@PreDestroy
public void shutdown() {
redissonQueue.shutdown();
}
}
4.測試接口
@Operation(summary = "添加任務(wù)", description = "添加任務(wù)")
@PostMapping
public ResponseEntity<?> add(@RequestParam(value = "taskId", required = false) String taskId,
@RequestParam(value = "timeout", required = false) Integer timeout) {
taskId = StringUtils.isEmpty(taskId) ? String.valueOf(snowflake.nextId()) : taskId;
redissonQueue.offer(taskId, timeout);
return ResponseEntity.ok().body(redissonQueue.delayedQueue());
}
@Operation(summary = "移除任務(wù)", description = "移除任務(wù)")
@DeleteMapping(value = "/{taskId}")
public ResponseEntity<?> remove(@PathVariable("taskId") String taskId) {
redissonQueue.remove(taskId);
return ResponseEntity.ok().body(redissonQueue.delayedQueue());
}
5.測試結(jié)果
添加10個任務(wù)

刪除第1個任務(wù)

可以看到第一個任務(wù)刪除后沒有被執(zhí)行(沒有設(shè)置到期時間,默認(rèn)為30秒到期)

實(shí)現(xiàn)原理
- redisson_delay_queue_timeout:delayQueue,sorted set 數(shù)據(jù)類型,存放所有延遲任務(wù),按延遲任務(wù)的到期時間戳(提交任務(wù)時間戳 +
延遲時間)排序,所以列表最前面第一個元素就是整個延遲隊(duì)列中最早被執(zhí)行的任務(wù)。 - redisson_delay_queue:delayQueue,list 數(shù)據(jù)類型,也是存放所有任務(wù)。
- delayQueue,list 數(shù)據(jù)類型,被稱為目標(biāo)隊(duì)列,這個里面存放的任務(wù)都是已經(jīng)到延遲時間的,可以被消費(fèi)者獲取的任務(wù),所以上面示例中
RBlockingQueue 的 take 方法是從此目標(biāo)隊(duì)列中獲取任務(wù)的。 - redisson_delay_queue_channel:delayQueue,是一個 channel,用來通知客戶端開啟一個延遲任務(wù)
- 生產(chǎn)者提交任務(wù)時將任務(wù)放到 redisson_delay_queue_timeout:delayQueue 中,提交任務(wù)的時間戳+延遲時間
- 客戶端會有一個延遲任務(wù),這個延遲任務(wù)會向 Redis Server 發(fā)送一段 lua 腳本,Redis 執(zhí)行 lua 腳本中的命令,此操作是原子性的
lua 腳本主要干兩件事
- 將到了延遲時間的任務(wù)從 redisson_delay_queue_timeout:delayQueue 中移除,存到 delayQueue 這個目標(biāo)隊(duì)列
- 獲取到 redisson_delay_queue_timeout:delayQueue 中最早到期時間的任務(wù)的到期時間戳,發(fā)布到 redisson_delay_queue_channel:
delayQueue channel 中
當(dāng)客戶端監(jiān)聽到 redisson_delay_queue_channel:delayQueue 這個 channel 的消息時,會再次提交一個客戶端延遲任務(wù),延遲時間就是消息(最早到期時間任務(wù)的到期時間戳)當(dāng)前時間戳
這個時間其實(shí)也就是 redisson_delay_queue_channel:delayQueue 中最早到期時間的任務(wù)的剩余的延遲時間。
一旦時間來到最早到期時間任務(wù)的到期時間戳,redisson_delay_queue_timeout:delayQueue 中最早到期時間的任務(wù)已經(jīng)到期,客戶端的延遲任務(wù)也同時到期,
于是開始執(zhí)行 lua 腳本操作,及時將到期任務(wù)放到目標(biāo)隊(duì)列中。然后再次發(fā)布剩余的延遲任務(wù)中最早到期任務(wù)的到期時間戳到 channel
中,
如此循環(huán)運(yùn)行下去,保證 redisson_delay_queue_timeout:delayQueue 中到期數(shù)據(jù)能及時放到目標(biāo)隊(duì)列中。
這里存在一個特殊情況,需要項(xiàng)目啟動時就執(zhí)行一次延時隊(duì)列。因?yàn)橛捎跊]有客戶端延遲任務(wù)的執(zhí)行,
可能會出現(xiàn) redisson_delay_queue_timeout:delayQueue 隊(duì)列中有到期但是沒有被放到目標(biāo)隊(duì)列的可能,啟動就執(zhí)行一次是為了保證到期的數(shù)據(jù)能被及時放到目標(biāo)隊(duì)列中。
結(jié)論
Redisson 方案理論上沒有延遲,但當(dāng)消息數(shù)量劇增,消費(fèi)者消費(fèi)緩慢這種情況下,可能會導(dǎo)致延遲任務(wù)消費(fèi)的延遲。
消息丟失問題 Redisson 方案最大程度上減輕消息丟失的可能性,因?yàn)樗腥蝿?wù)都是存在 list 和 sorted set 兩種數(shù)據(jù)類型中,Redis
有持久化機(jī)制。除非整個 redis 集群宕機(jī),可能丟失一小部分?jǐn)?shù)據(jù)。廣播任務(wù)問題,是不會出現(xiàn)的,因?yàn)槊總€客戶端都是從同一個目標(biāo)隊(duì)列中獲取任務(wù)。
Redisson 這種實(shí)現(xiàn)方案是比較合適且靠譜的,一般中小型項(xiàng)目建議用 Redisson 實(shí)現(xiàn)延遲隊(duì)列,規(guī)模較大的項(xiàng)目直接上 MQ。
到此這篇關(guān)于springboot整合redisson實(shí)現(xiàn)延時隊(duì)列(附倉庫地址)的文章就介紹到這了,更多相關(guān)springboot redisson延時隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
HttpClient實(shí)現(xiàn)遠(yuǎn)程調(diào)用
這篇文章主要為大家詳細(xì)介紹了HttpClient實(shí)現(xiàn)遠(yuǎn)程調(diào)用的方法,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-08-08
Java實(shí)現(xiàn)二分查找BinarySearch算法
這篇文章主要介紹了Java實(shí)現(xiàn)二分查找BinarySearch算法,二分查找針對的是一個有序的數(shù)據(jù)集合,每次都通過跟區(qū)間的中間元素對比,將待查找的區(qū)間縮小為之前的一半,直到找到要查找的元素,或者區(qū)間被縮小為 0,需要的朋友可以參考下2023-12-12
java處理轉(zhuǎn)義字符↑ → ↓ 保存后的展示還原操作
這篇文章主要介紹了java處理轉(zhuǎn)義字符↑ → ↓ 保存后的展示還原操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06
SpringMVC基于注解方式實(shí)現(xiàn)上傳下載
本文主要介紹了SpringMVC基于注解方式實(shí)現(xiàn)上傳下載,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04
Springmvc自定義參數(shù)轉(zhuǎn)換實(shí)現(xiàn)代碼解析
這篇文章主要介紹了Springmvc自定義參數(shù)轉(zhuǎn)換實(shí)現(xiàn)代碼解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-07-07
SpringBoot如何實(shí)現(xiàn)緩存預(yù)熱
緩存預(yù)熱是指在 Spring Boot 項(xiàng)目啟動時,預(yù)先將數(shù)據(jù)加載到緩存系統(tǒng)(如 Redis)中的一種機(jī)制,本文主要介紹了SpringBoot如何實(shí)現(xiàn)緩存預(yù)熱,感興趣的可以了解下2024-12-12

