JavaEE多線程中阻塞隊列的項目實踐
1. 前言
阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:
- 在隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强?/li>
- 當隊列滿時,存儲元素的線程會等待隊列可用
阻塞隊列常用于生產(chǎn)者和消費者的場景,生產(chǎn)者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產(chǎn)者存放元素的容器,而消費者也只從容器里拿元素。
2. 什么是生產(chǎn)者-消費者模型
生產(chǎn)者消費者模型是一種多線程并發(fā)協(xié)作的模型,由兩類線程和一個緩沖區(qū)組成:生產(chǎn)者線程生產(chǎn)數(shù)據(jù)并把數(shù)據(jù)放在緩沖區(qū),消費者線程從緩沖區(qū)取數(shù)據(jù)并消費。生產(chǎn)者和消費者在同一時間段內(nèi)共用同一個存儲空間,生產(chǎn)者往存儲空間中添加產(chǎn)品,消費者從存儲空間中取走產(chǎn)品。當存儲空間為空時,消費者阻塞;當存儲空間滿時,生產(chǎn)者阻塞。
3. 生產(chǎn)者-消費者模型的意義
- 解耦合
- 削峰填谷
3.1 解耦合
兩個模塊的聯(lián)系越緊密,耦合度就越高,耦合度越高就意味著兩個模塊的影響程度很大,當一個模塊出現(xiàn)問題的時候,另一個模塊也會受到影響而導致出現(xiàn)問題,特別是對分布式系統(tǒng)來說:解耦合是非常重要的。
假設(shè)上面是一個簡單的分布式系統(tǒng),服務(wù)器 A 和服務(wù)器 B 之間直接進行交互(服務(wù)器 A 向服務(wù)器 B 發(fā)送請求并接收服務(wù)器 B 返回的信息,服務(wù)器 B 向服務(wù)器 A 發(fā)送請求,以及接收服務(wù)器 A 返回的信息),服務(wù)器 A 和服務(wù)器 B 之間的耦合度比較高,當兩個服務(wù)器之間的一個發(fā)生故障的時候就會導致兩個服務(wù)器都無法使用。
不僅如此,當我們想要再添加一個服務(wù)器 C 與服務(wù)器 A 之間進行交互的時候,不僅需要對服務(wù)器 C 做出修改,還需要對服務(wù)器 A 作出修改。
相比上面的情況,如果我們使用生產(chǎn)者-消費者模型的話就可以解決上面的耦合度過高的問題。
服務(wù)器 A 接收到客戶端發(fā)來的請求不是直接發(fā)送給服務(wù)器 B ,而是將接收到的請求加入到阻塞隊列中,然后服務(wù)器 B 從阻塞隊列中獲取到請求,這樣就避免了兩個服務(wù)器之間進行直接的交互,降低了耦合性;不僅如此,當我們需要額外添加一個服務(wù)器 C 的時候,就不需要對服務(wù)器 A 做出修改,而是直接從阻塞隊列獲取請求信息。
3.2 削峰填谷
當客戶端向服務(wù)器 A 短時間發(fā)出大量請求信息的話,那么當服務(wù)器 A 接收到客戶端發(fā)來的請求的時候,就會立即將收到的所有信息都發(fā)送給服務(wù)器 B ,但是由于雖然服務(wù)器 A 能夠接收的請求量可以很多,但是服務(wù)器 B 卻不能一次接收這么多請求,就會導致服務(wù)器 B 會掛掉。
如果使用生產(chǎn)者-消費者莫模型的話,當客戶端向服務(wù)器 A 短時間發(fā)送大量請求的話,服務(wù)器 A 不會將請求發(fā)送給 B ,而是發(fā)送給阻塞隊列中,當阻塞隊列滿了的時候,服務(wù)器 A 就會停止向阻塞隊列中發(fā)送請求,陷入阻塞狀態(tài),等服務(wù)器 B 向阻塞隊列中受到請求使得阻塞隊列容量減少的時候,服務(wù)器 A 才會繼續(xù)向阻塞隊列中發(fā)送收到的請求,這樣就避免了服務(wù)器 B 短時間內(nèi)受到大量的請求而掛掉的情況;如果阻塞隊列中收到的請求信息被讀取完的時候,服務(wù)器 B 就會停止從阻塞隊列中讀取請求,進入阻塞狀態(tài),直到服務(wù)器 A 向阻塞隊列中發(fā)送請求。
4. 如何使用Java標準庫提供的阻塞隊列
當知道了生產(chǎn)者-消費者模型的意義之后,我們就來看看如何使用阻塞隊列。在Java標準庫中提供了阻塞隊列 BlockingQueue 可以直接使用。
因為 BlockingQueu 是一個接口,無法實例化,所以需要創(chuàng)建出實現(xiàn)了 BlockingQueue 接口的類,而 ArrayBlockingQueue 和 LinkedBlockingQueue 實現(xiàn)了這個接口。
我們還可以觀察到,BlockingQueue 還繼承了 Queue ,也就是說我們也可以使用 Queue 中的方法,比如 offer 和 poll 等,但是在阻塞隊列中不使用這兩個方法,因為這兩個方法不具有阻塞特性,而是使用 put 和 take 方法。
public class Demo1 { public static void main(String[] args) throws InterruptedException { BlockingQueue<String> queue = new LinkedBlockingQueue<>(); queue.put("123"); queue.put("234"); queue.put("345"); queue.put("456"); System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); System.out.println(queue.take()); } }
這里向阻塞隊列中加入了四個數(shù)據(jù),但是讀取的時候讀取了五次,所以看到線程進入了阻塞狀態(tài)。
5. 自己實現(xiàn)一個阻塞隊列
阻塞隊列是建立在隊列的基礎(chǔ)上的,所以要想實現(xiàn)一個阻塞隊列,首先需要實現(xiàn)出來一個隊列,那么就先來看看如何實現(xiàn)出一個循環(huán)隊列。
5.1 實現(xiàn)出循環(huán)隊列
隊列比較容易實現(xiàn),但是循環(huán)隊列該如何實現(xiàn)呢?當數(shù)據(jù)到達數(shù)組的最后的時候,將數(shù)組的下標修改為0,這樣就可以達到循環(huán)的目的。
當 tail == head 的時候有兩種情況:
- 隊列中沒有數(shù)據(jù)的時候
- 隊列滿了的時候
為了區(qū)分這兩種情況,我們可以使用兩種方法:
- 浪費一個空間,當tail++之后,如果tail + 1 == head,則表示隊列滿了,將 tail 修改為 0
- 定義一個size變量來表示隊列中有效數(shù)據(jù)的個數(shù),當size == queue.length的時候,表示隊列滿了
class MyQueue { private final String[] data = new String[1000]; private int size; private int head = 0; private int tail = 0; public void put(String str) { //當添加數(shù)據(jù)的時候需要判斷隊列的容量是否已經(jīng)滿了 if(size == data.length) return; data[tail++] = str; size++; if(tail == data.length) tail = 0; } public String take() { //讀取數(shù)據(jù)的時候需要判斷隊列是否為空 if(size == 0) return null; String ret = data[head++]; size--; if(head == data.length) head = 0; return ret; } }
5.2 實現(xiàn)阻塞隊列
阻塞隊列就是在隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强?;當隊列滿時,存儲元素的線程會等待隊列可用。并且因為阻塞隊列運用的環(huán)境是多線程,需要考慮到線程安全的問題。
5.2.1 加鎖
當需要進行查詢和修改的操作時,需要對該操作進行加鎖。因為我們的 put 和 take 基本上都在查詢和修改數(shù)據(jù),所以可以將這兩個操作直接進行加鎖操作。
class MyBlockingQueue { private final String[] data = new String[1000]; private int size; private int head = 0; private int tail = 0; public void put(String str) { synchronized (this) { if(size == data.length) return; data[tail++] = str; size++; if(tail == data.length) tail = 0; } } public String take() { synchronized (this) { if(size == 0) return null; String ret = data[head++]; size--; if(head == data.length) head = 0; return ret; } } }
5.2.2 進行阻塞操作
當進行完加鎖操作之后,我們還需要實現(xiàn)阻塞的作用,當添加數(shù)據(jù)的時候,如果隊列中容量滿了的時候就進入阻塞等待狀態(tài),直到進行了 take 讀取數(shù)據(jù)操作刪除數(shù)據(jù)的時候,才停止等待;當讀取數(shù)據(jù)的時候,如果隊列為空,那么該線程就進入阻塞等待狀態(tài),直到進行了 put 操作。
class MyBlockingQueue { private final String[] data = new String[1000]; private int size; private int head = 0; private int tail = 0; public void put(String str) throws InterruptedException { synchronized (this) { if(size == data.length) { this.wait(); } data[tail++] = str; size++; if(tail == data.length) tail = 0; //這個 notify 用來喚醒 take 操作的等待 this.notify(); } } public String take() throws InterruptedException { synchronized (this) { if(size == 0) { this.wait(); } String ret = data[head++]; size--; if(head == data.length) head = 0; //這個 notify 用來喚醒 put 操作的等待 this.notify(); return ret; } } }
5.2.3 解決因被 interrupt 喚醒 waiting 狀態(tài)的問題
當使用了 wait 和 notify 對 put 和 take 操作進行了阻塞等待和喚醒操作之后,我們還需要注意,難道只有 notify 才會喚醒 WAITING 狀態(tài)嗎?前面我們學習了使用 interrupt 來終止線程,但是 interrup 還會喚醒處于 WAITING 狀態(tài)的線程,也就是說這里的 WAITING 狀態(tài)的線程不僅可以被 notify 喚醒,還可以被 interrupt 喚醒。
- 當線程是因為 put 操作隊列滿了的時候進入阻塞等待狀態(tài)的時候,如果是因為被 interrupt 喚醒而不是 take 操作的 notify 喚醒的時候就意味著此時隊列還是滿的,當進行添加操作的時候,就會將有效的數(shù)據(jù)覆蓋掉;
- 當線程是因為 take 操作隊列為空的時候進入阻塞等待狀態(tài)的時候,如果是因為被 interrupt 喚醒而不是 put 操作的 notify 喚醒的時候就意味著此時隊列還是空的,如果進行刪除操作,并沒有意義。
為了解決 WAITING 狀態(tài)被 interrupt 喚醒而造成的問題,當線程被喚醒的時候,需要進行判斷 size 是否還等于 0 或者 queue.length,如果還等于,就繼續(xù)進入 WAITING 狀態(tài),但是光一次判斷是不夠的,因為還可能是被 interrupt 喚醒的,所以需要進行多次判斷,可以用 while 循環(huán)來解決。
class MyBlockingQueue { private final String[] data = new String[1000]; private int size; private int head = 0; private int tail = 0; public void put(String str) throws InterruptedException { synchronized (this) { while(size == data.length) { this.wait(); } data[tail++] = str; size++; if(tail == data.length) tail = 0; //這個 notify 用來喚醒 take 操作的等待 this.notify(); } } public String take() throws InterruptedException { synchronized (this) { while(size == 0) { this.wait(); } String ret = data[head++]; size--; if(head == data.length) head = 0; //這個 notify 用來喚醒 put 操作的等待 this.notify(); return ret; } } }
5.2.4 解決因指令重排序造成的問題
因為 put 和 take 操作要進行讀和寫的操作,可能會因為指令重排序的問題造成其他問題,這里就需要使用 volatile 解決指令重排序問題。
class MyBlockingQueue { private final String[] data = new String[1000]; private volatile int size; private volatile int head = 0; private volatile int tail = 0; public void put(String str) throws InterruptedException { synchronized (this) { while(size == data.length) { this.wait(); } data[tail++] = str; size++; if(tail == data.length) tail = 0; //這個 notify 用來喚醒 take 操作的等待 this.notify(); } } public String take() throws InterruptedException { synchronized (this) { while(size == 0) { this.wait(); } String ret = data[head++]; size--; if(head == data.length) head = 0; //這個 notify 用來喚醒 put 操作的等待 this.notify(); return ret; } } }
測試實現(xiàn)的阻塞隊列
public class Demo4 { public static void main(String[] args) { MyBlockingQueue queue = new MyBlockingQueue(); Thread t1 = new Thread(() -> { while(true) { try { System.out.println("消費元素" + queue.take()); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); Thread t2 = new Thread(() -> { int count = 1; while(true) { try { queue.put(count + ""); System.out.println("生產(chǎn)元素" + count); count++; Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t1.start(); t2.start(); } }
讓生產(chǎn)速度較慢,使得讀取操作阻塞等待插入數(shù)據(jù)才執(zhí)行。
public class Demo4 { public static void main(String[] args) { MyBlockingQueue queue = new MyBlockingQueue(); Thread t1 = new Thread(() -> { while(true) { try { System.out.println("消費元素" + queue.take()); Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); Thread t2 = new Thread(() -> { int count = 1; while(true) { try { queue.put(count + ""); System.out.println("生產(chǎn)元素" + count); count++; } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t1.start(); t2.start(); } }
讓生產(chǎn) put 操作進入阻塞等待狀態(tài)。
到此這篇關(guān)于JavaEE多線程中阻塞隊列的項目實踐的文章就介紹到這了,更多相關(guān)JavaEE 阻塞隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Rabbitmq延遲隊列實現(xiàn)定時任務(wù)的方法
這篇文章主要介紹了Rabbitmq延遲隊列實現(xiàn)定時任務(wù),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-05-05SpringCloud實現(xiàn)基于RabbitMQ消息隊列的詳細步驟
在Spring Cloud框架中,我們可以利用RabbitMQ實現(xiàn)強大而可靠的消息隊列系統(tǒng),本篇將詳細介紹如何在Spring Cloud項目中集成RabbitMQ,并創(chuàng)建一個簡單的消息隊列,感興趣的朋友一起看看吧2024-03-03Spring Boot集成spring-boot-devtools開發(fā)時實現(xiàn)熱部署的方式
這篇文章主要介紹了Spring Boot集成spring-boot-devtools開發(fā)時實現(xiàn)熱部署的方式,文中還給大家提到了spring boot 實現(xiàn)熱部署的方式及集成注意事項,感興趣的朋友跟隨腳本之家小編一起學習吧2018-05-05使用Filter過濾器中訪問getSession()要轉(zhuǎn)化
這篇文章主要介紹了使用Filter過濾器中訪問getSession()要轉(zhuǎn)化,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01還在用if(obj!=null)做非空判斷,帶你快速上手Optional
這篇文章主要介紹了還在用if(obj!=null)做非空判斷,帶你快速上手Optional,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-05-05JAVA中l(wèi)ist,set,數(shù)組之間的轉(zhuǎn)換詳解
以下是對JAVA中l(wèi)ist,set,數(shù)組之間的轉(zhuǎn)換進行了詳細的分析介紹,需要的朋友可以過來參考下2013-09-09