欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java阻塞隊(duì)列BlockingQueue詳解

 更新時(shí)間:2022年07月29日 15:39:58   作者:Hz488  
這篇文章主要介紹了Java阻塞隊(duì)列BlockingQueue,文章通過隊(duì)列的類型展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的朋友可以參考一下

隊(duì)列的類型

  • 無限隊(duì)列(unbounded queue) 無容量限定,只隨存儲(chǔ)變化
  • 有限隊(duì)列(bounded queue) 定義了最大容量

向無限隊(duì)列添加元素的所有操作都將永遠(yuǎn)不會(huì)阻塞(也是線程安全的),因此它可以增長到非常大的容量。 使用無限阻塞隊(duì)列 BlockingQueue 設(shè)計(jì)生產(chǎn)者 - 消費(fèi)者模型時(shí)最重要的是消費(fèi)者應(yīng)該能夠像生產(chǎn)者向隊(duì)列添加消息一樣快地消費(fèi)消息 。否則可能內(nèi)存不足而拋出 OutOfMemory 異常。

數(shù)據(jù)結(jié)構(gòu)

  • 1.通常使用鏈表或數(shù)組實(shí)現(xiàn)
  • 2.一般具有 FIFO(先進(jìn)先出) 特性,也可以設(shè)計(jì)為雙端隊(duì)列
  • 3.隊(duì)列的主要操作:入隊(duì)和出隊(duì)

阻塞隊(duì)列 BlockingQueue

定義:線程通信中,在任意時(shí)刻,無論并發(fā)有多高,在單個(gè) JVM 上,同一時(shí)間永遠(yuǎn)只有一個(gè)線程能對(duì)隊(duì)列進(jìn)行入隊(duì)或出隊(duì)操作。BlockingQueue 可以在線程之間共享而無需任何顯式同步

阻塞隊(duì)列的類型:  

JAVA中的應(yīng)用場景 : 線程池、SpringCloud-Eureka 三級(jí)緩存、Nacos、MQ、Netty 等

常見的阻塞隊(duì)列

  • ArrayBlockingQueue : 由數(shù)組支持的有界隊(duì)列
    • 應(yīng)用場景: 線程池中有比較多的應(yīng)用、生產(chǎn)者消費(fèi)者模型
    • 工作原理: 基于 ReentrantLock 保證線程安全,根據(jù)Condition實(shí)現(xiàn)隊(duì)列滿時(shí)的阻塞
  • LinkedBlockingQueue : 基于鏈表的無界隊(duì)列(理論上有界)
  • PriorityBlockingQueue : 由優(yōu)先級(jí)堆支持的無界優(yōu)先級(jí)隊(duì)列
  • DelayQueue : 由優(yōu)先級(jí)堆支持的、基于時(shí)間的調(diào)度隊(duì)列,內(nèi)部基于無界隊(duì)列PriorityQueue 實(shí)現(xiàn),而無界隊(duì)列基于數(shù)組的擴(kuò)容實(shí)現(xiàn)
    • 使用方法: 入隊(duì)的對(duì)象必須要實(shí)現(xiàn) Delayed 接口,而 Delayed 集成自 Comparable 接口
    • 應(yīng)用場景: 售賣電影票等
    • 工作原理: 隊(duì)列內(nèi)部會(huì)根據(jù)時(shí)間優(yōu)先級(jí)進(jìn)行排序。延遲類線程池周期執(zhí)行。

它們都實(shí)現(xiàn)了BlockingQueue接口,都有put()和take()等方法,創(chuàng)建方式如下:

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<> (666);

BlockingQueue API

添加元素:

方法含義
add()如果插入成功則返回 true,否則拋出 IllegalStateException 異常
put()將指定的元素插入隊(duì)列,如果隊(duì)列滿了,會(huì)阻塞直到有空間插入
offer()如果插入成功則返回 true,否則返回 false
offer(E e, long timeout, TimeUnit unit)嘗試將元素插入隊(duì)列,如果隊(duì)列已滿,會(huì)阻塞直到有空間插入,阻塞有時(shí)間控制

檢索元素:

方法含義
take()獲取隊(duì)列的頭部元素并將其刪除,如果隊(duì)列為空,則阻塞并等待元素變?yōu)榭捎?/td>
poll(long timeout, TimeUnit unit)檢索并刪除隊(duì)列的頭部,如有必要,等待指定的等待時(shí)間以使元素可用,如果超時(shí),則返回 null

