Spring?Boot集成Redisson實(shí)現(xiàn)延遲隊(duì)列
項(xiàng)目場景:
在電商、支付等領(lǐng)域,往往會有這樣的場景,用戶下單后放棄支付了,那這筆訂單會在指定的時(shí)間段后進(jìn)行關(guān)閉操作,細(xì)心的你一定發(fā)現(xiàn)了像某寶、某東都有這樣的邏輯,而且時(shí)間很準(zhǔn)確,誤差在1s內(nèi);那他們是怎么實(shí)現(xiàn)的呢?
一般實(shí)現(xiàn)的方法有幾種:使用 redisson、rocketmq、rabbitmq等消息隊(duì)列的延時(shí)投遞功能。
解決方案:
一般項(xiàng)目集成redis的比較多,所以我這篇文章就說下redisson延遲隊(duì)列,如果使用rocketmq或rabbitmq需要額外集成中間件,比較麻煩一點(diǎn)。
1.集成redisson
maven依賴
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.21.1</version> </dependency>
yml配置,單節(jié)點(diǎn)配置可以兼容redis的配置方式
# redis配置
spring:
redis:
database: 0
host: 127.0.0.1
password: redis@pass
port: 6001更詳細(xì)的配置參考:Spring Boot整合Redisson的兩種方式
2.配置多線程
因?yàn)檠舆t隊(duì)列可能會多個(gè)任務(wù)同時(shí)執(zhí)行,所以需要多線程處理。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@EnableAsync
public class ExecutorConfig {
/**
* 異步任務(wù)自定義線程池
*/
@Bean(name = "taskExecutor")
public ThreadPoolTaskExecutor asyncServiceExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//配置核心線程數(shù)
executor.setCorePoolSize(50);
//配置最大線程數(shù)
executor.setMaxPoolSize(500);
//配置隊(duì)列大小
executor.setQueueCapacity(300);
//允許線程空閑時(shí)間
executor.setKeepAliveSeconds(60);
//配置線程池中的線程的名稱前綴
executor.setThreadNamePrefix("taskExecutor-");
// rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù)
// CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來執(zhí)行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//調(diào)用shutdown()方法時(shí)等待所有的任務(wù)完成后再關(guān)閉
executor.setWaitForTasksToCompleteOnShutdown(true);
//等待所有任務(wù)完成后的最大等待時(shí)間
executor.setAwaitTerminationSeconds(60);
return executor;
}
}3.具體業(yè)務(wù)
比如消息通知、關(guān)閉訂單等 ,這里加上了@Async注解,可以異步執(zhí)行
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Date;
@Service
public class AsyncService {
@Async
public void executeQueue(Object value) {
System.out.println();
System.out.println("當(dāng)前線程:"+Thread.currentThread().getName());
System.out.println("執(zhí)行任務(wù):"+value);
//打印時(shí)間方便查看
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("執(zhí)行任務(wù)的時(shí)間:"+sdf.format(new Date()));
//自己的業(yè)務(wù)邏輯,可以根據(jù)id發(fā)送通知消息等
//......
}
}4.延遲隊(duì)列(關(guān)鍵代碼)
這里包括添加延遲隊(duì)列,和消費(fèi)延遲隊(duì)列,@PostConstruct注解的意思是服務(wù)啟動加載一次,參考
Spring Boot項(xiàng)目啟動時(shí)執(zhí)行指定的方法
Spring Boot中多個(gè)PostConstruct注解執(zhí)行順序控制
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
@Service
public class TestService {
@Resource
private AsyncService asyncService;
@Resource
private ThreadPoolTaskExecutor executor;
@Autowired
private RedissonClient redissonClient;
/**
* 添加延遲任務(wù)
*/
public void addQueue() {
//獲取延遲隊(duì)列
RBlockingQueue<Object> blockingQueue = redissonClient.getBlockingQueue("delayedQueue");
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
for (int i = 1; i <= 10; i++) {
long delayTime = 5+i; //延遲時(shí)間(秒)
// long delayTime = 5; //這里時(shí)間統(tǒng)一,可以測試并發(fā)執(zhí)行
delayedQueue.offer("延遲任務(wù)"+i, delayTime, TimeUnit.SECONDS);
}
//打印時(shí)間方便查看
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("添加任務(wù)的時(shí)間:"+sdf.format(new Date()));
}
/**
* 服務(wù)啟動時(shí)加載,開始消費(fèi)延遲隊(duì)列
*/
@PostConstruct
public void consumer() {
System.out.println("服務(wù)啟動時(shí)加載>>>>>>");
//獲取延遲隊(duì)列
RBlockingQueue<Object> delayedQueue = redissonClient.getBlockingQueue("delayedQueue");
//啟用一個(gè)線程來消費(fèi)這個(gè)延遲隊(duì)列
executor.execute(() ->{
while (true){
try {
// System.out.println("while中的線程:"+Thread.currentThread().getName());
//獲取延遲隊(duì)列中的任務(wù)
Object value = delayedQueue.poll();
if(value == null){
//如果沒有任務(wù)就休眠1秒,休眠時(shí)間根據(jù)業(yè)務(wù)自己定義
Thread.sleep(1000); //這里休眠時(shí)間越短,誤差就越小
continue;
}
//異步處理延遲隊(duì)列中的消息
asyncService.executeQueue(value);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}5.測試接口
import com.test.service.TestService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/test")
public class TestController {
@Autowired
private TestService testService;
/*
* 添加延遲任務(wù)
*/
@GetMapping(value = "/addQueue")
public String addQueue() {
testService.addQueue();
return "success";
}
}6.測試結(jié)果

總結(jié):
- Redisson的的RDelayedQueue是基于Redis實(shí)現(xiàn)的,而Redis本身并不保證數(shù)據(jù)的持久性。如果Redis服務(wù)器宕機(jī),那么所有在RDelayedQueue中的數(shù)據(jù)都會丟失。因此,我們需要在應(yīng)用層面進(jìn)行持久化設(shè)計(jì),例如定期將RDelayedQueue中的數(shù)據(jù)持久化到數(shù)據(jù)庫。
- 在設(shè)計(jì)延遲任務(wù)時(shí),我們應(yīng)該根據(jù)實(shí)際需求來合理設(shè)置延遲時(shí)間,避免設(shè)置過長的延遲時(shí)間導(dǎo)致內(nèi)存占用過高。
到此這篇關(guān)于Spring Boot集成Redisson實(shí)現(xiàn)延遲隊(duì)列的文章就介紹到這了,更多相關(guān)SpringBoot Redisson延遲隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java并發(fā)包之CopyOnWriteArrayList類的深入講解
這篇文章主要給大家介紹了關(guān)于Java并發(fā)包之CopyOnWriteArrayList類的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12
如何在Java中創(chuàng)建線程通信的四種方式你知道嗎
開發(fā)中不免會遇到需要所有子線程執(zhí)行完畢通知主線程處理某些邏輯的場景?;蛘呤蔷€程 A 在執(zhí)行到某個(gè)條件通知線程 B 執(zhí)行某個(gè)操作。下面我們來一起學(xué)習(xí)如何解決吧2021-09-09
解決springboot 無法配置多個(gè)靜態(tài)路徑的問題
這篇文章主要介紹了解決springboot 無法配置多個(gè)靜態(tài)路徑的問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08
初探Spring Cloud Gateway實(shí)戰(zhàn)
這篇文章主要介紹了創(chuàng)建網(wǎng)關(guān)項(xiàng)目(Spring Cloud Gateway)過程詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2021-08-08
Spring異常實(shí)現(xiàn)統(tǒng)一處理的方法
這篇文章主要介紹了Spring異常實(shí)現(xiàn)統(tǒng)一處理的方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧2022-12-12

