Java并發(fā)LinkedBlockingQueue源碼分析
簡介
LinkedBlockingQueue是一個阻塞的有界隊列,底層是通過一個個的Node節(jié)點形成的鏈表實現(xiàn)的,鏈表隊列中的頭節(jié)點是一個空的Node節(jié)點,在多線程下操作時會使用ReentrantLock鎖來保證數(shù)據(jù)的安全性,并使用ReentrantLock下的Condition對象來阻塞以及喚醒線程。
常量
/**
* 鏈表中的節(jié)點類
*/
static class Node<E> {
//節(jié)點中的元素
E item;
//下一個節(jié)點
Node<E> next;
Node(E x) { item = x; }
}
/** 鏈表隊列的容量大小,如果沒有指定則使用Integer最大值 */
private final int capacity;
/** 記錄鏈表中的節(jié)點的數(shù)量的原子類 */
private final AtomicInteger count = new AtomicInteger();
/**鏈表的頭節(jié)點
*/
transient Node<E> head;
/**
* 鏈表的尾節(jié)點
*/
private transient Node<E> last;
/** 從鏈表隊列中獲取節(jié)點時防止多個線程同時操作所產(chǎn)生數(shù)據(jù)安全問題時所加的鎖 */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** 添加節(jié)點到鏈表隊列中防止多個線程同時操作所產(chǎn)生數(shù)據(jù)安全問題時所加的鎖 */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
Node:鏈表隊列中的節(jié)點,用于存放元素。capacity:鏈表隊列中最多能存放的節(jié)點數(shù)量,如果在創(chuàng)建LinkedBlockingQueue的時候沒有指定.則默認最多存放的節(jié)點的數(shù)量為Integer的最大值。head:鏈表隊列中的頭節(jié)點,一般來說頭節(jié)點都是一個沒有元素的空節(jié)點。last:鏈表隊列中的尾節(jié)點。takeLock:在獲取鏈表隊列中的節(jié)點的時候所加的鎖。putLock:在添加鏈表隊列中的節(jié)點的時候所加的鎖。Condition:當線程需要進行等待或者喚醒的時候則會調(diào)用該對象下的方法。
構(gòu)造方法
/**
* 創(chuàng)建默認容量大小的鏈表隊列
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
/**
* 創(chuàng)建指定容量大小的鏈表隊列
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
//創(chuàng)建一個空節(jié)點,并將該節(jié)點設置為頭尾節(jié)點
last = head = new Node<E>(null);
}
/**
* 根據(jù)指定集合中的元素創(chuàng)建一個默認容量大小的鏈表隊列
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
//創(chuàng)建默認容量大小的鏈表隊列
this(Integer.MAX_VALUE);
//獲取添加元素節(jié)點的鎖
final ReentrantLock putLock = this.putLock;
//加鎖
putLock.lock();
try {
//鏈表中節(jié)點的數(shù)量
int n = 0;
//遍歷集合中的元素
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
//為元素創(chuàng)建一個節(jié)點,并將節(jié)點添加到鏈表的尾部,并設置節(jié)點為尾節(jié)點
enqueue(new Node<E>(e));
//鏈表中節(jié)點的數(shù)量自增
++n;
}
//記錄鏈表中節(jié)點的數(shù)量
count.set(n);
} finally {
//釋放鎖
putLock.unlock();
}
}
第一個和第三個構(gòu)造方法中都會調(diào)用第二個構(gòu)造方法,而在第二個構(gòu)造方法中會設置鏈表隊列中容納節(jié)點的數(shù)量以及創(chuàng)建一個空的頭節(jié)點來填充,再看第三個構(gòu)造方法中的代碼,首先會獲取putLock鎖,代表當前是一個需要添加節(jié)點的線程,再將指定集合中的元素封裝成一個Node節(jié)點,并依次將封裝的節(jié)點追加到鏈表隊列中的尾部,并使用AtomicInteger來記錄鏈表隊列中節(jié)點的數(shù)量。
put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
//為指定元素創(chuàng)建節(jié)點
Node<E> node = new Node<E>(e);
//獲取添加元素節(jié)點的鎖
final ReentrantLock putLock = this.putLock;
//獲取記錄鏈表節(jié)點數(shù)量的原子類
final AtomicInteger count = this.count;
//加鎖,如果加鎖的線程被中斷了則拋出異常
putLock.lockInterruptibly();
try {
//校驗鏈表中的節(jié)點數(shù)量是否到達了指定的容量
//如果到達了指定的容量就進行阻塞等待
//如果線程被喚醒了,但是鏈表中的節(jié)點數(shù)量還是未改變,則繼續(xù)阻塞等待
//只有當頭節(jié)點出隊,新的節(jié)點才能繼續(xù)添加
while (count.get() == capacity) {
notFull.await();
}
//將新節(jié)點添加到鏈表的尾部并設置為尾節(jié)點
enqueue(node);
//獲取沒有添加當前節(jié)點時鏈表中的節(jié)點數(shù)量
//并更新鏈表中的節(jié)點數(shù)量
c = count.getAndIncrement();
if (c + 1 < capacity)
//喚醒等待添加節(jié)點的線程
//可能當前線程在等待隊列中等待的時候
//有新的線程要執(zhí)行添加節(jié)點的操作
//但是鏈表的容量已經(jīng)到達最大,所以新的線程也會進行等待
//當前線程被喚醒了并且鏈表的容量沒有到達最大則嘗試去喚醒等待的線程
notFull.signal();
} finally {
//釋放鎖
putLock.unlock();
}
if (c == 0)
//c等于0說明添加當前節(jié)點的時候鏈表中沒有節(jié)點
//可能有線程在獲取節(jié)點,但是鏈表中沒有節(jié)點
//從而一直進行等待,當添加了節(jié)點的時候就需要喚醒獲取節(jié)點的線程
signalNotEmpty();
}
LinkedBlockingQueue中的代碼都比較簡單,主要是ReentrantLock下的Condition中的方法比較復雜,我們先整體的了解一下put方法,首先通過new Node為將指定元素封裝成一個節(jié)點,再獲取putLock鎖,當鏈表隊列中的節(jié)點數(shù)量已經(jīng)到達了capacity大小,那當前線程就需要調(diào)用Condition下的await方法進行等待將線程阻塞,直到有節(jié)點出隊或者說有節(jié)點被刪除或者當前線程被中斷了,當前線程被中斷了則會直接退出當前put方法并拋出異常,如果節(jié)點出隊了或者節(jié)點被刪除了,那當前線程被喚醒了則會繼續(xù)執(zhí)行添加節(jié)點的操作。
enqueue方法則會將封裝的節(jié)點追加到鏈表隊列中的尾部,通過getAndIncrement方法先獲取沒有添加當前節(jié)點時鏈表隊列中節(jié)點的數(shù)量,然后更新添加了當前節(jié)點之后鏈表隊列中節(jié)點的數(shù)量,c則是沒有添加當前節(jié)點時鏈表隊列中節(jié)點的數(shù)量,c+1則是添加當前節(jié)點后鏈表隊列中節(jié)點的數(shù)量,如果說c+1小于capacity則說明線程在添加節(jié)點的時候,鏈表隊列中的節(jié)點數(shù)量已經(jīng)到達了最大值,后續(xù)添加節(jié)點的線程都需要進行阻塞,當有節(jié)點被刪除或出隊的時候,最開始阻塞的線程被喚醒,被喚醒的線程則會去執(zhí)行添加節(jié)點的操作,當添加完節(jié)點之后鏈表隊列中的節(jié)點數(shù)量沒有到達最大值則會去喚醒后續(xù)被阻塞的線程執(zhí)行添加節(jié)點的操作。
c等于0說明在添加當前節(jié)點之前,可能有線程在獲取鏈表隊列中的節(jié)點,但是鏈表隊列中沒有節(jié)點,導致獲取節(jié)點的線程處于阻塞狀態(tài),當添加完節(jié)點之后,鏈表隊列中有了節(jié)點,此時就需要喚醒阻塞的線程去獲取節(jié)點。
添加元素的方法分為put和offer,區(qū)別在于阻塞與非阻塞,當鏈表隊列中的節(jié)點數(shù)量已經(jīng)到達最大值,put方法則會阻塞,而offer方法不會阻塞則是直接返回。
獲取元素的方法分為take、poll、peek,take方法與put方法相似,只不過一個是入隊,一個是出隊,poll與peek都是非阻塞的,但是區(qū)別在于poll獲取了節(jié)點之后,該節(jié)點會從鏈表隊列中移除,而peek不會移除節(jié)點。
await
public final void await() throws InterruptedException {
if (Thread.interrupted())
//線程被中斷拋出異常
throw new InterruptedException();
//為當前線程創(chuàng)建一個等待模式的節(jié)點并入隊,并將等待隊列中已經(jīng)取消等待的節(jié)點移除掉
Node node = addConditionWaiter();
//釋放當前線程的鎖,防止當前線程加了鎖,導致其它在等待的線程被喚醒之后不能獲取到鎖從而導致一直阻塞
int savedState = fullyRelease(node);
int interruptMode = 0;
//如果指定節(jié)點還在等待隊列中等待則掛起
//如果指定節(jié)點被中斷了則會將指定節(jié)點添加到同步等待隊列中
//如果指定節(jié)點被喚醒了則會將指定節(jié)點添加到同步等待隊列中
while (!isOnSyncQueue(node)) {
//節(jié)點在等待隊列中則掛起
LockSupport.park(this);
//線程在等待隊列中被中斷則會添加到同步等待隊列中
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//acquireQueued 指定節(jié)點中的線程被中斷了或者被喚醒了則會嘗試去獲取鎖
//如果還未到指定節(jié)點中的線程獲取鎖的時候則會繼續(xù)掛起
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
//指定節(jié)點的線程已經(jīng)獲取到了鎖并且節(jié)點關聯(lián)的下一個節(jié)點不為空
//此時就需要將已經(jīng)獲取到鎖的節(jié)點從等待隊列中移除
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
首先通過addConditionWaiter方法將當前線程封裝成一個等待模式的節(jié)點,并將節(jié)點添加到等待隊列中以及會將等待隊列中已經(jīng)取消等待的線程節(jié)點從隊列中移除,再通過fullyRelease方法釋放掉當前線程加的所有的鎖,之所以釋放鎖是防止其它線程獲取不到鎖從而一直阻塞,再看isOnSyncQueue方法,該方法是校驗當前線程節(jié)點是否在等待隊列中,如果在等待隊列中那就將節(jié)點中的線程掛起等待。
isOnSyncQueue
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
//指定節(jié)點還在等待隊列中此時就需要繼續(xù)等待
return false;
if (node.next != null)
//指定節(jié)點已經(jīng)不在等待隊列中了
return true;
//從等待隊列中的尾節(jié)點開始向頭節(jié)點遍歷,校驗指定的節(jié)點是否在其中
return findNodeFromTail(node);
}
當節(jié)點的狀態(tài)為CONDITION時,則說明該節(jié)點還在等待隊列中,node.prev等于null為什么說也是在等待隊列中呢?因為等待隊列中的節(jié)點是沒有prev指針和next指針的,如果prev指針和next指針指向的節(jié)點不為空,那就說明該節(jié)點是在同步等待隊列中的,如果在同步等待隊列中的話,那節(jié)點中的線程就可以嘗試去獲取鎖并執(zhí)行后續(xù)的操作。
當?shù)却犃兄械木€程節(jié)點被喚醒和中斷則會添加到同步等待隊列中,如果是被中斷的話則會通過checkInterruptWhileWaiting方法添加一個中斷標識,再通過acquireQueued方法來獲取鎖,如果獲取鎖失敗則繼續(xù)等待,當獲取鎖成功之后則會該節(jié)點從等待隊列中移除,如果說你是一個被中斷的線程,最后會通過reportInterruptAfterWait方法拋出中斷異常。
signal
public final void signal() {
if (!isHeldExclusively())
//加鎖的線程不是當前線程則拋出異常
throw new IllegalMonitorStateException();
//頭節(jié)點
Node first = firstWaiter;
if (first != null)
//喚醒頭節(jié)點
doSignal(first);
}
/**
* 喚醒等待隊列中的頭節(jié)點
* 如果等待隊列中的頭節(jié)點被取消等待或已經(jīng)被喚醒了
* 此時就需要喚醒頭節(jié)點的后續(xù)的一個節(jié)點
* 直到成功的喚醒一個節(jié)點中的線程
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
/**
* 將指定的節(jié)點添加到同步等待隊列中
* 并根據(jù)前一個節(jié)點的等待狀態(tài)來決定是否需要立刻喚醒指定節(jié)點
*/
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
//更改節(jié)點狀態(tài)失敗說明該節(jié)點已經(jīng)被喚醒了
return false;
//將要喚醒的節(jié)點添加到同步等待隊列中
//并返回前一個節(jié)點
Node p = enq(node);
//前一個節(jié)點的等待狀態(tài)
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//如果前一個節(jié)點的等待狀態(tài)大于0則說明已經(jīng)被取消加鎖,此時就需要喚醒后續(xù)的節(jié)點,就是當前節(jié)點
//前一個節(jié)點的等待狀態(tài)不大于0但是更改前一個節(jié)點的等待狀態(tài)時失敗則說明前一個節(jié)點已經(jīng)被喚醒了并更改了狀態(tài)
//此時就需要嘗試將當前節(jié)點中的線程喚醒
LockSupport.unpark(node.thread);
return true;
}
喚醒線程節(jié)點的方法主要還是看transferForSignal方法,首先會通過cas操作將需要喚醒的節(jié)點的狀態(tài)設置為0,如果更改節(jié)點狀態(tài)失敗則說明該節(jié)點已經(jīng)被喚醒了,更新節(jié)點狀態(tài)成功則會通過enq方法將節(jié)點添加到同步等待隊列中,此時就需要根據(jù)前一個節(jié)點來決定是否需要立即喚醒當前節(jié)點中的線程。
從下面的圖片中能看出來其實同步等待隊列和等待隊列中使用的節(jié)點是共用的節(jié)點,并不會創(chuàng)建新的節(jié)點,同步等待隊列中的節(jié)點使用next指針和prev指針來關聯(lián)節(jié)點,而等待隊列中則是使用nextWaiter指針來關聯(lián)節(jié)點的。

