JavaEE多線程中阻塞隊(duì)列的項(xiàng)目實(shí)踐
1. 前言
阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列。這兩個(gè)附加的操作是:
- 在隊(duì)列為空時(shí),獲取元素的線程會等待隊(duì)列變?yōu)榉强?/li>
- 當(dāng)隊(duì)列滿時(shí),存儲元素的線程會等待隊(duì)列可用
阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場景,生產(chǎn)者是往隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里拿元素的線程。阻塞隊(duì)列就是生產(chǎn)者存放元素的容器,而消費(fèi)者也只從容器里拿元素。
2. 什么是生產(chǎn)者-消費(fèi)者模型
生產(chǎn)者消費(fèi)者模型是一種多線程并發(fā)協(xié)作的模型,由兩類線程和一個(gè)緩沖區(qū)組成:生產(chǎn)者線程生產(chǎn)數(shù)據(jù)并把數(shù)據(jù)放在緩沖區(qū),消費(fèi)者線程從緩沖區(qū)取數(shù)據(jù)并消費(fèi)。生產(chǎn)者和消費(fèi)者在同一時(shí)間段內(nèi)共用同一個(gè)存儲空間,生產(chǎn)者往存儲空間中添加產(chǎn)品,消費(fèi)者從存儲空間中取走產(chǎn)品。當(dāng)存儲空間為空時(shí),消費(fèi)者阻塞;當(dāng)存儲空間滿時(shí),生產(chǎn)者阻塞。

3. 生產(chǎn)者-消費(fèi)者模型的意義
- 解耦合
- 削峰填谷
3.1 解耦合
兩個(gè)模塊的聯(lián)系越緊密,耦合度就越高,耦合度越高就意味著兩個(gè)模塊的影響程度很大,當(dāng)一個(gè)模塊出現(xiàn)問題的時(shí)候,另一個(gè)模塊也會受到影響而導(dǎo)致出現(xiàn)問題,特別是對分布式系統(tǒng)來說:解耦合是非常重要的。

假設(shè)上面是一個(gè)簡單的分布式系統(tǒng),服務(wù)器 A 和服務(wù)器 B 之間直接進(jìn)行交互(服務(wù)器 A 向服務(wù)器 B 發(fā)送請求并接收服務(wù)器 B 返回的信息,服務(wù)器 B 向服務(wù)器 A 發(fā)送請求,以及接收服務(wù)器 A 返回的信息),服務(wù)器 A 和服務(wù)器 B 之間的耦合度比較高,當(dāng)兩個(gè)服務(wù)器之間的一個(gè)發(fā)生故障的時(shí)候就會導(dǎo)致兩個(gè)服務(wù)器都無法使用。
不僅如此,當(dāng)我們想要再添加一個(gè)服務(wù)器 C 與服務(wù)器 A 之間進(jìn)行交互的時(shí)候,不僅需要對服務(wù)器 C 做出修改,還需要對服務(wù)器 A 作出修改。
相比上面的情況,如果我們使用生產(chǎn)者-消費(fèi)者模型的話就可以解決上面的耦合度過高的問題。

服務(wù)器 A 接收到客戶端發(fā)來的請求不是直接發(fā)送給服務(wù)器 B ,而是將接收到的請求加入到阻塞隊(duì)列中,然后服務(wù)器 B 從阻塞隊(duì)列中獲取到請求,這樣就避免了兩個(gè)服務(wù)器之間進(jìn)行直接的交互,降低了耦合性;不僅如此,當(dāng)我們需要額外添加一個(gè)服務(wù)器 C 的時(shí)候,就不需要對服務(wù)器 A 做出修改,而是直接從阻塞隊(duì)列獲取請求信息。

3.2 削峰填谷
當(dāng)客戶端向服務(wù)器 A 短時(shí)間發(fā)出大量請求信息的話,那么當(dāng)服務(wù)器 A 接收到客戶端發(fā)來的請求的時(shí)候,就會立即將收到的所有信息都發(fā)送給服務(wù)器 B ,但是由于雖然服務(wù)器 A 能夠接收的請求量可以很多,但是服務(wù)器 B 卻不能一次接收這么多請求,就會導(dǎo)致服務(wù)器 B 會掛掉。
如果使用生產(chǎn)者-消費(fèi)者莫模型的話,當(dāng)客戶端向服務(wù)器 A 短時(shí)間發(fā)送大量請求的話,服務(wù)器 A 不會將請求發(fā)送給 B ,而是發(fā)送給阻塞隊(duì)列中,當(dāng)阻塞隊(duì)列滿了的時(shí)候,服務(wù)器 A 就會停止向阻塞隊(duì)列中發(fā)送請求,陷入阻塞狀態(tài),等服務(wù)器 B 向阻塞隊(duì)列中受到請求使得阻塞隊(duì)列容量減少的時(shí)候,服務(wù)器 A 才會繼續(xù)向阻塞隊(duì)列中發(fā)送收到的請求,這樣就避免了服務(wù)器 B 短時(shí)間內(nèi)受到大量的請求而掛掉的情況;如果阻塞隊(duì)列中收到的請求信息被讀取完的時(shí)候,服務(wù)器 B 就會停止從阻塞隊(duì)列中讀取請求,進(jìn)入阻塞狀態(tài),直到服務(wù)器 A 向阻塞隊(duì)列中發(fā)送請求。

