JAVA中的延遲隊(duì)列DelayQueue應(yīng)用解析
前言
最近在開(kāi)發(fā)CRM管理系統(tǒng)時(shí)遇到一個(gè)需求:銷售部門(mén)的人員在使用該系統(tǒng)時(shí),可以從【線索公?!磕K中 “領(lǐng)取” 潛在的客戶線索到自己的【線索私?!磕K中,成為自己私有的潛在客戶線索,以便后期進(jìn)行跟蹤、開(kāi)發(fā),同時(shí),也可以主動(dòng)放棄該線索,將線索 “釋放” 回【線索公?!恐?,若開(kāi)發(fā)成功,則客戶進(jìn)入【客戶私海】模塊中,成為自己的潛在客戶,若這時(shí)不想繼續(xù)開(kāi)發(fā)這個(gè)客戶了,進(jìn)行 “釋放”,則該客戶進(jìn)入【客戶公海】中以供所有銷售進(jìn)行 “領(lǐng)取”,誰(shuí)領(lǐng)取到了,就進(jìn)入相應(yīng)銷售的【客戶私?!恐?/p>
在這個(gè)基礎(chǔ)上,我們希望實(shí)現(xiàn)這樣一個(gè)功能: 用戶在領(lǐng)取了線索后,若24小時(shí)內(nèi)沒(méi)有將線索成功開(kāi)發(fā)為自己的潛在客戶,則自動(dòng)釋放使之成為公海線索,并且48小時(shí)內(nèi)凍結(jié)該線索(無(wú)法領(lǐng)?。?,同樣,潛在客戶60天內(nèi)沒(méi)有開(kāi)發(fā)成正式客戶,則自動(dòng)釋放該客戶資源到公海中,同樣是48小時(shí)內(nèi)不能被重新認(rèn)領(lǐng)
在這個(gè)場(chǎng)景下,我想到了DelayQueue
DelayQueue介紹
簡(jiǎn)單來(lái)說(shuō),DelayQueue是一個(gè)根據(jù)元素的到期時(shí)間來(lái)排序的隊(duì)列,而并非是一般的隊(duì)列那樣先進(jìn)先出,最快過(guò)期的元素排在隊(duì)首,越晚到期的元素排得越后 使用時(shí),元素必須實(shí)現(xiàn)Delayed接口,生產(chǎn)者線程往隊(duì)列里添加元素時(shí),會(huì)觸發(fā)Delayed接口中的compareTo方法進(jìn)行排序,消費(fèi)者線索獲取元素時(shí),會(huì)調(diào)用Delayed接口中的getDelay方法來(lái)檢查隊(duì)首元素是否到期,getDelay方法返回的是離到期時(shí)間剩余的時(shí)間值,若getDelay返回的值?。盎蛘叩扔冢埃瑒t表示已到期,消費(fèi)者線程取出進(jìn)行消費(fèi),若getDelay方法返回的值大于0,則消費(fèi)者線程會(huì)被阻塞,wait返回的時(shí)間值后,再?gòu)年?duì)列頭部取出元素進(jìn)行消費(fèi)
數(shù)據(jù)結(jié)構(gòu)
閱讀DelayQueue的源碼

