Spring?Boot集成Redisson實現(xiàn)延遲隊列
項目場景:
在電商、支付等領(lǐng)域,往往會有這樣的場景,用戶下單后放棄支付了,那這筆訂單會在指定的時間段后進行關(guān)閉操作,細心的你一定發(fā)現(xiàn)了像某寶、某東都有這樣的邏輯,而且時間很準(zhǔn)確,誤差在1s內(nèi);那他們是怎么實現(xiàn)的呢?
一般實現(xiàn)的方法有幾種:使用 redisson、rocketmq、rabbitmq等消息隊列的延時投遞功能。
解決方案:
一般項目集成redis的比較多,所以我這篇文章就說下redisson延遲隊列,如果使用rocketmq或rabbitmq需要額外集成中間件,比較麻煩一點。
1.集成redisson
maven依賴
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.21.1</version> </dependency>
yml配置,單節(jié)點配置可以兼容redis的配置方式
# redis配置 spring: redis: database: 0 host: 127.0.0.1 password: redis@pass port: 6001
更詳細的配置參考:Spring Boot整合Redisson的兩種方式
2.配置多線程
因為延遲隊列可能會多個任務(wù)同時執(zhí)行,所以需要多線程處理。
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; @Configuration @EnableAsync public class ExecutorConfig { /** * 異步任務(wù)自定義線程池 */ @Bean(name = "taskExecutor") public ThreadPoolTaskExecutor asyncServiceExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心線程數(shù) executor.setCorePoolSize(50); //配置最大線程數(shù) executor.setMaxPoolSize(500); //配置隊列大小 executor.setQueueCapacity(300); //允許線程空閑時間 executor.setKeepAliveSeconds(60); //配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix("taskExecutor-"); // rejection-policy:當(dāng)pool已經(jīng)達到max size的時候,如何處理新任務(wù) // CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //調(diào)用shutdown()方法時等待所有的任務(wù)完成后再關(guān)閉 executor.setWaitForTasksToCompleteOnShutdown(true); //等待所有任務(wù)完成后的最大等待時間 executor.setAwaitTerminationSeconds(60); return executor; } }
3.具體業(yè)務(wù)
比如消息通知、關(guān)閉訂單等 ,這里加上了@Async注解,可以異步執(zhí)行
import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.text.SimpleDateFormat; import java.util.Date; @Service public class AsyncService { @Async public void executeQueue(Object value) { System.out.println(); System.out.println("當(dāng)前線程:"+Thread.currentThread().getName()); System.out.println("執(zhí)行任務(wù):"+value); //打印時間方便查看 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("執(zhí)行任務(wù)的時間:"+sdf.format(new Date())); //自己的業(yè)務(wù)邏輯,可以根據(jù)id發(fā)送通知消息等 //...... } }
4.延遲隊列(關(guān)鍵代碼)
這里包括添加延遲隊列,和消費延遲隊列,@PostConstruct注解的意思是服務(wù)啟動加載一次,參考
Spring Boot項目啟動時執(zhí)行指定的方法
Spring Boot中多個PostConstruct注解執(zhí)行順序控制
import org.redisson.api.RBlockingQueue; import org.redisson.api.RDelayedQueue; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.TimeUnit; @Service public class TestService { @Resource private AsyncService asyncService; @Resource private ThreadPoolTaskExecutor executor; @Autowired private RedissonClient redissonClient; /** * 添加延遲任務(wù) */ public void addQueue() { //獲取延遲隊列 RBlockingQueue<Object> blockingQueue = redissonClient.getBlockingQueue("delayedQueue"); RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingQueue); for (int i = 1; i <= 10; i++) { long delayTime = 5+i; //延遲時間(秒) // long delayTime = 5; //這里時間統(tǒng)一,可以測試并發(fā)執(zhí)行 delayedQueue.offer("延遲任務(wù)"+i, delayTime, TimeUnit.SECONDS); } //打印時間方便查看 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("添加任務(wù)的時間:"+sdf.format(new Date())); } /** * 服務(wù)啟動時加載,開始消費延遲隊列 */ @PostConstruct public void consumer() { System.out.println("服務(wù)啟動時加載>>>>>>"); //獲取延遲隊列 RBlockingQueue<Object> delayedQueue = redissonClient.getBlockingQueue("delayedQueue"); //啟用一個線程來消費這個延遲隊列 executor.execute(() ->{ while (true){ try { // System.out.println("while中的線程:"+Thread.currentThread().getName()); //獲取延遲隊列中的任務(wù) Object value = delayedQueue.poll(); if(value == null){ //如果沒有任務(wù)就休眠1秒,休眠時間根據(jù)業(yè)務(wù)自己定義 Thread.sleep(1000); //這里休眠時間越短,誤差就越小 continue; } //異步處理延遲隊列中的消息 asyncService.executeQueue(value); } catch (Exception e) { e.printStackTrace(); } } }); } }
5.測試接口
import com.test.service.TestService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/test") public class TestController { @Autowired private TestService testService; /* * 添加延遲任務(wù) */ @GetMapping(value = "/addQueue") public String addQueue() { testService.addQueue(); return "success"; } }
6.測試結(jié)果
總結(jié):
- Redisson的的RDelayedQueue是基于Redis實現(xiàn)的,而Redis本身并不保證數(shù)據(jù)的持久性。如果Redis服務(wù)器宕機,那么所有在RDelayedQueue中的數(shù)據(jù)都會丟失。因此,我們需要在應(yīng)用層面進行持久化設(shè)計,例如定期將RDelayedQueue中的數(shù)據(jù)持久化到數(shù)據(jù)庫。
- 在設(shè)計延遲任務(wù)時,我們應(yīng)該根據(jù)實際需求來合理設(shè)置延遲時間,避免設(shè)置過長的延遲時間導(dǎo)致內(nèi)存占用過高。
到此這篇關(guān)于Spring Boot集成Redisson實現(xiàn)延遲隊列的文章就介紹到這了,更多相關(guān)SpringBoot Redisson延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java并發(fā)包之CopyOnWriteArrayList類的深入講解
這篇文章主要給大家介紹了關(guān)于Java并發(fā)包之CopyOnWriteArrayList類的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12如何在Java中創(chuàng)建線程通信的四種方式你知道嗎
開發(fā)中不免會遇到需要所有子線程執(zhí)行完畢通知主線程處理某些邏輯的場景?;蛘呤蔷€程 A 在執(zhí)行到某個條件通知線程 B 執(zhí)行某個操作。下面我們來一起學(xué)習(xí)如何解決吧2021-09-09解決springboot 無法配置多個靜態(tài)路徑的問題
這篇文章主要介紹了解決springboot 無法配置多個靜態(tài)路徑的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08初探Spring Cloud Gateway實戰(zhàn)
這篇文章主要介紹了創(chuàng)建網(wǎng)關(guān)項目(Spring Cloud Gateway)過程詳解,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2021-08-08