4. 如何使用Java標(biāo)準(zhǔn)庫提供的阻塞隊(duì)列
當(dāng)知道了生產(chǎn)者-消費(fèi)者模型的意義之后,我們就來看看如何使用阻塞隊(duì)列。在Java標(biāo)準(zhǔn)庫中提供了阻塞隊(duì)列 BlockingQueue 可以直接使用。

因?yàn)?BlockingQueu 是一個(gè)接口,無法實(shí)例化,所以需要?jiǎng)?chuàng)建出實(shí)現(xiàn)了 BlockingQueue 接口的類,而 ArrayBlockingQueue 和 LinkedBlockingQueue 實(shí)現(xiàn)了這個(gè)接口。
我們還可以觀察到,BlockingQueue 還繼承了 Queue ,也就是說我們也可以使用 Queue 中的方法,比如 offer 和 poll 等,但是在阻塞隊(duì)列中不使用這兩個(gè)方法,因?yàn)檫@兩個(gè)方法不具有阻塞特性,而是使用 put 和 take 方法。
public class Demo1 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
queue.put("123");
queue.put("234");
queue.put("345");
queue.put("456");
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
}
}
這里向阻塞隊(duì)列中加入了四個(gè)數(shù)據(jù),但是讀取的時(shí)候讀取了五次,所以看到線程進(jìn)入了阻塞狀態(tài)。
5. 自己實(shí)現(xiàn)一個(gè)阻塞隊(duì)列
阻塞隊(duì)列是建立在隊(duì)列的基礎(chǔ)上的,所以要想實(shí)現(xiàn)一個(gè)阻塞隊(duì)列,首先需要實(shí)現(xiàn)出來一個(gè)隊(duì)列,那么就先來看看如何實(shí)現(xiàn)出一個(gè)循環(huán)隊(duì)列。
5.1 實(shí)現(xiàn)出循環(huán)隊(duì)列
隊(duì)列比較容易實(shí)現(xiàn),但是循環(huán)隊(duì)列該如何實(shí)現(xiàn)呢?當(dāng)數(shù)據(jù)到達(dá)數(shù)組的最后的時(shí)候,將數(shù)組的下標(biāo)修改為0,這樣就可以達(dá)到循環(huán)的目的。