可以看到它包含了: 一個(gè)PriorityQueue——PriorityQueue是一個(gè)優(yōu)先級(jí)隊(duì)列,它是一個(gè)沒(méi)有阻塞功能的Queue,也就是說(shuō)DelayQueue底層通過(guò)PriorityQueue來(lái)實(shí)現(xiàn)元素的存儲(chǔ)
一個(gè)ReentrantLock鎖
一個(gè)線程leader——DelayQueue使用類似Leader-Followr模式,即消費(fèi)者線程要獲取元素時(shí),若元素還沒(méi)過(guò)期,則消費(fèi)者線程阻塞等待的時(shí)間即元素的剩余過(guò)期時(shí)間,即消費(fèi)者線程等待的元素保證是最先過(guò)期的元素,這樣消費(fèi)者線程可以盡量把時(shí)間花在處理任務(wù)上,最小化空等的時(shí)間,以提高線程的利用效率
一個(gè)阻塞的條件Condition——實(shí)現(xiàn)出隊(duì)時(shí)阻塞的功能
特性
DelayQueue是一個(gè)無(wú)界隊(duì)列,因此入隊(duì)時(shí)不會(huì)阻塞,與優(yōu)先級(jí)隊(duì)列入隊(duì)相同 DelayQueue的特性主要在出隊(duì)上 出隊(duì)時(shí): 1.若隊(duì)列為空,則阻塞 2.若不為空,則檢查堆頂?shù)脑厥欠襁^(guò)期,剩余過(guò)期時(shí)間小于等于0則出隊(duì),若大于0,則:判斷當(dāng)前有無(wú)消費(fèi)者線程作為leader正在等待獲取元素,若leader不為null,則直接阻塞,若leader為null,則將當(dāng)前消費(fèi)者線程設(shè)為leader,并按照最早過(guò)期的時(shí)間進(jìn)行阻塞
示意圖:

過(guò)了2s后,元素5到期了,喚醒消費(fèi)者線程1并獲取元素5進(jìn)行消費(fèi) 同時(shí)把消費(fèi)者線程2設(shè)為leader,此時(shí)元素4為堆頂元素,2s后到期,所以消費(fèi)者線程2的阻塞時(shí)間設(shè)置為2s

又過(guò)了2s,元素4到期,喚醒消費(fèi)者線程2并獲取元素4進(jìn)行消費(fèi) 消費(fèi)者線程1繼續(xù)處理元素5

繼續(xù)過(guò)2s后,若此時(shí)消費(fèi)者線程1或者消費(fèi)者線程2處理完任務(wù),則繼續(xù)獲取元素進(jìn)行消費(fèi),并且元素3剛剛好到期了 若此時(shí)兩個(gè)線程都沒(méi)有處理完任務(wù),則會(huì)出現(xiàn)元素3到期了,但是沒(méi)有消費(fèi)者來(lái)取出消費(fèi),同時(shí),隊(duì)列中不斷有新的元素入隊(duì),就會(huì)造成任務(wù)延期,隊(duì)列會(huì)越來(lái)越大,元素延遲處理的時(shí)間會(huì)越來(lái)越長(zhǎng)
假設(shè)此時(shí)又過(guò)了2s,還是沒(méi)有消費(fèi)者線程空下來(lái):

