Java多線程實現(xiàn)阻塞隊列的示例代碼
1. 阻塞隊列簡介
1.1 阻塞隊列概念
阻塞隊列:是一種特殊的隊列,具有隊列"先進(jìn)先出"的特性,同時相較于普通隊列,阻塞隊列是線程安全的,并且?guī)в?code>阻塞功能,表現(xiàn)形式如下:
- 當(dāng)隊列滿時,繼續(xù)入隊列就會阻塞,直到有其他線程從隊列中取出元素
- 當(dāng)隊列空時,繼續(xù)出隊列就會阻塞,直到有其他線程往隊列中插入元素
基于阻塞隊列我們可以實現(xiàn)生產(chǎn)者消費者模型,這在后端開發(fā)場景中是相當(dāng)重要的!
1.2 生產(chǎn)者-消費者模型優(yōu)勢
基于阻塞隊列實現(xiàn)的 生產(chǎn)者消費者模型 具有以下兩大優(yōu)勢:
- 解耦合:

以搜狗搜索的服務(wù)器舉例,用戶輸入搜索關(guān)鍵字 **美容,**客戶端的請求到達(dá)搜狗的"入口服務(wù)器"時,會將請求轉(zhuǎn)發(fā)到 廣告服務(wù)器 和 大搜索服務(wù)器,此時廣告服務(wù)器返回相關(guān)廣告內(nèi)容,大搜索服務(wù)器根據(jù)搜索算法匹配對應(yīng)結(jié)果返回,如果按照這種方式通信,那么入口服務(wù)器需要編寫兩套代碼分別同廣告服務(wù)器和大搜索服務(wù)器進(jìn)行交互,并且一個嚴(yán)重問題是如果其中廣告服務(wù)器宕機了,會導(dǎo)致入口服務(wù)器無法正常工作進(jìn)而影響大搜索服務(wù)器也無法正常工作??!

而引入阻塞隊列后,入口服務(wù)器不需要知曉廣告服務(wù)器和大搜索服務(wù)器的存在,只需要往阻塞隊列中發(fā)送請求即可,而廣告服務(wù)器和大搜索服務(wù)器也不需要知道入口服務(wù)器的存在,只需要從阻塞隊列中取出請求處理完畢返回給阻塞隊列即可,并且當(dāng)其中大搜索服務(wù)器宕機時,不影響其他服務(wù)器以及入口服務(wù)器的正常運作!
- 削峰填谷:

如果沒有阻塞隊列,當(dāng)遇到一些突發(fā)場景例如"雙十一"大促等客戶請求量激增的時候,入口服務(wù)器轉(zhuǎn)發(fā)的請求量增多,壓力就會變大,同理廣告服務(wù)器和大搜索服務(wù)器處理過程復(fù)雜繁多,消耗的硬件資源就會激增,達(dá)到硬件瓶頸之后服務(wù)器就宕機了(直觀現(xiàn)象就是客戶端發(fā)送請求,服務(wù)器不會響應(yīng)了)