當(dāng) tail == head 的時(shí)候有兩種情況:
- 隊(duì)列中沒有數(shù)據(jù)的時(shí)候
- 隊(duì)列滿了的時(shí)候
為了區(qū)分這兩種情況,我們可以使用兩種方法:
- 浪費(fèi)一個(gè)空間,當(dāng)tail++之后,如果tail + 1 == head,則表示隊(duì)列滿了,將 tail 修改為 0
- 定義一個(gè)size變量來表示隊(duì)列中有效數(shù)據(jù)的個(gè)數(shù),當(dāng)size == queue.length的時(shí)候,表示隊(duì)列滿了
class MyQueue {
private final String[] data = new String[1000];
private int size;
private int head = 0;
private int tail = 0;
public void put(String str) {
//當(dāng)添加數(shù)據(jù)的時(shí)候需要判斷隊(duì)列的容量是否已經(jīng)滿了
if(size == data.length) return;
data[tail++] = str;
size++;
if(tail == data.length) tail = 0;
}
public String take() {
//讀取數(shù)據(jù)的時(shí)候需要判斷隊(duì)列是否為空
if(size == 0) return null;
String ret = data[head++];
size--;
if(head == data.length) head = 0;
return ret;
}
}5.2 實(shí)現(xiàn)阻塞隊(duì)列
阻塞隊(duì)列就是在隊(duì)列為空時(shí),獲取元素的線程會等待隊(duì)列變?yōu)榉强?;?dāng)隊(duì)列滿時(shí),存儲元素的線程會等待隊(duì)列可用。并且因?yàn)樽枞?duì)列運(yùn)用的環(huán)境是多線程,需要考慮到線程安全的問題。
5.2.1 加鎖
當(dāng)需要進(jìn)行查詢和修改的操作時(shí),需要對該操作進(jìn)行加鎖。因?yàn)槲覀兊?put 和 take 基本上都在查詢和修改數(shù)據(jù),所以可以將這兩個(gè)操作直接進(jìn)行加鎖操作。
class MyBlockingQueue {
private final String[] data = new String[1000];
private int size;
private int head = 0;
private int tail = 0;
public void put(String str) {
synchronized (this) {
if(size == data.length) return;
data[tail++] = str;
size++;
if(tail == data.length) tail = 0;
}
}
public String take() {
synchronized (this) {
if(size == 0) return null;
String ret = data[head++];
size--;
if(head == data.length) head = 0;
return ret;
}
}
}5.2.2 進(jìn)行阻塞操作
當(dāng)進(jìn)行完加鎖操作之后,我們還需要實(shí)現(xiàn)阻塞的作用,當(dāng)添加數(shù)據(jù)的時(shí)候,如果隊(duì)列中容量滿了的時(shí)候就進(jìn)入阻塞等待狀態(tài),直到進(jìn)行了 take 讀取數(shù)據(jù)操作刪除數(shù)據(jù)的時(shí)候,才停止等待;當(dāng)讀取數(shù)據(jù)的時(shí)候,如果隊(duì)列為空,那么該線程就進(jìn)入阻塞等待狀態(tài),直到進(jìn)行了 put 操作。
class MyBlockingQueue {
private final String[] data = new String[1000];
private int size;
private int head = 0;
private int tail = 0;
public void put(String str) throws InterruptedException {
synchronized (this) {
if(size == data.length) {
this.wait();
}
data[tail++] = str;
size++;
if(tail == data.length) tail = 0;
//這個(gè) notify 用來喚醒 take 操作的等待
this.notify();
}
}
public String take() throws InterruptedException {
synchronized (this) {
if(size == 0) {
this.wait();
}
String ret = data[head++];
size--;
if(head == data.length) head = 0;
//這個(gè) notify 用來喚醒 put 操作的等待
this.notify();
return ret;
}
}
}5.2.3 解決因被 interrupt 喚醒 waiting 狀態(tài)的問題
當(dāng)使用了 wait 和 notify 對 put 和 take 操作進(jìn)行了阻塞等待和喚醒操作之后,我們還需要注意,難道只有 notify 才會喚醒 WAITING 狀態(tài)嗎?前面我們學(xué)習(xí)了使用 interrupt 來終止線程,但是 interrup 還會喚醒處于 WAITING 狀態(tài)的線程,也就是說這里的 WAITING 狀態(tài)的線程不僅可以被 notify 喚醒,還可以被 interrupt 喚醒。
- 當(dāng)線程是因?yàn)?put 操作隊(duì)列滿了的時(shí)候進(jìn)入阻塞等待狀態(tài)的時(shí)候,如果是因?yàn)楸?interrupt 喚醒而不是 take 操作的 notify 喚醒的時(shí)候就意味著此時(shí)隊(duì)列還是滿的,當(dāng)進(jìn)行添加操作的時(shí)候,就會將有效的數(shù)據(jù)覆蓋掉;
- 當(dāng)線程是因?yàn)?take 操作隊(duì)列為空的時(shí)候進(jìn)入阻塞等待狀態(tài)的時(shí)候,如果是因?yàn)楸?interrupt 喚醒而不是 put 操作的 notify 喚醒的時(shí)候就意味著此時(shí)隊(duì)列還是空的,如果進(jìn)行刪除操作,并沒有意義。
為了解決 WAITING 狀態(tài)被 interrupt 喚醒而造成的問題,當(dāng)線程被喚醒的時(shí)候,需要進(jìn)行判斷 size 是否還等于 0 或者 queue.length,如果還等于,就繼續(xù)進(jìn)入 WAITING 狀態(tài),但是光一次判斷是不夠的,因?yàn)檫€可能是被 interrupt 喚醒的,所以需要進(jìn)行多次判斷,可以用 while 循環(huán)來解決。
class MyBlockingQueue {
private final String[] data = new String[1000];
private int size;
private int head = 0;
private int tail = 0;
public void put(String str) throws InterruptedException {
synchronized (this) {
while(size == data.length) {
this.wait();
}
data[tail++] = str;
size++;
if(tail == data.length) tail = 0;
//這個(gè) notify 用來喚醒 take 操作的等待
this.notify();
}
}
public String take() throws InterruptedException {
synchronized (this) {
while(size == 0) {
this.wait();
}
String ret = data[head++];
size--;
if(head == data.length) head = 0;
//這個(gè) notify 用來喚醒 put 操作的等待
this.notify();
return ret;
}
}
}5.2.4 解決因指令重排序造成的問題
因?yàn)?put 和 take 操作要進(jìn)行讀和寫的操作,可能會因?yàn)橹噶钪嘏判虻膯栴}造成其他問題,這里就需要使用 volatile 解決指令重排序問題。
class MyBlockingQueue {
private final String[] data = new String[1000];
private volatile int size;
private volatile int head = 0;
private volatile int tail = 0;
public void put(String str) throws InterruptedException {
synchronized (this) {
while(size == data.length) {
this.wait();
}
data[tail++] = str;
size++;
if(tail == data.length) tail = 0;
//這個(gè) notify 用來喚醒 take 操作的等待
this.notify();
}
}
public String take() throws InterruptedException {
synchronized (this) {
while(size == 0) {
this.wait();
}
String ret = data[head++];
size--;
if(head == data.length) head = 0;
//這個(gè) notify 用來喚醒 put 操作的等待
this.notify();
return ret;
}
}
}測試實(shí)現(xiàn)的阻塞隊(duì)列
public class Demo4 {
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue();
Thread t1 = new Thread(() -> {
while(true) {
try {
System.out.println("消費(fèi)元素" + queue.take());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread t2 = new Thread(() -> {
int count = 1;
while(true) {
try {
queue.put(count + "");
System.out.println("生產(chǎn)元素" + count);
count++;
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
t2.start();
}
}讓生產(chǎn)速度較慢,使得讀取操作阻塞等待插入數(shù)據(jù)才執(zhí)行。

public class Demo4 {
public static void main(String[] args) {
MyBlockingQueue queue = new MyBlockingQueue();
Thread t1 = new Thread(() -> {
while(true) {
try {
System.out.println("消費(fèi)元素" + queue.take());
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
Thread t2 = new Thread(() -> {
int count = 1;
while(true) {
try {
queue.put(count + "");
System.out.println("生產(chǎn)元素" + count);
count++;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
t1.start();
t2.start();
}
}
讓生產(chǎn) put 操作進(jìn)入阻塞等待狀態(tài)。
到此這篇關(guān)于JavaEE多線程中阻塞隊(duì)列的項(xiàng)目實(shí)踐的文章就介紹到這了,更多相關(guān)JavaEE 阻塞隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Java的PriorityBlockingQueue優(yōu)先級阻塞隊(duì)列代碼實(shí)例
- Java中的SynchronousQueue阻塞隊(duì)列使用代碼實(shí)例
- Java中的SynchronousQueue阻塞隊(duì)列及使用場景解析
- java中的BlockingQueue(阻塞隊(duì)列)解析
- Java中的BlockingQueue阻塞隊(duì)列原理以及實(shí)現(xiàn)詳解
- Java的非阻塞隊(duì)列ConcurrentLinkedQueue解讀
- java中阻塞隊(duì)列和非阻塞隊(duì)列的實(shí)現(xiàn)
- Java多線程實(shí)現(xiàn)阻塞隊(duì)列的示例代碼
相關(guān)文章
Rabbitmq延遲隊(duì)列實(shí)現(xiàn)定時(shí)任務(wù)的方法
這篇文章主要介紹了Rabbitmq延遲隊(duì)列實(shí)現(xiàn)定時(shí)任務(wù),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-05-05
SpringCloud實(shí)現(xiàn)基于RabbitMQ消息隊(duì)列的詳細(xì)步驟
在Spring Cloud框架中,我們可以利用RabbitMQ實(shí)現(xiàn)強(qiáng)大而可靠的消息隊(duì)列系統(tǒng),本篇將詳細(xì)介紹如何在Spring Cloud項(xiàng)目中集成RabbitMQ,并創(chuàng)建一個(gè)簡單的消息隊(duì)列,感興趣的朋友一起看看吧2024-03-03
基于Spring中的線程池和定時(shí)任務(wù)功能解析
下面小編就為大家?guī)硪黄赟pring中的線程池和定時(shí)任務(wù)功能解析。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-09-09
Spring Boot集成spring-boot-devtools開發(fā)時(shí)實(shí)現(xiàn)熱部署的方式
這篇文章主要介紹了Spring Boot集成spring-boot-devtools開發(fā)時(shí)實(shí)現(xiàn)熱部署的方式,文中還給大家提到了spring boot 實(shí)現(xiàn)熱部署的方式及集成注意事項(xiàng),感興趣的朋友跟隨腳本之家小編一起學(xué)習(xí)吧2018-05-05
使用Filter過濾器中訪問getSession()要轉(zhuǎn)化
這篇文章主要介紹了使用Filter過濾器中訪問getSession()要轉(zhuǎn)化,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-01-01
還在用if(obj!=null)做非空判斷,帶你快速上手Optional
這篇文章主要介紹了還在用if(obj!=null)做非空判斷,帶你快速上手Optional,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05
JAVA中l(wèi)ist,set,數(shù)組之間的轉(zhuǎn)換詳解
以下是對JAVA中l(wèi)ist,set,數(shù)組之間的轉(zhuǎn)換進(jìn)行了詳細(xì)的分析介紹,需要的朋友可以過來參考下2013-09-09