以上就是Java并發(fā)LinkedBlockingQueue源碼分析的詳細內(nèi)容,更多關于Java并發(fā)LinkedBlockingQueue的資料請關注腳本之家其它相關文章!
相關文章
java基礎之Collection與Collections和Array與Arrays的區(qū)別
這篇文章主要介紹了java基礎之Collection與Collections和Array與Arrays的區(qū)別的相關資料,本文主要說明兩者的區(qū)別以防大家混淆概念,需要的朋友可以參考下2017-08-08
詳解Spring Boot使用系統(tǒng)參數(shù)表提升系統(tǒng)的靈活性
Spring Boot項目中常有一些相對穩(wěn)定的參數(shù)設置項,其作用范圍是系統(tǒng)級的或模塊級的,這些參數(shù)稱為系統(tǒng)參數(shù)。這些變量以參數(shù)形式進行配置,從而提高變動和擴展的靈活性,保持代碼的穩(wěn)定性2021-06-06
Netty分布式ByteBuf使用subPage級別內(nèi)存分配剖析
這篇文章主要為大家介紹了Netty分布式ByteBuf使用subPage級別內(nèi)存分配剖析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-03-03
Springboot通過ObjectMapper配置json序列化詳解
SpringBoot默認集成Jackson庫,其中ObjectMapper類是核心,用于Java對象與JSON字符串的互轉(zhuǎn),提供配置序列化特性、注冊模塊等方法,在SpringBoot中可以全局配置JSON格式,如日期格式化、將Long轉(zhuǎn)為字符串,還可以配置序列化時的各種規(guī)則,感興趣的可以了解一下2024-10-10

