詳解java中的阻塞隊列
阻塞隊列簡介
阻塞隊列(BlockingQueue)首先是一個支持先進(jìn)先出的隊列,與普通的隊列完全相同;
其次是一個支持阻塞操作的隊列,即:
- 當(dāng)隊列滿時,會阻塞執(zhí)行插入操作的線程,直到隊列不滿。
- 當(dāng)隊列為空時,會阻塞執(zhí)行獲取操作的線程,直到隊列不為空。
阻塞隊列用在多線程的場景下,因此阻塞隊列使用了鎖機(jī)制來保證同步,這里使用的可重入鎖;
而對于阻塞與喚醒機(jī)制則有與鎖綁定的Condition
實現(xiàn)
應(yīng)用場景:生產(chǎn)者消費者模式
java中的阻塞隊列
java中的阻塞隊列根據(jù)容量可以分為有界隊列和無界隊列:
- 有界隊列:隊列中只能存儲有限個元素,超出后存放元素線程會被阻塞或者失敗。
- 無界隊列:隊列中可以存儲無限個元素。
java8中提供了7種阻塞隊列阻塞隊列供開發(fā)者使用,如下表:
類名 | 描述 |
ArrayBlockingQueue | 一個由數(shù)組結(jié)構(gòu)組成的有界阻塞隊列 |
LinkedBlockingQueue | 由鏈表結(jié)構(gòu)組成的有界阻塞隊列(默認(rèn)大小Integer.MAX_VALUE) |
PriorityBlockingQueue | 支持優(yōu)先級排序的無界阻塞隊列 |
DelayQueue | 使用優(yōu)先級隊列實現(xiàn)的延遲無界阻塞隊列 |
SynchronousQueue | 不存儲元素的阻塞隊列,即單個元素的隊列 |
LinkedTransferQueue | 由鏈表結(jié)構(gòu)組成的無界阻塞隊列 |
LinkedBlockingDeque | 由鏈表結(jié)構(gòu)組成的雙向阻塞隊列 |
另外還有一個在ScheduledThreadPoolExecutor
中實現(xiàn)的DelayedWorkQueue
阻塞隊列,
但這個阻塞隊列開發(fā)者不能使用。它們之間的UML類圖如下圖:
BlockingQueue接口是阻塞隊列對外的訪問接口,所有的阻塞隊列都實現(xiàn)了BlockQueue中的方法
BlockQueue中方法
作為一個隊列的核心方法就是入隊和出隊。由于存在阻塞策略,BlockQueue
將出隊入隊的情況分為了四組,每組提供不同的方法:
- 拋出異常:當(dāng)隊列滿時,如果再往隊列中插入元素,則拋出
IllegalStateException
異常;當(dāng)隊列為空時,從隊列中獲取元素則拋出NoSuchElementException
異常。 - 返回特定值(布爾值):當(dāng)隊列滿時,如果再往隊列中插入元素,則返回false;當(dāng)隊列為空時,從隊列中獲取元素則返回null。
- 一直阻塞:當(dāng)隊列滿時,如果再往隊列中插入元素,阻塞當(dāng)前線程直到隊列中至少一個被移除或者響應(yīng)中斷退出;當(dāng)隊列為空時,則阻塞當(dāng)前線程直到至少一個元素元素入隊或者響應(yīng)中斷退出。
- 超時退出:當(dāng)隊列滿時,如果再往隊列中插入元素,阻塞當(dāng)前線程直到隊列中至少一個被移除或者達(dá)到指定的等待時間退出或者響應(yīng)中斷退出;當(dāng)隊列為空時,則阻塞當(dāng)前線程直到至少一個元素元素入隊或者達(dá)到指定的等待時間退出或者響應(yīng)中斷退出。
對于每種情況BlockingQueue
提供的方法如下表:
方法\處理方式 | 拋出異常 | 返回特定值(布爾值) | 一直阻塞 | 超時退出 |
插入 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除 | remove() | poll() | take() | poll(time.unit) |
檢查 | element() | peek() | 不可用 | 不可用 |
上述方法一般用于生產(chǎn)者-消費者模型中,是其中的生產(chǎn)和消費操作隊列的核心方法。
除了這些方法,BlockingQueue
還提供了一些其他的方法如下表:
方法名稱 | 描述 |
remove(Object o) | 從隊列中移除一個指定值 |
size() | 獲取隊列中元素的個數(shù) |
contains(Object o) | 判斷隊列是否包含指定的元素,但是這個元素在這次判斷完可能就會被消費 |
drainTo(Collection<? super E> c) | 將隊列中元素放在給定的集合中,并返回添加的元素個數(shù) |
drainTo(Collection<? super E> c, int maxElements) | 將隊列中元素取maxElements(不超過隊列中元素個數(shù))個放在給定的集合中,并返回添加的元素個數(shù) |
remainingCapacity() | 計算隊列中還可以存放的元素個數(shù) |
toArray() | 以objetc數(shù)組的形式獲取隊列中所有的元素 |
toArray(T[] a) | 以給定類型數(shù)組的方式獲取隊列中所有的元素 |
clear() | 清空隊列,危險的操作 |
阻塞隊列的實現(xiàn)原理
阻塞隊列的實現(xiàn)依靠通知模式實現(xiàn):當(dāng)生產(chǎn)者向滿了的隊列中添加元素時,會阻塞住生產(chǎn)者,
直到消費者消費了一個隊列中的元素后會通知消費者隊列可用,此時再由生產(chǎn)者向隊列中添加元素。反之亦然。
阻塞隊列的阻塞喚醒依靠Condition
——條件隊列來實現(xiàn)。
以ArrayBlockingQueue
為例說明:
ArrayBlockingQueue
的定義:
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { /** The queued items */ //以數(shù)組的結(jié)構(gòu)存儲隊列的元素,采用的是循環(huán)數(shù)組 final Object[] items; /** items index for next take, poll, peek or remove */ //隊列的隊頭索引 int takeIndex; /** items index for next put, offer, or add */ //隊列的隊尾索引 int putIndex; /** Number of elements in the queue */ //隊列中元素的個數(shù) int count; /** Main lock guarding all access */ //對于ArrayBlockingQueue所有的操作都需要加鎖, final ReentrantLock lock; /** Condition for waiting takes */ //條件隊列,當(dāng)隊列為空時阻塞消費者并在生產(chǎn)者生產(chǎn)后喚醒消費者 private final Condition notEmpty; /** Condition for waiting puts */ //條件隊列,當(dāng)隊列滿時阻塞生產(chǎn)者,并在消費者消費隊列后喚醒生產(chǎn)者 private final Condition notFull; }
根據(jù)類的定義字段可以看到,有兩個Condition
條件隊列,猜測以下過程
- 當(dāng)隊列為空,消費者試圖消費時應(yīng)該調(diào)用
notEmpty.await()
方法阻塞,并在生產(chǎn)者生產(chǎn)后調(diào)用notEmpty.single()
方法 - 當(dāng)隊列已滿,生產(chǎn)者試圖放入元素應(yīng)調(diào)用
notFull.await()
方法阻塞,并在消費者消費隊列后調(diào)用notFull.single()
方法向隊
向隊列中添加元素put()
方法的添加過程。
/** * 向隊列中添加元素 * 當(dāng)隊列已滿時需要阻塞當(dāng)前線程 * 放入元素后喚醒因隊列為空阻塞的消費者 */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //當(dāng)隊列已滿時需要notFull.await()阻塞當(dāng)前線程 //offer(e,time,unit)方法就是阻塞的時候加了超時設(shè)定 while (count == items.length) notFull.await(); //放入元素的過程 enqueue(e); } finally { lock.unlock(); } } /**enqueue實際添加元素的方法*/ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; //如果條件隊列中存在等待的線程 //喚醒 notEmpty.signal(); }
從隊列中獲取元素take()
方法的獲取過程。
/** * 從隊列中獲取元素 * 當(dāng)隊列已空時阻塞當(dāng)前線程 * 從隊列中消費元素后喚醒等待的生產(chǎn)線程 */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //隊列為空需要阻塞當(dāng)前線程 while (count == 0) notEmpty.await(); //獲取元素的過程 return dequeue(); } finally { lock.unlock(); } } /**dequeue實際消費元素的方法*/ private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); //消費元素后從喚醒阻塞的生產(chǎn)者線程 notFull.signal(); return x; }
總結(jié)
阻塞隊列提供了不同于普通隊列的增加、刪除元素的方法,核心在與隊列滿時阻塞生產(chǎn)者和隊列空時阻塞消費者。
這一阻塞過程依靠與鎖綁定的Condition
對象實現(xiàn)。Condition
接口的實現(xiàn)在AQS中實現(xiàn),具體的實現(xiàn)類是
ConditionObject
以上就是詳解java中的阻塞隊列的詳細(xì)內(nèi)容,更多關(guān)于java 阻塞隊列的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringSecurity自定義AuthenticationProvider無法@Autowire的解決
這篇文章主要介紹了SpringSecurity自定義AuthenticationProvider無法@Autowire的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12SpringBoot中的@EnableAutoConfiguration注解解析
這篇文章主要介紹了SpringBoot中的@EnableAutoConfiguration注解解析,@EnableAutoConfiguration也是借助@Import的幫助,將所有符合自動配置條件的bean定義注冊到IoC容器,需要的朋友可以參考下2023-09-09java中關(guān)于轉(zhuǎn)義字符的一個bug
本文主要介紹了java中關(guān)于轉(zhuǎn)義字符的一個bug。具有很好的參考價值,下面跟著小編一起來看下吧2017-02-02