Java多線程實(shí)現(xiàn)阻塞隊(duì)列的示例代碼
1. 阻塞隊(duì)列簡(jiǎn)介
1.1 阻塞隊(duì)列概念
阻塞隊(duì)列:是一種特殊的隊(duì)列,具有隊(duì)列"先進(jìn)先出"的特性,同時(shí)相較于普通隊(duì)列,阻塞隊(duì)列是線程安全
的,并且?guī)в?code>阻塞功能,表現(xiàn)形式如下:
- 當(dāng)隊(duì)列滿時(shí),繼續(xù)入隊(duì)列就會(huì)阻塞,直到有其他線程從隊(duì)列中取出元素
- 當(dāng)隊(duì)列空時(shí),繼續(xù)出隊(duì)列就會(huì)阻塞,直到有其他線程往隊(duì)列中插入元素
基于阻塞隊(duì)列我們可以實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
,這在后端開發(fā)場(chǎng)景中是相當(dāng)重要的!
1.2 生產(chǎn)者-消費(fèi)者模型優(yōu)勢(shì)
基于阻塞隊(duì)列實(shí)現(xiàn)的 生產(chǎn)者消費(fèi)者模型 具有以下兩大優(yōu)勢(shì):
- 解耦合:
以搜狗搜索的服務(wù)器舉例,用戶輸入搜索關(guān)鍵字 **美容,**客戶端的請(qǐng)求到達(dá)搜狗的"入口服務(wù)器"時(shí),會(huì)將請(qǐng)求轉(zhuǎn)發(fā)到 廣告服務(wù)器 和 大搜索服務(wù)器,此時(shí)廣告服務(wù)器返回相關(guān)廣告內(nèi)容,大搜索服務(wù)器根據(jù)搜索算法匹配對(duì)應(yīng)結(jié)果返回,如果按照這種方式通信,那么入口服務(wù)器需要編寫兩套代碼分別同廣告服務(wù)器和大搜索服務(wù)器進(jìn)行交互,并且一個(gè)嚴(yán)重問(wèn)題是如果其中廣告服務(wù)器宕機(jī)了,會(huì)導(dǎo)致入口服務(wù)器無(wú)法正常工作進(jìn)而影響大搜索服務(wù)器也無(wú)法正常工作??!
而引入阻塞隊(duì)列后,入口服務(wù)器不需要知曉廣告服務(wù)器和大搜索服務(wù)器的存在,只需要往阻塞隊(duì)列中發(fā)送請(qǐng)求即可,而廣告服務(wù)器和大搜索服務(wù)器也不需要知道入口服務(wù)器的存在,只需要從阻塞隊(duì)列中取出請(qǐng)求處理完畢返回給阻塞隊(duì)列即可,并且當(dāng)其中大搜索服務(wù)器宕機(jī)時(shí),不影響其他服務(wù)器以及入口服務(wù)器的正常運(yùn)作!
- 削峰填谷:
如果沒(méi)有阻塞隊(duì)列,當(dāng)遇到一些突發(fā)場(chǎng)景例如"雙十一"大促等客戶請(qǐng)求量激增的時(shí)候,入口服務(wù)器轉(zhuǎn)發(fā)的請(qǐng)求量增多,壓力就會(huì)變大,同理廣告服務(wù)器和大搜索服務(wù)器處理過(guò)程復(fù)雜繁多,消耗的硬件資源就會(huì)激增,達(dá)到硬件瓶頸之后服務(wù)器就宕機(jī)了(直觀現(xiàn)象就是客戶端發(fā)送請(qǐng)求,服務(wù)器不會(huì)響應(yīng)了)
而引入阻塞隊(duì)列/消息隊(duì)列之后,由于阻塞隊(duì)列只負(fù)責(zé)存儲(chǔ)相應(yīng)的請(qǐng)求或者響應(yīng),無(wú)需額外的業(yè)務(wù)處理,因此抗壓能力比廣告服務(wù)器和大搜索服務(wù)器更強(qiáng),當(dāng)客戶請(qǐng)求量激增的時(shí)候交由阻塞隊(duì)列承受,而廣告服務(wù)器和大搜索服務(wù)器只需要按照特定的速率進(jìn)行讀取并返回處理結(jié)果即可,就起到了 削峰填谷 的作用!
注意:此處的阻塞隊(duì)列在現(xiàn)實(shí)場(chǎng)景中并不是一個(gè)單純的數(shù)據(jù)結(jié)構(gòu),往往是一個(gè)基于阻塞隊(duì)列的服務(wù)器程序,例如消息隊(duì)列(MQ)
2. 標(biāo)準(zhǔn)庫(kù)中的阻塞隊(duì)列
2.1 基本介紹
Java標(biāo)準(zhǔn)庫(kù)提供了現(xiàn)成的阻塞隊(duì)列數(shù)據(jù)結(jié)構(gòu)供開發(fā)者使用,即BlockingQueue
接口
BlockingQueue:該接口具有以下實(shí)現(xiàn)類:
- ArrayBlockingQueue:基于數(shù)組實(shí)現(xiàn)的阻塞隊(duì)列
- LinkedBlockingQueue:基于鏈表實(shí)現(xiàn)的阻塞隊(duì)列
- PriorityBlockingQueue:帶有優(yōu)先級(jí)的阻塞隊(duì)列
BlockingQueue方法:該接口具有以下常用方法
- 帶有阻塞功能:
put
:向隊(duì)列中入元素,隊(duì)列滿則阻塞等待take
:向隊(duì)列中取出元素,隊(duì)列空則阻塞等待
- 不帶有阻塞功能:
peek
:返回隊(duì)頭元素(不取出)poll
:返回隊(duì)頭元素(取出)offer
:向隊(duì)列中插入元素
2.2 代碼示例
/** * 測(cè)試Java標(biāo)準(zhǔn)庫(kù)提供的阻塞隊(duì)列實(shí)現(xiàn) */ public class TestStandardBlockingQueue { private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10); public static void main(String[] args) { // 生產(chǎn)者 Thread t1 = new Thread(() -> { int i = 0; while (true) { try { queue.put(i); System.out.println("生產(chǎn)數(shù)據(jù):" + i); i++; } catch (InterruptedException e) { throw new RuntimeException(e); } } }); // 消費(fèi)者 Thread t2 = new Thread(() -> { while (true) { try { Thread.sleep(1000); int ele = queue.take(); System.out.println("消費(fèi)數(shù)據(jù):" + ele); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t1.start(); t2.start(); } }
運(yùn)行效果:
我們?cè)谥骶€程中創(chuàng)建了兩個(gè)線程,其中t1
線程作為生產(chǎn)者不斷循環(huán)生產(chǎn)元素,而線程t2
作為消費(fèi)者每隔1s消費(fèi)一個(gè)數(shù)據(jù),所以我們很快看到當(dāng)生產(chǎn)數(shù)據(jù)個(gè)數(shù)達(dá)到容量capacity
時(shí)就會(huì)繼續(xù)生產(chǎn)就會(huì)阻塞等待,直到消費(fèi)者線程消費(fèi)數(shù)據(jù)后才可以繼續(xù)入隊(duì)列,這樣就實(shí)現(xiàn)了一個(gè) 生產(chǎn)者-消費(fèi)者模型 !
3. 自定義實(shí)現(xiàn)阻塞隊(duì)列
首先我們需要明確實(shí)現(xiàn)一個(gè)阻塞隊(duì)列需要哪些步驟?
- 首先我們需要實(shí)現(xiàn)一個(gè)普通隊(duì)列
- 使用鎖機(jī)制將普通隊(duì)列變成線程安全的
- 通過(guò)特殊機(jī)制讓該隊(duì)列能夠帶有"阻塞"功能
3.1 實(shí)現(xiàn)普通隊(duì)列
相信大家如果學(xué)過(guò) 數(shù)據(jù)結(jié)構(gòu)與算法 相關(guān)課程,應(yīng)該對(duì)隊(duì)列這種數(shù)據(jù)結(jié)構(gòu)的實(shí)現(xiàn)并不陌生!實(shí)現(xiàn)隊(duì)列有基于數(shù)組的也有基于鏈表的,我們此處采用基于數(shù)組實(shí)現(xiàn)的,基于數(shù)組實(shí)現(xiàn)的循環(huán)隊(duì)列也有以下兩種方式:
- 騰出一個(gè)空間用來(lái)判斷隊(duì)列空或者滿
- 使用額外的變量
size
用來(lái)記錄當(dāng)前元素的個(gè)數(shù)
我們使用第二種方式實(shí)現(xiàn),實(shí)現(xiàn)代碼如下:
/** * 自定義實(shí)現(xiàn)阻塞隊(duì)列 */ public class MyBlockingQueue { private int head = 0; // 頭指針 private int tail = 0; // 尾指針 private int size = 0; // 當(dāng)前元素個(gè)數(shù) private String[] array = null; private int capacity; // 容量 public MyBlockingQueue(int capacity) { this.capacity = capacity; this.array = new String[capacity]; } /** * 入隊(duì)列方法 */ public void put(String elem) { if (size == capacity) { // 隊(duì)列已經(jīng)滿了 return; } array[tail] = elem; tail++; if (tail >= capacity) { tail = 0; } size++; } /** * 出隊(duì)列方法 */ public String take() { // 判斷隊(duì)列是否為空 if (size == 0) { return null; } String topElem = array[head]; head++; if (head >= capacity) { head = 0; } size--; return topElem; } public static void main(String[] args) { MyBlockingQueue queue = new MyBlockingQueue(3); queue.put("11"); queue.put("22"); queue.put("33"); System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); } }
3.2 引入鎖機(jī)制實(shí)現(xiàn)線程安全
引入synchronized
關(guān)鍵字在原有隊(duì)列實(shí)現(xiàn)的基礎(chǔ)上實(shí)現(xiàn)線程安全,代碼如下:
/** * 自定義實(shí)現(xiàn)阻塞隊(duì)列 */ public class MyBlockingQueue { private int head = 0; // 頭指針 private int tail = 0; // 尾指針 private int size = 0; // 當(dāng)前元素個(gè)數(shù) private String[] array = null; private int capacity; // 容量 private Object locker = new Object(); // 鎖對(duì)象 public MyBlockingQueue(int capacity) { this.capacity = capacity; this.array = new String[capacity]; } /** * 入隊(duì)列方法 */ public void put(String elem) { synchronized (locker) { if (size == capacity) { // 隊(duì)列已經(jīng)滿了 return; } array[tail] = elem; tail++; if (tail >= capacity) { tail = 0; } size++; } } /** * 出隊(duì)列方法 */ public String take() { String topElem = ""; synchronized (locker) { // 判斷隊(duì)列是否為空 if (size == 0) { return null; } topElem = array[head]; head++; if (head >= capacity) { head = 0; } size--; } return topElem; } public static void main(String[] args) { MyBlockingQueue queue = new MyBlockingQueue(3); queue.put("11"); queue.put("22"); queue.put("33"); System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); } }
我們?cè)?code>put、take
等關(guān)鍵方法上將 多個(gè)線程修改同一個(gè)變量 部分的操作進(jìn)行加鎖處理,實(shí)現(xiàn)線程安全!
3.3 加入阻塞功能
在普通隊(duì)列的實(shí)現(xiàn)中,如果隊(duì)列滿或者空我們直接使用return
關(guān)鍵字返回,但是在多線程環(huán)境下我們希望實(shí)現(xiàn)阻塞等待的功能,這就可以使用Object類提供的wait/notify
這組方法實(shí)現(xiàn)阻塞與喚醒機(jī)制了!我們就需要考慮阻塞與喚醒的時(shí)機(jī)了!
何時(shí)阻塞:這個(gè)問(wèn)題非常簡(jiǎn)單,當(dāng)隊(duì)列滿時(shí)入隊(duì)列操作就應(yīng)該阻塞等待,而當(dāng)隊(duì)列為空時(shí)出隊(duì)列操作就需要阻塞等待
何時(shí)喚醒:想必大家都可以想到,對(duì)于入隊(duì)列操作來(lái)說(shuō),只要隊(duì)列不滿就可以被喚醒,而對(duì)于出隊(duì)列操作來(lái)說(shuō),隊(duì)列不為空就可以被喚醒,因此,只要有線程調(diào)用take
操作出隊(duì)列,那么入隊(duì)列的線程就可以被喚醒,而只要有線程調(diào)用put
操作入隊(duì)列,那么出隊(duì)列的線程就可以被喚醒
/** * 自定義實(shí)現(xiàn)阻塞隊(duì)列 */ public class MyBlockingQueue { private int head = 0; // 頭指針 private int tail = 0; // 尾指針 private int size = 0; // 當(dāng)前元素個(gè)數(shù) private String[] array = null; private int capacity; // 容量 private Object locker = new Object(); // 鎖對(duì)象 public MyBlockingQueue(int capacity) { this.capacity = capacity; this.array = new String[capacity]; } /** * 入隊(duì)列方法 */ public void put(String elem) { synchronized (locker) { while (size == capacity) { // 隊(duì)列已經(jīng)滿了(進(jìn)行阻塞) try { locker.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } } array[tail] = elem; tail++; if (tail >= capacity) { tail = 0; } size++; locker.notifyAll(); } } /** * 出隊(duì)列方法 */ public String take() { String topElem = ""; synchronized (locker) { // 判斷隊(duì)列是否為空 while (size == 0) { try { locker.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } } topElem = array[head]; head++; if (head >= capacity) { head = 0; } size--; locker.notifyAll(); } return topElem; } public static void main(String[] args) { MyBlockingQueue queue = new MyBlockingQueue(10); // 生產(chǎn)者 Thread producer = new Thread(() -> { int i = 0; while (true) { queue.put(i + ""); System.out.println("生產(chǎn)元素:" + i); i++; } }); // 消費(fèi)者 Thread consumer = new Thread(() -> { while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } String elem = queue.take(); System.out.println("消費(fèi)元素" + elem); } }); producer.start(); consumer.start(); } }
我們使用wait/notify
這組操作實(shí)現(xiàn)了阻塞/喚醒功能,并且滿足必須使用在synchronized
關(guān)鍵字內(nèi)部的使用條件,這里有一個(gè)注意點(diǎn)
為什么我們將if判斷條件改成了while循環(huán)呢???這是需要考慮清楚的!
如圖所示:一開始由于隊(duì)列滿所以生產(chǎn)者1進(jìn)入阻塞狀態(tài),釋放鎖,然后生產(chǎn)者2也進(jìn)入阻塞狀態(tài)釋放鎖,此時(shí)消費(fèi)者消費(fèi)一個(gè)元素后喚醒生產(chǎn)者1,然后生產(chǎn)者1生產(chǎn)一個(gè)元素后(記住此時(shí)隊(duì)列已滿)繼續(xù)喚醒,但是此時(shí)喚醒的恰恰是 生產(chǎn)者2 ,生產(chǎn)者2繼續(xù)執(zhí)行生產(chǎn)元素,于是就出現(xiàn)問(wèn)題,我們總結(jié)一下出現(xiàn)問(wèn)題的原因:
notifyAll
是隨機(jī)喚醒,無(wú)法指定喚醒線程,因此可能出現(xiàn)生產(chǎn)者喚醒生產(chǎn)者,消費(fèi)者喚醒消費(fèi)者的情況if
判定條件一經(jīng)執(zhí)行就無(wú)法繼續(xù)判定,所以生產(chǎn)者2被喚醒后沒(méi)有再次判斷當(dāng)前隊(duì)列是否滿
于是我們的應(yīng)對(duì)策略就是使用while
循環(huán),當(dāng)線程被喚醒使重新判斷,如果隊(duì)列仍滿,入隊(duì)列操作繼續(xù)阻塞,而隊(duì)列仍空,出隊(duì)列操作繼續(xù)阻塞!Java標(biāo)準(zhǔn)也推薦我們使用 while 關(guān)鍵字和 wait 關(guān)鍵字一起使用!
4. 應(yīng)用場(chǎng)景(實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型)
我們繼續(xù)基于我們自定義實(shí)現(xiàn)的阻塞隊(duì)列再來(lái)實(shí)現(xiàn) 生產(chǎn)者-消費(fèi)者模型代碼示例(主函數(shù)):
public static void main(String[] args) { MyBlockingQueue queue = new MyBlockingQueue(10); // 生產(chǎn)者 Thread producer = new Thread(() -> { int i = 0; while (true) { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } queue.put(i + ""); System.out.println("生產(chǎn)元素:" + i); i++; } }); // 消費(fèi)者 Thread consumer = new Thread(() -> { while (true) { String elem = queue.take(); System.out.println("消費(fèi)元素" + elem); } }); producer.start(); consumer.start(); }
運(yùn)行效果:
此時(shí)我們創(chuàng)建兩個(gè)兩個(gè)線程,producer
作為生產(chǎn)者線程每隔1s生產(chǎn)一個(gè)元素,consumer
作為消費(fèi)者線程不斷消費(fèi)元素,此時(shí)我們看到的就是消費(fèi)者消費(fèi)很快,當(dāng)阻塞隊(duì)列空時(shí)就進(jìn)入阻塞狀態(tài),直到生產(chǎn)者線程生產(chǎn)元素后才被喚醒繼續(xù)執(zhí)行!此時(shí)我們真正模擬實(shí)現(xiàn)了 阻塞隊(duì)列 這樣的數(shù)據(jù)結(jié)構(gòu)!
到此這篇關(guān)于Java多線程實(shí)現(xiàn)阻塞隊(duì)列的示例代碼的文章就介紹到這了,更多相關(guān)Java 阻塞隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
解決springboot啟動(dòng)報(bào)錯(cuò)bean找不到的問(wèn)題
這篇文章主要介紹了解決springboot啟動(dòng)報(bào)錯(cuò)bean找不到原因,本文給大家分享完美解決方案,通過(guò)圖文相結(jié)合給大家介紹的非常詳細(xì),需要的朋友可以參考下2023-03-03兩種JAVA實(shí)現(xiàn)短網(wǎng)址服務(wù)算法
這篇文章介紹了兩種JAVA實(shí)現(xiàn)短網(wǎng)址服務(wù)算法,一種是基于MD5碼的,一種是基于自增序列的,需要的朋友可以參考下2015-07-07Java中的HashSet詳解和使用示例_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
HashSet 是一個(gè)沒(méi)有重復(fù)元素的集合。接下來(lái)通過(guò)實(shí)例代碼給大家介紹java中的hashset相關(guān)知識(shí),感興趣的朋友一起看看吧2017-05-05Java中forEach使用lambda表達(dá)式,數(shù)組和集合的區(qū)別說(shuō)明
這篇文章主要介紹了Java中forEach使用lambda表達(dá)式,數(shù)組和集合的區(qū)別說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07java中FileOutputStream中文亂碼問(wèn)題解決辦法
這篇文章主要介紹了java中FileOutputStream中文亂碼問(wèn)題解決辦法的相關(guān)資料,需要的朋友可以參考下2017-04-04