深入理解Java并發(fā)編程之LinkedBlockingQueue隊列
前面一篇文章我們介紹了使用CAS算法實現(xiàn)的非阻塞隊列ConcurrentLinedQueue, 下面我們來介紹使用獨占鎖實現(xiàn)的阻塞隊列LinkedBlockingQueue。
LinkedBlockingQueue也是使用單向鏈表實現(xiàn)的,其也有兩個Node,分別用來存放首、尾節(jié)點,并且還有一個初始值為0的原子變量count,用來記錄隊列元素個數(shù)。另外還有兩個ReentrantLock的實例,分別用來控制元素入隊和出隊的原子性,其中takeLock用來控制同時只有一個線程可以從隊列頭獲取元素,其他線程必須等待,putLock控制同時只能有一個線程可以獲取鎖,在隊列尾部添加元素,其他線程必須等待。另外,notEmpty 和 notFull 是條件變量,它們內(nèi)部都有一個條件隊列用來存放進隊和出隊時被阻塞的線程,其實這是生產(chǎn)者-消費者模型。如下是獨占鎖的創(chuàng)建代碼。
private final AtomicInteger count = new AtomicInteger(); /** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
當調(diào)用線程在LinkedBlockingQueue 實例上執(zhí)行take、poll 等操作時需要獲取到 takeLock 鎖,從而保證同時只有一個線程可以操作鏈表頭節(jié)點。另外由于條件變量 notEmpty 內(nèi)部的條件隊列的維護使用的是takeLock的鎖狀態(tài)管理機制,所以在調(diào)用notEmpty的await 和signal方法前調(diào)用線程必須先獲取到 takeLock鎖,否則會拋出IllegalMonitorStateException 異常。notEmpty內(nèi)部則維護著一個條件隊列,當線程獲取到takeLock 鎖后調(diào)用 notEmpty的await 方法時,調(diào)用線程會被阻塞,然后該線程會被放到notEmpty內(nèi)部的條件隊列進行等待,直到有線程調(diào)用了notEmpty的 signal 方法。
在LinkedBlockingQueue實例上執(zhí)行put、offer等操作時需要獲取到putLock鎖,從而保證同時只有一個線程可以操作鏈表尾節(jié)點。同樣由于條件變量 notFull 內(nèi)部的條件隊列的維護使用的是putLock的鎖狀態(tài)管理機制,所以在調(diào)用 notFull 的 await 和 signal 方法前調(diào)用線程必須先獲取到putLock鎖,否則會拋出 IllegalMonitorStateException 異常。notFull 內(nèi)部則維護著一個條件隊列,當線程獲取到 putLock 鎖后調(diào)用notFull的await 方法時,調(diào)用線程會被阻塞,然后該線程會被放到notFull 內(nèi)部的條件隊列進行等待,直到有線程調(diào)用了 notFull 的 signal 方法。如下是LinkedBlockingQueue 的無參構造函數(shù)的代碼。
如下是LinkedBlockingQueue的無參構造代碼
public static final int MAX_VALUE = 0x7fffffff; public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalAgrumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
由該代碼可知,默認隊列容量為0x7fffffff,用戶也可以自己指定容量,所以從一定程度上可以說LinkedBlockingQueue是有界阻塞隊列。
offer操作
public boolean offer(E e) { //(1) if (e == null) throw new NullPointerException(); //(2) final AtomicInteger count = this.count; if (count.get() == capacity) return false; //(3) int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { //(4) if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); //(5) if (c + 1 < capacity) notFull.signal(); } } finally { //(6) putLock.unlock(); } //(7) if (c == 0) signalNotEmpty(); //(8) return c >= 0; }
代碼(2)判斷如果當前隊列已滿則丟棄當前元素并返回false
代碼(3)獲取到 putLock 鎖,當前線程獲取到該鎖后,則其他調(diào)用put和 offer操的線程將會被阻塞(阻塞的線程被放到putLock鎖的AQS阻塞隊列)。
代碼(4)這里重新判斷當前隊列是否滿,這是因為在執(zhí)行代碼(2)和獲取到 putLock 鎖期間可能其他線程通過 put 或者offer 操作向隊列里面添加了新元素。重新判斯隊列確實不滿則新元素入隊,并遞增計數(shù)器。
代碼(5)判斷如果新元素入隊后隊列還有空閑空間,則喚醒notFull的條件隊列里面因為調(diào)用了notFull的await操作(比如執(zhí)行put方法而隊列滿了的時候)而被阻塞的一個線程,因為隊列現(xiàn)在有空閑所以這里可以提前喚醒一個入隊線程。
代碼(6)則釋放獲取的putLock 鎖,這里要注意,鎖的釋放一定要在finally里面做因為即使try塊拋出異常了,finally也是會被執(zhí)行到。另外釋放鎖后其他因為調(diào)用put 操作而被阻塞的線程將會有一個獲取到該鎖。
代碼(7)中的c0說明在執(zhí)行代碼(6)釋放鎖時隊列里面至少有一個元素,隊列里面有元素則執(zhí)行signalNotEmpty操作.
到此這篇關于深入理解Java并發(fā)編程之LinkedBlockingQueue隊列的文章就介紹到這了,更多相關Java LinkedBlockingQueue隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Spring中事務管理方案和事務管理器及事務控制的API詳解
這篇文章主要介紹了Spring中事務管理方案和事務管理器及事務控制的API詳解,事務管理是指對事務進行管理和控制,以確保事務的正確性和完整性,事務管理的作用是保證數(shù)據(jù)庫的數(shù)據(jù)操作的一致性和可靠性,需要的朋友可以參考下2023-08-08SpringBoot整合Spring Data Elasticsearch的過程詳解
這篇文章主要介紹了SpringBoot整合Spring Data Elasticsearch的過程詳解,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2019-09-09Springboot整合PageOffice 實現(xiàn)word在線編輯保存功能
這篇文章主要介紹了Springboot整合PageOffice 實現(xiàn)word在線編輯保存,本文以Samples5 為示例文件結合示例代碼給大家詳細介紹,需要的朋友可以參考下2021-08-08