Java阻塞隊(duì)列BlockingQueue詳解
隊(duì)列的類型
- 無限隊(duì)列(unbounded queue) 無容量限定,只隨存儲(chǔ)變化
- 有限隊(duì)列(bounded queue) 定義了最大容量
向無限隊(duì)列添加元素的所有操作都將永遠(yuǎn)不會(huì)阻塞(也是線程安全的),因此它可以增長到非常大的容量。 使用無限阻塞隊(duì)列 BlockingQueue 設(shè)計(jì)生產(chǎn)者 - 消費(fèi)者模型時(shí)最重要的是消費(fèi)者應(yīng)該能夠像生產(chǎn)者向隊(duì)列添加消息一樣快地消費(fèi)消息 。否則可能內(nèi)存不足而拋出 OutOfMemory 異常。
數(shù)據(jù)結(jié)構(gòu)
- 1.通常使用鏈表或數(shù)組實(shí)現(xiàn)
- 2.一般具有 FIFO(先進(jìn)先出) 特性,也可以設(shè)計(jì)為雙端隊(duì)列
- 3.隊(duì)列的主要操作:入隊(duì)和出隊(duì)
阻塞隊(duì)列 BlockingQueue
定義:線程通信中,在任意時(shí)刻,無論并發(fā)有多高,在單個(gè) JVM 上,同一時(shí)間永遠(yuǎn)只有一個(gè)線程能對(duì)隊(duì)列進(jìn)行入隊(duì)或出隊(duì)操作。BlockingQueue 可以在線程之間共享而無需任何顯式同步
阻塞隊(duì)列的類型:
JAVA中的應(yīng)用場景 : 線程池、SpringCloud-Eureka 三級(jí)緩存、Nacos、MQ、Netty 等
常見的阻塞隊(duì)列
- ArrayBlockingQueue : 由數(shù)組支持的有界隊(duì)列
- 應(yīng)用場景: 線程池中有比較多的應(yīng)用、生產(chǎn)者消費(fèi)者模型
- 工作原理: 基于 ReentrantLock 保證線程安全,根據(jù)Condition實(shí)現(xiàn)隊(duì)列滿時(shí)的阻塞
- LinkedBlockingQueue : 基于鏈表的無界隊(duì)列(理論上有界)
- PriorityBlockingQueue : 由優(yōu)先級(jí)堆支持的無界優(yōu)先級(jí)隊(duì)列
- DelayQueue : 由優(yōu)先級(jí)堆支持的、基于時(shí)間的調(diào)度隊(duì)列,內(nèi)部基于無界隊(duì)列PriorityQueue 實(shí)現(xiàn),而無界隊(duì)列基于數(shù)組的擴(kuò)容實(shí)現(xiàn)
- 使用方法: 入隊(duì)的對(duì)象必須要實(shí)現(xiàn) Delayed 接口,而 Delayed 集成自 Comparable 接口
- 應(yīng)用場景: 售賣電影票等
- 工作原理: 隊(duì)列內(nèi)部會(huì)根據(jù)時(shí)間優(yōu)先級(jí)進(jìn)行排序。延遲類線程池周期執(zhí)行。
它們都實(shí)現(xiàn)了BlockingQueue接口,都有put()和take()等方法,創(chuàng)建方式如下:
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<> (666);
BlockingQueue API
添加元素:
方法 | 含義 |
---|---|
add() | 如果插入成功則返回 true,否則拋出 IllegalStateException 異常 |
put() | 將指定的元素插入隊(duì)列,如果隊(duì)列滿了,會(huì)阻塞直到有空間插入 |
offer() | 如果插入成功則返回 true,否則返回 false |
offer(E e, long timeout, TimeUnit unit) | 嘗試將元素插入隊(duì)列,如果隊(duì)列已滿,會(huì)阻塞直到有空間插入,阻塞有時(shí)間控制 |
檢索元素:
方法 | 含義 |
---|---|
take() | 獲取隊(duì)列的頭部元素并將其刪除,如果隊(duì)列為空,則阻塞并等待元素變?yōu)榭捎?/td> |
poll(long timeout, TimeUnit unit) | 檢索并刪除隊(duì)列的頭部,如有必要,等待指定的等待時(shí)間以使元素可用,如果超時(shí),則返回 null |
ArrayBlockingQueue 源碼簡解
實(shí)現(xiàn):同步等待隊(duì)列(CLH)+ 條件等待隊(duì)列滿足條件的元素在CLH隊(duì)列中等待鎖,不滿足條件的隊(duì)列挪到條件等待隊(duì)列,滿足條件后再從 tail 插入 CLH 隊(duì)列
線程獲取鎖的條件: 在 CLH 隊(duì)列里等待的 Node 節(jié)點(diǎn),并且 Node 節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是 Singal。條件等待隊(duì)列里的線程是無法獲取鎖的。
/** * 構(gòu)造方法 * 還有兩個(gè)構(gòu)造函數(shù),一個(gè)無fair參數(shù),一個(gè)可傳入集合,創(chuàng)建時(shí)插入隊(duì)列 * @param capacity 固定容量 * @param fair 默認(rèn)是false:訪問順序未指定; true:按照FIFO順序處理 */ public ArrayBlockingQueue(int capacity, boolean fair) { ? if (capacity <= 0) ? ? ? ?throw new IllegalArgumentException(); ? ?this.items = new Object[capacity]; ? ?lock = new ReentrantLock(fair); // 根據(jù)fair創(chuàng)建對(duì)應(yīng)的鎖 ? ?// 條件對(duì)象,配合容器能滿足業(yè)務(wù) ? ?notEmpty = lock.newCondition(); // 出隊(duì)條件對(duì)象 ? ?notFull = ?lock.newCondition(); // 入隊(duì)條件對(duì)象 } /** * 入隊(duì)方法 * 在隊(duì)列的尾部插入指定的元素,如果隊(duì)列已滿,則等待空間可用 */ public void put(E e) throws InterruptedException { ? ?checkNotNull(e); // 檢查put對(duì)象是否為空,空拋出異常 ? ?final ReentrantLock lock = this.lock; ? ?lock.lockInterruptibly(); // 若未被中斷嘗試獲取鎖,詳見下文 ? ?try { ? // 隊(duì)列中元素的數(shù)量 等于 排隊(duì)元素的長度 ? ? ? ?while (count == items.length) ? ? ? ? ? ?notFull.await(); // 見下文 ? ? ? ?enqueue(e); // 元素入隊(duì) ? } finally { ? ? ? ?lock.unlock(); ? } } /** * 出隊(duì)方法 * 獲取隊(duì)列的頭部元素并將其刪除,如果隊(duì)列為空,則阻塞并等待元素變?yōu)榭捎? */ public E take() throws InterruptedException { ? ?final ReentrantLock lock = this.lock; ? ?lock.lockInterruptibly(); // 見下文 ? ?try { ? ? ? ?while (count == 0) ? ? ? ? ? ?notEmpty.await(); // 見下文 ? ? ? ?return dequeue(); // 元素出隊(duì) ? } finally { ? ? ? ?lock.unlock(); ? } }
令當(dāng)前線程等待,直到收到信號(hào)或被中斷詳:與此 Condition 關(guān)聯(lián)的鎖被自動(dòng)釋放,進(jìn)入等待,并且處于休眠狀態(tài),直到發(fā)生以下四種情況之一:
- ①其他線程調(diào)用這個(gè)Condition的 signal 方法,當(dāng)前線程恰好被選為要被喚醒的線程;
- ②其他線程調(diào)用這個(gè)條件的 signalAll 方法
- ③其他線程中斷當(dāng)前線程,支持中斷線程掛起;
- ④一個(gè)“虛假的喚醒”發(fā)生了。
在這些情況下,在此方法返回之前,當(dāng)前線程必須重新獲得與此條件相關(guān)聯(lián)的鎖。當(dāng)線程返回時(shí),保證它持有這個(gè)鎖。
如果當(dāng)前線程有以下兩種情況之一:
- ①在進(jìn)入該方法時(shí)設(shè)置中斷狀態(tài);
- ②在等待時(shí)被中斷,支持線程掛起的中斷 拋出InterruptedException
生產(chǎn)者消費(fèi)者模式
BlockingQueue 可以在線程之間共享而無需任何顯式同步,在生產(chǎn)者消費(fèi)者之間,只需要將阻塞隊(duì)列以參數(shù)的形式進(jìn)行傳遞即可。它內(nèi)部的機(jī)制會(huì)自動(dòng)保證線程的安全性。
生產(chǎn)者:實(shí)現(xiàn)了 Runnable 接口,每個(gè)生產(chǎn)者生產(chǎn)100種商品和1個(gè)中斷標(biāo)記后完成線程任務(wù)
@Slf4j @Slf4j public class Producer implements Runnable{ ? ?// 作為參數(shù)的阻塞隊(duì)列 ? ?private BlockingQueue<Integer> blockingQueue; ? ?private final int stopTag; ? ?/** ? ? * 構(gòu)造方法 ? ? * @param blockingQueue ? ? * @param stopTag ? ? */ ? ?public Producer(BlockingQueue<Integer> blockingQueue,int stopTag) { ? ? ? ?this.blockingQueue = blockingQueue; ? ? ? ?this.stopTag = stopTag; ? } ? ?@Override ? ?public void run() { ? ? ? ?try { ? ? ? ? ? ?generateNumbers(); ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ?Thread.currentThread().interrupt(); ? ? ? } ? } ? ?private void generateNumbers() throws InterruptedException { ? ? ? ?// 每個(gè)生產(chǎn)者都隨機(jī)生產(chǎn)10種商品 ? ? ? ?for (int i = 0; i < 10; i++) { ? ? ? ? ? ?int product = ThreadLocalRandom.current().nextInt(1000,1100); ? ? ? ? ? ?log.info("生產(chǎn)者{}號(hào),生產(chǎn)了商品,編號(hào)為{}",Thread.currentThread().getId(),product); ? ? ? ? ? ?blockingQueue.put(product); ? ? ? } ? ? ? ?// 生產(chǎn)終止標(biāo)記 ? ? ? ?blockingQueue.put(stopTag); ? ? ? ?log.info("生產(chǎn)者{}號(hào),生產(chǎn)了第終止標(biāo)記編號(hào){}",Thread.currentThread().getId(),Thread.currentThread().getId()); ? } }
消費(fèi)者:消費(fèi)者拿到終止消費(fèi)標(biāo)記終止消費(fèi),否則消費(fèi)商品,拿到終止標(biāo)記后完成線程任務(wù)
@Slf4j public class Consumer implements Runnable{ ? ?// 作為參數(shù)的阻塞隊(duì)列 ? ?private BlockingQueue<Integer> queue; ? ?private final int stopTage; ? ?public Consumer(BlockingQueue<Integer> queue, int stopTage) { ? ? ? ?this.queue = queue; ? ? ? ?this.stopTage = stopTage; ? } ? ?@Override ? ?public void run() { ? ? ? ?try { ? ? ? ? ? ?while (true) { ? ? ? ? ? ? ? ?Integer product = queue.take(); ? ? ? ? ? ? ? ?if (product.equals(stopTage)) { ? ? ? ? ? ? ? ? ? ?log.info("{}號(hào)消費(fèi)者,停止消費(fèi),因?yàn)槟玫搅送V瓜M(fèi)標(biāo)記",Thread.currentThread().getId()); ? ? ? ? ? ? ? ? ? ?return; ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ?log.info("{}號(hào)消費(fèi)者,拿到的商品編號(hào):{}",Thread.currentThread().getId(),product); ? ? ? ? ? } ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ?Thread.currentThread().interrupt(); ? ? ? } ? } }
客戶端類: 創(chuàng)建與計(jì)算機(jī) CPU 核數(shù)相同的線程數(shù),與 16個(gè)生產(chǎn)者
public class ProductConsumerTest { ? ?public static void main(String[] args) { ? ? ? ?// 阻塞隊(duì)列容量 ? ? ? ?int blockingQueueSize = 10; ? ? ? ?// 生產(chǎn)者數(shù)量 ? ? ? ?int producerSize = 16; ? ? ? ?// 消費(fèi)者數(shù)量 = 計(jì)算機(jī)線程核數(shù) 8 ? ? ? ?int consumerSize = Runtime.getRuntime().availableProcessors(); ? ? ? ?// 終止消費(fèi)標(biāo)記 ? ? ? ?int stopTag = Integer.MAX_VALUE; ? ? ? ?BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(blockingQueueSize); ? ? ? ?// 創(chuàng)建16個(gè)生產(chǎn)者線程 ? ? ? ?for (int i = 0; i < producerSize; i++) { ? ? ? ? ? ?new Thread(new Producer(blockingQueue, stopTag)).start(); ? ? ? } ? ? ? ?// 創(chuàng)建8個(gè)消費(fèi)者線程 ? ? ? ?for (int j = 0; j < consumerSize; j++) { ? ? ? ? ? ?new Thread(new Consumer(blockingQueue, stopTag)).start(); ? ? ? } ? } }
延遲隊(duì)列 DelayQueue
定義: Java 延遲隊(duì)列提供了在指定時(shí)間才能獲取隊(duì)列元素的功能,隊(duì)列頭元素是最接近過期的元素。沒有過期元素的話,使用 poll() 方法會(huì)返回 null 值,超時(shí)判定是通過getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于 0 來判斷。延時(shí)隊(duì)列不能存放空元素。
/** * 電影票類,實(shí)現(xiàn)了Delayed接口,重寫 compareTo 和 getDelay方法 */ public class MovieTicket implements Delayed { ? ?//延遲時(shí)間 ? ?private final long delay; ? ?//到期時(shí)間 ? ?private final long expire; ? ?//數(shù)據(jù) ? ?private final String msg; ? ?//創(chuàng)建時(shí)間 ? ?private final long now; ? ?public long getDelay() { ? ? ? ?return delay; ? } ? ?public long getExpire() { ? ? ? ?return expire; ? } ? ?public String getMsg() { ? ? ? ?return msg; ? } ? ?public long getNow() { ? ? ? ?return now; ? } ? ?/** ? ? * @param msg 消息 ? ? * @param delay 延期時(shí)間 ? ? */ ? ?public MovieTicket(String msg , long delay) { ? ? ? ?this.delay = delay; ? ? ? ?this.msg = msg; ? ? ? ?expire = System.currentTimeMillis() + delay; ? ?//到期時(shí)間 = 當(dāng)前時(shí)間+延遲時(shí)間 ? ? ? ?now = System.currentTimeMillis(); ? } ? ?/** ? ? * @param msg ? ? */ ? ?public MovieTicket(String msg){ ? ? ? ?this(msg,1000); ? } ? ?public MovieTicket(){ ? ? ? ?this(null,1000); ? } ? ?/** ? ? * 獲得延遲時(shí)間 ? 用過期時(shí)間-當(dāng)前時(shí)間,時(shí)間單位毫秒 ? ? * @param unit ? ? * @return ? ? */ ? ?@Override ? ?public long getDelay(TimeUnit unit) { ? ? ? ?return unit.convert(this.expire ? ? ? ? ? ? ? ?- System.currentTimeMillis() , TimeUnit.MILLISECONDS); ? } ? ?/** ? ? * 用于延遲隊(duì)列內(nèi)部比較排序 當(dāng)前時(shí)間的延遲時(shí)間 - 比較對(duì)象的延遲時(shí)間 ? ? * 越早過期的時(shí)間在隊(duì)列中越靠前 ? ? * @param delayed ? ? * @return ? ? */ ? ?@Override ? ?public int compareTo(Delayed delayed) { ? ? ? ?return (int) (this.getDelay(TimeUnit.MILLISECONDS) ? ? ? ? ? ? ? ?- delayed.getDelay(TimeUnit.MILLISECONDS)); ? } }
測試類:
public static void main(String[] args) { ? ?DelayQueue<MovieTicket> delayQueue = new DelayQueue<MovieTicket>(); ? ?MovieTicket ticket = new MovieTicket("電影票1",10000); ? ?delayQueue.put(ticket); ? ?MovieTicket ticket1 = new MovieTicket("電影票2",5000); ? ?delayQueue.put(ticket1); ? ?MovieTicket ticket2 = new MovieTicket("電影票3",8000); ? ?delayQueue.put(ticket2); ? ?log.info("message:--->入隊(duì)完畢"); ? ?while( delayQueue.size() > 0 ){ ? ? ? ?try { ? ? ? ? ? ?ticket = delayQueue.take(); ? ? ? ? ? ?log.info("電影票出隊(duì):{}",ticket.getMsg()); ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ?e.printStackTrace(); ? ? ? } ? } }
從運(yùn)行結(jié)果可以看出隊(duì)列是延遲出隊(duì),間隔和我們所設(shè)置的時(shí)間相同
到此這篇關(guān)于Java阻塞隊(duì)列BlockingQueue詳解的文章就介紹到這了,更多相關(guān)Java BlockingQueue 內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 關(guān)于Java中阻塞隊(duì)列BlockingQueue的詳解
- Java阻塞隊(duì)列BlockingQueue基礎(chǔ)與使用
- Java阻塞隊(duì)列必看類:BlockingQueue快速了解大體框架和實(shí)現(xiàn)思路
- Java?阻塞隊(duì)列BlockingQueue詳解
- Java并發(fā)編程之阻塞隊(duì)列(BlockingQueue)詳解
- Java源碼解析阻塞隊(duì)列ArrayBlockingQueue介紹
- Java源碼解析阻塞隊(duì)列ArrayBlockingQueue常用方法
- Java源碼解析阻塞隊(duì)列ArrayBlockingQueue功能簡介
- 詳解Java阻塞隊(duì)列(BlockingQueue)的實(shí)現(xiàn)原理
- java 中 阻塞隊(duì)列BlockingQueue詳解及實(shí)例
- 一文簡介Java中BlockingQueue阻塞隊(duì)列
相關(guān)文章
Zuul 實(shí)現(xiàn)網(wǎng)關(guān)轉(zhuǎn)發(fā)的五種方式小結(jié)
這篇文章主要介紹了Zuul 實(shí)現(xiàn)網(wǎng)關(guān)轉(zhuǎn)發(fā)的五種方式小結(jié),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07Spring Boot console log 格式自定義方式
這篇文章主要介紹了Spring Boot console log 格式自定義方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07對(duì)Mybatis?Plus中@TableField的使用正解
這篇文章主要介紹了對(duì)Mybatis?Plus中@TableField的使用正解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01java.sql.SQLException問題解決以及注意事項(xiàng)
這篇文章主要給大家介紹了關(guān)于java.sql.SQLException問題解決以及注意事項(xiàng)的相關(guān)資料,這個(gè)問題其實(shí)很好解決,文中通過圖文將解決的辦法介紹的很詳細(xì),需要的朋友可以參考下2023-07-07使用Mybatis-plus清空表數(shù)據(jù)的操作方法
MyBatis 是一個(gè)基于 java 的持久層框架,它內(nèi)部封裝了 jdbc,極大提高了我們的開發(fā)效率,文中給大家介紹了MybatisPlus常用API-增刪改查功能,感興趣的朋友跟隨小編一起看看吧2022-11-11淺談@FeignClient中name和value屬性的區(qū)別
這篇文章主要介紹了@FeignClient中name和value屬性的區(qū)別,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07詳解Spring Data JPA動(dòng)態(tài)條件查詢的寫法
本篇文章主要介紹了Spring Data JPA動(dòng)態(tài)條件查詢的寫法 ,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-06-06