Java無(wú)界阻塞隊(duì)列DelayQueue詳細(xì)解析
概述
DelayQueue是一個(gè)支持時(shí)延獲取元素的無(wú)界阻塞隊(duì)列。
隊(duì)列使用PriorityQueue來(lái)實(shí)現(xiàn)。
隊(duì)列中的元素必須實(shí)現(xiàn)Delayed接口,在創(chuàng)建元素時(shí)可以指定多久才能從隊(duì)列中獲取當(dāng)前元素。
只有在延遲期滿時(shí)才能從隊(duì)列中提取元素。
DelayQueue可以運(yùn)用在以下兩個(gè)應(yīng)用場(chǎng)景:
緩存系統(tǒng)的設(shè)計(jì):使用DelayQueue保存緩存元素的有效期,使用一個(gè)線程循環(huán)查詢DelayQueue,一旦能從DelayQueue中獲取元素時(shí),就表示有緩存到期了。
定時(shí)任務(wù)調(diào)度:使用DelayQueue保存當(dāng)天要執(zhí)行的任務(wù)和執(zhí)行時(shí)間,一旦從DelayQueue中獲取到任務(wù)就開(kāi)始執(zhí)行,比如Tiner就是使用DelayQueue實(shí)現(xiàn)的。
用法實(shí)例
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @author admin
*/
public class Message implements Delayed {
/**
*觸發(fā)時(shí)間
*/
private long time;
/**
*名稱
*/
String name;
public Message(String name,long time,TimeUnit unit){
this.name = name;
this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
Message item = (Message) o;
long diff = this.time - item.time;
if (diff <= 0){
return -1;
}else{
return 1;
}
}
@Override
public String toString() {
return DelayQueueDemo.printDate() + "Message{" + "time=" + time + ", name=" + name + "/" + "}";
}
}import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
/**
* @author admin
*/
public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
Message item1 = new Message("消息1",5, TimeUnit.SECONDS);
Message item2 = new Message("消息2",10, TimeUnit.SECONDS);
Message item3 = new Message("消息3",15, TimeUnit.SECONDS);
DelayQueue<Message> queue = new DelayQueue<Message>();
queue.add(item1);
queue.add(item2);
queue.add(item3);
int queueLengh = queue.size();
System.out.println(printDate() + "開(kāi)始!");
for (int i = 0; i < queueLengh; i++) {
Message take = queue.take();
System.out.format(printDate() + " 消息出隊(duì),屬性name=%s%n",take.name);
}
System.out.println(printDate() + "結(jié)束!");
}
static String printDate(){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf.format(new Date());
}
}DelayQueue聲明
DelayQueue聲明如下:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E>從DelayQueue聲明可以看出,DelayQueue中的元素必須是Delayed接口的子類。
Delayed聲明如下:
public interface Delayed extends Comparable<Delayed> {
/**
*以給定的時(shí)間單位返回與此對(duì)象關(guān)聯(lián)的剩余延遲
*/
long getDelay(TimeUnit unit);
}DelayQueue屬性
/** *可重入鎖 */ private final transient ReentrantLock lock = new ReentrantLock(); /** *緩存元素的優(yōu)先級(jí)隊(duì)列 */ private final PriorityQueue<E> q = new PriorityQueue<E>(); /** *特定的用于等待隊(duì)列頭中元素的線程 *Leader-Follower模式的變體形式 *用于最小化不必要的定時(shí)等待 */ private Thread leader = null; /** *當(dāng)更新的元素在隊(duì)列的開(kāi)頭變得可用時(shí) *或在新線程可能需要成為領(lǐng)導(dǎo)者時(shí),會(huì)發(fā)出條件信號(hào) */ private final Condition available = lock.newCondition();
以上可以看出,延時(shí)隊(duì)列主要使用優(yōu)先級(jí)隊(duì)列來(lái)實(shí)現(xiàn),并輔以重入鎖和條件來(lái)控制并發(fā)安全。
DelayQueue構(gòu)造器
/**
*默認(rèn)構(gòu)造器
*/
public DelayQueue() {}
/**
*添加集合c中所有元素到隊(duì)列中
*/
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}DelayQueue入隊(duì)
/*
*將指定元素插入此延時(shí)隊(duì)列
*/
public boolean add(E e) {
return offer(e);
}
/*
*將指定元素插入此延時(shí)隊(duì)列
*由于隊(duì)列是無(wú)界的,因此該方法將永遠(yuǎn)不會(huì)被阻塞
*/
public void put(E e) {
offer(e);
}
/*
*將指定元素插入此延時(shí)隊(duì)列
*由于隊(duì)列是無(wú)界的,因此該方法將永遠(yuǎn)不會(huì)被阻塞
*/
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}以上幾個(gè)方法都會(huì)調(diào)用offer()方法。
public boolean offer(E e) {
//獲取可重入鎖
final ReentrantLock lock = this.lock;
//可重入鎖加鎖
lock.lock();
try {
//調(diào)用優(yōu)先級(jí)隊(duì)列的offer()方法入隊(duì)
q.offer(e);
//如果入隊(duì)元素在隊(duì)首,則喚醒一個(gè)出隊(duì)線程
if (q.peek() == e) {
leader = null;
available.signal();
}
//返回入隊(duì)成功
return true;
} finally {
//解鎖
lock.unlock();
}
}leader是等待獲取隊(duì)列頭元素的線程,應(yīng)用主從式設(shè)計(jì)減少不必要的等待。如果leader不為空,表示已經(jīng)有線程在等待獲取隊(duì)列的頭元素。
所以,通過(guò)await()方法讓出當(dāng)前線程等待信號(hào)。
如果leader為空,則把當(dāng)前線程設(shè)置為leader,當(dāng)一個(gè)線程為leader,它使用awaitNanos()方法讓當(dāng)前線程等待接收信號(hào)或等待delay時(shí)間。
DelayQueue出隊(duì)
poll()方法
/*
*檢索并刪除次隊(duì)列的頭
*如果此隊(duì)列沒(méi)有延遲過(guò)期的元素,則返回null
*/
public E poll() {
//獲取可重入鎖
final ReentrantLock lock = this.lock;
//可重入鎖加鎖
lock.lock();
try {
//檢索但不刪除隊(duì)列頭部元素
E first = q.peek();
//如果first為null或者返回與此對(duì)象關(guān)聯(lián)的剩余延遲時(shí)間大于0
//返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
//否則通過(guò)優(yōu)先隊(duì)列poll()方法出隊(duì)
return q.poll();
} finally {
//可重入鎖解鎖
lock.unlock();
}
}take()方法
/*
*檢索并除去此隊(duì)列的頭
*等待直到該隊(duì)列上具有過(guò)期延遲的元素可用
*/
public E take() throws InterruptedException {
//獲取可重入鎖
final ReentrantLock lock = this.lock;
//可重入鎖加鎖
lock.lockInterruptibly();
try {
for (;;) {
//檢索但不刪除隊(duì)列頭部元素
E first = q.peek();
//如果first為空
if (first == null)
//在available條件上等待
available.await();
else {
//如果first非空
//獲取first的剩余延遲時(shí)間
long delay = first.getDelay(NANOSECONDS);
//如果delay小于等于0
if (delay <= 0)
//延遲時(shí)間到期,獲取并刪除頭部元素
return q.poll();
//如果delay大于0,即延遲時(shí)間未到期
//將first置為null
first = null;
//如果leader線程非空
if (leader != null)
//當(dāng)前線程無(wú)限期阻塞
//等待leader線程喚醒
available.await();
else {
//如果leader線程為空
//獲取當(dāng)前線程
Thread thisThread = Thread.currentThread();
//是當(dāng)前線程成為leader線程
leader = thisThread;
try {
//當(dāng)前線程等待剩余延遲時(shí)間
available.awaitNanos(delay);
} finally {
//如果當(dāng)前線程是leader線程
//釋放leader線程
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//如果leader線程為null并且隊(duì)列不為空
//說(shuō)明沒(méi)有其他線程在等待,那就通知條件隊(duì)列
if (leader == null && q.peek() != null)
//通過(guò)signal()方法喚醒一個(gè)出隊(duì)線程
available.signal();
//解鎖
lock.unlock();
}
}take()方法總結(jié):
- 獲取鎖。
- 取出優(yōu)先級(jí)隊(duì)列q的首元素。
- 如果元素q的隊(duì)首為空則阻塞。
- 如果元素q的隊(duì)首(first)不為空;獲取這個(gè)元素的delay時(shí)間值,如果first的延遲delay時(shí)間小于等于0,說(shuō)明元素已經(jīng)到了可以使用的時(shí)間,調(diào)用poll()方法彈出該元素,跳出方法。
- 如果first的延遲delay時(shí)間大于0,釋放元素first的引用,避免內(nèi)存泄漏。
- 如果延遲delay時(shí)間大于0,leader非空,當(dāng)前線程等待。
- 如果延遲delay時(shí)間大于0,leader為空,將當(dāng)前線程設(shè)置為leader線程,等待剩余時(shí)間。
- 自旋,循環(huán)以上操作,直到return。
重載poll()方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
//獲取等待時(shí)間
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
//如果first為空
if (first == null) {
//如果nanos小于等于0
if (nanos <= 0)
//返回null
return null;
else
//如果nanos大于0
//等待nanos時(shí)間
nanos = available.awaitNanos(nanos);
} else {
//如果隊(duì)首非空
//獲取first的剩余延遲時(shí)間
long delay = first.getDelay(NANOSECONDS);
//如果delay小于等于0
if (delay <= 0)
//延遲時(shí)間到期,獲取并刪除頭部元素
return q.poll();
//如果delay大于0
//如果nanos小于等于0
if (nanos <= 0)
//返回null
return null;
//如果delay大于0且nanos大于0
//first置為null
first = null;
//如果nanos小于delay或者leader非空
if (nanos < delay || leader != null)
//等待delay時(shí)間
nanos = available.awaitNanos(nanos);
else {
//如果nanos大于等于delay或者leader為空
//獲取當(dāng)前線程
Thread thisThread = Thread.currentThread();
//設(shè)置當(dāng)前線程為leader
leader = thisThread;
try {
//等待delay時(shí)間
long timeLeft = available.awaitNanos(delay);
//修改nanos
nanos -= delay - timeLeft;
} finally {
//如果當(dāng)前線程為leader線程
//釋放leader線程
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//如果leader為null并且隊(duì)列不為空
//說(shuō)明沒(méi)有其他線程在等待,那就通知條件隊(duì)列
if (leader == null && q.peek() != null)
//通過(guò)singnal()方法喚醒一個(gè)出隊(duì)線程
available.signal();
//解鎖
lock.unlock();
}
}到此這篇關(guān)于Java無(wú)界阻塞隊(duì)列DelayQueue詳細(xì)解析的文章就介紹到這了,更多相關(guān)Java無(wú)界阻塞隊(duì)列DelayQueue內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
聊一聊new對(duì)象與Spring對(duì)bean的初始化的差別
這篇文章主要介紹了聊一聊new對(duì)象與Spring對(duì)bean的初始化的差別,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02
SpringCloud如何搭建一個(gè)多模塊項(xiàng)目
這篇文章主要介紹了SpringCloud如何搭建一個(gè)多模塊項(xiàng)目,記錄下使用SpringCloud創(chuàng)建多模塊項(xiàng)目,一步一步記錄搭建的過(guò)程,感興趣的可以了解一下2021-05-05
springboot 默認(rèn)靜態(tài)路徑實(shí)例解析
這篇文章主要介紹了springboot 默認(rèn)靜態(tài)路徑實(shí)例解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11
Spring AI TikaDocumentReader詳解
TikaDocumentReader是SpringAI中用于從多種格式文檔中提取文本內(nèi)容的組件,支持PDF、DOC/DOCX、PPT/PPTX和HTML等格式,它在構(gòu)建知識(shí)庫(kù)、文檔處理和數(shù)據(jù)清洗等任務(wù)中非常有用2025-01-01
Java獲取年月日(格式:xxxx年xx月xx日)的方法詳解
在開(kāi)發(fā)應(yīng)用程序時(shí),經(jīng)常需要獲取當(dāng)前的年、月、日,并以特定格式進(jìn)行展示或處理,本文將介紹如何獲取年月日,并將其格式化為“xxxx年xx月xx日”的形式,幫助你在應(yīng)用程序中處理日期信息,需要的朋友可以參考下2023-10-10
idea創(chuàng)建spring boot項(xiàng)目及java版本只能選擇17和21的問(wèn)題
這篇文章主要介紹了idea創(chuàng)建spring boot項(xiàng)目及java版本只能選擇17和21的問(wèn)題,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2024-01-01
Java并發(fā)LinkedBlockingQueue源碼分析
這篇文章主要為大家介紹了Java并發(fā)LinkedBlockingQueue源碼分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02
SpringBoot使用classfinal-maven-plugin插件加密Jar包的示例代碼
這篇文章給大家介紹了SpringBoot使用classfinal-maven-plugin插件加密Jar包的實(shí)例,文中通過(guò)代碼示例和圖文講解的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2024-02-02