ArrayBlockingQueue 源碼簡解

實(shí)現(xiàn):同步等待隊(duì)列(CLH)+ 條件等待隊(duì)列滿足條件的元素在CLH隊(duì)列中等待鎖,不滿足條件的隊(duì)列挪到條件等待隊(duì)列,滿足條件后再從 tail 插入 CLH 隊(duì)列

線程獲取鎖的條件: 在 CLH 隊(duì)列里等待的 Node 節(jié)點(diǎn),并且 Node 節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是 Singal。條件等待隊(duì)列里的線程是無法獲取鎖的。

/**
 * 構(gòu)造方法
 * 還有兩個(gè)構(gòu)造函數(shù),一個(gè)無fair參數(shù),一個(gè)可傳入集合,創(chuàng)建時(shí)插入隊(duì)列
 * @param capacity 固定容量
 * @param fair 默認(rèn)是false:訪問順序未指定; true:按照FIFO順序處理
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
 ? if (capacity <= 0)
 ? ? ? ?throw new IllegalArgumentException();
 ? ?this.items = new Object[capacity];
 ? ?lock = new ReentrantLock(fair); // 根據(jù)fair創(chuàng)建對(duì)應(yīng)的鎖
 ? ?// 條件對(duì)象,配合容器能滿足業(yè)務(wù)
 ? ?notEmpty = lock.newCondition(); // 出隊(duì)條件對(duì)象
 ? ?notFull = ?lock.newCondition(); // 入隊(duì)條件對(duì)象
}
/**
 * 入隊(duì)方法
 * 在隊(duì)列的尾部插入指定的元素,如果隊(duì)列已滿,則等待空間可用
 */
public void put(E e) throws InterruptedException {
 ? ?checkNotNull(e); // 檢查put對(duì)象是否為空,空拋出異常
 ? ?final ReentrantLock lock = this.lock;
 ? ?lock.lockInterruptibly(); // 若未被中斷嘗試獲取鎖,詳見下文
 ? ?try {
 ?      // 隊(duì)列中元素的數(shù)量 等于 排隊(duì)元素的長度
 ? ? ? ?while (count == items.length)
 ? ? ? ? ? ?notFull.await(); // 見下文
 ? ? ? ?enqueue(e); // 元素入隊(duì)
 ?  } finally {
 ? ? ? ?lock.unlock();
 ?  }
}
/**
 * 出隊(duì)方法
 * 獲取隊(duì)列的頭部元素并將其刪除,如果隊(duì)列為空,則阻塞并等待元素變?yōu)榭捎?
 */
public E take() throws InterruptedException {
 ? ?final ReentrantLock lock = this.lock;
 ? ?lock.lockInterruptibly(); // 見下文
 ? ?try {
 ? ? ? ?while (count == 0)
 ? ? ? ? ? ?notEmpty.await(); // 見下文
 ? ? ? ?return dequeue(); // 元素出隊(duì)
 ?  } finally {
 ? ? ? ?lock.unlock();
 ?  }
}

令當(dāng)前線程等待,直到收到信號(hào)或被中斷詳:與此 Condition 關(guān)聯(lián)的鎖被自動(dòng)釋放,進(jìn)入等待,并且處于休眠狀態(tài),直到發(fā)生以下四種情況之一:

  • ①其他線程調(diào)用這個(gè)Condition的 signal 方法,當(dāng)前線程恰好被選為要被喚醒的線程;
  • ②其他線程調(diào)用這個(gè)條件的 signalAll 方法
  • ③其他線程中斷當(dāng)前線程,支持中斷線程掛起;
  • ④一個(gè)“虛假的喚醒”發(fā)生了。

在這些情況下,在此方法返回之前,當(dāng)前線程必須重新獲得與此條件相關(guān)聯(lián)的鎖。當(dāng)線程返回時(shí),保證它持有這個(gè)鎖。

如果當(dāng)前線程有以下兩種情況之一:

  • ①在進(jìn)入該方法時(shí)設(shè)置中斷狀態(tài);
  • ②在等待時(shí)被中斷,支持線程掛起的中斷 拋出InterruptedException

生產(chǎn)者消費(fèi)者模式

