Java使用延時(shí)隊(duì)列搞定超時(shí)訂單處理的場景
作者:趕路人兒
這篇文章主要介紹了Java使用延時(shí)隊(duì)列搞定超時(shí)訂單處理,本文通過示例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
1、延時(shí)隊(duì)列使用場景:
那么什么時(shí)候需要用延時(shí)隊(duì)列呢?常見的延時(shí)任務(wù)場景 舉栗子:
- 訂單在30分鐘之內(nèi)未支付則自動取消。
- 重試機(jī)制實(shí)現(xiàn),把調(diào)用失敗的接口放入一個(gè)固定延時(shí)的隊(duì)列,到期后再重試。
- 新創(chuàng)建的店鋪,如果在十天內(nèi)都沒有上傳過商品,則自動發(fā)送消息提醒。
- 用戶發(fā)起退款,如果三天內(nèi)沒有得到處理則通知相關(guān)運(yùn)營人員。
- 預(yù)定會議后,需要在預(yù)定的時(shí)間點(diǎn)前十分鐘通知各個(gè)與會人員參加會議。
- 關(guān)閉空閑連接,服務(wù)器中,有很多客戶端的連接,空閑一段時(shí)間之后需要關(guān)閉之。
- 清理過期數(shù)據(jù)業(yè)務(wù)。比如緩存中的對象,超過了空閑時(shí)間,需要從緩存中移出。
解決方案也非常多:
- 定期輪詢(數(shù)據(jù)庫等)
- JDK DelayQueue
- JDK Timer
- ScheduledExecutorService 周期性線程池
- 時(shí)間輪(kafka)
- 時(shí)間輪(Netty的HashedWheelTimer)
- Redis有序集合(zset)
- zookeeper之curator
- RabbitMQ
- Quartz,xxljob等定時(shí)任務(wù)框架
- Koala(考拉)
- JCronTab(仿crontab的java調(diào)度器)
SchedulerX(阿里)
對于單機(jī)服務(wù)優(yōu)選DelayQueue,對于分布式環(huán)境,可以使用mq、zk、redis之類的。接下來,介紹DelayQueue的使用。
一句話介紹:DelayQueue = BlockingQueue + PriorityQueue + Delayed
2、示例:
實(shí)戰(zhàn)以訂單下單后三十分鐘內(nèi)未支付則自動取消 為業(yè)務(wù)場景,該場景的代碼邏輯分析如下:
- 下單后將訂單直接放入未支付的延時(shí)隊(duì)列中
- 如果超時(shí)未支付,則從隊(duì)列中取出,進(jìn)行修改為取消狀態(tài)的訂單
- 如果支付了,則不去進(jìn)行取消,或者取消的時(shí)候做個(gè)狀態(tài)篩選,即可避免更新
- 或者支付完成后,做個(gè)主動出隊(duì)
還有就是用戶主動取消訂單,也做個(gè)主動出隊(duì)
1)先來寫個(gè)通用的??Delayed?? :
import lombok.Getter;
import lombok.Setter;
import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@Setter
@Getter
public class ItemDelayed<T> implements Delayed {
/**默認(rèn)延遲30分鐘*/
private final static long DELAY = 30 * 60 * 1000L;
/**數(shù)據(jù)id*/
private Long dataId;
/**開始時(shí)間*/
private long startTime;
/**到期時(shí)間*/
private long expire;
/**創(chuàng)建時(shí)間*/
private Date now;
/**泛型data*/
private T data;
public ItemDelayed(Long dataId, long startTime, long secondsDelay) {
super();
this.dataId = dataId;
this.startTime = startTime;
this.expire = startTime + (secondsDelay * 1000);
this.now = new Date();
}
public ItemDelayed(Long dataId, long startTime) {
super();
this.dataId = dataId;
this.startTime = startTime;
this.expire = startTime + DELAY;
this.now = new Date();
}
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
}2)再寫個(gè)通用的接口,用于規(guī)范和方便統(tǒng)一實(shí)現(xiàn) 這樣任何類型的訂單都可以實(shí)現(xiàn)這個(gè)接口 進(jìn)行延時(shí)任務(wù)的處理:
public interface DelayOrder<T> {
/**
* 添加延遲對象到延時(shí)隊(duì)列
*
* @param itemDelayed 延遲對象
* @return boolean
*/
boolean addToOrderDelayQueue(ItemDelayed<T> itemDelayed);
/**
* 根據(jù)對象添加到指定延時(shí)隊(duì)列
*
* @param data 數(shù)據(jù)對象
* @return boolean
*/
boolean addToDelayQueue(T data);
/**
* 移除指定的延遲對象從延時(shí)隊(duì)列中
*
* @param data
*/
void removeToOrderDelayQueue(T data);
}具體業(yè)務(wù)邏輯實(shí)現(xiàn):
@Slf4j
@Lazy(false)
@Component
public class DelayOwnOrderImpl implements DelayOrder<Order> {
@Autowired
private OrderService orderService;
@Autowired
private ExecutorService delayOrderExecutor;
private final static DelayQueue<ItemDelayed<Order>> DELAY_QUEUE = new DelayQueue<>();
/**
* 初始化時(shí)加載數(shù)據(jù)庫中需處理超時(shí)的訂單
* 系統(tǒng)啟動:掃描數(shù)據(jù)庫中未支付(要在更新時(shí):加上已支付就不用更新了),未過期的的訂單
*/
@PostConstruct
public void init() {
log.info("系統(tǒng)啟動:掃描數(shù)據(jù)庫中未支付,未過期的的訂單");
List<Order> orderList = orderService.selectFutureOverTimeOrder();
for (Order order : orderList) {
ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime());
this.addToOrderDelayQueue(orderDelayed);
}
log.info("系統(tǒng)啟動:掃描數(shù)據(jù)庫中未支付的訂單,總共掃描了" + orderList.size() + "個(gè)訂單,推入檢查隊(duì)列,準(zhǔn)備到期檢查...");
/*啟動一個(gè)線程,去取延遲訂單*/
delayOrderExecutor.execute(() -> {
log.info("啟動處理的訂單線程:" + Thread.currentThread().getName());
ItemDelayed<Order> orderDelayed;
while (true) {
try {
orderDelayed = DELAY_QUEUE.take();
//處理超時(shí)訂單
orderService.updateCloseOverTimeOrder(orderDelayed.getDataId());
} catch (Exception e) {
log.error("執(zhí)行自營超時(shí)訂單的_延遲隊(duì)列_異常:" + e);
}
}
});
}
/**
* 加入延遲消息隊(duì)列
**/
@Override
public boolean addToOrderDelayQueue(ItemDelayed<Order> orderDelayed) {
return DELAY_QUEUE.add(orderDelayed);
}
/**
* 加入延遲消息隊(duì)列
**/
@Override
public boolean addToDelayQueue(Order order) {
ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime());
return DELAY_QUEUE.add(orderDelayed);
}
/**
* 從延遲隊(duì)列中移除 主動取消就主動從隊(duì)列中取出
**/
@Override
public void removeToOrderDelayQueue(Order order) {
if (order == null) {
return;
}
for (Iterator<ItemDelayed<Order>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) {
ItemDelayed<Order> queue = iterator.next();
if (queue.getDataId().equals(order.getId())) {
DELAY_QUEUE.remove(queue);
}
}
}
}分析:
- delayOrderExecutor是注入的一個(gè)專門處理出隊(duì)的一個(gè)線程
- @PostConstruct是啥呢,是在容器啟動后只進(jìn)行一次初始化動作的一個(gè)注解,相當(dāng)實(shí)用
- 啟動后呢,我們?nèi)?shù)據(jù)庫掃描一遍,防止有漏網(wǎng)之魚,因?yàn)閱螜C(jī)版嗎,隊(duì)列的數(shù)據(jù)是在內(nèi)存中的,重啟后肯定原先的數(shù)據(jù)會丟失,所以為保證服務(wù)質(zhì)量,我們可能會錄音.....所以為保證重啟后數(shù)據(jù)的恢復(fù),我們需要重新掃描數(shù)據(jù)庫把未支付的數(shù)據(jù)重新裝載到內(nèi)存的隊(duì)列中
- 接下來就是用這個(gè)線程去一直不停的訪問隊(duì)列的take()方法,當(dāng)隊(duì)列無數(shù)據(jù)就一直阻塞,或者數(shù)據(jù)沒到期繼續(xù)阻塞著,直到到期出隊(duì),然后獲取訂單的信息,去處理訂單的更新操作
到此這篇關(guān)于Java使用延時(shí)隊(duì)列搞定超時(shí)訂單處理的文章就介紹到這了,更多相關(guān)java延時(shí)隊(duì)列超時(shí)訂單處理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!