Java無界阻塞隊列DelayQueue詳細(xì)解析
概述
DelayQueue是一個支持時延獲取元素的無界阻塞隊列。
隊列使用PriorityQueue來實現(xiàn)。
隊列中的元素必須實現(xiàn)Delayed接口,在創(chuàng)建元素時可以指定多久才能從隊列中獲取當(dāng)前元素。
只有在延遲期滿時才能從隊列中提取元素。
DelayQueue可以運用在以下兩個應(yīng)用場景:
緩存系統(tǒng)的設(shè)計:使用DelayQueue保存緩存元素的有效期,使用一個線程循環(huán)查詢DelayQueue,一旦能從DelayQueue中獲取元素時,就表示有緩存到期了。
定時任務(wù)調(diào)度:使用DelayQueue保存當(dāng)天要執(zhí)行的任務(wù)和執(zhí)行時間,一旦從DelayQueue中獲取到任務(wù)就開始執(zhí)行,比如Tiner就是使用DelayQueue實現(xiàn)的。
用法實例
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @author admin
*/
public class Message implements Delayed {
/**
*觸發(fā)時間
*/
private long time;
/**
*名稱
*/
String name;
public Message(String name,long time,TimeUnit unit){
this.name = name;
this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
Message item = (Message) o;
long diff = this.time - item.time;
if (diff <= 0){
return -1;
}else{
return 1;
}
}
@Override
public String toString() {
return DelayQueueDemo.printDate() + "Message{" + "time=" + time + ", name=" + name + "/" + "}";
}
}import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
/**
* @author admin
*/
public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
Message item1 = new Message("消息1",5, TimeUnit.SECONDS);
Message item2 = new Message("消息2",10, TimeUnit.SECONDS);
Message item3 = new Message("消息3",15, TimeUnit.SECONDS);
DelayQueue<Message> queue = new DelayQueue<Message>();
queue.add(item1);
queue.add(item2);
queue.add(item3);
int queueLengh = queue.size();
System.out.println(printDate() + "開始!");
for (int i = 0; i < queueLengh; i++) {
Message take = queue.take();
System.out.format(printDate() + " 消息出隊,屬性name=%s%n",take.name);
}
System.out.println(printDate() + "結(jié)束!");
}
static String printDate(){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf.format(new Date());
}
}DelayQueue聲明
DelayQueue聲明如下:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E>從DelayQueue聲明可以看出,DelayQueue中的元素必須是Delayed接口的子類。
Delayed聲明如下:
public interface Delayed extends Comparable<Delayed> {
/**
*以給定的時間單位返回與此對象關(guān)聯(lián)的剩余延遲
*/
long getDelay(TimeUnit unit);
}DelayQueue屬性
/** *可重入鎖 */ private final transient ReentrantLock lock = new ReentrantLock(); /** *緩存元素的優(yōu)先級隊列 */ private final PriorityQueue<E> q = new PriorityQueue<E>(); /** *特定的用于等待隊列頭中元素的線程 *Leader-Follower模式的變體形式 *用于最小化不必要的定時等待 */ private Thread leader = null; /** *當(dāng)更新的元素在隊列的開頭變得可用時 *或在新線程可能需要成為領(lǐng)導(dǎo)者時,會發(fā)出條件信號 */ private final Condition available = lock.newCondition();
以上可以看出,延時隊列主要使用優(yōu)先級隊列來實現(xiàn),并輔以重入鎖和條件來控制并發(fā)安全。
DelayQueue構(gòu)造器
/**
*默認(rèn)構(gòu)造器
*/
public DelayQueue() {}
/**
*添加集合c中所有元素到隊列中
*/
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}DelayQueue入隊
/*
*將指定元素插入此延時隊列
*/
public boolean add(E e) {
return offer(e);
}
/*
*將指定元素插入此延時隊列
*由于隊列是無界的,因此該方法將永遠(yuǎn)不會被阻塞
*/
public void put(E e) {
offer(e);
}
/*
*將指定元素插入此延時隊列
*由于隊列是無界的,因此該方法將永遠(yuǎn)不會被阻塞
*/
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}以上幾個方法都會調(diào)用offer()方法。
public boolean offer(E e) {
//獲取可重入鎖
final ReentrantLock lock = this.lock;
//可重入鎖加鎖
lock.lock();
try {
//調(diào)用優(yōu)先級隊列的offer()方法入隊
q.offer(e);
//如果入隊元素在隊首,則喚醒一個出隊線程
if (q.peek() == e) {
leader = null;
available.signal();
}
//返回入隊成功
return true;
} finally {
//解鎖
lock.unlock();
}
}leader是等待獲取隊列頭元素的線程,應(yīng)用主從式設(shè)計減少不必要的等待。如果leader不為空,表示已經(jīng)有線程在等待獲取隊列的頭元素。
所以,通過await()方法讓出當(dāng)前線程等待信號。
如果leader為空,則把當(dāng)前線程設(shè)置為leader,當(dāng)一個線程為leader,它使用awaitNanos()方法讓當(dāng)前線程等待接收信號或等待delay時間。
DelayQueue出隊
poll()方法
/*
*檢索并刪除次隊列的頭
*如果此隊列沒有延遲過期的元素,則返回null
*/
public E poll() {
//獲取可重入鎖
final ReentrantLock lock = this.lock;
//可重入鎖加鎖
lock.lock();
try {
//檢索但不刪除隊列頭部元素
E first = q.peek();
//如果first為null或者返回與此對象關(guān)聯(lián)的剩余延遲時間大于0
//返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
//否則通過優(yōu)先隊列poll()方法出隊
return q.poll();
} finally {
//可重入鎖解鎖
lock.unlock();
}
}take()方法
/*
*檢索并除去此隊列的頭
*等待直到該隊列上具有過期延遲的元素可用
*/
public E take() throws InterruptedException {
//獲取可重入鎖
final ReentrantLock lock = this.lock;
//可重入鎖加鎖
lock.lockInterruptibly();
try {
for (;;) {
//檢索但不刪除隊列頭部元素
E first = q.peek();
//如果first為空
if (first == null)
//在available條件上等待
available.await();
else {
//如果first非空
//獲取first的剩余延遲時間
long delay = first.getDelay(NANOSECONDS);
//如果delay小于等于0
if (delay <= 0)
//延遲時間到期,獲取并刪除頭部元素
return q.poll();
//如果delay大于0,即延遲時間未到期
//將first置為null
first = null;
//如果leader線程非空
if (leader != null)
//當(dāng)前線程無限期阻塞
//等待leader線程喚醒
available.await();
else {
//如果leader線程為空
//獲取當(dāng)前線程
Thread thisThread = Thread.currentThread();
//是當(dāng)前線程成為leader線程
leader = thisThread;
try {
//當(dāng)前線程等待剩余延遲時間
available.awaitNanos(delay);
} finally {
//如果當(dāng)前線程是leader線程
//釋放leader線程
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//如果leader線程為null并且隊列不為空
//說明沒有其他線程在等待,那就通知條件隊列
if (leader == null && q.peek() != null)
//通過signal()方法喚醒一個出隊線程
available.signal();
//解鎖
lock.unlock();
}
}take()方法總結(jié):
- 獲取鎖。
- 取出優(yōu)先級隊列q的首元素。
- 如果元素q的隊首為空則阻塞。
- 如果元素q的隊首(first)不為空;獲取這個元素的delay時間值,如果first的延遲delay時間小于等于0,說明元素已經(jīng)到了可以使用的時間,調(diào)用poll()方法彈出該元素,跳出方法。
- 如果first的延遲delay時間大于0,釋放元素first的引用,避免內(nèi)存泄漏。
- 如果延遲delay時間大于0,leader非空,當(dāng)前線程等待。
- 如果延遲delay時間大于0,leader為空,將當(dāng)前線程設(shè)置為leader線程,等待剩余時間。
- 自旋,循環(huán)以上操作,直到return。
重載poll()方法
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
//獲取等待時間
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
//如果first為空
if (first == null) {
//如果nanos小于等于0
if (nanos <= 0)
//返回null
return null;
else
//如果nanos大于0
//等待nanos時間
nanos = available.awaitNanos(nanos);
} else {
//如果隊首非空
//獲取first的剩余延遲時間
long delay = first.getDelay(NANOSECONDS);
//如果delay小于等于0
if (delay <= 0)
//延遲時間到期,獲取并刪除頭部元素
return q.poll();
//如果delay大于0
//如果nanos小于等于0
if (nanos <= 0)
//返回null
return null;
//如果delay大于0且nanos大于0
//first置為null
first = null;
//如果nanos小于delay或者leader非空
if (nanos < delay || leader != null)
//等待delay時間
nanos = available.awaitNanos(nanos);
else {
//如果nanos大于等于delay或者leader為空
//獲取當(dāng)前線程
Thread thisThread = Thread.currentThread();
//設(shè)置當(dāng)前線程為leader
leader = thisThread;
try {
//等待delay時間
long timeLeft = available.awaitNanos(delay);
//修改nanos
nanos -= delay - timeLeft;
} finally {
//如果當(dāng)前線程為leader線程
//釋放leader線程
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//如果leader為null并且隊列不為空
//說明沒有其他線程在等待,那就通知條件隊列
if (leader == null && q.peek() != null)
//通過singnal()方法喚醒一個出隊線程
available.signal();
//解鎖
lock.unlock();
}
}到此這篇關(guān)于Java無界阻塞隊列DelayQueue詳細(xì)解析的文章就介紹到這了,更多相關(guān)Java無界阻塞隊列DelayQueue內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot 默認(rèn)靜態(tài)路徑實例解析
這篇文章主要介紹了springboot 默認(rèn)靜態(tài)路徑實例解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-11-11
Spring AI TikaDocumentReader詳解
TikaDocumentReader是SpringAI中用于從多種格式文檔中提取文本內(nèi)容的組件,支持PDF、DOC/DOCX、PPT/PPTX和HTML等格式,它在構(gòu)建知識庫、文檔處理和數(shù)據(jù)清洗等任務(wù)中非常有用2025-01-01
Java獲取年月日(格式:xxxx年xx月xx日)的方法詳解
在開發(fā)應(yīng)用程序時,經(jīng)常需要獲取當(dāng)前的年、月、日,并以特定格式進(jìn)行展示或處理,本文將介紹如何獲取年月日,并將其格式化為“xxxx年xx月xx日”的形式,幫助你在應(yīng)用程序中處理日期信息,需要的朋友可以參考下2023-10-10
idea創(chuàng)建spring boot項目及java版本只能選擇17和21的問題
這篇文章主要介紹了idea創(chuàng)建spring boot項目及java版本只能選擇17和21的問題,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2024-01-01
Java并發(fā)LinkedBlockingQueue源碼分析
這篇文章主要為大家介紹了Java并發(fā)LinkedBlockingQueue源碼分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02
SpringBoot使用classfinal-maven-plugin插件加密Jar包的示例代碼
這篇文章給大家介紹了SpringBoot使用classfinal-maven-plugin插件加密Jar包的實例,文中通過代碼示例和圖文講解的非常詳細(xì),對大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2024-02-02