BlockingQueue 可以在線程之間共享而無需任何顯式同步,在生產(chǎn)者消費(fèi)者之間,只需要將阻塞隊(duì)列以參數(shù)的形式進(jìn)行傳遞即可。它內(nèi)部的機(jī)制會(huì)自動(dòng)保證線程的安全性。

生產(chǎn)者:實(shí)現(xiàn)了 Runnable 接口,每個(gè)生產(chǎn)者生產(chǎn)100種商品和1個(gè)中斷標(biāo)記后完成線程任務(wù)

@Slf4j
@Slf4j
public class Producer implements Runnable{
 ? ?// 作為參數(shù)的阻塞隊(duì)列
 ? ?private BlockingQueue<Integer> blockingQueue;
 ? ?private final int stopTag;
 ? ?/**
 ? ? * 構(gòu)造方法
 ? ? * @param blockingQueue
 ? ? * @param stopTag
 ? ? */
 ? ?public Producer(BlockingQueue<Integer> blockingQueue,int stopTag) {
 ? ? ? ?this.blockingQueue = blockingQueue;
 ? ? ? ?this.stopTag = stopTag;
 ?  }
 ? ?@Override
 ? ?public void run() {
 ? ? ? ?try {
 ? ? ? ? ? ?generateNumbers();
 ? ? ?  } catch (InterruptedException e) {
 ? ? ? ? ? ?Thread.currentThread().interrupt();
 ? ? ?  }
 ?  }
? ?private void generateNumbers() throws InterruptedException {
 ? ? ? ?// 每個(gè)生產(chǎn)者都隨機(jī)生產(chǎn)10種商品
 ? ? ? ?for (int i = 0; i < 10; i++) {
 ? ? ? ? ? ?int product = ThreadLocalRandom.current().nextInt(1000,1100);
 ? ? ? ? ? ?log.info("生產(chǎn)者{}號(hào),生產(chǎn)了商品,編號(hào)為{}",Thread.currentThread().getId(),product);
 ? ? ? ? ? ?blockingQueue.put(product);
 ? ? ?  }
 ? ? ? ?// 生產(chǎn)終止標(biāo)記
 ? ? ? ?blockingQueue.put(stopTag);
 ? ? ? ?log.info("生產(chǎn)者{}號(hào),生產(chǎn)了第終止標(biāo)記編號(hào){}",Thread.currentThread().getId(),Thread.currentThread().getId());
 ?  }
}

消費(fèi)者:消費(fèi)者拿到終止消費(fèi)標(biāo)記終止消費(fèi),否則消費(fèi)商品,拿到終止標(biāo)記后完成線程任務(wù)

@Slf4j
public class Consumer implements Runnable{
 ? ?// 作為參數(shù)的阻塞隊(duì)列
 ? ?private BlockingQueue<Integer> queue;
 ? ?private final int stopTage;
 ? ?public Consumer(BlockingQueue<Integer> queue, int stopTage) {
 ? ? ? ?this.queue = queue;
 ? ? ? ?this.stopTage = stopTage;
 ?  }
 ? ?@Override
 ? ?public void run() {
 ? ? ? ?try {
 ? ? ? ? ? ?while (true) {
 ? ? ? ? ? ? ? ?Integer product = queue.take();
 ? ? ? ? ? ? ? ?if (product.equals(stopTage)) {
 ? ? ? ? ? ? ? ? ? ?log.info("{}號(hào)消費(fèi)者,停止消費(fèi),因?yàn)槟玫搅送V瓜M(fèi)標(biāo)記",Thread.currentThread().getId());
 ? ? ? ? ? ? ? ? ? ?return;
 ? ? ? ? ? ? ?  }
 ? ? ? ? ? ? ? ?log.info("{}號(hào)消費(fèi)者,拿到的商品編號(hào):{}",Thread.currentThread().getId(),product);
 ? ? ? ? ?  }
 ? ? ?  } catch (InterruptedException e) {
 ? ? ? ? ? ?Thread.currentThread().interrupt();
 ? ? ?  }
 ?  }
}

客戶端類: 創(chuàng)建與計(jì)算機(jī) CPU 核數(shù)相同的線程數(shù),與 16個(gè)生產(chǎn)者

