LinkedBlockingQueue鏈?zhǔn)阶枞?duì)列的使用和原理解析
概覽
1. 基于鏈表的可選有界阻塞隊(duì)列。根據(jù)FIFO的出入隊(duì)順序,從隊(duì)列頭部檢索和獲取元素,在隊(duì)列尾部插入新元素。
2. 當(dāng)作為有界阻塞隊(duì)列,在隊(duì)列空間不足時(shí),put方法將會(huì)一直阻塞直到有多余空間才會(huì)執(zhí)行插入元素操作,take方法則相反,只到隊(duì)列內(nèi)元素不為空時(shí),才將隊(duì)列元素逐個(gè)取出。
3. 隊(duì)列容量不指定時(shí),默認(rèn)為Integer.MAX_VALUE,此時(shí)可以看作無(wú)界隊(duì)列。
4. 使用非公平鎖進(jìn)行并發(fā)控制。所有方法都是線程安全的。
使用方法
下面的文章給出了阻塞隊(duì)列的四種基本用法:
了解BlockingQueue 大體框架和實(shí)現(xiàn)思路
LinkedBlockingQueue實(shí)現(xiàn)了BlockingQueue類。
在BlockingQueue中,方法被分為如下四類:
Throws exception
:操作未實(shí)現(xiàn)時(shí)(正常流程下的執(zhí)行)拋出異常Special value
:根據(jù)操作的實(shí)際情況,返回特定值,例如null、false(這些失敗可能是線程中斷、隊(duì)列為空引起的)Blocks
:阻塞當(dāng)前線程,直到當(dāng)前線程可以成功執(zhí)行Times out
:嘗試指定時(shí)間后,放棄執(zhí)行
Throws exception | Special value | Blocks | Times out | |
新增 | add(E e) | offer(E e) | put(E e) | offer(E e, long timeout, TimeUnit unit) |
刪除 | remove() | poll() | take() | poll(long timeout, TimeUnit unit) |
查詢 | element() | peek() |
1. add | remove | element
這三個(gè)方法在BlockingQueue的定義中,都會(huì)在操作未實(shí)現(xiàn)時(shí),拋出異常。
add(E e)
:在隊(duì)尾添加元素e,add內(nèi)部調(diào)用offer方法實(shí)現(xiàn)。因此,元素e為空時(shí),拋出NullPointerException異常;插入失敗時(shí),拋出IllegalStateException異常。remove
:刪除隊(duì)首元素,內(nèi)部調(diào)用poll方法。隊(duì)首無(wú)數(shù)據(jù)時(shí),拋出NoSuchElementException異常。element
:檢索隊(duì)首元素。隊(duì)首無(wú)數(shù)據(jù)時(shí),拋出NoSuchElementException異常。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>(); blockingQue.add(1); blockingQue.remove(1); blockingQue.remove(); // NoSuchElementException blockingQue.element(); // NoSuchElementException
2. offer | poll | peek
根據(jù)操作的實(shí)際情況,返回特定值,例如null、false(這些失敗可能是線程中斷、隊(duì)列為空引起的)
offer(E e)
:在隊(duì)尾添加元素e,元素e為空時(shí),拋出NullPointerException異常;插入失敗時(shí)返回false。poll
:刪除隊(duì)首元素。刪除失敗時(shí)返回false。peek
:檢索隊(duì)首元素。隊(duì)首無(wú)數(shù)據(jù)時(shí),返回null。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>(); blockingQue.offer(1); blockingQue.poll(); Integer peek = blockingQue.peek(); // 返回null
3. put | take
阻塞當(dāng)前線程,直到當(dāng)前線程可以成功執(zhí)行。
put(E e)
:在隊(duì)尾添加元素e,元素e為空時(shí),拋出NullPointerException異常。當(dāng)隊(duì)列滿時(shí),阻塞put線程,等待隊(duì)列被消費(fèi)后,隊(duì)列容量不滿時(shí),該阻塞線程繼續(xù)嘗試在隊(duì)尾插入元素。該方法在阻塞時(shí)可以被中斷,并拋出InterruptedException異常。take
:刪除并獲取隊(duì)首元素。隊(duì)首元素不為空時(shí)返回。隊(duì)首元素為空,阻塞take線程,等待隊(duì)列不為空時(shí),再次嘗試消費(fèi)隊(duì)首元素。該方法在阻塞時(shí)可以被中斷,并拋出InterruptedException異常。
注意:阻塞時(shí),不會(huì)解除鎖占用。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>(); try { blockingQue.put(1); blockingQue.take(); } catch (InterruptedException e) { // 線程被中斷 e.printStackTrace(); }
4. offer | poll (timeout)
嘗試指定時(shí)間后,放棄執(zhí)行
offer(E e, long timeout, TimeUnit unit)
:在隊(duì)尾添加元素e,元素e為空時(shí),拋出NullPointerException異常;當(dāng)隊(duì)列容量滿時(shí),線程休眠一定時(shí)間后再次查看隊(duì)列容量,當(dāng)該休眠時(shí)間大于等于timeout后,此時(shí)隊(duì)列還滿則返回false。不滿時(shí),嘗試入隊(duì)。需要注意的是,由于偽喚醒機(jī)制的存在,線程可能在timeout這個(gè)時(shí)間段內(nèi)的任意一點(diǎn)被喚醒,如果隊(duì)列容易不滿,則會(huì)直接執(zhí)行入隊(duì)操作。阻塞時(shí),當(dāng)前線程被中斷拋出InterruptedException異常。poll(long timeout, TimeUnit unit)
:刪除隊(duì)首元素。poll與offer對(duì)應(yīng)的,當(dāng)隊(duì)列為空的時(shí)候,線程休眠一定時(shí)間。休眠時(shí),當(dāng)前線程被中斷拋出InterruptedException異常。
LinkedBlockingQueue<Integer> blockingQue = new LinkedBlockingQueue<>(); try { blockingQue.offer(1, 100, TimeUnit.MILLISECONDS); blockingQue.poll(100, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); }
當(dāng)然除了上述的阻塞隊(duì)列的基本操作外,LinkedBlockingQueue還具有集合Collection的性質(zhì)。因此集合中的通用方法也可以使用。
源碼解析
說(shuō)明
本次源碼分析主要按照下面幾個(gè)步驟進(jìn)行:
1. 保存隊(duì)列數(shù)據(jù)的容器以及出入隊(duì)方法
2. 主要成員變量以及作用
3. 主要方法分析
隊(duì)列容器
結(jié)構(gòu)圖
僅有數(shù)據(jù)item和后繼next的單向節(jié)點(diǎn),結(jié)構(gòu)簡(jiǎn)單。
static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } }
next三種情況
A 普通節(jié)點(diǎn)的真實(shí)后繼
B 真正的隊(duì)首節(jié)點(diǎn),item=null(隊(duì)首節(jié)點(diǎn)恒為head.next)
C 隊(duì)尾節(jié)點(diǎn),next=null
- item: null -> first -> …… -> last
- next: first -> second -> ……-> null
入隊(duì)操作
// 入隊(duì) private void enqueue(Node<E> node) { // assert putLock.isHeldByCurrentThread(); // assert last.next == null; last = last.next = node; // last.next = node; last = node; }
步驟1
步驟2
出隊(duì)操作
// 出隊(duì) private E dequeue() { // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; // 形成引用鏈閉環(huán),JVM根據(jù)可達(dá)性分析時(shí),GC root的引用鏈與該對(duì)象之間不可達(dá),進(jìn)行GC h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
步驟1
步驟2
關(guān)鍵成員變量
隊(duì)列入隊(duì)和出隊(duì)鎖分離,都使用了非公平鎖。
這里的count屬性需要注意下,這里使用了原子類保證操作的原子性。后面的入隊(duì)和出隊(duì),將會(huì)頻繁使用它。
/** 容量, 初始化不設(shè)置時(shí)默認(rèn)為Integer.MAX_VALUE*/ private final int capacity; /** 當(dāng)前隊(duì)列內(nèi)的元素?cái)?shù)量 */ private final AtomicInteger count = new AtomicInteger(); /** * 隊(duì)首 * 不變量: head.item == null */ transient Node<E> head; /** * 隊(duì)尾 * 不變量: last.next == null */ private transient Node<E> last; /** 出隊(duì)操作公用鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 用于出隊(duì)操作的阻塞和喚醒 出隊(duì)的話,只需要考慮隊(duì)列是否為空 */ private final Condition notEmpty = takeLock.newCondition(); /** 入隊(duì)操作公用鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** 用于入隊(duì)操作的阻塞和喚醒 入隊(duì)只需要考慮隊(duì)列空間是否足夠*/ private final Condition notFull = putLock.newCondition();
初始化
三個(gè)構(gòu)造函數(shù)
public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } // 自定義容量 public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } // 初始化時(shí),批量添加集合中的元素 public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } }
put方法
put方法幾個(gè)關(guān)注點(diǎn)
- 釋放鎖的時(shí)機(jī)
- 執(zhí)行入隊(duì)操作
- 喚醒生產(chǎn)者的時(shí)機(jī)
- 喚醒消費(fèi)者的時(shí)機(jī)
這幾點(diǎn)是整個(gè)阻塞操作的核心,可以在下面的分析中仔細(xì)觀察。
注:由于阻塞隊(duì)列就是基于生產(chǎn)者-消費(fèi)者模型的,因此,下文中都把調(diào)用put方法的線程稱為生產(chǎn)者,調(diào)用take方法的線程稱為消費(fèi)者。
總體分析
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; // -1代表操作異常 Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; // 如果線程沒(méi)有被標(biāo)記為中斷,則獲取鎖 putLock.lockInterruptibly(); try { while (count.get() == capacity) { // 這里是線程在執(zhí)行put操作時(shí)唯一一個(gè)執(zhí)行過(guò)程中釋放鎖的地方 notFull.await(); // 容量已滿,等待被消費(fèi)后喚醒 } // 添加元素,更新容量 enqueue(node); c = count.getAndIncrement(); // 隊(duì)列容量有余時(shí),在這里再次喚醒一個(gè)其他的生產(chǎn)者線程(或者說(shuō)消費(fèi)者消費(fèi)速度大于生產(chǎn)) if (c + 1 < capacity) notFull.signal(); } finally { // 釋放鎖 putLock.unlock(); } // 喚醒一個(gè)消費(fèi)者 if (c == 0) signalNotEmpty(); }
count屬性并發(fā)問(wèn)題
這里需要重點(diǎn)關(guān)注count,由于有兩把鎖,count可以同時(shí)被putLock、takeLock操作,那么這里是否會(huì)產(chǎn)生并發(fā)問(wèn)題。
分析如下:
A. 只有putLock或takeLock一把鎖操作:就是單線程操作,沒(méi)影響,不產(chǎn)生并發(fā)問(wèn)題。
其他所有put操作都處于await的狀態(tài)或者競(jìng)爭(zhēng)鎖狀態(tài),其他線程也因?yàn)楂@取不到鎖而無(wú)法執(zhí)行,只有等該節(jié)點(diǎn)添加完成釋放鎖,其他線程才有機(jī)會(huì)繼續(xù)執(zhí)行。
while (count.get() == capacity) { notFull.await(); // 容量已滿,等待被消費(fèi)后喚醒 }
B. putLock和takeLock同時(shí)操作:我們假設(shè)兩個(gè)線程一個(gè)獲取到putLock,一個(gè)獲取到了takeLock(同時(shí)最多也只有兩個(gè)線程操作count)。
// put while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); // take while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal();
由于count是原子類那么count的所有讀寫操作必然是一個(gè)串聯(lián)的操作,而非并行操作,因此也不存在并發(fā)問(wèn)題,如下圖(順序可能不同):
喚醒消費(fèi)者
代碼的最后一段,會(huì)有喚醒一個(gè)消費(fèi)者的操作。
// 喚醒一個(gè)等待中的消費(fèi)者 private void signalNotEmpty() { final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
剛開(kāi)始看到的時(shí)候很疑惑,為什么是c == 0才喚醒。如果生產(chǎn)者入隊(duì)成功,那么c應(yīng)該為如下值:
c = count.getAndIncrement();
后面看了一下count.getAndIncrement()方法定義才發(fā)現(xiàn)自己記混了,count.getAndIncrement()是一個(gè)原子操作,且返回值的是操作前的值。
ok,現(xiàn)在沒(méi)問(wèn)題了。
count >= 0,也就是說(shuō),只有在生產(chǎn)者入隊(duì)前隊(duì)列為空,入隊(duì)成功之后才會(huì)喚醒一個(gè)消費(fèi)者消費(fèi)。
take方法
take方法與put方法大致相似,只是與put做相反操作。
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 隊(duì)列元素為空,停止消費(fèi),讓出鎖并等待被喚醒 while (count.get() == 0) { notEmpty.await(); } // 移除隊(duì)首元素,并更新容量 x = dequeue(); c = count.getAndDecrement(); // 生產(chǎn)速度大于消費(fèi)速度,喚醒一個(gè)其他消費(fèi)者進(jìn)行消費(fèi) if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } // 消費(fèi)之前隊(duì)列已滿,消費(fèi)后喚醒一個(gè)生產(chǎn)者 if (c == capacity) signalNotFull(); return x; } // 喚醒生產(chǎn)者 /** * Signals a waiting put. Called only from take/poll. */ private void signalNotFull() { final ReentrantLock putLock = this.putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
總結(jié)
總體上看LinkedBlockingQueue類不難,整個(gè)生產(chǎn)-消費(fèi)的流程實(shí)現(xiàn)也比較簡(jiǎn)單。源碼已經(jīng)把該介紹的東西都講得很明白了,我這屬于依葫蘆畫(huà)瓢順著源碼注釋寫出來(lái)的。這么一寫,自己這個(gè)類的印象就很深刻了。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
java定時(shí)任務(wù)cron表達(dá)式每周執(zhí)行一次的坑及解決
這篇文章主要介紹了java定時(shí)任務(wù)cron表達(dá)式每周執(zhí)行一次的坑及解決,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-06-06Java數(shù)據(jù)結(jié)構(gòu)之圖的兩種搜索算法詳解
在很多情況下,我們需要遍歷圖,得到圖的一些性質(zhì)。有關(guān)圖的搜索,最經(jīng)典的算法有深度優(yōu)先搜索和廣度優(yōu)先搜索,接下來(lái)我們分別講解這兩種搜索算法,需要的可以參考一下2022-11-11SpringBoot 如何使用RestTemplate來(lái)調(diào)用接口
這篇文章主要介紹了SpringBoot 如何使用RestTemplate來(lái)調(diào)用接口方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10Java創(chuàng)建可執(zhí)行JAR文件的多種方式
本文主要介紹了Java創(chuàng)建可執(zhí)行JAR文件的多種方式,使用JDK的jar工具、IDE、Maven和Gradle來(lái)創(chuàng)建和配置可執(zhí)行JAR文件,具有一定的參考價(jià)值,感興趣的可以了解一下2024-07-07