Java并發(fā)LinkedBlockingQueue源碼分析
簡介
LinkedBlockingQueue
是一個阻塞的有界隊列,底層是通過一個個的Node
節(jié)點形成的鏈表實現(xiàn)的,鏈表隊列中的頭節(jié)點是一個空的Node
節(jié)點,在多線程下操作時會使用ReentrantLock
鎖來保證數據的安全性,并使用ReentrantLock
下的Condition
對象來阻塞以及喚醒線程。
常量
/** * 鏈表中的節(jié)點類 */ static class Node<E> { //節(jié)點中的元素 E item; //下一個節(jié)點 Node<E> next; Node(E x) { item = x; } } /** 鏈表隊列的容量大小,如果沒有指定則使用Integer最大值 */ private final int capacity; /** 記錄鏈表中的節(jié)點的數量的原子類 */ private final AtomicInteger count = new AtomicInteger(); /**鏈表的頭節(jié)點 */ transient Node<E> head; /** * 鏈表的尾節(jié)點 */ private transient Node<E> last; /** 從鏈表隊列中獲取節(jié)點時防止多個線程同時操作所產生數據安全問題時所加的鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** 添加節(jié)點到鏈表隊列中防止多個線程同時操作所產生數據安全問題時所加的鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
Node
:鏈表隊列中的節(jié)點,用于存放元素。capacity
:鏈表隊列中最多能存放的節(jié)點數量,如果在創(chuàng)建LinkedBlockingQueue
的時候沒有指定.則默認最多存放的節(jié)點的數量為Integer
的最大值。head
:鏈表隊列中的頭節(jié)點,一般來說頭節(jié)點都是一個沒有元素的空節(jié)點。last
:鏈表隊列中的尾節(jié)點。takeLock
:在獲取鏈表隊列中的節(jié)點的時候所加的鎖。putLock
:在添加鏈表隊列中的節(jié)點的時候所加的鎖。Condition
:當線程需要進行等待或者喚醒的時候則會調用該對象下的方法。
構造方法
/** * 創(chuàng)建默認容量大小的鏈表隊列 */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } /** * 創(chuàng)建指定容量大小的鏈表隊列 */ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; //創(chuàng)建一個空節(jié)點,并將該節(jié)點設置為頭尾節(jié)點 last = head = new Node<E>(null); } /** * 根據指定集合中的元素創(chuàng)建一個默認容量大小的鏈表隊列 */ public LinkedBlockingQueue(Collection<? extends E> c) { //創(chuàng)建默認容量大小的鏈表隊列 this(Integer.MAX_VALUE); //獲取添加元素節(jié)點的鎖 final ReentrantLock putLock = this.putLock; //加鎖 putLock.lock(); try { //鏈表中節(jié)點的數量 int n = 0; //遍歷集合中的元素 for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); //為元素創(chuàng)建一個節(jié)點,并將節(jié)點添加到鏈表的尾部,并設置節(jié)點為尾節(jié)點 enqueue(new Node<E>(e)); //鏈表中節(jié)點的數量自增 ++n; } //記錄鏈表中節(jié)點的數量 count.set(n); } finally { //釋放鎖 putLock.unlock(); } }
第一個和第三個構造方法中都會調用第二個構造方法,而在第二個構造方法中會設置鏈表隊列中容納節(jié)點的數量以及創(chuàng)建一個空的頭節(jié)點來填充,再看第三個構造方法中的代碼,首先會獲取putLock
鎖,代表當前是一個需要添加節(jié)點的線程,再將指定集合中的元素封裝成一個Node
節(jié)點,并依次將封裝的節(jié)點追加到鏈表隊列中的尾部,并使用AtomicInteger
來記錄鏈表隊列中節(jié)點的數量。
put
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; //為指定元素創(chuàng)建節(jié)點 Node<E> node = new Node<E>(e); //獲取添加元素節(jié)點的鎖 final ReentrantLock putLock = this.putLock; //獲取記錄鏈表節(jié)點數量的原子類 final AtomicInteger count = this.count; //加鎖,如果加鎖的線程被中斷了則拋出異常 putLock.lockInterruptibly(); try { //校驗鏈表中的節(jié)點數量是否到達了指定的容量 //如果到達了指定的容量就進行阻塞等待 //如果線程被喚醒了,但是鏈表中的節(jié)點數量還是未改變,則繼續(xù)阻塞等待 //只有當頭節(jié)點出隊,新的節(jié)點才能繼續(xù)添加 while (count.get() == capacity) { notFull.await(); } //將新節(jié)點添加到鏈表的尾部并設置為尾節(jié)點 enqueue(node); //獲取沒有添加當前節(jié)點時鏈表中的節(jié)點數量 //并更新鏈表中的節(jié)點數量 c = count.getAndIncrement(); if (c + 1 < capacity) //喚醒等待添加節(jié)點的線程 //可能當前線程在等待隊列中等待的時候 //有新的線程要執(zhí)行添加節(jié)點的操作 //但是鏈表的容量已經到達最大,所以新的線程也會進行等待 //當前線程被喚醒了并且鏈表的容量沒有到達最大則嘗試去喚醒等待的線程 notFull.signal(); } finally { //釋放鎖 putLock.unlock(); } if (c == 0) //c等于0說明添加當前節(jié)點的時候鏈表中沒有節(jié)點 //可能有線程在獲取節(jié)點,但是鏈表中沒有節(jié)點 //從而一直進行等待,當添加了節(jié)點的時候就需要喚醒獲取節(jié)點的線程 signalNotEmpty(); }
LinkedBlockingQueue
中的代碼都比較簡單,主要是ReentrantLock
下的Condition
中的方法比較復雜,我們先整體的了解一下put
方法,首先通過new Node
為將指定元素封裝成一個節(jié)點,再獲取putLock
鎖,當鏈表隊列中的節(jié)點數量已經到達了capacity
大小,那當前線程就需要調用Condition
下的await
方法進行等待將線程阻塞,直到有節(jié)點出隊或者說有節(jié)點被刪除或者當前線程被中斷了,當前線程被中斷了則會直接退出當前put
方法并拋出異常,如果節(jié)點出隊了或者節(jié)點被刪除了,那當前線程被喚醒了則會繼續(xù)執(zhí)行添加節(jié)點的操作。
enqueue
方法則會將封裝的節(jié)點追加到鏈表隊列中的尾部,通過getAndIncrement
方法先獲取沒有添加當前節(jié)點時鏈表隊列中節(jié)點的數量,然后更新添加了當前節(jié)點之后鏈表隊列中節(jié)點的數量,c
則是沒有添加當前節(jié)點時鏈表隊列中節(jié)點的數量,c+1
則是添加當前節(jié)點后鏈表隊列中節(jié)點的數量,如果說c+1小于capacity
則說明線程在添加節(jié)點的時候,鏈表隊列中的節(jié)點數量已經到達了最大值,后續(xù)添加節(jié)點的線程都需要進行阻塞,當有節(jié)點被刪除或出隊的時候,最開始阻塞的線程被喚醒,被喚醒的線程則會去執(zhí)行添加節(jié)點的操作,當添加完節(jié)點之后鏈表隊列中的節(jié)點數量沒有到達最大值則會去喚醒后續(xù)被阻塞的線程執(zhí)行添加節(jié)點的操作。
c等于0
說明在添加當前節(jié)點之前,可能有線程在獲取鏈表隊列中的節(jié)點,但是鏈表隊列中沒有節(jié)點,導致獲取節(jié)點的線程處于阻塞狀態(tài),當添加完節(jié)點之后,鏈表隊列中有了節(jié)點,此時就需要喚醒阻塞的線程去獲取節(jié)點。
添加元素的方法分為put和offer
,區(qū)別在于阻塞與非阻塞,當鏈表隊列中的節(jié)點數量已經到達最大值,put
方法則會阻塞,而offer
方法不會阻塞則是直接返回。
獲取元素的方法分為take、poll、peek
,take
方法與put
方法相似,只不過一個是入隊,一個是出隊,poll
與peek
都是非阻塞的,但是區(qū)別在于poll
獲取了節(jié)點之后,該節(jié)點會從鏈表隊列中移除,而peek
不會移除節(jié)點。
await
public final void await() throws InterruptedException { if (Thread.interrupted()) //線程被中斷拋出異常 throw new InterruptedException(); //為當前線程創(chuàng)建一個等待模式的節(jié)點并入隊,并將等待隊列中已經取消等待的節(jié)點移除掉 Node node = addConditionWaiter(); //釋放當前線程的鎖,防止當前線程加了鎖,導致其它在等待的線程被喚醒之后不能獲取到鎖從而導致一直阻塞 int savedState = fullyRelease(node); int interruptMode = 0; //如果指定節(jié)點還在等待隊列中等待則掛起 //如果指定節(jié)點被中斷了則會將指定節(jié)點添加到同步等待隊列中 //如果指定節(jié)點被喚醒了則會將指定節(jié)點添加到同步等待隊列中 while (!isOnSyncQueue(node)) { //節(jié)點在等待隊列中則掛起 LockSupport.park(this); //線程在等待隊列中被中斷則會添加到同步等待隊列中 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //acquireQueued 指定節(jié)點中的線程被中斷了或者被喚醒了則會嘗試去獲取鎖 //如果還未到指定節(jié)點中的線程獲取鎖的時候則會繼續(xù)掛起 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) //指定節(jié)點的線程已經獲取到了鎖并且節(jié)點關聯(lián)的下一個節(jié)點不為空 //此時就需要將已經獲取到鎖的節(jié)點從等待隊列中移除 unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
首先通過addConditionWaiter
方法將當前線程封裝成一個等待模式的節(jié)點,并將節(jié)點添加到等待隊列中以及會將等待隊列中已經取消等待的線程節(jié)點從隊列中移除,再通過fullyRelease
方法釋放掉當前線程加的所有的鎖,之所以釋放鎖是防止其它線程獲取不到鎖從而一直阻塞,再看isOnSyncQueue
方法,該方法是校驗當前線程節(jié)點是否在等待隊列中,如果在等待隊列中那就將節(jié)點中的線程掛起等待。
isOnSyncQueue
final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) //指定節(jié)點還在等待隊列中此時就需要繼續(xù)等待 return false; if (node.next != null) //指定節(jié)點已經不在等待隊列中了 return true; //從等待隊列中的尾節(jié)點開始向頭節(jié)點遍歷,校驗指定的節(jié)點是否在其中 return findNodeFromTail(node); }
當節(jié)點的狀態(tài)為CONDITION
時,則說明該節(jié)點還在等待隊列中,node.prev等于null
為什么說也是在等待隊列中呢?因為等待隊列中的節(jié)點是沒有prev
指針和next
指針的,如果prev
指針和next
指針指向的節(jié)點不為空,那就說明該節(jié)點是在同步等待隊列中的,如果在同步等待隊列中的話,那節(jié)點中的線程就可以嘗試去獲取鎖并執(zhí)行后續(xù)的操作。
當等待隊列中的線程節(jié)點被喚醒和中斷則會添加到同步等待隊列中,如果是被中斷的話則會通過checkInterruptWhileWaiting
方法添加一個中斷標識,再通過acquireQueued
方法來獲取鎖,如果獲取鎖失敗則繼續(xù)等待,當獲取鎖成功之后則會該節(jié)點從等待隊列中移除,如果說你是一個被中斷的線程,最后會通過reportInterruptAfterWait
方法拋出中斷異常。
signal
public final void signal() { if (!isHeldExclusively()) //加鎖的線程不是當前線程則拋出異常 throw new IllegalMonitorStateException(); //頭節(jié)點 Node first = firstWaiter; if (first != null) //喚醒頭節(jié)點 doSignal(first); } /** * 喚醒等待隊列中的頭節(jié)點 * 如果等待隊列中的頭節(jié)點被取消等待或已經被喚醒了 * 此時就需要喚醒頭節(jié)點的后續(xù)的一個節(jié)點 * 直到成功的喚醒一個節(jié)點中的線程 */ private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } /** * 將指定的節(jié)點添加到同步等待隊列中 * 并根據前一個節(jié)點的等待狀態(tài)來決定是否需要立刻喚醒指定節(jié)點 */ final boolean transferForSignal(Node node) { if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) //更改節(jié)點狀態(tài)失敗說明該節(jié)點已經被喚醒了 return false; //將要喚醒的節(jié)點添加到同步等待隊列中 //并返回前一個節(jié)點 Node p = enq(node); //前一個節(jié)點的等待狀態(tài) int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //如果前一個節(jié)點的等待狀態(tài)大于0則說明已經被取消加鎖,此時就需要喚醒后續(xù)的節(jié)點,就是當前節(jié)點 //前一個節(jié)點的等待狀態(tài)不大于0但是更改前一個節(jié)點的等待狀態(tài)時失敗則說明前一個節(jié)點已經被喚醒了并更改了狀態(tài) //此時就需要嘗試將當前節(jié)點中的線程喚醒 LockSupport.unpark(node.thread); return true; }
喚醒線程節(jié)點的方法主要還是看transferForSignal
方法,首先會通過cas
操作將需要喚醒的節(jié)點的狀態(tài)設置為0,如果更改節(jié)點狀態(tài)失敗則說明該節(jié)點已經被喚醒了,更新節(jié)點狀態(tài)成功則會通過enq
方法將節(jié)點添加到同步等待隊列中,此時就需要根據前一個節(jié)點來決定是否需要立即喚醒當前節(jié)點中的線程。
從下面的圖片中能看出來其實同步等待隊列和等待隊列中使用的節(jié)點是共用的節(jié)點,并不會創(chuàng)建新的節(jié)點,同步等待隊列中的節(jié)點使用next
指針和prev
指針來關聯(lián)節(jié)點,而等待隊列中則是使用nextWaiter
指針來關聯(lián)節(jié)點的。
以上就是Java并發(fā)LinkedBlockingQueue源碼分析的詳細內容,更多關于Java并發(fā)LinkedBlockingQueue的資料請關注腳本之家其它相關文章!
相關文章
java基礎之Collection與Collections和Array與Arrays的區(qū)別
這篇文章主要介紹了java基礎之Collection與Collections和Array與Arrays的區(qū)別的相關資料,本文主要說明兩者的區(qū)別以防大家混淆概念,需要的朋友可以參考下2017-08-08詳解Spring Boot使用系統(tǒng)參數表提升系統(tǒng)的靈活性
Spring Boot項目中常有一些相對穩(wěn)定的參數設置項,其作用范圍是系統(tǒng)級的或模塊級的,這些參數稱為系統(tǒng)參數。這些變量以參數形式進行配置,從而提高變動和擴展的靈活性,保持代碼的穩(wěn)定性2021-06-06Netty分布式ByteBuf使用subPage級別內存分配剖析
這篇文章主要為大家介紹了Netty分布式ByteBuf使用subPage級別內存分配剖析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-03-03Springboot通過ObjectMapper配置json序列化詳解
SpringBoot默認集成Jackson庫,其中ObjectMapper類是核心,用于Java對象與JSON字符串的互轉,提供配置序列化特性、注冊模塊等方法,在SpringBoot中可以全局配置JSON格式,如日期格式化、將Long轉為字符串,還可以配置序列化時的各種規(guī)則,感興趣的可以了解一下2024-10-10