欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java阻塞隊列BlockingQueue詳解

 更新時間:2022年07月29日 15:39:58   作者:Hz488  
這篇文章主要介紹了Java阻塞隊列BlockingQueue,文章通過隊列的類型展開詳細的內(nèi)容介紹,具有一定的參考價值,需要的朋友可以參考一下

隊列的類型

  • 無限隊列(unbounded queue) 無容量限定,只隨存儲變化
  • 有限隊列(bounded queue) 定義了最大容量

向無限隊列添加元素的所有操作都將永遠不會阻塞(也是線程安全的),因此它可以增長到非常大的容量。 使用無限阻塞隊列 BlockingQueue 設(shè)計生產(chǎn)者 - 消費者模型時最重要的是消費者應(yīng)該能夠像生產(chǎn)者向隊列添加消息一樣快地消費消息 。否則可能內(nèi)存不足而拋出 OutOfMemory 異常。

數(shù)據(jù)結(jié)構(gòu)

  • 1.通常使用鏈表或數(shù)組實現(xiàn)
  • 2.一般具有 FIFO(先進先出) 特性,也可以設(shè)計為雙端隊列
  • 3.隊列的主要操作:入隊和出隊

阻塞隊列 BlockingQueue

定義:線程通信中,在任意時刻,無論并發(fā)有多高,在單個 JVM 上,同一時間永遠只有一個線程能對隊列進行入隊或出隊操作。BlockingQueue 可以在線程之間共享而無需任何顯式同步

阻塞隊列的類型:  

JAVA中的應(yīng)用場景 : 線程池、SpringCloud-Eureka 三級緩存、Nacos、MQ、Netty 等

常見的阻塞隊列

  • ArrayBlockingQueue : 由數(shù)組支持的有界隊列
    • 應(yīng)用場景: 線程池中有比較多的應(yīng)用、生產(chǎn)者消費者模型
    • 工作原理: 基于 ReentrantLock 保證線程安全,根據(jù)Condition實現(xiàn)隊列滿時的阻塞
  • LinkedBlockingQueue : 基于鏈表的無界隊列(理論上有界)
  • PriorityBlockingQueue : 由優(yōu)先級堆支持的無界優(yōu)先級隊列
  • DelayQueue : 由優(yōu)先級堆支持的、基于時間的調(diào)度隊列,內(nèi)部基于無界隊列PriorityQueue 實現(xiàn),而無界隊列基于數(shù)組的擴容實現(xiàn)
    • 使用方法: 入隊的對象必須要實現(xiàn) Delayed 接口,而 Delayed 集成自 Comparable 接口
    • 應(yīng)用場景: 售賣電影票等
    • 工作原理: 隊列內(nèi)部會根據(jù)時間優(yōu)先級進行排序。延遲類線程池周期執(zhí)行。

它們都實現(xiàn)了BlockingQueue接口,都有put()和take()等方法,創(chuàng)建方式如下:

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<> (666);

BlockingQueue API

添加元素:

方法含義
add()如果插入成功則返回 true,否則拋出 IllegalStateException 異常
put()將指定的元素插入隊列,如果隊列滿了,會阻塞直到有空間插入
offer()如果插入成功則返回 true,否則返回 false
offer(E e, long timeout, TimeUnit unit)嘗試將元素插入隊列,如果隊列已滿,會阻塞直到有空間插入,阻塞有時間控制

檢索元素:

方法含義
take()獲取隊列的頭部元素并將其刪除,如果隊列為空,則阻塞并等待元素變?yōu)榭捎?/td>
poll(long timeout, TimeUnit unit)檢索并刪除隊列的頭部,如有必要,等待指定的等待時間以使元素可用,如果超時,則返回 null

ArrayBlockingQueue 源碼簡解

實現(xiàn):同步等待隊列(CLH)+ 條件等待隊列滿足條件的元素在CLH隊列中等待鎖,不滿足條件的隊列挪到條件等待隊列,滿足條件后再從 tail 插入 CLH 隊列

線程獲取鎖的條件: 在 CLH 隊列里等待的 Node 節(jié)點,并且 Node 節(jié)點的前驅(qū)節(jié)點是 Singal。條件等待隊列里的線程是無法獲取鎖的。

/**
 * 構(gòu)造方法
 * 還有兩個構(gòu)造函數(shù),一個無fair參數(shù),一個可傳入集合,創(chuàng)建時插入隊列
 * @param capacity 固定容量
 * @param fair 默認是false:訪問順序未指定; true:按照FIFO順序處理
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
 ? if (capacity <= 0)
 ? ? ? ?throw new IllegalArgumentException();
 ? ?this.items = new Object[capacity];
 ? ?lock = new ReentrantLock(fair); // 根據(jù)fair創(chuàng)建對應(yīng)的鎖
 ? ?// 條件對象,配合容器能滿足業(yè)務(wù)
 ? ?notEmpty = lock.newCondition(); // 出隊條件對象
 ? ?notFull = ?lock.newCondition(); // 入隊條件對象
}
/**
 * 入隊方法
 * 在隊列的尾部插入指定的元素,如果隊列已滿,則等待空間可用
 */
