springboot整合redisson實現(xiàn)延時隊列(附倉庫地址)
應(yīng)用場景
通常在一些需要經(jīng)歷一段時間或者到達某個指定時間節(jié)點才會執(zhí)行的功能,比如以下這些場景:
- 訂單超時提醒
- 收貨自動確認
- 會議提醒
- 代辦事項提醒
為什么使用延時隊列
對于數(shù)據(jù)量小且實時性要求不高的需求來說,最簡單的方法就是定時掃描數(shù)據(jù)庫。
但是,當數(shù)量達到數(shù)百萬、上千萬級別且時,定時掃庫就顯得非常低效且消耗資源,
甚至有些時間間隔小實時性要求高的情況,上一次掃描還沒結(jié)束,下一次就又開始了,
這時候如果使用延時隊列就會比較合適
延時隊列的幾種方式:
- Quartz 定時任務(wù)實現(xiàn)掃庫
- DelayQueue JDK中提供了一組實現(xiàn)延遲隊列的API
- Redis sorted set
- Redis 過期鍵監(jiān)聽回調(diào)
- RabbitMQ 死信隊列
- RabbitMQ 基于插件實現(xiàn)延遲隊列
- Wheel 時間輪訓算法
Redisson 實現(xiàn)延時隊列
顧名思義 Redis son 就是 Redis 的兒子,舉個栗子先:
1.引入 pom
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>${lastest.version}</version> </dependency>
2.封裝一個 RedissonQueue 類
@Service public class RedissonQueue { public static final String QUEUE = "delayQueue"; // 默認超時時間,30秒 public static final Integer DEFAULT_TIMEOUT = 30; @Resource private RedissonClient redissonClient; // 加入任務(wù)并設(shè)置到期時間 public void offer(String taskId, Integer timeout) { RDelayedQueue<String> delayedQueue = delayedQueue(); delayedQueue.offer(taskId, Objects.isNull(timeout) ? DEFAULT_TIMEOUT : timeout, TimeUnit.SECONDS); } // 移除任務(wù) public void remove(String taskId) { RDelayedQueue<String> delayedQueue = delayedQueue(); delayedQueue.removeIf(messageId -> messageId.equals(taskId)); } // 任務(wù)列表 public RDelayedQueue<String> delayedQueue() { RBlockingDeque<String> blockingDeque = blockingDeque(); return redissonClient.getDelayedQueue(blockingDeque); } public RBlockingDeque<String> blockingDeque() { return redissonClient.getBlockingDeque(QUEUE); } public boolean isShutdown() { return redissonClient.isShutdown(); } public void shutdown() { redissonClient.shutdown(); } }
3.交給 Spring 管理
@Slf4j @Service public class RedissonService implements ApplicationRunner { @Resource private RedissonQueue redissonQueue; @Resource(name = "threadPoolTaskExecutor") private ThreadPoolTaskExecutor executor; @Override public void run(ApplicationArguments args) { RBlockingDeque<String> blockingDeque = redissonQueue.blockingDeque(); executor.execute(() -> { while (true) { if (redissonQueue.isShutdown()) { return; } else { String messageId = null; try { messageId = blockingDeque.take(); } catch (InterruptedException e) { log.warn("RedissonConsumer error:{}", e.getMessage()); } if (!Objects.isNull(messageId) && !messageId.isEmpty()) { log.warn("timeout messageId : {}", messageId); } } } }); } // 初始化,啟動服務(wù)就執(zhí)行一次 @PostConstruct public void init() { redissonQueue.delayedQueue(); } @PreDestroy public void shutdown() { redissonQueue.shutdown(); } }
4.測試接口
@Operation(summary = "添加任務(wù)", description = "添加任務(wù)") @PostMapping public ResponseEntity<?> add(@RequestParam(value = "taskId", required = false) String taskId, @RequestParam(value = "timeout", required = false) Integer timeout) { taskId = StringUtils.isEmpty(taskId) ? String.valueOf(snowflake.nextId()) : taskId; redissonQueue.offer(taskId, timeout); return ResponseEntity.ok().body(redissonQueue.delayedQueue()); } @Operation(summary = "移除任務(wù)", description = "移除任務(wù)") @DeleteMapping(value = "/{taskId}") public ResponseEntity<?> remove(@PathVariable("taskId") String taskId) { redissonQueue.remove(taskId); return ResponseEntity.ok().body(redissonQueue.delayedQueue()); }
5.測試結(jié)果
添加10個任務(wù)
刪除第1個任務(wù)
可以看到第一個任務(wù)刪除后沒有被執(zhí)行(沒有設(shè)置到期時間,默認為30秒到期)
實現(xiàn)原理
- redisson_delay_queue_timeout:delayQueue,sorted set 數(shù)據(jù)類型,存放所有延遲任務(wù),按延遲任務(wù)的到期時間戳(提交任務(wù)時間戳 +
延遲時間)排序,所以列表最前面第一個元素就是整個延遲隊列中最早被執(zhí)行的任務(wù)。 - redisson_delay_queue:delayQueue,list 數(shù)據(jù)類型,也是存放所有任務(wù)。
- delayQueue,list 數(shù)據(jù)類型,被稱為目標隊列,這個里面存放的任務(wù)都是已經(jīng)到延遲時間的,可以被消費者獲取的任務(wù),所以上面示例中
RBlockingQueue 的 take 方法是從此目標隊列中獲取任務(wù)的。 - redisson_delay_queue_channel:delayQueue,是一個 channel,用來通知客戶端開啟一個延遲任務(wù)
- 生產(chǎn)者提交任務(wù)時將任務(wù)放到 redisson_delay_queue_timeout:delayQueue 中,提交任務(wù)的時間戳+延遲時間
- 客戶端會有一個延遲任務(wù),這個延遲任務(wù)會向 Redis Server 發(fā)送一段 lua 腳本,Redis 執(zhí)行 lua 腳本中的命令,此操作是原子性的
lua 腳本主要干兩件事
- 將到了延遲時間的任務(wù)從 redisson_delay_queue_timeout:delayQueue 中移除,存到 delayQueue 這個目標隊列
- 獲取到 redisson_delay_queue_timeout:delayQueue 中最早到期時間的任務(wù)的到期時間戳,發(fā)布到 redisson_delay_queue_channel:
delayQueue channel 中
當客戶端監(jiān)聽到 redisson_delay_queue_channel:delayQueue 這個 channel 的消息時,會再次提交一個客戶端延遲任務(wù),延遲時間就是消息(最早到期時間任務(wù)的到期時間戳)當前時間戳
這個時間其實也就是 redisson_delay_queue_channel:delayQueue 中最早到期時間的任務(wù)的剩余的延遲時間。
一旦時間來到最早到期時間任務(wù)的到期時間戳,redisson_delay_queue_timeout:delayQueue 中最早到期時間的任務(wù)已經(jīng)到期,客戶端的延遲任務(wù)也同時到期,
于是開始執(zhí)行 lua 腳本操作,及時將到期任務(wù)放到目標隊列中。然后再次發(fā)布剩余的延遲任務(wù)中最早到期任務(wù)的到期時間戳到 channel
中,
如此循環(huán)運行下去,保證 redisson_delay_queue_timeout:delayQueue 中到期數(shù)據(jù)能及時放到目標隊列中。
這里存在一個特殊情況,需要項目啟動時就執(zhí)行一次延時隊列。因為由于沒有客戶端延遲任務(wù)的執(zhí)行,
可能會出現(xiàn) redisson_delay_queue_timeout:delayQueue 隊列中有到期但是沒有被放到目標隊列的可能,啟動就執(zhí)行一次是為了保證到期的數(shù)據(jù)能被及時放到目標隊列中。
結(jié)論
Redisson 方案理論上沒有延遲,但當消息數(shù)量劇增,消費者消費緩慢這種情況下,可能會導致延遲任務(wù)消費的延遲。
消息丟失問題 Redisson 方案最大程度上減輕消息丟失的可能性,因為所有任務(wù)都是存在 list 和 sorted set 兩種數(shù)據(jù)類型中,Redis
有持久化機制。除非整個 redis 集群宕機,可能丟失一小部分數(shù)據(jù)。廣播任務(wù)問題,是不會出現(xiàn)的,因為每個客戶端都是從同一個目標隊列中獲取任務(wù)。
Redisson 這種實現(xiàn)方案是比較合適且靠譜的,一般中小型項目建議用 Redisson 實現(xiàn)延遲隊列,規(guī)模較大的項目直接上 MQ。
到此這篇關(guān)于springboot整合redisson實現(xiàn)延時隊列(附倉庫地址)的文章就介紹到這了,更多相關(guān)springboot redisson延時隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java實現(xiàn)二分查找BinarySearch算法
這篇文章主要介紹了Java實現(xiàn)二分查找BinarySearch算法,二分查找針對的是一個有序的數(shù)據(jù)集合,每次都通過跟區(qū)間的中間元素對比,將待查找的區(qū)間縮小為之前的一半,直到找到要查找的元素,或者區(qū)間被縮小為 0,需要的朋友可以參考下2023-12-12java處理轉(zhuǎn)義字符↑ → ↓ 保存后的展示還原操作
這篇文章主要介紹了java處理轉(zhuǎn)義字符↑ → ↓ 保存后的展示還原操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06Springmvc自定義參數(shù)轉(zhuǎn)換實現(xiàn)代碼解析
這篇文章主要介紹了Springmvc自定義參數(shù)轉(zhuǎn)換實現(xiàn)代碼解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-07-07