Java阻塞隊列BlockingQueue詳解
隊列的類型
- 無限隊列(unbounded queue) 無容量限定,只隨存儲變化
- 有限隊列(bounded queue) 定義了最大容量
向無限隊列添加元素的所有操作都將永遠不會阻塞(也是線程安全的),因此它可以增長到非常大的容量。 使用無限阻塞隊列 BlockingQueue 設(shè)計生產(chǎn)者 - 消費者模型時最重要的是消費者應(yīng)該能夠像生產(chǎn)者向隊列添加消息一樣快地消費消息 。否則可能內(nèi)存不足而拋出 OutOfMemory 異常。
數(shù)據(jù)結(jié)構(gòu)
- 1.通常使用鏈表或數(shù)組實現(xiàn)
- 2.一般具有 FIFO(先進先出) 特性,也可以設(shè)計為雙端隊列
- 3.隊列的主要操作:入隊和出隊
阻塞隊列 BlockingQueue
定義:線程通信中,在任意時刻,無論并發(fā)有多高,在單個 JVM 上,同一時間永遠只有一個線程能對隊列進行入隊或出隊操作。BlockingQueue 可以在線程之間共享而無需任何顯式同步
阻塞隊列的類型:
JAVA中的應(yīng)用場景 : 線程池、SpringCloud-Eureka 三級緩存、Nacos、MQ、Netty 等
常見的阻塞隊列
- ArrayBlockingQueue : 由數(shù)組支持的有界隊列
- 應(yīng)用場景: 線程池中有比較多的應(yīng)用、生產(chǎn)者消費者模型
- 工作原理: 基于 ReentrantLock 保證線程安全,根據(jù)Condition實現(xiàn)隊列滿時的阻塞
- LinkedBlockingQueue : 基于鏈表的無界隊列(理論上有界)
- PriorityBlockingQueue : 由優(yōu)先級堆支持的無界優(yōu)先級隊列
- DelayQueue : 由優(yōu)先級堆支持的、基于時間的調(diào)度隊列,內(nèi)部基于無界隊列PriorityQueue 實現(xiàn),而無界隊列基于數(shù)組的擴容實現(xiàn)
- 使用方法: 入隊的對象必須要實現(xiàn) Delayed 接口,而 Delayed 集成自 Comparable 接口
- 應(yīng)用場景: 售賣電影票等
- 工作原理: 隊列內(nèi)部會根據(jù)時間優(yōu)先級進行排序。延遲類線程池周期執(zhí)行。
它們都實現(xiàn)了BlockingQueue接口,都有put()和take()等方法,創(chuàng)建方式如下:
BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<> (666);
BlockingQueue API
添加元素:
方法 | 含義 |
---|---|
add() | 如果插入成功則返回 true,否則拋出 IllegalStateException 異常 |
put() | 將指定的元素插入隊列,如果隊列滿了,會阻塞直到有空間插入 |
offer() | 如果插入成功則返回 true,否則返回 false |
offer(E e, long timeout, TimeUnit unit) | 嘗試將元素插入隊列,如果隊列已滿,會阻塞直到有空間插入,阻塞有時間控制 |
檢索元素:
方法 | 含義 |
---|---|
take() | 獲取隊列的頭部元素并將其刪除,如果隊列為空,則阻塞并等待元素變?yōu)榭捎?/td> |
poll(long timeout, TimeUnit unit) | 檢索并刪除隊列的頭部,如有必要,等待指定的等待時間以使元素可用,如果超時,則返回 null |
ArrayBlockingQueue 源碼簡解
實現(xiàn):同步等待隊列(CLH)+ 條件等待隊列滿足條件的元素在CLH隊列中等待鎖,不滿足條件的隊列挪到條件等待隊列,滿足條件后再從 tail 插入 CLH 隊列
線程獲取鎖的條件: 在 CLH 隊列里等待的 Node 節(jié)點,并且 Node 節(jié)點的前驅(qū)節(jié)點是 Singal。條件等待隊列里的線程是無法獲取鎖的。
/** * 構(gòu)造方法 * 還有兩個構(gòu)造函數(shù),一個無fair參數(shù),一個可傳入集合,創(chuàng)建時插入隊列 * @param capacity 固定容量 * @param fair 默認是false:訪問順序未指定; true:按照FIFO順序處理 */ public ArrayBlockingQueue(int capacity, boolean fair) { ? if (capacity <= 0) ? ? ? ?throw new IllegalArgumentException(); ? ?this.items = new Object[capacity]; ? ?lock = new ReentrantLock(fair); // 根據(jù)fair創(chuàng)建對應(yīng)的鎖 ? ?// 條件對象,配合容器能滿足業(yè)務(wù) ? ?notEmpty = lock.newCondition(); // 出隊條件對象 ? ?notFull = ?lock.newCondition(); // 入隊條件對象 } /** * 入隊方法 * 在隊列的尾部插入指定的元素,如果隊列已滿,則等待空間可用 */ public void put(E e) throws InterruptedException { ? ?checkNotNull(e); // 檢查put對象是否為空,空拋出異常 ? ?final ReentrantLock lock = this.lock; ? ?lock.lockInterruptibly(); // 若未被中斷嘗試獲取鎖,詳見下文 ? ?try { ? // 隊列中元素的數(shù)量 等于 排隊元素的長度 ? ? ? ?while (count == items.length) ? ? ? ? ? ?notFull.await(); // 見下文 ? ? ? ?enqueue(e); // 元素入隊 ? } finally { ? ? ? ?lock.unlock(); ? } } /** * 出隊方法 * 獲取隊列的頭部元素并將其刪除,如果隊列為空,則阻塞并等待元素變?yōu)榭捎? */ public E take() throws InterruptedException { ? ?final ReentrantLock lock = this.lock; ? ?lock.lockInterruptibly(); // 見下文 ? ?try { ? ? ? ?while (count == 0) ? ? ? ? ? ?notEmpty.await(); // 見下文 ? ? ? ?return dequeue(); // 元素出隊 ? } finally { ? ? ? ?lock.unlock(); ? } }
令當(dāng)前線程等待,直到收到信號或被中斷詳:與此 Condition 關(guān)聯(lián)的鎖被自動釋放,進入等待,并且處于休眠狀態(tài),直到發(fā)生以下四種情況之一:
- ①其他線程調(diào)用這個Condition的 signal 方法,當(dāng)前線程恰好被選為要被喚醒的線程;
- ②其他線程調(diào)用這個條件的 signalAll 方法
- ③其他線程中斷當(dāng)前線程,支持中斷線程掛起;
- ④一個“虛假的喚醒”發(fā)生了。
在這些情況下,在此方法返回之前,當(dāng)前線程必須重新獲得與此條件相關(guān)聯(lián)的鎖。當(dāng)線程返回時,保證它持有這個鎖。
如果當(dāng)前線程有以下兩種情況之一:
- ①在進入該方法時設(shè)置中斷狀態(tài);
- ②在等待時被中斷,支持線程掛起的中斷 拋出InterruptedException
生產(chǎn)者消費者模式
BlockingQueue 可以在線程之間共享而無需任何顯式同步,在生產(chǎn)者消費者之間,只需要將阻塞隊列以參數(shù)的形式進行傳遞即可。它內(nèi)部的機制會自動保證線程的安全性。
生產(chǎn)者:實現(xiàn)了 Runnable 接口,每個生產(chǎn)者生產(chǎn)100種商品和1個中斷標記后完成線程任務(wù)
@Slf4j @Slf4j public class Producer implements Runnable{ ? ?// 作為參數(shù)的阻塞隊列 ? ?private BlockingQueue<Integer> blockingQueue; ? ?private final int stopTag; ? ?/** ? ? * 構(gòu)造方法 ? ? * @param blockingQueue ? ? * @param stopTag ? ? */ ? ?public Producer(BlockingQueue<Integer> blockingQueue,int stopTag) { ? ? ? ?this.blockingQueue = blockingQueue; ? ? ? ?this.stopTag = stopTag; ? } ? ?@Override ? ?public void run() { ? ? ? ?try { ? ? ? ? ? ?generateNumbers(); ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ?Thread.currentThread().interrupt(); ? ? ? } ? } ? ?private void generateNumbers() throws InterruptedException { ? ? ? ?// 每個生產(chǎn)者都隨機生產(chǎn)10種商品 ? ? ? ?for (int i = 0; i < 10; i++) { ? ? ? ? ? ?int product = ThreadLocalRandom.current().nextInt(1000,1100); ? ? ? ? ? ?log.info("生產(chǎn)者{}號,生產(chǎn)了商品,編號為{}",Thread.currentThread().getId(),product); ? ? ? ? ? ?blockingQueue.put(product); ? ? ? } ? ? ? ?// 生產(chǎn)終止標記 ? ? ? ?blockingQueue.put(stopTag); ? ? ? ?log.info("生產(chǎn)者{}號,生產(chǎn)了第終止標記編號{}",Thread.currentThread().getId(),Thread.currentThread().getId()); ? } }
消費者:消費者拿到終止消費標記終止消費,否則消費商品,拿到終止標記后完成線程任務(wù)
@Slf4j public class Consumer implements Runnable{ ? ?// 作為參數(shù)的阻塞隊列 ? ?private BlockingQueue<Integer> queue; ? ?private final int stopTage; ? ?public Consumer(BlockingQueue<Integer> queue, int stopTage) { ? ? ? ?this.queue = queue; ? ? ? ?this.stopTage = stopTage; ? } ? ?@Override ? ?public void run() { ? ? ? ?try { ? ? ? ? ? ?while (true) { ? ? ? ? ? ? ? ?Integer product = queue.take(); ? ? ? ? ? ? ? ?if (product.equals(stopTage)) { ? ? ? ? ? ? ? ? ? ?log.info("{}號消費者,停止消費,因為拿到了停止消費標記",Thread.currentThread().getId()); ? ? ? ? ? ? ? ? ? ?return; ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ?log.info("{}號消費者,拿到的商品編號:{}",Thread.currentThread().getId(),product); ? ? ? ? ? } ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ?Thread.currentThread().interrupt(); ? ? ? } ? } }
客戶端類: 創(chuàng)建與計算機 CPU 核數(shù)相同的線程數(shù),與 16個生產(chǎn)者
public class ProductConsumerTest { ? ?public static void main(String[] args) { ? ? ? ?// 阻塞隊列容量 ? ? ? ?int blockingQueueSize = 10; ? ? ? ?// 生產(chǎn)者數(shù)量 ? ? ? ?int producerSize = 16; ? ? ? ?// 消費者數(shù)量 = 計算機線程核數(shù) 8 ? ? ? ?int consumerSize = Runtime.getRuntime().availableProcessors(); ? ? ? ?// 終止消費標記 ? ? ? ?int stopTag = Integer.MAX_VALUE; ? ? ? ?BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(blockingQueueSize); ? ? ? ?// 創(chuàng)建16個生產(chǎn)者線程 ? ? ? ?for (int i = 0; i < producerSize; i++) { ? ? ? ? ? ?new Thread(new Producer(blockingQueue, stopTag)).start(); ? ? ? } ? ? ? ?// 創(chuàng)建8個消費者線程 ? ? ? ?for (int j = 0; j < consumerSize; j++) { ? ? ? ? ? ?new Thread(new Consumer(blockingQueue, stopTag)).start(); ? ? ? } ? } }
延遲隊列 DelayQueue
定義: Java 延遲隊列提供了在指定時間才能獲取隊列元素的功能,隊列頭元素是最接近過期的元素。沒有過期元素的話,使用 poll() 方法會返回 null 值,超時判定是通過getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于 0 來判斷。延時隊列不能存放空元素。
/** * 電影票類,實現(xiàn)了Delayed接口,重寫 compareTo 和 getDelay方法 */ public class MovieTicket implements Delayed { ? ?//延遲時間 ? ?private final long delay; ? ?//到期時間 ? ?private final long expire; ? ?//數(shù)據(jù) ? ?private final String msg; ? ?//創(chuàng)建時間 ? ?private final long now; ? ?public long getDelay() { ? ? ? ?return delay; ? } ? ?public long getExpire() { ? ? ? ?return expire; ? } ? ?public String getMsg() { ? ? ? ?return msg; ? } ? ?public long getNow() { ? ? ? ?return now; ? } ? ?/** ? ? * @param msg 消息 ? ? * @param delay 延期時間 ? ? */ ? ?public MovieTicket(String msg , long delay) { ? ? ? ?this.delay = delay; ? ? ? ?this.msg = msg; ? ? ? ?expire = System.currentTimeMillis() + delay; ? ?//到期時間 = 當(dāng)前時間+延遲時間 ? ? ? ?now = System.currentTimeMillis(); ? } ? ?/** ? ? * @param msg ? ? */ ? ?public MovieTicket(String msg){ ? ? ? ?this(msg,1000); ? } ? ?public MovieTicket(){ ? ? ? ?this(null,1000); ? } ? ?/** ? ? * 獲得延遲時間 ? 用過期時間-當(dāng)前時間,時間單位毫秒 ? ? * @param unit ? ? * @return ? ? */ ? ?@Override ? ?public long getDelay(TimeUnit unit) { ? ? ? ?return unit.convert(this.expire ? ? ? ? ? ? ? ?- System.currentTimeMillis() , TimeUnit.MILLISECONDS); ? } ? ?/** ? ? * 用于延遲隊列內(nèi)部比較排序 當(dāng)前時間的延遲時間 - 比較對象的延遲時間 ? ? * 越早過期的時間在隊列中越靠前 ? ? * @param delayed ? ? * @return ? ? */ ? ?@Override ? ?public int compareTo(Delayed delayed) { ? ? ? ?return (int) (this.getDelay(TimeUnit.MILLISECONDS) ? ? ? ? ? ? ? ?- delayed.getDelay(TimeUnit.MILLISECONDS)); ? } }
測試類:
public static void main(String[] args) { ? ?DelayQueue<MovieTicket> delayQueue = new DelayQueue<MovieTicket>(); ? ?MovieTicket ticket = new MovieTicket("電影票1",10000); ? ?delayQueue.put(ticket); ? ?MovieTicket ticket1 = new MovieTicket("電影票2",5000); ? ?delayQueue.put(ticket1); ? ?MovieTicket ticket2 = new MovieTicket("電影票3",8000); ? ?delayQueue.put(ticket2); ? ?log.info("message:--->入隊完畢"); ? ?while( delayQueue.size() > 0 ){ ? ? ? ?try { ? ? ? ? ? ?ticket = delayQueue.take(); ? ? ? ? ? ?log.info("電影票出隊:{}",ticket.getMsg()); ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ?e.printStackTrace(); ? ? ? } ? } }
從運行結(jié)果可以看出隊列是延遲出隊,間隔和我們所設(shè)置的時間相同
到此這篇關(guān)于Java阻塞隊列BlockingQueue詳解的文章就介紹到這了,更多相關(guān)Java BlockingQueue 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- 關(guān)于Java中阻塞隊列BlockingQueue的詳解
- Java阻塞隊列BlockingQueue基礎(chǔ)與使用
- Java阻塞隊列必看類:BlockingQueue快速了解大體框架和實現(xiàn)思路
- Java?阻塞隊列BlockingQueue詳解
- Java并發(fā)編程之阻塞隊列(BlockingQueue)詳解
- Java源碼解析阻塞隊列ArrayBlockingQueue介紹
- Java源碼解析阻塞隊列ArrayBlockingQueue常用方法
- Java源碼解析阻塞隊列ArrayBlockingQueue功能簡介
- 詳解Java阻塞隊列(BlockingQueue)的實現(xiàn)原理
- java 中 阻塞隊列BlockingQueue詳解及實例
- 一文簡介Java中BlockingQueue阻塞隊列
相關(guān)文章
Zuul 實現(xiàn)網(wǎng)關(guān)轉(zhuǎn)發(fā)的五種方式小結(jié)
這篇文章主要介紹了Zuul 實現(xiàn)網(wǎng)關(guān)轉(zhuǎn)發(fā)的五種方式小結(jié),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07Spring Boot console log 格式自定義方式
這篇文章主要介紹了Spring Boot console log 格式自定義方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07對Mybatis?Plus中@TableField的使用正解
這篇文章主要介紹了對Mybatis?Plus中@TableField的使用正解,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01java.sql.SQLException問題解決以及注意事項
這篇文章主要給大家介紹了關(guān)于java.sql.SQLException問題解決以及注意事項的相關(guān)資料,這個問題其實很好解決,文中通過圖文將解決的辦法介紹的很詳細,需要的朋友可以參考下2023-07-07使用Mybatis-plus清空表數(shù)據(jù)的操作方法
MyBatis 是一個基于 java 的持久層框架,它內(nèi)部封裝了 jdbc,極大提高了我們的開發(fā)效率,文中給大家介紹了MybatisPlus常用API-增刪改查功能,感興趣的朋友跟隨小編一起看看吧2022-11-11淺談@FeignClient中name和value屬性的區(qū)別
這篇文章主要介紹了@FeignClient中name和value屬性的區(qū)別,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-07-07詳解Spring Data JPA動態(tài)條件查詢的寫法
本篇文章主要介紹了Spring Data JPA動態(tài)條件查詢的寫法 ,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-06-06