public void put(E e) throws InterruptedException {
 ? ?checkNotNull(e); // 檢查put對象是否為空,空拋出異常
 ? ?final ReentrantLock lock = this.lock;
 ? ?lock.lockInterruptibly(); // 若未被中斷嘗試獲取鎖,詳見下文
 ? ?try {
 ?      // 隊列中元素的數(shù)量 等于 排隊元素的長度
 ? ? ? ?while (count == items.length)
 ? ? ? ? ? ?notFull.await(); // 見下文
 ? ? ? ?enqueue(e); // 元素入隊
 ?  } finally {
 ? ? ? ?lock.unlock();
 ?  }
}
/**
 * 出隊方法
 * 獲取隊列的頭部元素并將其刪除,如果隊列為空,則阻塞并等待元素變?yōu)榭捎?
 */
public E take() throws InterruptedException {
 ? ?final ReentrantLock lock = this.lock;
 ? ?lock.lockInterruptibly(); // 見下文
 ? ?try {
 ? ? ? ?while (count == 0)
 ? ? ? ? ? ?notEmpty.await(); // 見下文
 ? ? ? ?return dequeue(); // 元素出隊
 ?  } finally {
 ? ? ? ?lock.unlock();
 ?  }
}

令當(dāng)前線程等待,直到收到信號或被中斷詳:與此 Condition 關(guān)聯(lián)的鎖被自動釋放,進入等待,并且處于休眠狀態(tài),直到發(fā)生以下四種情況之一:

  • ①其他線程調(diào)用這個Condition的 signal 方法,當(dāng)前線程恰好被選為要被喚醒的線程;
  • ②其他線程調(diào)用這個條件的 signalAll 方法
  • ③其他線程中斷當(dāng)前線程,支持中斷線程掛起;
  • ④一個“虛假的喚醒”發(fā)生了。

在這些情況下,在此方法返回之前,當(dāng)前線程必須重新獲得與此條件相關(guān)聯(lián)的鎖。當(dāng)線程返回時,保證它持有這個鎖。

如果當(dāng)前線程有以下兩種情況之一:

  • ①在進入該方法時設(shè)置中斷狀態(tài);
  • ②在等待時被中斷,支持線程掛起的中斷 拋出InterruptedException

生產(chǎn)者消費者模式

BlockingQueue 可以在線程之間共享而無需任何顯式同步,在生產(chǎn)者消費者之間,只需要將阻塞隊列以參數(shù)的形式進行傳遞即可。它內(nèi)部的機制會自動保證線程的安全性。

生產(chǎn)者:實現(xiàn)了 Runnable 接口,每個生產(chǎn)者生產(chǎn)100種商品和1個中斷標記后完成線程任務(wù)

@Slf4j
@Slf4j
public class Producer implements Runnable{
 ? ?// 作為參數(shù)的阻塞隊列
 ? ?private BlockingQueue<Integer> blockingQueue;
 ? ?private final int stopTag;
 ? ?/**
 ? ? * 構(gòu)造方法
 ? ? * @param blockingQueue
 ? ? * @param stopTag
 ? ? */
 ? ?public Producer(BlockingQueue<Integer> blockingQueue,int stopTag) {
 ? ? ? ?this.blockingQueue = blockingQueue;
 ? ? ? ?this.stopTag = stopTag;
 ?  }
 ? ?@Override
 ? ?public void run() {
 ? ? ? ?try {
 ? ? ? ? ? ?generateNumbers();
 ? ? ?  } catch (InterruptedException e) {
 ? ? ? ? ? ?Thread.currentThread().interrupt();
 ? ? ?  }
 ?  }
? ?private void generateNumbers() throws InterruptedException {
 ? ? ? ?// 每個生產(chǎn)者都隨機生產(chǎn)10種商品
 ? ? ? ?for (int i = 0; i < 10; i++) {
 ? ? ? ? ? ?int product = ThreadLocalRandom.current().nextInt(1000,1100);
 ? ? ? ? ? ?log.info("生產(chǎn)者{}號,生產(chǎn)了商品,編號為{}",Thread.currentThread().getId(),product);
 ? ? ? ? ? ?blockingQueue.put(product);
 ? ? ?  }
 ? ? ? ?// 生產(chǎn)終止標記
 ? ? ? ?blockingQueue.put(stopTag);
 ? ? ? ?log.info("生產(chǎn)者{}號,生產(chǎn)了第終止標記編號{}",Thread.currentThread().getId(),Thread.currentThread().getId());
 ?  }
}

