???? SSI ????????

欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

java
???? SSI ????????
首頁(yè) > 軟件編程 > java > java延時(shí)隊(duì)列超時(shí)訂單處理

Java使用延時(shí)隊(duì)列搞定超時(shí)訂單處理的場(chǎng)景

作者:趕路人兒

這篇文章主要介紹了Java使用延時(shí)隊(duì)列搞定超時(shí)訂單處理,本文通過(guò)示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

1、延時(shí)隊(duì)列使用場(chǎng)景:

那么什么時(shí)候需要用延時(shí)隊(duì)列呢?常見的延時(shí)任務(wù)場(chǎng)景 舉栗子:

  1. 訂單在30分鐘之內(nèi)未支付則自動(dòng)取消。
  2. 重試機(jī)制實(shí)現(xiàn),把調(diào)用失敗的接口放入一個(gè)固定延時(shí)的隊(duì)列,到期后再重試。
  3. 新創(chuàng)建的店鋪,如果在十天內(nèi)都沒(méi)有上傳過(guò)商品,則自動(dòng)發(fā)送消息提醒。
  4. 用戶發(fā)起退款,如果三天內(nèi)沒(méi)有得到處理則通知相關(guān)運(yùn)營(yíng)人員。
  5. 預(yù)定會(huì)議后,需要在預(yù)定的時(shí)間點(diǎn)前十分鐘通知各個(gè)與會(huì)人員參加會(huì)議。
  6. 關(guān)閉空閑連接,服務(wù)器中,有很多客戶端的連接,空閑一段時(shí)間之后需要關(guān)閉之。
  7. 清理過(guò)期數(shù)據(jù)業(yè)務(wù)。比如緩存中的對(duì)象,超過(guò)了空閑時(shí)間,需要從緩存中移出。

解決方案也非常多:

SchedulerX(阿里)

對(duì)于單機(jī)服務(wù)優(yōu)選DelayQueue,對(duì)于分布式環(huán)境,可以使用mq、zk、redis之類的。接下來(lái),介紹DelayQueue的使用。

一句話介紹:DelayQueue = BlockingQueue + PriorityQueue + Delayed

2、示例:

實(shí)戰(zhàn)以訂單下單后三十分鐘內(nèi)未支付則自動(dòng)取消 為業(yè)務(wù)場(chǎng)景,該場(chǎng)景的代碼邏輯分析如下:

  1. 下單后將訂單直接放入未支付的延時(shí)隊(duì)列中
  2. 如果超時(shí)未支付,則從隊(duì)列中取出,進(jìn)行修改為取消狀態(tài)的訂單
  3. 如果支付了,則不去進(jìn)行取消,或者取消的時(shí)候做個(gè)狀態(tài)篩選,即可避免更新
  4. 或者支付完成后,做個(gè)主動(dòng)出隊(duì)

還有就是用戶主動(dòng)取消訂單,也做個(gè)主動(dòng)出隊(duì)

1)先來(lái)寫個(gè)通用的??Delayed?? :

import lombok.Getter;
import lombok.Setter;
import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

@Setter
@Getter
public class ItemDelayed<T> implements Delayed {

    /**默認(rèn)延遲30分鐘*/
    private final static long DELAY = 30 * 60 * 1000L;
    /**數(shù)據(jù)id*/
    private Long dataId;
    /**開始時(shí)間*/
    private long startTime;
    /**到期時(shí)間*/
    private long expire;
    /**創(chuàng)建時(shí)間*/
    private Date now;
    /**泛型data*/
    private T data;
    
    public ItemDelayed(Long dataId, long startTime, long secondsDelay) {
        super();
        this.dataId = dataId;
        this.startTime = startTime;
        this.expire = startTime + (secondsDelay * 1000);
        this.now = new Date();
    }

    public ItemDelayed(Long dataId, long startTime) {
        super();
        this.dataId = dataId;
        this.startTime = startTime;
        this.expire = startTime + DELAY;
        this.now = new Date();
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
}

2)再寫個(gè)通用的接口,用于規(guī)范和方便統(tǒng)一實(shí)現(xiàn) 這樣任何類型的訂單都可以實(shí)現(xiàn)這個(gè)接口 進(jìn)行延時(shí)任務(wù)的處理:

public interface DelayOrder<T> {

    /**
     * 添加延遲對(duì)象到延時(shí)隊(duì)列
     *
     * @param itemDelayed 延遲對(duì)象
     * @return boolean
     */
    boolean addToOrderDelayQueue(ItemDelayed<T> itemDelayed);

    /**
     * 根據(jù)對(duì)象添加到指定延時(shí)隊(duì)列
     *
     * @param data 數(shù)據(jù)對(duì)象
     * @return boolean
     */
    boolean addToDelayQueue(T data);