因此,若任務(wù)處理時(shí)間較長(zhǎng),任務(wù)增長(zhǎng)速度快,且到期時(shí)間較集中,則需要加快消費(fèi)者線程處理任務(wù)的速度和增加消費(fèi)者線程數(shù)量,否則就會(huì)造成任務(wù)延期越來(lái)越長(zhǎng),反之,也不能盲目增加消費(fèi)者線程數(shù)量,數(shù)量太多導(dǎo)致資源浪費(fèi)
實(shí)例
結(jié)合項(xiàng)目需求,使用DelayQueue來(lái)實(shí)現(xiàn)線索、客戶的超時(shí)功能 (1)創(chuàng)建任務(wù)類:DelayTask.java,實(shí)現(xiàn)Delayed接口,作為延遲隊(duì)列中的元素,然后只需將線索類、客戶類繼承該類
@Data
public class DelayTask implements Delayed {
/**
* 開(kāi)始計(jì)時(shí)時(shí)間 不設(shè)置則默認(rèn)為當(dāng)前系統(tǒng)時(shí)間
*/
private transient Date taskStartTime = new Date();
/**
* 過(guò)期時(shí)間 不設(shè)置則默認(rèn)1分鐘
*/
private transient long taskExpiredTime = 60 * 1000;
/**
* 初始設(shè)置開(kāi)始計(jì)時(shí)時(shí)間
* taskStartTime 開(kāi)始時(shí)間 [String] [yyyy-MM-dd HH:mm:ss]
* taskExpiredTime 過(guò)期時(shí)間 [long] 單位:s
* @param taskStartTime
* @param taskExpiredTime
*/
public void initTaskTime(String taskStartTime, long taskExpiredTime) {
if(Assert.notEmpty(taskStartTime)) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
this.taskStartTime = sdf.parse(taskStartTime);
} catch (ParseException e) {
e.printStackTrace();
}
}
this.taskExpiredTime = taskExpiredTime;
this.taskExpiredTime += this.taskStartTime.getTime();
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(taskExpiredTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return (this.getDelay(TimeUnit.MILLISECONDS) - ((DelayTask) o).getDelay(TimeUnit.MILLISECONDS)) > 0 ? 1:0;
}
}(2)創(chuàng)建一個(gè)單例的延遲隊(duì)列工具類:DelayQueueHelper 聲明了一個(gè)延遲隊(duì)列,并且對(duì)外提供一個(gè)統(tǒng)一、全局的操作延遲隊(duì)列的入口(入隊(duì)、刪除元素操作)
public class DelayQueueHelper {
private volatile static DelayQueueHelper delayQueueHelper = null;
//私海線索過(guò)期時(shí)間:24h
public static final long CLUE_EXPIRED_TIME = 24 * 60 * 60 * 1000;
//私??蛻暨^(guò)期時(shí)間:60天
public static final long CUS_EXPIRED_TIME = 60L * 24 * 60 * 60 * 1000;
//線索、客戶釋放后冷凍時(shí)間:48h
public static final long BLOCK_TIME = 48 * 60 * 60 * 1000;
private DelayQueue<DelayTask> queue = new DelayQueue<>();
private DelayQueueHelper() {
}
public static DelayQueueHelper getInstance() {
if(delayQueueHelper == null) {
synchronized(DelayQueueHelper.class) {
delayQueueHelper = new DelayQueueHelper();
}
}
return delayQueueHelper;
}
public void addTask(DelayTask task) {
queue.put(task);
}
public void removeTask(DelayTask task) {
if(task == null){
return;
}
for(Iterator<DelayTask> iterator = queue.iterator(); iterator.hasNext();) {
if(task instanceof Clue) {
Clue clue = (Clue) task;
Clue queueObj = (Clue) iterator.next();
if(clue.getId().equals(queueObj.getId())){
queue.remove(queueObj);
}
}
}
}
public DelayQueue<DelayTask> getQueue() {
return queue;
}
}(3)創(chuàng)建一個(gè)初始化類:DelayQueueRunner,實(shí)現(xiàn)ApplicationRunner接口
1.系統(tǒng)啟動(dòng)時(shí),首先將所有任務(wù)入隊(duì) (DelayQueue的缺點(diǎn):宕機(jī)、系統(tǒng)重啟后數(shù)據(jù)會(huì)被清空,因此系統(tǒng)初始化時(shí)需將所有滿足條件的元素入隊(duì))
2.開(kāi)啟一個(gè)消費(fèi)者線程,循環(huán)從延遲隊(duì)列中獲取到期的線索、客戶進(jìn)行消費(fèi)(將線索、客戶狀態(tài)修改為釋放狀態(tài)、解除凍結(jié)狀態(tài))
@Slf4j
@Component
public class DelayQueueRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
DelayQueueHelper queueHelper = DelayQueueHelper.getInstance();
//1.將所有未到期的線程、客戶入隊(duì)
//......
//2.開(kāi)啟一個(gè)消費(fèi)者線程
run(queueHelper.getQueue());
}
public void run(DelayQueue queue) {
new Thread() {
@Override
public void run() {
try {
while (true) {
DelayTask task = (DelayTask) queue.take();
executeTask(task);
}
} catch (InterruptedException e) {
log.error(e.getMessage());
e.printStackTrace();
}
}
}.start();
}
private void executeTask(DelayTask task) {
if(task instanceof Clue) {
Clue clue = (Clue) task;
//修改狀態(tài)
clue.update();
}
}
}(4)在添加、釋放線索記錄、客戶記錄時(shí),通過(guò)DelayQueueHelper對(duì)隊(duì)列中的元素進(jìn)行相應(yīng)的入隊(duì)、出隊(duì)操作
/**
* 將線索\客戶加入超時(shí)自動(dòng)更新?tīng)顟B(tài)隊(duì)列
* @param clue 線索\客戶對(duì)象
* @param type 0:私海線索 1:私??蛻?3:釋放后元素
* @param startTime 開(kāi)始計(jì)時(shí)時(shí)間
*/
public void addToTimeoutAutoUpdateQueue(Clue clue, int type, Date startTime) {
long expireTime = 0;
if(type == CLUE) { //線索隊(duì)列
expireTime = DelayQueueHelper.CLUE_EXPIRED_TIME;
}else if(type == CUS) { //客戶隊(duì)列
expireTime = DelayQueueHelper.CUS_EXPIRED_TIME;
}else if(type == LOCK) { //凍結(jié)隊(duì)列
expireTime = DelayQueueHelper.BLOCK_TIME;
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
DelayQueueHelper queueHelper = DelayQueueHelper.getInstance();
clue.initTaskTime(sdf.format(startTime), expireTime);
queueHelper.addTask(clue);
}
/**
* 將線索從超時(shí)自動(dòng)更新?tīng)顟B(tài)隊(duì)列中刪除
* @param clue
*/
public void removeFromTimeoutAutoUpdateQueue(Clue clue) {
DelayQueueHelper queueHelper = DelayQueueHelper.getInstance();
queueHelper.removeTask(clue);
}到此這篇關(guān)于JAVA中的延遲隊(duì)列DelayQueue應(yīng)用解析的文章就介紹到這了,更多相關(guān)JAVA的DelayQueue應(yīng)用內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 詳解Java線程池隊(duì)列中的延遲隊(duì)列DelayQueue
- Java延遲隊(duì)列DelayQueue原理詳解
- Java中的延遲隊(duì)列DelayQueue詳細(xì)解析
- Java阻塞延遲隊(duì)列DelayQueue原理及使用詳解
- Java中的延遲隊(duì)列DelayQueue源碼解析
- Java的延遲隊(duì)列之DelayQueue解讀
- java并發(fā)中DelayQueue延遲隊(duì)列原理剖析
- Java的DelayQueue延遲隊(duì)列簡(jiǎn)單使用代碼實(shí)例
- Java實(shí)現(xiàn)DelayQueue延遲隊(duì)列示例代碼
相關(guān)文章
新建Maven工程出現(xiàn)Process?Terminated的問(wèn)題解決
當(dāng)Maven出現(xiàn)"Process terminated"錯(cuò)誤時(shí),這通常是由于配置文件或路徑錯(cuò)誤導(dǎo)致的,本文主要介紹了新建Maven工程出現(xiàn)Process?Terminated的問(wèn)題解決,感興趣的可以了解一下2024-04-04
關(guān)于任務(wù)調(diào)度框架quartz使用(異常處理,解決恢復(fù)后多次調(diào)度處理)
這篇文章主要介紹了關(guān)于任務(wù)調(diào)度框架quartz使用(異常處理,解決恢復(fù)后多次調(diào)度處理),具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12
Spring Boot定制type Formatters實(shí)例詳解
在本篇文章里小編給大家整理的是關(guān)于Spring Boot定制type Formatters實(shí)例知識(shí)點(diǎn),需要的朋友們學(xué)習(xí)下。2019-11-11
java使用lambda表達(dá)式對(duì)List集合進(jìn)行操作技巧(JDK1.8)
這篇文章主要介紹了java使用lambda表達(dá)式對(duì)List集合進(jìn)行操作技巧適用jdk1.8,感興趣的朋友跟著小編一起看看實(shí)現(xiàn)代碼吧2018-06-06
使用HttpClient調(diào)用接口的實(shí)例講解
下面小編就為大家?guī)?lái)一篇使用HttpClient調(diào)用接口的實(shí)例講解。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-10-10
SpringBoot小程序推送信息的項(xiàng)目實(shí)踐
本文主要介紹了SpringBoot小程序推送信息的項(xiàng)目實(shí)踐,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04