public class ProductConsumerTest {
 ? ?public static void main(String[] args) {
 ? ? ? ?// 阻塞隊(duì)列容量
 ? ? ? ?int blockingQueueSize = 10;
 ? ? ? ?// 生產(chǎn)者數(shù)量
 ? ? ? ?int producerSize = 16;
 ? ? ? ?// 消費(fèi)者數(shù)量 = 計(jì)算機(jī)線程核數(shù) 8
 ? ? ? ?int consumerSize = Runtime.getRuntime().availableProcessors();
 ? ? ? ?// 終止消費(fèi)標(biāo)記
 ? ? ? ?int stopTag = Integer.MAX_VALUE;
 ? ? ? ?BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(blockingQueueSize);
 ? ? ? ?// 創(chuàng)建16個(gè)生產(chǎn)者線程
 ? ? ? ?for (int i = 0; i < producerSize; i++) {
 ? ? ? ? ? ?new Thread(new Producer(blockingQueue, stopTag)).start();
 ? ? ?  }
 ? ? ? ?// 創(chuàng)建8個(gè)消費(fèi)者線程
 ? ? ? ?for (int j = 0; j < consumerSize; j++) {
 ? ? ? ? ? ?new Thread(new Consumer(blockingQueue, stopTag)).start();
 ? ? ?  }
 ?  }
}

延遲隊(duì)列 DelayQueue

定義: Java 延遲隊(duì)列提供了在指定時(shí)間才能獲取隊(duì)列元素的功能,隊(duì)列頭元素是最接近過期的元素。沒有過期元素的話,使用 poll() 方法會(huì)返回 null 值,超時(shí)判定是通過getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于 0 來判斷。延時(shí)隊(duì)列不能存放空元素。

/**
 * 電影票類,實(shí)現(xiàn)了Delayed接口,重寫 compareTo 和 getDelay方法
 */
public class MovieTicket implements Delayed {
 ? ?//延遲時(shí)間
 ? ?private final long delay;
 ? ?//到期時(shí)間
 ? ?private final long expire;
 ? ?//數(shù)據(jù)
 ? ?private final String msg;
 ? ?//創(chuàng)建時(shí)間
 ? ?private final long now;
 ? ?public long getDelay() {
 ? ? ? ?return delay;
 ?  }
 ? ?public long getExpire() {
 ? ? ? ?return expire;
 ?  }
 ? ?public String getMsg() {
 ? ? ? ?return msg;
 ?  }
 ? ?public long getNow() {
 ? ? ? ?return now;
 ?  }
 ? ?/**
 ? ? * @param msg 消息
 ? ? * @param delay 延期時(shí)間
 ? ? */
 ? ?public MovieTicket(String msg , long delay) {
 ? ? ? ?this.delay = delay;
 ? ? ? ?this.msg = msg;
 ? ? ? ?expire = System.currentTimeMillis() + delay; ? ?//到期時(shí)間 = 當(dāng)前時(shí)間+延遲時(shí)間
 ? ? ? ?now = System.currentTimeMillis();
 ?  }
 ? ?/**
 ? ? * @param msg
 ? ? */
 ? ?public MovieTicket(String msg){
 ? ? ? ?this(msg,1000);
 ?  }
 ? ?public MovieTicket(){
 ? ? ? ?this(null,1000);
 ?  }
 ? ?/**
 ? ? * 獲得延遲時(shí)間 ? 用過期時(shí)間-當(dāng)前時(shí)間,時(shí)間單位毫秒
 ? ? * @param unit
 ? ? * @return
 ? ? */
 ? ?@Override
 ? ?public long getDelay(TimeUnit unit) {
 ? ? ? ?return unit.convert(this.expire
 ? ? ? ? ? ? ? ?- System.currentTimeMillis() , TimeUnit.MILLISECONDS);
 ?  }
 ? ?/**
 ? ? * 用于延遲隊(duì)列內(nèi)部比較排序  當(dāng)前時(shí)間的延遲時(shí)間 - 比較對(duì)象的延遲時(shí)間
 ? ? * 越早過期的時(shí)間在隊(duì)列中越靠前
 ? ? * @param delayed
 ? ? * @return
 ? ? */
 ? ?@Override
 ? ?public int compareTo(Delayed delayed) {
 ? ? ? ?return (int) (this.getDelay(TimeUnit.MILLISECONDS)
 ? ? ? ? ? ? ? ?- delayed.getDelay(TimeUnit.MILLISECONDS));
 ?  }
}

