Java無界阻塞隊(duì)列DelayQueue詳細(xì)解析
概述
DelayQueue是一個(gè)支持時(shí)延獲取元素的無界阻塞隊(duì)列。
隊(duì)列使用PriorityQueue來實(shí)現(xiàn)。
隊(duì)列中的元素必須實(shí)現(xiàn)Delayed接口,在創(chuàng)建元素時(shí)可以指定多久才能從隊(duì)列中獲取當(dāng)前元素。
只有在延遲期滿時(shí)才能從隊(duì)列中提取元素。
DelayQueue可以運(yùn)用在以下兩個(gè)應(yīng)用場(chǎng)景:
緩存系統(tǒng)的設(shè)計(jì):使用DelayQueue保存緩存元素的有效期,使用一個(gè)線程循環(huán)查詢DelayQueue,一旦能從DelayQueue中獲取元素時(shí),就表示有緩存到期了。
定時(shí)任務(wù)調(diào)度:使用DelayQueue保存當(dāng)天要執(zhí)行的任務(wù)和執(zhí)行時(shí)間,一旦從DelayQueue中獲取到任務(wù)就開始執(zhí)行,比如Tiner就是使用DelayQueue實(shí)現(xiàn)的。
用法實(shí)例
import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * @author admin */ public class Message implements Delayed { /** *觸發(fā)時(shí)間 */ private long time; /** *名稱 */ String name; public Message(String name,long time,TimeUnit unit){ this.name = name; this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0); } @Override public long getDelay(TimeUnit unit) { return time - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { Message item = (Message) o; long diff = this.time - item.time; if (diff <= 0){ return -1; }else{ return 1; } } @Override public String toString() { return DelayQueueDemo.printDate() + "Message{" + "time=" + time + ", name=" + name + "/" + "}"; } }
import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.DelayQueue; import java.util.concurrent.TimeUnit; /** * @author admin */ public class DelayQueueDemo { public static void main(String[] args) throws InterruptedException { Message item1 = new Message("消息1",5, TimeUnit.SECONDS); Message item2 = new Message("消息2",10, TimeUnit.SECONDS); Message item3 = new Message("消息3",15, TimeUnit.SECONDS); DelayQueue<Message> queue = new DelayQueue<Message>(); queue.add(item1); queue.add(item2); queue.add(item3); int queueLengh = queue.size(); System.out.println(printDate() + "開始!"); for (int i = 0; i < queueLengh; i++) { Message take = queue.take(); System.out.format(printDate() + " 消息出隊(duì),屬性name=%s%n",take.name); } System.out.println(printDate() + "結(jié)束!"); } static String printDate(){ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return sdf.format(new Date()); } }
DelayQueue聲明
DelayQueue聲明如下:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>
從DelayQueue聲明可以看出,DelayQueue中的元素必須是Delayed接口的子類。
Delayed聲明如下:
public interface Delayed extends Comparable<Delayed> { /** *以給定的時(shí)間單位返回與此對(duì)象關(guān)聯(lián)的剩余延遲 */ long getDelay(TimeUnit unit); }
DelayQueue屬性
/** *可重入鎖 */ private final transient ReentrantLock lock = new ReentrantLock(); /** *緩存元素的優(yōu)先級(jí)隊(duì)列 */ private final PriorityQueue<E> q = new PriorityQueue<E>(); /** *特定的用于等待隊(duì)列頭中元素的線程 *Leader-Follower模式的變體形式 *用于最小化不必要的定時(shí)等待 */ private Thread leader = null; /** *當(dāng)更新的元素在隊(duì)列的開頭變得可用時(shí) *或在新線程可能需要成為領(lǐng)導(dǎo)者時(shí),會(huì)發(fā)出條件信號(hào) */ private final Condition available = lock.newCondition();
以上可以看出,延時(shí)隊(duì)列主要使用優(yōu)先級(jí)隊(duì)列來實(shí)現(xiàn),并輔以重入鎖和條件來控制并發(fā)安全。
DelayQueue構(gòu)造器
/** *默認(rèn)構(gòu)造器 */ public DelayQueue() {} /** *添加集合c中所有元素到隊(duì)列中 */ public DelayQueue(Collection<? extends E> c) { this.addAll(c); }
DelayQueue入隊(duì)
/* *將指定元素插入此延時(shí)隊(duì)列 */ public boolean add(E e) { return offer(e); } /* *將指定元素插入此延時(shí)隊(duì)列 *由于隊(duì)列是無界的,因此該方法將永遠(yuǎn)不會(huì)被阻塞 */ public void put(E e) { offer(e); } /* *將指定元素插入此延時(shí)隊(duì)列 *由于隊(duì)列是無界的,因此該方法將永遠(yuǎn)不會(huì)被阻塞 */ public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); }
以上幾個(gè)方法都會(huì)調(diào)用offer()方法。
public boolean offer(E e) { //獲取可重入鎖 final ReentrantLock lock = this.lock; //可重入鎖加鎖 lock.lock(); try { //調(diào)用優(yōu)先級(jí)隊(duì)列的offer()方法入隊(duì) q.offer(e); //如果入隊(duì)元素在隊(duì)首,則喚醒一個(gè)出隊(duì)線程 if (q.peek() == e) { leader = null; available.signal(); } //返回入隊(duì)成功 return true; } finally { //解鎖 lock.unlock(); } }
leader是等待獲取隊(duì)列頭元素的線程,應(yīng)用主從式設(shè)計(jì)減少不必要的等待。如果leader不為空,表示已經(jīng)有線程在等待獲取隊(duì)列的頭元素。
所以,通過await()方法讓出當(dāng)前線程等待信號(hào)。
如果leader為空,則把當(dāng)前線程設(shè)置為leader,當(dāng)一個(gè)線程為leader,它使用awaitNanos()方法讓當(dāng)前線程等待接收信號(hào)或等待delay時(shí)間。
DelayQueue出隊(duì)
poll()方法
/* *檢索并刪除次隊(duì)列的頭 *如果此隊(duì)列沒有延遲過期的元素,則返回null */ public E poll() { //獲取可重入鎖 final ReentrantLock lock = this.lock; //可重入鎖加鎖 lock.lock(); try { //檢索但不刪除隊(duì)列頭部元素 E first = q.peek(); //如果first為null或者返回與此對(duì)象關(guān)聯(lián)的剩余延遲時(shí)間大于0 //返回null if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else //否則通過優(yōu)先隊(duì)列poll()方法出隊(duì) return q.poll(); } finally { //可重入鎖解鎖 lock.unlock(); } }
take()方法
/* *檢索并除去此隊(duì)列的頭 *等待直到該隊(duì)列上具有過期延遲的元素可用 */ public E take() throws InterruptedException { //獲取可重入鎖 final ReentrantLock lock = this.lock; //可重入鎖加鎖 lock.lockInterruptibly(); try { for (;;) { //檢索但不刪除隊(duì)列頭部元素 E first = q.peek(); //如果first為空 if (first == null) //在available條件上等待 available.await(); else { //如果first非空 //獲取first的剩余延遲時(shí)間 long delay = first.getDelay(NANOSECONDS); //如果delay小于等于0 if (delay <= 0) //延遲時(shí)間到期,獲取并刪除頭部元素 return q.poll(); //如果delay大于0,即延遲時(shí)間未到期 //將first置為null first = null; //如果leader線程非空 if (leader != null) //當(dāng)前線程無限期阻塞 //等待leader線程喚醒 available.await(); else { //如果leader線程為空 //獲取當(dāng)前線程 Thread thisThread = Thread.currentThread(); //是當(dāng)前線程成為leader線程 leader = thisThread; try { //當(dāng)前線程等待剩余延遲時(shí)間 available.awaitNanos(delay); } finally { //如果當(dāng)前線程是leader線程 //釋放leader線程 if (leader == thisThread) leader = null; } } } } } finally { //如果leader線程為null并且隊(duì)列不為空 //說明沒有其他線程在等待,那就通知條件隊(duì)列 if (leader == null && q.peek() != null) //通過signal()方法喚醒一個(gè)出隊(duì)線程 available.signal(); //解鎖 lock.unlock(); } }
take()方法總結(jié):
- 獲取鎖。
- 取出優(yōu)先級(jí)隊(duì)列q的首元素。
- 如果元素q的隊(duì)首為空則阻塞。
- 如果元素q的隊(duì)首(first)不為空;獲取這個(gè)元素的delay時(shí)間值,如果first的延遲delay時(shí)間小于等于0,說明元素已經(jīng)到了可以使用的時(shí)間,調(diào)用poll()方法彈出該元素,跳出方法。
- 如果first的延遲delay時(shí)間大于0,釋放元素first的引用,避免內(nèi)存泄漏。
- 如果延遲delay時(shí)間大于0,leader非空,當(dāng)前線程等待。
- 如果延遲delay時(shí)間大于0,leader為空,將當(dāng)前線程設(shè)置為leader線程,等待剩余時(shí)間。
- 自旋,循環(huán)以上操作,直到return。
重載poll()方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException { //獲取等待時(shí)間 long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); //如果first為空 if (first == null) { //如果nanos小于等于0 if (nanos <= 0) //返回null return null; else //如果nanos大于0 //等待nanos時(shí)間 nanos = available.awaitNanos(nanos); } else { //如果隊(duì)首非空 //獲取first的剩余延遲時(shí)間 long delay = first.getDelay(NANOSECONDS); //如果delay小于等于0 if (delay <= 0) //延遲時(shí)間到期,獲取并刪除頭部元素 return q.poll(); //如果delay大于0 //如果nanos小于等于0 if (nanos <= 0) //返回null return null; //如果delay大于0且nanos大于0 //first置為null first = null; //如果nanos小于delay或者leader非空 if (nanos < delay || leader != null) //等待delay時(shí)間 nanos = available.awaitNanos(nanos); else { //如果nanos大于等于delay或者leader為空 //獲取當(dāng)前線程 Thread thisThread = Thread.currentThread(); //設(shè)置當(dāng)前線程為leader leader = thisThread; try { //等待delay時(shí)間 long timeLeft = available.awaitNanos(delay); //修改nanos nanos -= delay - timeLeft; } finally { //如果當(dāng)前線程為leader線程 //釋放leader線程 if (leader == thisThread) leader = null; } } } } } finally { //如果leader為null并且隊(duì)列不為空 //說明沒有其他線程在等待,那就通知條件隊(duì)列 if (leader == null && q.peek() != null) //通過singnal()方法喚醒一個(gè)出隊(duì)線程 available.signal(); //解鎖 lock.unlock(); } }
到此這篇關(guān)于Java無界阻塞隊(duì)列DelayQueue詳細(xì)解析的文章就介紹到這了,更多相關(guān)Java無界阻塞隊(duì)列DelayQueue內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
聊一聊new對(duì)象與Spring對(duì)bean的初始化的差別
這篇文章主要介紹了聊一聊new對(duì)象與Spring對(duì)bean的初始化的差別,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02SpringCloud如何搭建一個(gè)多模塊項(xiàng)目
這篇文章主要介紹了SpringCloud如何搭建一個(gè)多模塊項(xiàng)目,記錄下使用SpringCloud創(chuàng)建多模塊項(xiàng)目,一步一步記錄搭建的過程,感興趣的可以了解一下2021-05-05springboot 默認(rèn)靜態(tài)路徑實(shí)例解析
這篇文章主要介紹了springboot 默認(rèn)靜態(tài)路徑實(shí)例解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11Spring AI TikaDocumentReader詳解
TikaDocumentReader是SpringAI中用于從多種格式文檔中提取文本內(nèi)容的組件,支持PDF、DOC/DOCX、PPT/PPTX和HTML等格式,它在構(gòu)建知識(shí)庫、文檔處理和數(shù)據(jù)清洗等任務(wù)中非常有用2025-01-01Java獲取年月日(格式:xxxx年xx月xx日)的方法詳解
在開發(fā)應(yīng)用程序時(shí),經(jīng)常需要獲取當(dāng)前的年、月、日,并以特定格式進(jìn)行展示或處理,本文將介紹如何獲取年月日,并將其格式化為“xxxx年xx月xx日”的形式,幫助你在應(yīng)用程序中處理日期信息,需要的朋友可以參考下2023-10-10idea創(chuàng)建spring boot項(xiàng)目及java版本只能選擇17和21的問題
這篇文章主要介紹了idea創(chuàng)建spring boot項(xiàng)目及java版本只能選擇17和21的問題,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2024-01-01Java并發(fā)LinkedBlockingQueue源碼分析
這篇文章主要為大家介紹了Java并發(fā)LinkedBlockingQueue源碼分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02SpringBoot使用classfinal-maven-plugin插件加密Jar包的示例代碼
這篇文章給大家介紹了SpringBoot使用classfinal-maven-plugin插件加密Jar包的實(shí)例,文中通過代碼示例和圖文講解的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2024-02-02