Java延遲隊(duì)列DelayQueue原理詳解
什么是DelayQueue(延時(shí)隊(duì)列)
DelayQueue 是一個(gè)通過PriorityBlockingQueue實(shí)現(xiàn)延遲獲取元素的無界隊(duì)列無界阻塞隊(duì)列,其中添加進(jìn)該隊(duì)列的元素必須實(shí)現(xiàn)Delayed接口(指定延遲時(shí)間),而且只有在延遲期滿后才能從中提取元素。
什么是PriorityBlockingQueue(優(yōu)先隊(duì)列)
PriorityBlockingQueue是一個(gè)支持優(yōu)先級的無界阻塞隊(duì)列,隊(duì)列的元素默認(rèn)情況下元素采用自然順序升序排列,或者根據(jù)構(gòu)造隊(duì)列時(shí)提供的 Comparator 進(jìn)行排序,具體取決于所使用的構(gòu)造方法。
需要注意的是不能保證同優(yōu)先級元素的順序。
PriorityBlockingQueue也是基于最小二叉堆實(shí)現(xiàn),使用基于CAS實(shí)現(xiàn)的自旋鎖來控制隊(duì)列的動態(tài)擴(kuò)容,保證了擴(kuò)容操作不會阻塞take操作的執(zhí)行。
DelayQueue使用場景
DelayQueue可以運(yùn)用在以下應(yīng)用場景:
- 緩存系統(tǒng)的設(shè)計(jì):可以用DelayQueue保存緩存元素的有效期,使用一個(gè)線程循環(huán)查詢DelayQueue,一旦能從DelayQueue中獲取元素時(shí),表示緩存有效期到了。
- 定時(shí)任務(wù)調(diào)度。使用DelayQueue保存當(dāng)天將會執(zhí)行的任務(wù)和執(zhí)行時(shí)間,一旦從DelayQueue中獲取到任務(wù)就開始執(zhí)行,從比如TimerQueue就是使用DelayQueue實(shí)現(xiàn)的。
DelayQueue原理
DelayQueue的泛型參數(shù)需要實(shí)現(xiàn)Delayed接口,Delayed接口繼承了Comparable接口,DelayQueue內(nèi)部使用非線程安全的優(yōu)先隊(duì)列(PriorityQueue),并使用Leader/Followers模式,最小化不必要的等待時(shí)間。DelayQueue不允許包含null元素。
Leader/Followers模式:
有若干個(gè)線程(一般組成線程池)用來處理大量的事件
有一個(gè)線程作為領(lǐng)導(dǎo)者,等待事件的發(fā)生;其他的線程作為追隨者,僅僅是睡眠。
假如有事件需要處理,領(lǐng)導(dǎo)者會從追隨者中指定一個(gè)新的領(lǐng)導(dǎo)者,自己去處理事件。
喚醒的追隨者作為新的領(lǐng)導(dǎo)者等待事件的發(fā)生。
處理事件的線程處理完畢以后,就會成為追隨者的一員,直到被喚醒成為領(lǐng)導(dǎo)者。
假如需要處理的事件太多,而線程數(shù)量不夠(能夠動態(tài)創(chuàng)建線程處理另當(dāng)別論),則有的事件可能會得不到處理。
所有線程會有三種身份中的一種:leader和follower,以及一個(gè)干活中的狀態(tài):proccesser。它的基本原則就是,永遠(yuǎn)最多只有一個(gè)leader。而所有follower都在等待成為leader。線程池啟動時(shí)會自動產(chǎn)生一個(gè)Leader負(fù)責(zé)等待網(wǎng)絡(luò)IO事件,當(dāng)有一個(gè)事件產(chǎn)生時(shí),Leader線程首先通知一個(gè)Follower線程將其提拔為新的Leader,然后自己就去干活了,去處理這個(gè)網(wǎng)絡(luò)事件,處理完畢后加入Follower線程等待隊(duì)列,等待下次成為Leader。這種方法可以增強(qiáng)CPU高速緩存相似性,及消除動態(tài)內(nèi)存分配和線程間的數(shù)據(jù)交換。
DelayQueue源碼解析
DelayQueue屬性
//可重入同步鎖 private final transient ReentrantLock lock = new ReentrantLock(); //DelayQueue的實(shí)現(xiàn)依賴于PriorityQueue(優(yōu)先隊(duì)列) private final PriorityQueue<E> q = new PriorityQueue<E>(); //第一個(gè)等待某個(gè)延時(shí)對象的線程,在延時(shí)對象還沒有到期時(shí)其他線程看到這個(gè)leader不為null,那么就直接wait //主要是為了避免大量線程在同一時(shí)間點(diǎn)喚醒,導(dǎo)致大量的競爭,反而影響性能 private Thread leader = null; //條件隊(duì)列,用于wait線程 private final Condition available = lock.newCondition();
DelayQueue構(gòu)造方法
//從上面屬性就可以看出,DelayQueue采用了餓漢模式,調(diào)用構(gòu)造方法即創(chuàng)建了隊(duì)列實(shí)例
public DelayQueue() {}
/**
* 創(chuàng)建一個(gè)DelayQueue,最初包含給定的Collection實(shí)例集合。
* @param c 最初包含的元素集合
*/
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}DelayQueue主要方法
offer添加元素
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//調(diào)用優(yōu)先隊(duì)列
q.offer(e);
//檢驗(yàn)元素是否為隊(duì)首,是則設(shè)置 leader 為null, 并喚醒一個(gè)消費(fèi)線程
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}take獲取元素
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
//從優(yōu)先隊(duì)列中獲取第一個(gè)元素,peek方法不會刪除元素
E first = q.peek();
//如果獲取不到數(shù)據(jù),則調(diào)用available.await()進(jìn)入阻塞狀態(tài)
if (first == null)
available.await();
else {
//獲取當(dāng)前延時(shí)對象是否到期
long delay = first.getDelay(NANOSECONDS);
//到期那么返回這個(gè)延時(shí)對象
if (delay <= 0)
return q.poll();
first = null; //
//leader不為空,表明已經(jīng)有其他線程在等待這個(gè)延時(shí)對象了
//為什么不available.awaitNanos(delay)呢?這將會導(dǎo)致大量的線程在同一時(shí)間點(diǎn)被喚醒,然后去競爭
//這個(gè)到期的延時(shí)任務(wù),影響性能,還不如直接將他們無時(shí)間限制的wait,leader線程或者其他新進(jìn)來的線程獲取到延時(shí)對象后,去喚醒
//讓他們?nèi)ジ偁幭乱粋€(gè)延時(shí)對象
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//指定納秒級別線程阻塞時(shí)間,當(dāng)前wait住的線程被喚醒后有可能與其他線程競爭失敗,就會進(jìn)入了同步隊(duì)列阻塞,那個(gè)搶到鎖的線程就會取走這個(gè)延時(shí)對象
available.awaitNanos(delay);
} finally {
//leader線程被喚醒并獲取到鎖之后會將leader設(shè)置為空
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//leader為空并且隊(duì)列不為空,那么喚醒正在等待的線程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock(); //釋放鎖
}
}從優(yōu)先隊(duì)列中取值,如果取到的延時(shí)節(jié)點(diǎn)已經(jīng)已經(jīng)到期,那么直接返回,如果還沒有到期并且已經(jīng)有其他線程在執(zhí)行delay時(shí)間等待了(也就是leader線程),那么掛起自己(避免延時(shí) 相同時(shí)間造成大量線程同時(shí)喚醒), leader線程在指定delay時(shí)間后主動喚醒,然后取競爭鎖,如果競爭成功,那么很大概率可以獲取到延時(shí)節(jié)點(diǎn),如果競爭失敗,將被阻塞。
remove刪除元素
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.remove(o);
} finally {
lock.unlock();
}
}Delayed接口
使用DelayQueue的話,放入該隊(duì)列的對象必須實(shí)現(xiàn)Delayed接口,實(shí)現(xiàn)的接口中有兩個(gè)參數(shù):延遲時(shí)間單位,優(yōu)先級規(guī)則,take方法會根據(jù)規(guī)則按照優(yōu)先級執(zhí)行
Delayed接口源碼:
public interface Delayed extends Comparable<Delayed> {
/**
* 返回與此對象關(guān)聯(lián)的剩余延遲(給定的時(shí)間單位)。
* @param unit 時(shí)間單位
* @返回剩余延遲;零值或負(fù)值表示 延遲已過期
*/
long getDelay(TimeUnit unit);
}因?yàn)镈elayed繼承了Comparable,所以還需要實(shí)現(xiàn)compareTo方法,具體實(shí)現(xiàn)如下:
class MyDelay implements Delayed {
long delayTime; // 延遲時(shí)間
long expire; // 過期時(shí)間
public MyDelay(long delayTime, Thread t) {
this.delayTime = delayTime;
// 過期時(shí)間 = 當(dāng)前時(shí)間 + 延遲時(shí)間
this.expire = System.currentTimeMillis() + delayTime;
}
/**
* 剩余時(shí)間 = 到期時(shí)間 - 當(dāng)前時(shí)間
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 優(yōu)先級規(guī)則:兩個(gè)任務(wù)比較,時(shí)間短的優(yōu)先執(zhí)行
*/
@Override
public int compareTo(Delayed o) {
long f = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
return (int) f;
}
}使用示例
實(shí)現(xiàn)Delayed接口:
class MyDelay<T> implements Delayed {
long delayTime; // 延遲時(shí)間
long expire; // 過期時(shí)間
T data;
public MyDelay(long delayTime, T t) {
this.delayTime = delayTime;
// 過期時(shí)間 = 當(dāng)前時(shí)間 + 延遲時(shí)間
this.expire = System.currentTimeMillis() + delayTime;
data = t;
}
/**
* 剩余時(shí)間 = 到期時(shí)間 - 當(dāng)前時(shí)間
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* 優(yōu)先級規(guī)則:兩個(gè)任務(wù)比較,時(shí)間短的優(yōu)先執(zhí)行
*/
@Override
public int compareTo(Delayed o) {
long f = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
return (int) f;
}
@Override
public String toString() {
return "delayTime=" + delayTime +
", expire=" + expire +
", data=" + data;
}
}測試用例如下:
public class DelayQueueDemo {
static BlockingQueue<Delayed> queue = new DelayQueue();
public static void main(String[] args) throws InterruptedException {
queue.add(new MyDelay(8, "第一次添加任務(wù)"));
queue.add(new MyDelay(3, "第二次添加任務(wù)"));
queue.add(new MyDelay(5, "第三次添加任務(wù)"));
while (!queue.isEmpty()) {
Delayed delayed = queue.take();
System.out.println(delayed);
}
}
}輸出如下:
delayTime=3, expire=1625902338874, data=第二次添加任務(wù)
delayTime=5, expire=1625902338876, data=第三次添加任務(wù)
delayTime=8, expire=1625902338879, data=第一次添加任務(wù)
總結(jié)
DelayQueue其實(shí)采用了裝飾器模式,在對PriorityQueue進(jìn)行包裝下增加了延時(shí)時(shí)間獲取元素的功能,其主要特點(diǎn)歸納如下:
- DelayQueue是一個(gè)無界阻塞隊(duì)列,隊(duì)列內(nèi)部使用PriorityQueue來實(shí)現(xiàn)。
- 進(jìn)入隊(duì)列的元素必須實(shí)現(xiàn)Delayed接口,在創(chuàng)建元素時(shí)可以指定多久才能從隊(duì)列中獲取當(dāng)前元素,只有在延遲期滿時(shí)才能從中提取元素;
- 該隊(duì)列頭部是延遲期滿后保存時(shí)間最長的Delayed元素;
- 如果沒有延遲未過期元素,且隊(duì)列沒有頭部,并且poll將返回null;
- 當(dāng)一個(gè)元素的getDelay(TimeUnit.NANOSECONDS)方法返回一個(gè)小于等于0的值時(shí),表示該元素已過期;
- 無法使用poll或take移除未到期的元素,也不會將這些元素作為正常元素對待;例如:size方法返回到期和未到期元素的計(jì)數(shù)之和。
- 此隊(duì)列不允許使用null元素。
到此這篇關(guān)于Java延遲隊(duì)列DelayQueue原理詳解的文章就介紹到這了,更多相關(guān)延遲隊(duì)列DelayQueue原理內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 詳解Java線程池隊(duì)列中的延遲隊(duì)列DelayQueue
- Java中的延遲隊(duì)列DelayQueue詳細(xì)解析
- Java阻塞延遲隊(duì)列DelayQueue原理及使用詳解
- Java中的延遲隊(duì)列DelayQueue源碼解析
- JAVA中的延遲隊(duì)列DelayQueue應(yīng)用解析
- Java的延遲隊(duì)列之DelayQueue解讀
- java并發(fā)中DelayQueue延遲隊(duì)列原理剖析
- Java的DelayQueue延遲隊(duì)列簡單使用代碼實(shí)例
- Java實(shí)現(xiàn)DelayQueue延遲隊(duì)列示例代碼
相關(guān)文章
Java中ByteBuddy動態(tài)字節(jié)碼操作庫的使用技術(shù)指南
ByteBuddy?是一個(gè)功能強(qiáng)大的?Java?字節(jié)碼操作庫,可以幫助開發(fā)者在運(yùn)行時(shí)動態(tài)生成和修改類,而無需直接接觸復(fù)雜的?ASM?API,本文給大家介紹了Java?ByteBuddy動態(tài)字節(jié)碼操作庫的使用技術(shù)指南,需要的朋友可以參考下2025-04-04
詳解java中List中set方法和add方法的區(qū)別
本文主要介紹了詳解java中List中set方法和add方法的區(qū)別,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-08-08
JDK自帶的序列化方式優(yōu)缺點(diǎn)及實(shí)現(xiàn)原理面試精講
這篇文章主要為大家介紹了JDK自帶的序列化方式優(yōu)缺點(diǎn)及實(shí)現(xiàn)原理面試精講,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10
Springboot升級到2.7.2結(jié)合nacos遇到的坑及解決
這篇文章主要介紹了Springboot升級到2.7.2結(jié)合nacos遇到的坑及解決,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-06-06
java static塊和構(gòu)造函數(shù)的實(shí)例詳解
這篇文章主要介紹了java static塊和構(gòu)造函數(shù)的實(shí)例詳解的相關(guān)資料,希望通過本文能幫助到大家,讓大家理解掌握J(rèn)ava static關(guān)鍵字的函數(shù)方法,需要的朋友可以參考下2017-09-09
SpringBoot前后端json數(shù)據(jù)交互的全過程記錄
現(xiàn)在大多數(shù)互聯(lián)網(wǎng)項(xiàng)目都是采用前后端分離的方式開發(fā),下面這篇文章主要給大家介紹了關(guān)于SpringBoot前后端json數(shù)據(jù)交互的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-03-03