消費者:消費者拿到終止消費標記終止消費,否則消費商品,拿到終止標記后完成線程任務(wù)

@Slf4j
public class Consumer implements Runnable{
 ? ?// 作為參數(shù)的阻塞隊列
 ? ?private BlockingQueue<Integer> queue;
 ? ?private final int stopTage;
 ? ?public Consumer(BlockingQueue<Integer> queue, int stopTage) {
 ? ? ? ?this.queue = queue;
 ? ? ? ?this.stopTage = stopTage;
 ?  }
 ? ?@Override
 ? ?public void run() {
 ? ? ? ?try {
 ? ? ? ? ? ?while (true) {
 ? ? ? ? ? ? ? ?Integer product = queue.take();
 ? ? ? ? ? ? ? ?if (product.equals(stopTage)) {
 ? ? ? ? ? ? ? ? ? ?log.info("{}號消費者,停止消費,因為拿到了停止消費標記",Thread.currentThread().getId());
 ? ? ? ? ? ? ? ? ? ?return;
 ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ? ?log.info("{}號消費者,拿到的商品編號:{}",Thread.currentThread().getId(),product);
 ? ? ? ? ?  }
 ? ? ?  } catch (InterruptedException e) {
 ? ? ? ? ? ?Thread.currentThread().interrupt();
 ? ? ?  }
 ?  }
}

客戶端類: 創(chuàng)建與計算機 CPU 核數(shù)相同的線程數(shù),與 16個生產(chǎn)者

public class ProductConsumerTest {
 ? ?public static void main(String[] args) {
 ? ? ? ?// 阻塞隊列容量
 ? ? ? ?int blockingQueueSize = 10;
 ? ? ? ?// 生產(chǎn)者數(shù)量
 ? ? ? ?int producerSize = 16;
 ? ? ? ?// 消費者數(shù)量 = 計算機線程核數(shù) 8
 ? ? ? ?int consumerSize = Runtime.getRuntime().availableProcessors();
 ? ? ? ?// 終止消費標記
 ? ? ? ?int stopTag = Integer.MAX_VALUE;
 ? ? ? ?BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(blockingQueueSize);
 ? ? ? ?// 創(chuàng)建16個生產(chǎn)者線程
 ? ? ? ?for (int i = 0; i < producerSize; i++) {
 ? ? ? ? ? ?new Thread(new Producer(blockingQueue, stopTag)).start();
 ? ? ?  }
 ? ? ? ?// 創(chuàng)建8個消費者線程
 ? ? ? ?for (int j = 0; j < consumerSize; j++) {
 ? ? ? ? ? ?new Thread(new Consumer(blockingQueue, stopTag)).start();
 ? ? ?  }
 ?  }
}

延遲隊列 DelayQueue

定義: Java 延遲隊列提供了在指定時間才能獲取隊列元素的功能,隊列頭元素是最接近過期的元素。沒有過期元素的話,使用 poll() 方法會返回 null 值,超時判定是通過getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于 0 來判斷。延時隊列不能存放空元素。

/**
 * 電影票類,實現(xiàn)了Delayed接口,重寫 compareTo 和 getDelay方法
 */
public class MovieTicket implements Delayed {
 ? ?//延遲時間
 ? ?private final long delay;
 ? ?//到期時間
 ? ?private final long expire;
 ? ?//數(shù)據(jù)
 ? ?private final String msg;
 ? ?//創(chuàng)建時間
 ? ?private final long now;
 ? ?public long getDelay() {
 ? ? ? ?return delay;
 ?  }
 ? ?public long getExpire() {
 ? ? ? ?return expire;
 ?  }
 ? ?public String getMsg() {
 ? ? ? ?return msg;
 ?  }
 ? ?public long getNow() {
 ? ? ? ?return now;
 ?  }
 ? ?/**
 ? ? * @param msg 消息
 ? ? * @param delay 延期時間
 ? ? */
 ? ?public MovieTicket(String msg , long delay) {
 ? ? ? ?this.delay = delay;
 ? ? ? ?this.msg = msg;
 ? ? ? ?expire = System.currentTimeMillis() + delay; ? ?//到期時間 = 當(dāng)前時間+延遲時間
 ? ? ? ?now = System.currentTimeMillis();
 ?  }
 ? ?/**
 ? ? * @param msg
 ? ? */
 ? ?public MovieTicket(String msg){
 ? ? ? ?this(msg,1000);
 ?  }
 ? ?public MovieTicket(){
 ? ? ? ?this(null,1000);
 ?  }
 ? ?/**
 ? ? * 獲得延遲時間 ? 用過期時間-當(dāng)前時間,時間單位毫秒
 ? ? * @param unit
 ? ? * @return
 ? ? */
 ? ?@Override
 ? ?public long getDelay(TimeUnit unit) {
 ? ? ? ?return unit.convert(this.expire
 ? ? ? ? ? ? ? ?- System.currentTimeMillis() , TimeUnit.MILLISECONDS);
 ?  }
 ? ?/**
 ? ? * 用于延遲隊列內(nèi)部比較排序  當(dāng)前時間的延遲時間 - 比較對象的延遲時間
 ? ? * 越早過期的時間在隊列中越靠前
 ? ? * @param delayed
 ? ? * @return
 ? ? */
 ? ?@Override
 ? ?public int compareTo(Delayed delayed) {
 ? ? ? ?return (int) (this.getDelay(TimeUnit.MILLISECONDS)
 ? ? ? ? ? ? ? ?- delayed.getDelay(TimeUnit.MILLISECONDS));
 ?  }
}

測試類:

public static void main(String[] args) {
 ? ?DelayQueue<MovieTicket> delayQueue = new DelayQueue<MovieTicket>();
 ? ?MovieTicket ticket = new MovieTicket("電影票1",10000);
 ? ?delayQueue.put(ticket);
 ? ?MovieTicket ticket1 = new MovieTicket("電影票2",5000);
 ? ?delayQueue.put(ticket1);
 ? ?MovieTicket ticket2 = new MovieTicket("電影票3",8000);
 ? ?delayQueue.put(ticket2);
 ? ?log.info("message:--->入隊完畢");
 ? ?while( delayQueue.size() > 0 ){
 ? ? ? ?try {
 ? ? ? ? ? ?ticket = delayQueue.take();
 ? ? ? ? ? ?log.info("電影票出隊:{}",ticket.getMsg());
 ? ? ?  } catch (InterruptedException e) {
 ? ? ? ? ? ?e.printStackTrace();
 ? ? ?  }
 ?  }
}

從運行結(jié)果可以看出隊列是延遲出隊,間隔和我們所設(shè)置的時間相同

到此這篇關(guān)于Java阻塞隊列BlockingQueue詳解的文章就介紹到這了,更多相關(guān)Java BlockingQueue 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Zuul 實現(xiàn)網(wǎng)關(guān)轉(zhuǎn)發(fā)的五種方式小結(jié)

