SpringBoot中Redisson延遲隊列的示例
場景:
需求:
支付的二維碼,超過兩個小時以后,如果還未支付,則自動轉為取消支付,或者支付超時的狀態(tài)
需求分析:
1,動態(tài)定時任務:
每個支付的二維碼創(chuàng)建的時候,創(chuàng)建一個動態(tài)的定時任務,兩個小時候自動執(zhí)行,更新支付狀態(tài),可以解決這個問題。
(1)持久化:
如果服務重啟了,動態(tài)定時任務會丟失,導致部分數(shù)據(jù)沒辦法更新狀態(tài)。
(2)分布式:
如果當服務重啟時,自動掃描數(shù)據(jù),重新計算時間,再次創(chuàng)建動態(tài)定時任務。可以解決(1)的問題,但是當分布式,多個節(jié)點的時候,都會重新加載所有的任務,這樣性能上不是最優(yōu)解,只能在數(shù)據(jù)源上加上節(jié)點名稱,不同的服務節(jié)點,加載屬于自己的定時任務,可以解決這個問題??偟南胂?,太麻煩了,還是算了。
2,Redisson延遲隊列
(1)持久化:隊列信息放在Redis上,服務重啟不影響。
(2)分布式:多節(jié)點去Redis拿去數(shù)據(jù),誰搶到算誰的,不會存在同一個任務,多個節(jié)點支持。唯一不足就是過度依賴Redis,萬一Redis崩了,那就涼涼了(那就是要把Redis配置高可用,當前業(yè)務就不用管了)??傮w來說還是比較好用的。
實現(xiàn)
1,創(chuàng)建延遲隊列的監(jiān)聽任務【RedisDelayedQueueListener】,消費延遲隊列
2,創(chuàng)建新增延遲隊列的類,用于創(chuàng)建延遲隊列
3,整體初始化,把監(jiān)聽任務與spring綁定,掃描各個監(jiān)聽延遲隊列的實現(xiàn)類,并開啟單獨線程,監(jiān)聽任務。
4,創(chuàng)建延遲任務(開始測試使用)
連接Redis
不貼代碼了,自己在網(wǎng)上搜
監(jiān)聽延遲隊列
接口:
/**
* 隊列事件監(jiān)聽接口,需要實現(xiàn)這個方法
*
* @module
* @author frank
* @date 2021/8/19 10:50
*/
public interface RedisDelayedQueueListener<T> {
/**
* 執(zhí)行方法
*
* @param t
*/
void invoke(T t);
}實現(xiàn):
import com.sxmaps.netschool.common.redisson.RedisDelayedQueueListener;
import com.sxmaps.netschool.service.vo.school.SchoolAccountPayStateReqVO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* 支付二維碼監(jiān)聽器
*
* @module
* @author frank
* @date 2021/8/19 10:49
*/
@Component
public class PayQCordListener implements RedisDelayedQueueListener<SchoolAccountPayStateReqVO> {
private final Logger logger = LoggerFactory.getLogger(PayQCordListener.class);
@Autowired
private SchoolAccountService schoolAccountService;
@Override
public void invoke(SchoolAccountPayStateReqVO payStateReqVO) {
logger.info("支付二維碼-延遲失效,內(nèi)容:{}", payStateReqVO);
//處理業(yè)務,更新二維碼狀態(tài)
logger.info("支付二維碼-延遲失效,內(nèi)容:{},處理結果:{}", payStateReqVO,respDTO);
}
}增加延遲隊列
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 增加延遲信息
*
* @author frank
* @module
* @date 2021/8/19 10:49
*/
@Component
public class RedisDelayedQueue {
private final Logger logger = LoggerFactory.getLogger(RedisDelayedQueue.class);
@Autowired
RedissonClient redissonClient;
/**
* 添加隊列
*
* @param t DTO傳輸類
* @param delay 時間數(shù)量
* @param timeUnit 時間單位
* @param <T> 泛型
*/
private <T> void addQueue(T t, long delay, TimeUnit timeUnit, String queueName) {
logger.info("添加延遲隊列,監(jiān)聽名稱:{},時間:{},時間單位:{},內(nèi)容:{}" , queueName, delay, timeUnit,t);
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
delayedQueue.offer(t, delay, timeUnit);
}
/**
* 添加隊列-秒
*
* @param t DTO傳輸類
* @param delay 時間數(shù)量
* @param <T> 泛型
*/
public <T> void addQueueSeconds(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
addQueue(t, delay, TimeUnit.SECONDS, clazz.getName());
}
/**
* 添加隊列-分
*
* @param t DTO傳輸類
* @param delay 時間數(shù)量
* @param <T> 泛型
*/
public <T> void addQueueMinutes(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
addQueue(t, delay, TimeUnit.MINUTES, clazz.getName());
}
/**
* 添加隊列-時
*
* @param t DTO傳輸類
* @param delay 時間數(shù)量
* @param <T> 泛型
*/
public <T> void addQueueHours(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
addQueue(t, delay, TimeUnit.HOURS, clazz.getName());
}
/**
* 添加隊列-天
*
* @param t DTO傳輸類
* @param delay 時間數(shù)量
* @param <T> 泛型
*/
public <T> void addQueueDays(T t, long delay, Class<? extends RedisDelayedQueueListener> clazz) {
addQueue(t, delay, TimeUnit.DAYS, clazz.getName());
}
}整體初始化
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 初始化隊列監(jiān)聽
*
* @module
* @author frank
* @date 2021/8/19 10:49
*/
@Component
public class RedisDelayedQueueInit implements ApplicationContextAware {
private final Logger logger = LoggerFactory.getLogger(RedisDelayedQueueInit.class);
@Autowired
RedissonClient redissonClient;
/**
* 獲取應用上下文并獲取相應的接口實現(xiàn)類
*
* @param applicationContext
* @throws BeansException
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
Map<String, RedisDelayedQueueListener> map = applicationContext.getBeansOfType(RedisDelayedQueueListener.class);
for (Map.Entry<String, RedisDelayedQueueListener> taskEventListenerEntry : map.entrySet()) {
String listenerName = taskEventListenerEntry.getValue().getClass().getName();
startThread(listenerName, taskEventListenerEntry.getValue());
}
}
/**
* 啟動線程獲取隊列*
*
* @param queueName queueName
* @param redisDelayedQueueListener 任務回調監(jiān)聽
* @param <T> 泛型
* @return
*/
private <T> void startThread(String queueName, RedisDelayedQueueListener redisDelayedQueueListener) {
RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(queueName);
//服務重啟后,無offer,take不到信息。
redissonClient.getDelayedQueue(blockingFairQueue);
//由于此線程需要常駐,可以新建線程,不用交給線程池管理
Thread thread = new Thread(() -> {
logger.info("啟動監(jiān)聽隊列線程" + queueName);
while (true) {
try {
T t = blockingFairQueue.take();
logger.info("監(jiān)聽隊列線程,監(jiān)聽名稱:{},內(nèi)容:{}", queueName, t);
redisDelayedQueueListener.invoke(t);
} catch (Exception e) {
logger.info("監(jiān)聽隊列線程錯誤,", e);
}
}
});
thread.setName(queueName);
thread.start();
}
}創(chuàng)建延遲任務
@Autowired RedisDelayedQueue queue; ................. queue.addQueueHours(new SchoolAccountPayStateReqVO(dto.getPayNo()),2, PayQCordListener.class);
到此這篇關于Redisson延遲隊列的示例的文章就介紹到這了,更多相關Redisson延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
使用spring?data的page和pageable如何實現(xiàn)分頁查詢
這篇文章主要介紹了使用spring?data的page和pageable如何實現(xiàn)分頁查詢,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-12-12
Struts2攔截器Interceptor的原理與配置實例詳解
攔截器是一種AOP(面向切面編程)思想的編程方式.它提供一種機制是開發(fā)者能夠把相對獨立的代碼抽離出來,配置到Action前后執(zhí)行。下面這篇文章主要給大家介紹了關于Struts2攔截器Interceptor的原理與配置的相關資料,需要的朋友可以參考下。2017-11-11
Java利用Geotools從DEM數(shù)據(jù)中讀取指定位置的高程信息全過程
Geotools作為一款功能強大且開源的地理工具庫,為地理數(shù)據(jù)的處理和分析提供了豐富的類庫和便捷的接口,能夠很好地滿足從DEM數(shù)據(jù)中讀取高程信息這一實戰(zhàn)需求,本文將深入講解如何利用Geotools從獲取DEM數(shù)據(jù)到成功讀取指定位置高程信息的全過程,需要的朋友可以參考下2025-03-03
Spring Boot 使用 Swagger 構建 RestAPI 接口文檔
這篇文章主要介紹了Spring Boot 使用 Swagger 構建 RestAPI 接口文檔,幫助大家更好的理解和使用Spring Boot框架,感興趣的朋友可以了解下2020-10-10

