Java使用延時(shí)隊(duì)列搞定超時(shí)訂單處理的場(chǎng)景
作者:趕路人兒
這篇文章主要介紹了Java使用延時(shí)隊(duì)列搞定超時(shí)訂單處理,本文通過(guò)示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
1、延時(shí)隊(duì)列使用場(chǎng)景:
那么什么時(shí)候需要用延時(shí)隊(duì)列呢?常見的延時(shí)任務(wù)場(chǎng)景 舉栗子:
- 訂單在30分鐘之內(nèi)未支付則自動(dòng)取消。
- 重試機(jī)制實(shí)現(xiàn),把調(diào)用失敗的接口放入一個(gè)固定延時(shí)的隊(duì)列,到期后再重試。
- 新創(chuàng)建的店鋪,如果在十天內(nèi)都沒(méi)有上傳過(guò)商品,則自動(dòng)發(fā)送消息提醒。
- 用戶發(fā)起退款,如果三天內(nèi)沒(méi)有得到處理則通知相關(guān)運(yùn)營(yíng)人員。
- 預(yù)定會(huì)議后,需要在預(yù)定的時(shí)間點(diǎn)前十分鐘通知各個(gè)與會(huì)人員參加會(huì)議。
- 關(guān)閉空閑連接,服務(wù)器中,有很多客戶端的連接,空閑一段時(shí)間之后需要關(guān)閉之。
- 清理過(guò)期數(shù)據(jù)業(yè)務(wù)。比如緩存中的對(duì)象,超過(guò)了空閑時(shí)間,需要從緩存中移出。
解決方案也非常多:
- 定期輪詢(數(shù)據(jù)庫(kù)等)
- JDK DelayQueue
- JDK Timer
- ScheduledExecutorService 周期性線程池
- 時(shí)間輪(kafka)
- 時(shí)間輪(Netty的HashedWheelTimer)
- Redis有序集合(zset)
- zookeeper之curator
- RabbitMQ
- Quartz,xxljob等定時(shí)任務(wù)框架
- Koala(考拉)
- JCronTab(仿crontab的java調(diào)度器)
SchedulerX(阿里)
對(duì)于單機(jī)服務(wù)優(yōu)選DelayQueue,對(duì)于分布式環(huán)境,可以使用mq、zk、redis之類的。接下來(lái),介紹DelayQueue的使用。
一句話介紹:DelayQueue = BlockingQueue + PriorityQueue + Delayed
2、示例:
實(shí)戰(zhàn)以訂單下單后三十分鐘內(nèi)未支付則自動(dòng)取消 為業(yè)務(wù)場(chǎng)景,該場(chǎng)景的代碼邏輯分析如下:
- 下單后將訂單直接放入未支付的延時(shí)隊(duì)列中
- 如果超時(shí)未支付,則從隊(duì)列中取出,進(jìn)行修改為取消狀態(tài)的訂單
- 如果支付了,則不去進(jìn)行取消,或者取消的時(shí)候做個(gè)狀態(tài)篩選,即可避免更新
- 或者支付完成后,做個(gè)主動(dòng)出隊(duì)
還有就是用戶主動(dòng)取消訂單,也做個(gè)主動(dòng)出隊(duì)
1)先來(lái)寫個(gè)通用的??Delayed?
? :
import lombok.Getter; import lombok.Setter; import java.util.Date; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; @Setter @Getter public class ItemDelayed<T> implements Delayed { /**默認(rèn)延遲30分鐘*/ private final static long DELAY = 30 * 60 * 1000L; /**數(shù)據(jù)id*/ private Long dataId; /**開始時(shí)間*/ private long startTime; /**到期時(shí)間*/ private long expire; /**創(chuàng)建時(shí)間*/ private Date now; /**泛型data*/ private T data; public ItemDelayed(Long dataId, long startTime, long secondsDelay) { super(); this.dataId = dataId; this.startTime = startTime; this.expire = startTime + (secondsDelay * 1000); this.now = new Date(); } public ItemDelayed(Long dataId, long startTime) { super(); this.dataId = dataId; this.startTime = startTime; this.expire = startTime + DELAY; this.now = new Date(); } @Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } }
2)再寫個(gè)通用的接口,用于規(guī)范和方便統(tǒng)一實(shí)現(xiàn) 這樣任何類型的訂單都可以實(shí)現(xiàn)這個(gè)接口 進(jìn)行延時(shí)任務(wù)的處理:
public interface DelayOrder<T> { /** * 添加延遲對(duì)象到延時(shí)隊(duì)列 * * @param itemDelayed 延遲對(duì)象 * @return boolean */ boolean addToOrderDelayQueue(ItemDelayed<T> itemDelayed); /** * 根據(jù)對(duì)象添加到指定延時(shí)隊(duì)列 * * @param data 數(shù)據(jù)對(duì)象 * @return boolean */ boolean addToDelayQueue(T data); /** * 移除指定的延遲對(duì)象從延時(shí)隊(duì)列中 * * @param data */ void removeToOrderDelayQueue(T data); }
具體業(yè)務(wù)邏輯實(shí)現(xiàn):
@Slf4j @Lazy(false) @Component public class DelayOwnOrderImpl implements DelayOrder<Order> { @Autowired private OrderService orderService; @Autowired private ExecutorService delayOrderExecutor; private final static DelayQueue<ItemDelayed<Order>> DELAY_QUEUE = new DelayQueue<>(); /** * 初始化時(shí)加載數(shù)據(jù)庫(kù)中需處理超時(shí)的訂單 * 系統(tǒng)啟動(dòng):掃描數(shù)據(jù)庫(kù)中未支付(要在更新時(shí):加上已支付就不用更新了),未過(guò)期的的訂單 */ @PostConstruct public void init() { log.info("系統(tǒng)啟動(dòng):掃描數(shù)據(jù)庫(kù)中未支付,未過(guò)期的的訂單"); List<Order> orderList = orderService.selectFutureOverTimeOrder(); for (Order order : orderList) { ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime()); this.addToOrderDelayQueue(orderDelayed); } log.info("系統(tǒng)啟動(dòng):掃描數(shù)據(jù)庫(kù)中未支付的訂單,總共掃描了" + orderList.size() + "個(gè)訂單,推入檢查隊(duì)列,準(zhǔn)備到期檢查..."); /*啟動(dòng)一個(gè)線程,去取延遲訂單*/ delayOrderExecutor.execute(() -> { log.info("啟動(dòng)處理的訂單線程:" + Thread.currentThread().getName()); ItemDelayed<Order> orderDelayed; while (true) { try { orderDelayed = DELAY_QUEUE.take(); //處理超時(shí)訂單 orderService.updateCloseOverTimeOrder(orderDelayed.getDataId()); } catch (Exception e) { log.error("執(zhí)行自營(yíng)超時(shí)訂單的_延遲隊(duì)列_異常:" + e); } } }); } /** * 加入延遲消息隊(duì)列 **/ @Override public boolean addToOrderDelayQueue(ItemDelayed<Order> orderDelayed) { return DELAY_QUEUE.add(orderDelayed); } /** * 加入延遲消息隊(duì)列 **/ @Override public boolean addToDelayQueue(Order order) { ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime()); return DELAY_QUEUE.add(orderDelayed); } /** * 從延遲隊(duì)列中移除 主動(dòng)取消就主動(dòng)從隊(duì)列中取出 **/ @Override public void removeToOrderDelayQueue(Order order) { if (order == null) { return; } for (Iterator<ItemDelayed<Order>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) { ItemDelayed<Order> queue = iterator.next(); if (queue.getDataId().equals(order.getId())) { DELAY_QUEUE.remove(queue); } } } }
分析:
- delayOrderExecutor是注入的一個(gè)專門處理出隊(duì)的一個(gè)線程
- @PostConstruct是啥呢,是在容器啟動(dòng)后只進(jìn)行一次初始化動(dòng)作的一個(gè)注解,相當(dāng)實(shí)用
- 啟動(dòng)后呢,我們?nèi)?shù)據(jù)庫(kù)掃描一遍,防止有漏網(wǎng)之魚,因?yàn)閱螜C(jī)版嗎,隊(duì)列的數(shù)據(jù)是在內(nèi)存中的,重啟后肯定原先的數(shù)據(jù)會(huì)丟失,所以為保證服務(wù)質(zhì)量,我們可能會(huì)錄音.....所以為保證重啟后數(shù)據(jù)的恢復(fù),我們需要重新掃描數(shù)據(jù)庫(kù)把未支付的數(shù)據(jù)重新裝載到內(nèi)存的隊(duì)列中
- 接下來(lái)就是用這個(gè)線程去一直不停的訪問(wèn)隊(duì)列的take()方法,當(dāng)隊(duì)列無(wú)數(shù)據(jù)就一直阻塞,或者數(shù)據(jù)沒(méi)到期繼續(xù)阻塞著,直到到期出隊(duì),然后獲取訂單的信息,去處理訂單的更新操作
到此這篇關(guān)于Java使用延時(shí)隊(duì)列搞定超時(shí)訂單處理的文章就介紹到這了,更多相關(guān)java延時(shí)隊(duì)列超時(shí)訂單處理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!