    /**
     * 移除指定的延遲對(duì)象從延時(shí)隊(duì)列中
     *
     * @param data
     */
    void removeToOrderDelayQueue(T data);
}

具體業(yè)務(wù)邏輯實(shí)現(xiàn):

@Slf4j
@Lazy(false)
@Component
public class DelayOwnOrderImpl implements DelayOrder<Order> {

    @Autowired
    private OrderService orderService;

    @Autowired
    private ExecutorService delayOrderExecutor;

    private final static DelayQueue<ItemDelayed<Order>> DELAY_QUEUE = new DelayQueue<>();

    /**
     * 初始化時(shí)加載數(shù)據(jù)庫(kù)中需處理超時(shí)的訂單
     * 系統(tǒng)啟動(dòng):掃描數(shù)據(jù)庫(kù)中未支付(要在更新時(shí):加上已支付就不用更新了),未過(guò)期的的訂單
     */
    @PostConstruct
    public void init() {
        log.info("系統(tǒng)啟動(dòng):掃描數(shù)據(jù)庫(kù)中未支付,未過(guò)期的的訂單");
        List<Order> orderList = orderService.selectFutureOverTimeOrder();
        for (Order order : orderList) {
            ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime());
            this.addToOrderDelayQueue(orderDelayed);
        }
        log.info("系統(tǒng)啟動(dòng):掃描數(shù)據(jù)庫(kù)中未支付的訂單,總共掃描了" + orderList.size() + "個(gè)訂單,推入檢查隊(duì)列,準(zhǔn)備到期檢查...");

        /*啟動(dòng)一個(gè)線程,去取延遲訂單*/
        delayOrderExecutor.execute(() -> {
            log.info("啟動(dòng)處理的訂單線程:" + Thread.currentThread().getName());
            ItemDelayed<Order> orderDelayed;
            while (true) {
                try {
                    orderDelayed = DELAY_QUEUE.take();
                    //處理超時(shí)訂單
                    orderService.updateCloseOverTimeOrder(orderDelayed.getDataId());
                } catch (Exception e) {
                    log.error("執(zhí)行自營(yíng)超時(shí)訂單的_延遲隊(duì)列_異常:" + e);
                }
            }
        });
    }

    /**
     * 加入延遲消息隊(duì)列
     **/
    @Override
    public boolean addToOrderDelayQueue(ItemDelayed<Order> orderDelayed) {
        return DELAY_QUEUE.add(orderDelayed);
    }

    /**
     * 加入延遲消息隊(duì)列
     **/
    @Override
    public boolean addToDelayQueue(Order order) {
        ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime());
        return DELAY_QUEUE.add(orderDelayed);
    }

    /**
     * 從延遲隊(duì)列中移除 主動(dòng)取消就主動(dòng)從隊(duì)列中取出
     **/
    @Override
    public void removeToOrderDelayQueue(Order order) {
        if (order == null) {
            return;
        }
        for (Iterator<ItemDelayed<Order>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) {
            ItemDelayed<Order> queue = iterator.next();
            if (queue.getDataId().equals(order.getId())) {
                DELAY_QUEUE.remove(queue);
            }
        }
    }
}

分析:

  1. delayOrderExecutor是注入的一個(gè)專門處理出隊(duì)的一個(gè)線程
  2. @PostConstruct是啥呢,是在容器啟動(dòng)后只進(jìn)行一次初始化動(dòng)作的一個(gè)注解,相當(dāng)實(shí)用
  3. 啟動(dòng)后呢,我們?nèi)?shù)據(jù)庫(kù)掃描一遍,防止有漏網(wǎng)之魚,因?yàn)閱螜C(jī)版嗎,隊(duì)列的數(shù)據(jù)是在內(nèi)存中的,重啟后肯定原先的數(shù)據(jù)會(huì)丟失,所以為保證服務(wù)質(zhì)量,我們可能會(huì)錄音.....所以為保證重啟后數(shù)據(jù)的恢復(fù),我們需要重新掃描數(shù)據(jù)庫(kù)把未支付的數(shù)據(jù)重新裝載到內(nèi)存的隊(duì)列中
  4. 接下來(lái)就是用這個(gè)線程去一直不停的訪問(wèn)隊(duì)列的take()方法,當(dāng)隊(duì)列無(wú)數(shù)據(jù)就一直阻塞,或者數(shù)據(jù)沒(méi)到期繼續(xù)阻塞著,直到到期出隊(duì),然后獲取訂單的信息,去處理訂單的更新操作

到此這篇關(guān)于Java使用延時(shí)隊(duì)列搞定超時(shí)訂單處理的文章就介紹到這了,更多相關(guān)java延時(shí)隊(duì)列超時(shí)訂單處理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

您可能感興趣的文章:
閱讀全文
???? SSI ????????
???? SSI ????????