測試類:

public static void main(String[] args) {
 ? ?DelayQueue<MovieTicket> delayQueue = new DelayQueue<MovieTicket>();
 ? ?MovieTicket ticket = new MovieTicket("電影票1",10000);
 ? ?delayQueue.put(ticket);
 ? ?MovieTicket ticket1 = new MovieTicket("電影票2",5000);
 ? ?delayQueue.put(ticket1);
 ? ?MovieTicket ticket2 = new MovieTicket("電影票3",8000);
 ? ?delayQueue.put(ticket2);
 ? ?log.info("message:--->入隊(duì)完畢");
 ? ?while( delayQueue.size() > 0 ){
 ? ? ? ?try {
 ? ? ? ? ? ?ticket = delayQueue.take();
 ? ? ? ? ? ?log.info("電影票出隊(duì):{}",ticket.getMsg());
 ? ? ?  } catch (InterruptedException e) {
 ? ? ? ? ? ?e.printStackTrace();
 ? ? ?  }
 ?  }
}

從運(yùn)行結(jié)果可以看出隊(duì)列是延遲出隊(duì),間隔和我們所設(shè)置的時(shí)間相同

到此這篇關(guān)于Java阻塞隊(duì)列BlockingQueue詳解的文章就介紹到這了,更多相關(guān)Java BlockingQueue 內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Zuul 實(shí)現(xiàn)網(wǎng)關(guān)轉(zhuǎn)發(fā)的五種方式小結(jié)

    Zuul 實(shí)現(xiàn)網(wǎng)關(guān)轉(zhuǎn)發(fā)的五種方式小結(jié)

    這篇文章主要介紹了Zuul 實(shí)現(xiàn)網(wǎng)關(guān)轉(zhuǎn)發(fā)的五種方式小結(jié),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • Spring Boot console log 格式自定義方式

    Spring Boot console log 格式自定義方式

    這篇文章主要介紹了Spring Boot console log 格式自定義方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-07-07
  • 常見的java面試題

    常見的java面試題

    這篇文章主要為大家詳細(xì)介紹了常見的java面試題,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2016-11-11
  • 對(duì)Mybatis?Plus中@TableField的使用正解

    對(duì)Mybatis?Plus中@TableField的使用正解

    這篇文章主要介紹了對(duì)Mybatis?Plus中@TableField的使用正解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-01-01
  • java.sql.SQLException問題解決以及注意事項(xiàng)

    java.sql.SQLException問題解決以及注意事項(xiàng)

    這篇文章主要給大家介紹了關(guān)于java.sql.SQLException問題解決以及注意事項(xiàng)的相關(guān)資料,這個(gè)問題其實(shí)很好解決,文中通過圖文將解決的辦法介紹的很詳細(xì),需要的朋友可以參考下
    2023-07-07
  • logback過濾部分日志輸出的操作

    logback過濾部分日志輸出的操作

    這篇文章主要介紹了logback過濾部分日志輸出的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • 使用Mybatis-plus清空表數(shù)據(jù)的操作方法

    使用Mybatis-plus清空表數(shù)據(jù)的操作方法

    MyBatis 是一個(gè)基于 java 的持久層框架,它內(nèi)部封裝了 jdbc,極大提高了我們的開發(fā)效率,文中給大家介紹了MybatisPlus常用API-增刪改查功能,感興趣的朋友跟隨小編一起看看吧
    2022-11-11
  • Spring Mybatis 分頁插件使用教程

    Spring Mybatis 分頁插件使用教程

    這篇文章主要介紹了Spring Mybatis分頁插件使用教程,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下
    2018-02-02
  • 淺談@FeignClient中name和value屬性的區(qū)別

    淺談@FeignClient中name和value屬性的區(qū)別

    這篇文章主要介紹了@FeignClient中name和value屬性的區(qū)別,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2022-07-07
  • 詳解Spring Data JPA動(dòng)態(tài)條件查詢的寫法

    詳解Spring Data JPA動(dòng)態(tài)條件查詢的寫法

    本篇文章主要介紹了Spring Data JPA動(dòng)態(tài)條件查詢的寫法 ,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2017-06-06

最新評(píng)論