一文簡介Java中BlockingQueue阻塞隊(duì)列
最近在研究一些并發(fā)方面的技術(shù)其中研究到阻塞隊(duì)列(BlockingQueue)的時(shí)候做了寫筆記文檔 大家可以一起探討一下 :
BlockingQueue,是 java.util.concurrent 包提供的用于解決并發(fā)生產(chǎn)者 - 消費(fèi)者問題的最有用的類。
它的特性是在任意時(shí)刻只有一個(gè)線程可以進(jìn)行take或者put操作,并且BlockingQueue提供了超時(shí)return null的機(jī)制,在許多生產(chǎn)場(chǎng)景里都可以看到這個(gè)工具的身影。
一、隊(duì)列類型
1.無限隊(duì)列 (unbounded queue ) - 幾乎可以無限增長
2.有限隊(duì)列 ( bounded queue ) - 定義了最大容量
二、隊(duì)列數(shù)據(jù)結(jié)構(gòu)
隊(duì)列實(shí)質(zhì)就是一種存儲(chǔ)數(shù)據(jù)的結(jié)構(gòu)
通常用鏈表或者數(shù)組實(shí)現(xiàn)
一般而言隊(duì)列具備FIFO先進(jìn)先出的特性,當(dāng)然也有雙端隊(duì)列(Deque)優(yōu)先級(jí)隊(duì)列
主要操作:入隊(duì)(EnQueue)與出隊(duì)(Dequeue)
三、常見的4中阻塞隊(duì)列
- ArrayBlockingQueue 由數(shù)組支持的有界隊(duì)列
- LinkedBlockingQueue 由鏈接節(jié)點(diǎn)支持的可選有界隊(duì)列
- PriorityBlockingQueue 由優(yōu)先級(jí)堆支持的無界優(yōu)先級(jí)隊(duì)列
- DelayQueue 由優(yōu)先級(jí)堆支持的、基于時(shí)間的調(diào)度隊(duì)列
1.ArrayBlockingQueue
隊(duì)列基于數(shù)組實(shí)現(xiàn),容量大小在創(chuàng)建ArrayBlockingQueue對(duì)象時(shí)已定義好數(shù)據(jù)結(jié)構(gòu)如下圖:
①隊(duì)列創(chuàng)建:
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>();
②應(yīng)用場(chǎng)景:
在線程池中有比較多的應(yīng)用,生產(chǎn)者消費(fèi)者場(chǎng)景
③工作原理:
基于ReentrantLock保證線程安全,根據(jù)Condition實(shí)現(xiàn)隊(duì)列滿時(shí)的阻塞
2.LinkedBlockingQueue
是一個(gè)基于鏈表的無界隊(duì)列(理論上有界)
BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<>();
上面這段代碼中,blockingQueue 的容量將設(shè)置為 Integer.MAX_VALUE 。
向無限隊(duì)列添加元素的所有操作都將永遠(yuǎn)不會(huì)阻塞,[注意這里不是說不會(huì)加鎖保證線程安全],因此它可以增長到非常大的容量。
使用無限 BlockingQueue 設(shè)計(jì)生產(chǎn)者 - 消費(fèi)者模型時(shí)最重要的是 消費(fèi)者應(yīng)該能夠像生產(chǎn)者向隊(duì)列添加消息一樣快地消費(fèi)消息 。
否則,內(nèi)存可能會(huì)填滿,然后就會(huì)得到一個(gè) OutOfMemory 異常。
3.DelayQueue
由優(yōu)先級(jí)堆支持的、基于時(shí)間的調(diào)度隊(duì)列,內(nèi)部基于無界隊(duì)列PriorityQueue實(shí)現(xiàn),而無界隊(duì)列基于數(shù)組的擴(kuò)容實(shí)現(xiàn)。
①隊(duì)列創(chuàng)建:
BlockingQueue<String> blockingQueue = new DelayQueue();
②要求:
入隊(duì)的對(duì)象必須要實(shí)現(xiàn)Delayed接口,而Delayed集成自Comparable接口
③應(yīng)用場(chǎng)景:
電影票
④工作原理:
隊(duì)列內(nèi)部會(huì)根據(jù)時(shí)間優(yōu)先級(jí)進(jìn)行排序。延遲類線程池周期執(zhí)行。
4.BlockingQueue API
BlockingQueue 接口的所有方法可以分為兩大類:負(fù)責(zé)向隊(duì)列添加元素的方法和檢索這些元素的方法。在隊(duì)列滿/空的情況下,來自這兩個(gè)組的每個(gè)方法的行為都不同。
①添加元素
方法 | 說明 |
add() | 如果插入成功則返回 true,否則拋出 IllegalStateException 異常 |
put() | 將指定的元素插入隊(duì)列,如果隊(duì)列滿了,那么會(huì)阻塞直到有空間插入 |
offer() | 如果插入成功則返回 true,否則返回 false |
offer(E e, long timeout, TimeUnit unit) | 嘗試將元素插入隊(duì)列,如果隊(duì)列已滿,那么會(huì)阻塞直到有空間插入 |
②檢索元素
方法 | 說明 |
take() | 獲取隊(duì)列的頭部元素并將其刪除,如果隊(duì)列為空,則阻塞并等待元素變?yōu)榭捎?/td> |
poll(long timeout, TimeUnit unit) | 檢索并刪除隊(duì)列的頭部,如有必要,等待指定的等待時(shí)間以使元素可用,如果超時(shí),則返回 null |
在構(gòu)建生產(chǎn)者 - 消費(fèi)者程序時(shí),這些方法是 BlockingQueue 接口中最重要的構(gòu)建塊。
四、多線程生產(chǎn)者-消費(fèi)者示例
接下來我們創(chuàng)建一個(gè)由兩部分組成的程序 - 生產(chǎn)者 ( Producer ) 和消費(fèi)者 ( Consumer ) 。
生產(chǎn)者將生成一個(gè) 0 到 100 的隨機(jī)數(shù)(十全大補(bǔ)丸的編號(hào)),并將該數(shù)字放在 BlockingQueue 中。
我們將創(chuàng)建 16 個(gè)線程(潘金蓮)用于生成隨機(jī)數(shù)并使用 put() 方法阻塞,直到隊(duì)列中有可用空間。
需要記住的重要一點(diǎn)是,我們需要阻止我們的消費(fèi)者線程無限期地等待元素出現(xiàn)在隊(duì)列中。
從生產(chǎn)者(潘金蓮)向消費(fèi)者(武大郎)發(fā)出信號(hào)的好方法是,不需要處理消息,而是發(fā)送稱為毒 ( poison ) 丸 ( pill ) 的特殊消息。
我們需要發(fā)送盡可能多的毒 ( poison ) 丸 ( pill ) ,因?yàn)槲覀冇邢M(fèi)者(武大郎)。
然后當(dāng)消費(fèi)者從隊(duì)列中獲取特殊的毒 ( poison ) 丸 ( pill )消息時(shí),它將優(yōu)雅地完成執(zhí)行。
以下生產(chǎn)者的代碼:
@Slf4j public class NumbersProducer implements Runnable { private final int poisonPill; private final int poisonPillPerProducer; private BlockingQueue<Integer> numbersQueue; public NumbersProducer(BlockingQueue<Integer> numbersQueue, int poisonPill, int poisonPillPerProducer) { this.numbersQueue = numbersQueue; this.poisonPill = poisonPill; this.poisonPillPerProducer = poisonPillPerProducer; } @Override public void run() { try { generateNumbers (); } catch (InterruptedException e) { Thread.currentThread ().interrupt (); } } private void generateNumbers() throws InterruptedException { for (int i = 0; i < 100; i++) { numbersQueue.put (ThreadLocalRandom.current ().nextInt (100)); log.info ("潘金蓮-{}號(hào),給武大郎的泡藥!", Thread.currentThread ().getId ()); } for (int j = 0; j < poisonPillPerProducer; j++) { numbersQueue.put (poisonPill); log.info ("潘金蓮-{}號(hào),往武大郎的藥里放入第{}顆毒丸!", Thread.currentThread ().getId (), j + 1); } } }
我們的生成器構(gòu)造函數(shù)將 BlockingQueue 作為參數(shù),用于協(xié)調(diào)生產(chǎn)者和使用者之間的處理。
我們看到方法 generateNumbers() 將 100 個(gè)元素(生產(chǎn)100副藥給武大郎吃)放入隊(duì)列中。
它還需要有毒 ( poison ) 丸 ( pill ) (潘金蓮給武大郎下毒)消息,以便知道在執(zhí)行完成時(shí)放入隊(duì)列的消息類型。
該消息需要將 poisonPillPerProducer 次放入隊(duì)列中。
每個(gè)消費(fèi)者將使用 take() 方法從 BlockingQueue 獲取一個(gè)元素,因此它將阻塞,直到隊(duì)列中有一個(gè)元素。
從隊(duì)列中取出一個(gè) Integer 后,它會(huì)檢查該消息是否是毒 ( poison ) 丸 ( pill )(武大郎看潘金蓮有沒有下毒) ,如果是,則完成一個(gè)線程的執(zhí)行。
否則,它將在標(biāo)準(zhǔn)輸出上打印出結(jié)果以及當(dāng)前線程的名稱。
@Slf4j public class NumbersConsumer implements Runnable { private final int poisonPill; private BlockingQueue<Integer> queue; public NumbersConsumer(BlockingQueue<Integer> queue, int poisonPill) { this.queue = queue; this.poisonPill = poisonPill; } @Override public void run() { try { while (true) { Integer number = queue.take (); if (number.equals (poisonPill)) { return; } log.info ("武大郎-{}號(hào),喝藥-編號(hào):{}", Thread.currentThread ().getId (), number); } } catch (InterruptedException e) { Thread.currentThread ().interrupt (); } } }
需要注意的重要事項(xiàng)是隊(duì)列的使用。
與生成器構(gòu)造函數(shù)中的相同,隊(duì)列作為參數(shù)傳遞。
我們可以這樣做,是因?yàn)?BlockingQueue 可以在線程之間共享而無需任何顯式同步。
既然我們有生產(chǎn)者和消費(fèi)者,我們就可以開始我們的計(jì)劃。我們需要定義隊(duì)列的容量,并將其設(shè)置為 10個(gè)元素。
我們創(chuàng)建4 個(gè)生產(chǎn)者線程,并且創(chuàng)建等于可用處理器數(shù)量的消費(fèi)者線程:
public class Main { public static void main(String[] args) { int BOUND = 10; int N_PRODUCERS = 16; int N_CONSUMERS = Runtime.getRuntime ().availableProcessors (); int poisonPill = Integer.MAX_VALUE; int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS; int mod = N_CONSUMERS % N_PRODUCERS; BlockingQueue<Integer> queue = new LinkedBlockingQueue<> (BOUND); //潘金蓮給武大郎熬藥 for (int i = 1; i < N_PRODUCERS; i++) { new Thread (new NumbersProducer (queue, poisonPill, poisonPillPerProducer)).start (); } //武大郎開始喝藥 for (int j = 0; j < N_CONSUMERS; j++) { new Thread (new NumbersConsumer (queue, poisonPill)).start (); } //潘金蓮開始投毒,武大郎喝完毒藥GG new Thread (new NumbersProducer (queue, poisonPill, poisonPillPerProducer + mod)).start (); } }
BlockingQueue 是使用具有容量的構(gòu)造創(chuàng)建的。我們正在創(chuàng)造 4 個(gè)生產(chǎn)者和 N 個(gè)消費(fèi)者(武大郎)。
我們將我們的毒 ( poison ) 丸 ( pill )消息指定為 Integer.MAX_VALUE,因?yàn)槲覀兊纳a(chǎn)者在正常工作條件下永遠(yuǎn)不會(huì)發(fā)送這樣的值。
這里要注意的最重要的事情是 BlockingQueue 用于協(xié)調(diào)它們之間的工作。
到此這篇關(guān)于一文簡介Java中BlockingQueue阻塞隊(duì)列的文章就介紹到這了,更多相關(guān)Java BlockingQueue阻塞隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 關(guān)于Java中阻塞隊(duì)列BlockingQueue的詳解
- Java阻塞隊(duì)列BlockingQueue基礎(chǔ)與使用
- Java阻塞隊(duì)列必看類:BlockingQueue快速了解大體框架和實(shí)現(xiàn)思路
- Java阻塞隊(duì)列BlockingQueue詳解
- Java?阻塞隊(duì)列BlockingQueue詳解
- Java并發(fā)編程之阻塞隊(duì)列(BlockingQueue)詳解
- Java源碼解析阻塞隊(duì)列ArrayBlockingQueue介紹
- Java源碼解析阻塞隊(duì)列ArrayBlockingQueue常用方法
- Java源碼解析阻塞隊(duì)列ArrayBlockingQueue功能簡介
- 詳解Java阻塞隊(duì)列(BlockingQueue)的實(shí)現(xiàn)原理
- java 中 阻塞隊(duì)列BlockingQueue詳解及實(shí)例
相關(guān)文章
SpringCloud2020版本配置與環(huán)境搭建教程詳解
這篇文章主要介紹了SpringCloud2020版本配置與環(huán)境搭建教程詳解,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-12-12jdk動(dòng)態(tài)代理和cglib動(dòng)態(tài)代理詳解
本篇文章主要介紹了深度剖析java中JDK動(dòng)態(tài)代理機(jī)制 ,動(dòng)態(tài)代理避免了開發(fā)人員編寫各個(gè)繁鎖的靜態(tài)代理類,只需簡單地指定一組接口及目標(biāo)類對(duì)象就能動(dòng)態(tài)的獲得代理對(duì)象2021-07-07SpringBoot切面攔截@PathVariable參數(shù)及拋出異常的全局處理方式
這篇文章主要介紹了SpringBoot切面攔截@PathVariable參數(shù)及拋出異常的全局處理方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08JAVA 創(chuàng)建線程池的注意事項(xiàng)
這篇文章主要介紹了JAVA 創(chuàng)建線程池的注意事項(xiàng),文中講解非常細(xì)致,代碼幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下2020-07-07Java org.w3c.dom.Document 類方法引用報(bào)錯(cuò)
這篇文章主要介紹了Java org.w3c.dom.Document 類方法引用報(bào)錯(cuò)的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08基于Ajax用戶名驗(yàn)證、服務(wù)條款加載、驗(yàn)證碼生成的實(shí)現(xiàn)方法
本篇文章對(duì)Ajax用戶名驗(yàn)證、服務(wù)條款加載、驗(yàn)證碼生成的實(shí)現(xiàn)方法,進(jìn)行了詳細(xì)的分析介紹。需要的朋友參考下2013-05-05Linux配置jdk1.8與jdk17兼容并存并啟動(dòng)jar包指定jdk版本
JDK是Java語言的軟件開發(fā)工具包,主要用于移動(dòng)設(shè)備、嵌入式設(shè)備上的java應(yīng)用程序,這篇文章主要給大家介紹了關(guān)于Linux配置jdk1.8與jdk17兼容并存并啟動(dòng)jar包指定jdk版本的相關(guān)資料,需要的朋友可以參考下2024-08-08Java通過HttpClient進(jìn)行HTTP請(qǐng)求的代碼詳解
Apache?HttpClient是一個(gè)功能強(qiáng)大且廣泛使用的Java庫,它提供了方便的方法來執(zhí)行HTTP請(qǐng)求并處理響應(yīng)。本文將介紹如何使用HttpClient庫進(jìn)行HTTP請(qǐng)求,包括GET請(qǐng)求、POST請(qǐng)求、添加參數(shù)和請(qǐng)求體、設(shè)置請(qǐng)求頭等操作,需要的朋友可以參考下2023-05-05