而引入阻塞隊列/消息隊列之后,由于阻塞隊列只負(fù)責(zé)存儲相應(yīng)的請求或者響應(yīng),無需額外的業(yè)務(wù)處理,因此抗壓能力比廣告服務(wù)器和大搜索服務(wù)器更強,當(dāng)客戶請求量激增的時候交由阻塞隊列承受,而廣告服務(wù)器和大搜索服務(wù)器只需要按照特定的速率進(jìn)行讀取并返回處理結(jié)果即可,就起到了 削峰填谷 的作用!
注意:此處的阻塞隊列在現(xiàn)實場景中并不是一個單純的數(shù)據(jù)結(jié)構(gòu),往往是一個基于阻塞隊列的服務(wù)器程序,例如消息隊列(MQ)
2. 標(biāo)準(zhǔn)庫中的阻塞隊列
2.1 基本介紹
Java標(biāo)準(zhǔn)庫提供了現(xiàn)成的阻塞隊列數(shù)據(jù)結(jié)構(gòu)供開發(fā)者使用,即BlockingQueue接口
BlockingQueue:該接口具有以下實現(xiàn)類:
- ArrayBlockingQueue:基于數(shù)組實現(xiàn)的阻塞隊列
- LinkedBlockingQueue:基于鏈表實現(xiàn)的阻塞隊列
- PriorityBlockingQueue:帶有優(yōu)先級的阻塞隊列
BlockingQueue方法:該接口具有以下常用方法
- 帶有阻塞功能:
put:向隊列中入元素,隊列滿則阻塞等待take:向隊列中取出元素,隊列空則阻塞等待
- 不帶有阻塞功能:
peek:返回隊頭元素(不取出)poll:返回隊頭元素(取出)offer:向隊列中插入元素
2.2 代碼示例
/**
* 測試Java標(biāo)準(zhǔn)庫提供的阻塞隊列實現(xiàn)
*/
public class TestStandardBlockingQueue {
private static BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(10);
public static void main(String[] args) {
// 生產(chǎn)者
Thread t1 = new Thread(() -> {
int i = 0;
while (true) {
try {
queue.put(i);
System.out.println("生產(chǎn)數(shù)據(jù):" + i);
i++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
// 消費者
Thread t2 = new Thread(() -> {
while (true) {
try {
Thread.sleep(1000);
int ele = queue.take();
System.out.println("消費數(shù)據(jù):" + ele);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
t2.start();
}
}
運行效果:

我們在主線程中創(chuàng)建了兩個線程,其中t1線程作為生產(chǎn)者不斷循環(huán)生產(chǎn)元素,而線程t2作為消費者每隔1s消費一個數(shù)據(jù),所以我們很快看到當(dāng)生產(chǎn)數(shù)據(jù)個數(shù)達(dá)到容量capacity時就會繼續(xù)生產(chǎn)就會阻塞等待,直到消費者線程消費數(shù)據(jù)后才可以繼續(xù)入隊列,這樣就實現(xiàn)了一個 生產(chǎn)者-消費者模型 !
3. 自定義實現(xiàn)阻塞隊列
首先我們需要明確實現(xiàn)一個阻塞隊列需要哪些步驟?
- 首先我們需要實現(xiàn)一個普通隊列
- 使用鎖機制將普通隊列變成線程安全的
- 通過特殊機制讓該隊列能夠帶有"阻塞"功能
3.1 實現(xiàn)普通隊列
相信大家如果學(xué)過 數(shù)據(jù)結(jié)構(gòu)與算法 相關(guān)課程,應(yīng)該對隊列這種數(shù)據(jù)結(jié)構(gòu)的實現(xiàn)并不陌生!實現(xiàn)隊列有基于數(shù)組的也有基于鏈表的,我們此處采用基于數(shù)組實現(xiàn)的,基于數(shù)組實現(xiàn)的循環(huán)隊列也有以下兩種方式:
- 騰出一個空間用來判斷隊列空或者滿
- 使用額外的變量
size用來記錄當(dāng)前元素的個數(shù)
我們使用第二種方式實現(xiàn),實現(xiàn)代碼如下:
/**
* 自定義實現(xiàn)阻塞隊列
*/
public class MyBlockingQueue {
private int head = 0; // 頭指針
private int tail = 0; // 尾指針
private int size = 0; // 當(dāng)前元素個數(shù)
private String[] array = null;
private int capacity; // 容量
public MyBlockingQueue(int capacity) {
this.capacity = capacity;
this.array = new String[capacity];
}
/**
* 入隊列方法
*/
public void put(String elem) {
if (size == capacity) {
// 隊列已經(jīng)滿了
return;
}
array[tail] = elem;
tail++;
if (tail >= capacity) {
tail = 0;
}
size++;
}
/**
* 出隊列方法
*/
public String take() {
// 判斷隊列是否為空
if (size == 0) {
return null;
}
String topElem = array[head];
head++;
if (head >= capacity) {
head = 0;
}
size--;
return topElem;
}
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue(3);
queue.put("11");
queue.put("22");
queue.put("33");
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
}
}
3.2 引入鎖機制實現(xiàn)線程安全
引入synchronized關(guān)鍵字在原有隊列實現(xiàn)的基礎(chǔ)上實現(xiàn)線程安全,代碼如下:
/**
* 自定義實現(xiàn)阻塞隊列
*/
public class MyBlockingQueue {
private int head = 0; // 頭指針
private int tail = 0; // 尾指針
private int size = 0; // 當(dāng)前元素個數(shù)
private String[] array = null;
private int capacity; // 容量
private Object locker = new Object(); // 鎖對象
public MyBlockingQueue(int capacity) {
this.capacity = capacity;
this.array = new String[capacity];
}
/**
* 入隊列方法
*/
public void put(String elem) {
synchronized (locker) {
if (size == capacity) {
// 隊列已經(jīng)滿了
return;
}
array[tail] = elem;
tail++;
if (tail >= capacity) {
tail = 0;
}
size++;
}
}
/**
* 出隊列方法
*/
public String take() {
String topElem = "";
synchronized (locker) {
// 判斷隊列是否為空
if (size == 0) {
return null;
}
topElem = array[head];
head++;
if (head >= capacity) {
head = 0;
}
size--;
}
return topElem;
}
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue(3);
queue.put("11");
queue.put("22");
queue.put("33");
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
}
}
我們在put、take等關(guān)鍵方法上將 多個線程修改同一個變量 部分的操作進(jìn)行加鎖處理,實現(xiàn)線程安全!
3.3 加入阻塞功能
在普通隊列的實現(xiàn)中,如果隊列滿或者空我們直接使用return關(guān)鍵字返回,但是在多線程環(huán)境下我們希望實現(xiàn)阻塞等待的功能,這就可以使用Object類提供的wait/notify這組方法實現(xiàn)阻塞與喚醒機制了!我們就需要考慮阻塞與喚醒的時機了!
何時阻塞:這個問題非常簡單,當(dāng)隊列滿時入隊列操作就應(yīng)該阻塞等待,而當(dāng)隊列為空時出隊列操作就需要阻塞等待
何時喚醒:想必大家都可以想到,對于入隊列操作來說,只要隊列不滿就可以被喚醒,而對于出隊列操作來說,隊列不為空就可以被喚醒,因此,只要有線程調(diào)用take操作出隊列,那么入隊列的線程就可以被喚醒,而只要有線程調(diào)用put操作入隊列,那么出隊列的線程就可以被喚醒
/**
* 自定義實現(xiàn)阻塞隊列
*/
public class MyBlockingQueue {
private int head = 0; // 頭指針
private int tail = 0; // 尾指針
private int size = 0; // 當(dāng)前元素個數(shù)
private String[] array = null;
private int capacity; // 容量
private Object locker = new Object(); // 鎖對象
public MyBlockingQueue(int capacity) {
this.capacity = capacity;
this.array = new String[capacity];
}
/**
* 入隊列方法
*/
public void put(String elem) {
synchronized (locker) {
while (size == capacity) {
// 隊列已經(jīng)滿了(進(jìn)行阻塞)
try {
locker.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
array[tail] = elem;
tail++;
if (tail >= capacity) {
tail = 0;
}
size++;
locker.notifyAll();
}
}
/**
* 出隊列方法
*/
public String take() {
String topElem = "";
synchronized (locker) {
// 判斷隊列是否為空
while (size == 0) {
try {
locker.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
topElem = array[head];
head++;
if (head >= capacity) {
head = 0;
}
size--;
locker.notifyAll();
}
return topElem;
}
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue(10);
// 生產(chǎn)者
Thread producer = new Thread(() -> {
int i = 0;
while (true) {
queue.put(i + "");
System.out.println("生產(chǎn)元素:" + i);
i++;
}
});
// 消費者
Thread consumer = new Thread(() -> {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
String elem = queue.take();
System.out.println("消費元素" + elem);
}
});
producer.start();
consumer.start();
}
}
我們使用wait/notify這組操作實現(xiàn)了阻塞/喚醒功能,并且滿足必須使用在synchronized關(guān)鍵字內(nèi)部的使用條件,這里有一個注意點
為什么我們將if判斷條件改成了while循環(huán)呢???這是需要考慮清楚的!

如圖所示:一開始由于隊列滿所以生產(chǎn)者1進(jìn)入阻塞狀態(tài),釋放鎖,然后生產(chǎn)者2也進(jìn)入阻塞狀態(tài)釋放鎖,此時消費者消費一個元素后喚醒生產(chǎn)者1,然后生產(chǎn)者1生產(chǎn)一個元素后(記住此時隊列已滿)繼續(xù)喚醒,但是此時喚醒的恰恰是 生產(chǎn)者2 ,生產(chǎn)者2繼續(xù)執(zhí)行生產(chǎn)元素,于是就出現(xiàn)問題,我們總結(jié)一下出現(xiàn)問題的原因:
notifyAll是隨機喚醒,無法指定喚醒線程,因此可能出現(xiàn)生產(chǎn)者喚醒生產(chǎn)者,消費者喚醒消費者的情況if判定條件一經(jīng)執(zhí)行就無法繼續(xù)判定,所以生產(chǎn)者2被喚醒后沒有再次判斷當(dāng)前隊列是否滿
于是我們的應(yīng)對策略就是使用while循環(huán),當(dāng)線程被喚醒使重新判斷,如果隊列仍滿,入隊列操作繼續(xù)阻塞,而隊列仍空,出隊列操作繼續(xù)阻塞!Java標(biāo)準(zhǔn)也推薦我們使用 while 關(guān)鍵字和 wait 關(guān)鍵字一起使用!

4. 應(yīng)用場景(實現(xiàn)生產(chǎn)者消費者模型)
我們繼續(xù)基于我們自定義實現(xiàn)的阻塞隊列再來實現(xiàn) 生產(chǎn)者-消費者模型代碼示例(主函數(shù)):
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue(10);
// 生產(chǎn)者
Thread producer = new Thread(() -> {
int i = 0;
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
queue.put(i + "");
System.out.println("生產(chǎn)元素:" + i);
i++;
}
});
// 消費者
Thread consumer = new Thread(() -> {
while (true) {
String elem = queue.take();
System.out.println("消費元素" + elem);
}
});
producer.start();
consumer.start();
}
運行效果:

此時我們創(chuàng)建兩個兩個線程,producer作為生產(chǎn)者線程每隔1s生產(chǎn)一個元素,consumer作為消費者線程不斷消費元素,此時我們看到的就是消費者消費很快,當(dāng)阻塞隊列空時就進(jìn)入阻塞狀態(tài),直到生產(chǎn)者線程生產(chǎn)元素后才被喚醒繼續(xù)執(zhí)行!此時我們真正模擬實現(xiàn)了 阻塞隊列 這樣的數(shù)據(jù)結(jié)構(gòu)!
到此這篇關(guān)于Java多線程實現(xiàn)阻塞隊列的示例代碼的文章就介紹到這了,更多相關(guān)Java 阻塞隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
兩種JAVA實現(xiàn)短網(wǎng)址服務(wù)算法
這篇文章介紹了兩種JAVA實現(xiàn)短網(wǎng)址服務(wù)算法,一種是基于MD5碼的,一種是基于自增序列的,需要的朋友可以參考下2015-07-07
Java中的HashSet詳解和使用示例_動力節(jié)點Java學(xué)院整理
HashSet 是一個沒有重復(fù)元素的集合。接下來通過實例代碼給大家介紹java中的hashset相關(guān)知識,感興趣的朋友一起看看吧2017-05-05
Java中forEach使用lambda表達(dá)式,數(shù)組和集合的區(qū)別說明
這篇文章主要介紹了Java中forEach使用lambda表達(dá)式,數(shù)組和集合的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07
java中FileOutputStream中文亂碼問題解決辦法
這篇文章主要介紹了java中FileOutputStream中文亂碼問題解決辦法的相關(guān)資料,需要的朋友可以參考下2017-04-04