    Zuul 實現(xiàn)網(wǎng)關(guān)轉(zhuǎn)發(fā)的五種方式小結(jié)

    這篇文章主要介紹了Zuul 實現(xiàn)網(wǎng)關(guān)轉(zhuǎn)發(fā)的五種方式小結(jié),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • Spring Boot console log 格式自定義方式

    Spring Boot console log 格式自定義方式

    這篇文章主要介紹了Spring Boot console log 格式自定義方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • 常見的java面試題

    常見的java面試題

    這篇文章主要為大家詳細介紹了常見的java面試題,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2016-11-11
  • 對Mybatis?Plus中@TableField的使用正解

    對Mybatis?Plus中@TableField的使用正解

    這篇文章主要介紹了對Mybatis?Plus中@TableField的使用正解,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • java.sql.SQLException問題解決以及注意事項

    java.sql.SQLException問題解決以及注意事項

    這篇文章主要給大家介紹了關(guān)于java.sql.SQLException問題解決以及注意事項的相關(guān)資料,這個問題其實很好解決,文中通過圖文將解決的辦法介紹的很詳細,需要的朋友可以參考下
    2023-07-07
  • logback過濾部分日志輸出的操作

    logback過濾部分日志輸出的操作

    這篇文章主要介紹了logback過濾部分日志輸出的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • 使用Mybatis-plus清空表數(shù)據(jù)的操作方法

    使用Mybatis-plus清空表數(shù)據(jù)的操作方法

    MyBatis 是一個基于 java 的持久層框架,它內(nèi)部封裝了 jdbc,極大提高了我們的開發(fā)效率,文中給大家介紹了MybatisPlus常用API-增刪改查功能,感興趣的朋友跟隨小編一起看看吧
    2022-11-11
  • Spring Mybatis 分頁插件使用教程

    Spring Mybatis 分頁插件使用教程

    這篇文章主要介紹了Spring Mybatis分頁插件使用教程,非常不錯,具有參考借鑒價值,需要的朋友可以參考下
    2018-02-02
  • 淺談@FeignClient中name和value屬性的區(qū)別

    淺談@FeignClient中name和value屬性的區(qū)別

    這篇文章主要介紹了@FeignClient中name和value屬性的區(qū)別,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-07-07
  • 詳解Spring Data JPA動態(tài)條件查詢的寫法

    詳解Spring Data JPA動態(tài)條件查詢的寫法

    本篇文章主要介紹了Spring Data JPA動態(tài)條件查詢的寫法 ,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-06-06

最新評論