JAVA中的延遲隊列DelayQueue應(yīng)用解析
前言
最近在開發(fā)CRM管理系統(tǒng)時遇到一個需求:銷售部門的人員在使用該系統(tǒng)時,可以從【線索公?!磕K中 “領(lǐng)取” 潛在的客戶線索到自己的【線索私海】模塊中,成為自己私有的潛在客戶線索,以便后期進行跟蹤、開發(fā),同時,也可以主動放棄該線索,將線索 “釋放” 回【線索公?!恐?,若開發(fā)成功,則客戶進入【客戶私?!磕K中,成為自己的潛在客戶,若這時不想繼續(xù)開發(fā)這個客戶了,進行 “釋放”,則該客戶進入【客戶公?!恐幸怨┧袖N售進行 “領(lǐng)取”,誰領(lǐng)取到了,就進入相應(yīng)銷售的【客戶私?!恐?/p>
在這個基礎(chǔ)上,我們希望實現(xiàn)這樣一個功能: 用戶在領(lǐng)取了線索后,若24小時內(nèi)沒有將線索成功開發(fā)為自己的潛在客戶,則自動釋放使之成為公海線索,并且48小時內(nèi)凍結(jié)該線索(無法領(lǐng)?。?,同樣,潛在客戶60天內(nèi)沒有開發(fā)成正式客戶,則自動釋放該客戶資源到公海中,同樣是48小時內(nèi)不能被重新認領(lǐng)
在這個場景下,我想到了DelayQueue
DelayQueue介紹
簡單來說,DelayQueue是一個根據(jù)元素的到期時間來排序的隊列,而并非是一般的隊列那樣先進先出,最快過期的元素排在隊首,越晚到期的元素排得越后 使用時,元素必須實現(xiàn)Delayed接口,生產(chǎn)者線程往隊列里添加元素時,會觸發(fā)Delayed接口中的compareTo方法進行排序,消費者線索獲取元素時,會調(diào)用Delayed接口中的getDelay方法來檢查隊首元素是否到期,getDelay方法返回的是離到期時間剩余的時間值,若getDelay返回的值?。盎蛘叩扔冢埃瑒t表示已到期,消費者線程取出進行消費,若getDelay方法返回的值大于0,則消費者線程會被阻塞,wait返回的時間值后,再從隊列頭部取出元素進行消費
數(shù)據(jù)結(jié)構(gòu)
閱讀DelayQueue的源碼
可以看到它包含了: 一個PriorityQueue——PriorityQueue是一個優(yōu)先級隊列,它是一個沒有阻塞功能的Queue,也就是說DelayQueue底層通過PriorityQueue來實現(xiàn)元素的存儲
一個ReentrantLock鎖
一個線程leader——DelayQueue使用類似Leader-Followr模式,即消費者線程要獲取元素時,若元素還沒過期,則消費者線程阻塞等待的時間即元素的剩余過期時間,即消費者線程等待的元素保證是最先過期的元素,這樣消費者線程可以盡量把時間花在處理任務(wù)上,最小化空等的時間,以提高線程的利用效率
一個阻塞的條件Condition——實現(xiàn)出隊時阻塞的功能
特性
DelayQueue是一個無界隊列,因此入隊時不會阻塞,與優(yōu)先級隊列入隊相同 DelayQueue的特性主要在出隊上 出隊時: 1.若隊列為空,則阻塞 2.若不為空,則檢查堆頂?shù)脑厥欠襁^期,剩余過期時間小于等于0則出隊,若大于0,則:判斷當前有無消費者線程作為leader正在等待獲取元素,若leader不為null,則直接阻塞,若leader為null,則將當前消費者線程設(shè)為leader,并按照最早過期的時間進行阻塞
示意圖:
過了2s后,元素5到期了,喚醒消費者線程1并獲取元素5進行消費 同時把消費者線程2設(shè)為leader,此時元素4為堆頂元素,2s后到期,所以消費者線程2的阻塞時間設(shè)置為2s
又過了2s,元素4到期,喚醒消費者線程2并獲取元素4進行消費 消費者線程1繼續(xù)處理元素5
繼續(xù)過2s后,若此時消費者線程1或者消費者線程2處理完任務(wù),則繼續(xù)獲取元素進行消費,并且元素3剛剛好到期了 若此時兩個線程都沒有處理完任務(wù),則會出現(xiàn)元素3到期了,但是沒有消費者來取出消費,同時,隊列中不斷有新的元素入隊,就會造成任務(wù)延期,隊列會越來越大,元素延遲處理的時間會越來越長
假設(shè)此時又過了2s,還是沒有消費者線程空下來:
因此,若任務(wù)處理時間較長,任務(wù)增長速度快,且到期時間較集中,則需要加快消費者線程處理任務(wù)的速度和增加消費者線程數(shù)量,否則就會造成任務(wù)延期越來越長,反之,也不能盲目增加消費者線程數(shù)量,數(shù)量太多導致資源浪費
實例
結(jié)合項目需求,使用DelayQueue來實現(xiàn)線索、客戶的超時功能 (1)創(chuàng)建任務(wù)類:DelayTask.java,實現(xiàn)Delayed接口,作為延遲隊列中的元素,然后只需將線索類、客戶類繼承該類
@Data public class DelayTask implements Delayed { /** * 開始計時時間 不設(shè)置則默認為當前系統(tǒng)時間 */ private transient Date taskStartTime = new Date(); /** * 過期時間 不設(shè)置則默認1分鐘 */ private transient long taskExpiredTime = 60 * 1000; /** * 初始設(shè)置開始計時時間 * taskStartTime 開始時間 [String] [yyyy-MM-dd HH:mm:ss] * taskExpiredTime 過期時間 [long] 單位:s * @param taskStartTime * @param taskExpiredTime */ public void initTaskTime(String taskStartTime, long taskExpiredTime) { if(Assert.notEmpty(taskStartTime)) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); try { this.taskStartTime = sdf.parse(taskStartTime); } catch (ParseException e) { e.printStackTrace(); } } this.taskExpiredTime = taskExpiredTime; this.taskExpiredTime += this.taskStartTime.getTime(); } @Override public long getDelay(TimeUnit unit) { return unit.convert(taskExpiredTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return (this.getDelay(TimeUnit.MILLISECONDS) - ((DelayTask) o).getDelay(TimeUnit.MILLISECONDS)) > 0 ? 1:0; } }
(2)創(chuàng)建一個單例的延遲隊列工具類:DelayQueueHelper 聲明了一個延遲隊列,并且對外提供一個統(tǒng)一、全局的操作延遲隊列的入口(入隊、刪除元素操作)
public class DelayQueueHelper { private volatile static DelayQueueHelper delayQueueHelper = null; //私海線索過期時間:24h public static final long CLUE_EXPIRED_TIME = 24 * 60 * 60 * 1000; //私海客戶過期時間:60天 public static final long CUS_EXPIRED_TIME = 60L * 24 * 60 * 60 * 1000; //線索、客戶釋放后冷凍時間:48h public static final long BLOCK_TIME = 48 * 60 * 60 * 1000; private DelayQueue<DelayTask> queue = new DelayQueue<>(); private DelayQueueHelper() { } public static DelayQueueHelper getInstance() { if(delayQueueHelper == null) { synchronized(DelayQueueHelper.class) { delayQueueHelper = new DelayQueueHelper(); } } return delayQueueHelper; } public void addTask(DelayTask task) { queue.put(task); } public void removeTask(DelayTask task) { if(task == null){ return; } for(Iterator<DelayTask> iterator = queue.iterator(); iterator.hasNext();) { if(task instanceof Clue) { Clue clue = (Clue) task; Clue queueObj = (Clue) iterator.next(); if(clue.getId().equals(queueObj.getId())){ queue.remove(queueObj); } } } } public DelayQueue<DelayTask> getQueue() { return queue; } }
(3)創(chuàng)建一個初始化類:DelayQueueRunner,實現(xiàn)ApplicationRunner接口
1.系統(tǒng)啟動時,首先將所有任務(wù)入隊 (DelayQueue的缺點:宕機、系統(tǒng)重啟后數(shù)據(jù)會被清空,因此系統(tǒng)初始化時需將所有滿足條件的元素入隊)
2.開啟一個消費者線程,循環(huán)從延遲隊列中獲取到期的線索、客戶進行消費(將線索、客戶狀態(tài)修改為釋放狀態(tài)、解除凍結(jié)狀態(tài))
@Slf4j @Component public class DelayQueueRunner implements ApplicationRunner { @Override public void run(ApplicationArguments args) throws Exception { DelayQueueHelper queueHelper = DelayQueueHelper.getInstance(); //1.將所有未到期的線程、客戶入隊 //...... //2.開啟一個消費者線程 run(queueHelper.getQueue()); } public void run(DelayQueue queue) { new Thread() { @Override public void run() { try { while (true) { DelayTask task = (DelayTask) queue.take(); executeTask(task); } } catch (InterruptedException e) { log.error(e.getMessage()); e.printStackTrace(); } } }.start(); } private void executeTask(DelayTask task) { if(task instanceof Clue) { Clue clue = (Clue) task; //修改狀態(tài) clue.update(); } } }
(4)在添加、釋放線索記錄、客戶記錄時,通過DelayQueueHelper對隊列中的元素進行相應(yīng)的入隊、出隊操作
/** * 將線索\客戶加入超時自動更新狀態(tài)隊列 * @param clue 線索\客戶對象 * @param type 0:私海線索 1:私??蛻?3:釋放后元素 * @param startTime 開始計時時間 */ public void addToTimeoutAutoUpdateQueue(Clue clue, int type, Date startTime) { long expireTime = 0; if(type == CLUE) { //線索隊列 expireTime = DelayQueueHelper.CLUE_EXPIRED_TIME; }else if(type == CUS) { //客戶隊列 expireTime = DelayQueueHelper.CUS_EXPIRED_TIME; }else if(type == LOCK) { //凍結(jié)隊列 expireTime = DelayQueueHelper.BLOCK_TIME; } SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); DelayQueueHelper queueHelper = DelayQueueHelper.getInstance(); clue.initTaskTime(sdf.format(startTime), expireTime); queueHelper.addTask(clue); } /** * 將線索從超時自動更新狀態(tài)隊列中刪除 * @param clue */ public void removeFromTimeoutAutoUpdateQueue(Clue clue) { DelayQueueHelper queueHelper = DelayQueueHelper.getInstance(); queueHelper.removeTask(clue); }
到此這篇關(guān)于JAVA中的延遲隊列DelayQueue應(yīng)用解析的文章就介紹到這了,更多相關(guān)JAVA的DelayQueue應(yīng)用內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
關(guān)于swagger配置及踩坑@Api參數(shù)postion無效解決接口排序問題
這篇文章主要介紹了關(guān)于swagger配置及踩坑@Api參數(shù)postion無效解決接口排序問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-06-06quartz實現(xiàn)定時功能實例詳解(servlet定時器配置方法)
Quartz是一個完全由java編寫的開源作業(yè)調(diào)度框架,下面提供一個小例子供大家參考,還有在servlet配置的方法2013-12-12@RequestMapping對不同參數(shù)的接收方式示例詳解
Spring?MVC框架中,@RequestMapping注解用于映射URL到控制器方法,不同的參數(shù)類型如簡單參數(shù)、實體參數(shù)、數(shù)組參數(shù)、集合參數(shù)、日期參數(shù)和JSON參數(shù),本文給大家介紹@RequestMapping對不同參數(shù)的接收方式,感興趣的朋友一起看看吧2024-10-10記錄一個使用Spring?Data?JPA設(shè)置默認值的問題
這篇文章主要介紹了使用Spring?Data?JPA設(shè)置默認值的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11