Java多線程使用阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型詳解
1. 什么是阻塞隊(duì)列
在數(shù)據(jù)結(jié)構(gòu)的學(xué)習(xí)中,我們知道了隊(duì)列有普通隊(duì)列、循環(huán)隊(duì)列,它們都遵循“先進(jìn)先出”的原則。
阻塞隊(duì)列也遵循這個(gè)原則,它是一種特殊的隊(duì)列(帶有阻塞功能的隊(duì)列),并且滿足以下兩點(diǎn):
- 當(dāng)隊(duì)列滿的時(shí)候,如果繼續(xù)往隊(duì)列中插入數(shù)據(jù),則發(fā)生阻塞狀態(tài),直到有數(shù)據(jù)出隊(duì)列。
- 當(dāng)隊(duì)列空的時(shí)候,如果往外取數(shù)據(jù),也發(fā)生阻塞狀態(tài),直到有數(shù)序入隊(duì)列。
Java 標(biāo)準(zhǔn)庫中的阻塞隊(duì)列為:BlockingDeque<>,是一個(gè)泛型接口。因此,我們使用的時(shí)候直接遵循標(biāo)準(zhǔn)庫的寫法即可。注意以下兩點(diǎn):
- BlockingDeque 是一個(gè)接口,因此我們實(shí)例對(duì)象時(shí)用的是 LinkedBlockingQueue類。
- put 方法用于阻塞式的入隊(duì)列, take 用于阻塞式的出隊(duì)列。
通過上述介紹,我們可以寫出一段簡(jiǎn)易的阻塞隊(duì)列代碼:
public static void main(String[] args) throws InterruptedException { //BlockingQueue<>為阻塞隊(duì)列的原型 BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(); //take(取元素)、put(插入元素)為阻塞隊(duì)列的兩個(gè)核心方法 blockingQueue.put(20);//插入元素20 Integer result = blockingQueue.take();//從隊(duì)頭取元素 System.out.println(result); }
運(yùn)行后打印:
通過上述代碼,大家已經(jīng)對(duì)阻塞隊(duì)列有了一個(gè)淺的認(rèn)識(shí),當(dāng)然你可以可以多 take 幾次來達(dá)到阻塞效果。
阻塞隊(duì)列主要用于“生產(chǎn)者消費(fèi)者模型”,是實(shí)際開發(fā)中常用到的,下面我就來介紹它的用法。
2. 生產(chǎn)者消費(fèi)者模型
什么是生產(chǎn)者消費(fèi)者模型?從字面上來看,前者是生產(chǎn)者,后者是消費(fèi)者。
因此,生產(chǎn)者與消費(fèi)者之間進(jìn)行交互需要一個(gè)中間平臺(tái),這個(gè)平臺(tái)就是阻塞隊(duì)列,如果沒有中間平臺(tái)交易就會(huì)產(chǎn)生一定風(fēng)險(xiǎn)、效率也會(huì)降低很多。
生產(chǎn)者消費(fèi)者體現(xiàn):過年大家都包餃子,假設(shè)一家有三個(gè)人員,人員1 搟餃子皮,搟完后放在砧板上,人員2 和 人員3 負(fù)責(zé)包餃子。這樣一個(gè)例子中 人員1 就是生產(chǎn)者,砧板就是平臺(tái),人員2 和 人員3 是消費(fèi)者。如果三個(gè)人員自己搟皮自己包,這樣的效率是非常低的?。ㄖ挥幸粋€(gè)搟面杖、無砧板情況下)
中間平臺(tái)優(yōu)點(diǎn)體現(xiàn):假如,有兩個(gè)服務(wù)器它們直接進(jìn)行交互。服務(wù)器1掛了,緊接著服務(wù)器2也掛了。因此,我們需要一個(gè)中間平臺(tái)(阻塞隊(duì)列),連接這兩個(gè)服務(wù)器并進(jìn)行交互。這樣無論那一個(gè)服務(wù)器掛了也不影響另一個(gè)服務(wù)器。
生產(chǎn)者消費(fèi)者模型的優(yōu)點(diǎn)有很多,但最突出了有兩點(diǎn):解耦合和削峰填谷。請(qǐng)看下方講解。
2.1 解耦合
大家都聽過高內(nèi)聚低耦合這個(gè)概念,在此我來做個(gè)解釋:
何為內(nèi)聚,舉個(gè)例子:在快遞站拿快遞,我們可以根據(jù)貨物號(hào)來快速的找到想要的物品,這就是高內(nèi)聚。
但某一天,快遞站來了個(gè)怪人,他在找快遞的過程中把每個(gè)拿起來的快遞都隨意放在其他位置。因此別人再去找自己的快遞時(shí)就不能快速的找到自己的快遞了,這就是低內(nèi)聚的一個(gè)體現(xiàn)。
在 Java 中高內(nèi)聚主要體現(xiàn)在代碼的條理性,相關(guān)聯(lián)的代碼很好的放在一起。低內(nèi)聚則是相關(guān)聯(lián)的代碼沒有放在一起,東一塊、西一塊。
何為耦合,主要體現(xiàn)一個(gè)關(guān)聯(lián)性。也是舉個(gè)例子:假設(shè)我的親人生病住院了,我會(huì)放下手中的一切去好好照顧他/她,哪怕對(duì)我現(xiàn)實(shí)生活影響很大,我也義無反顧。這樣的行為就是高耦合的。
但我的女神生病了,她發(fā)了個(gè)朋友圈。由于我和她只是“朋友圈點(diǎn)贊之交”,我只會(huì)給她點(diǎn)個(gè)贊并且評(píng)論句多喝熱水。因?yàn)樗×藢?duì)我的影響是很低的,所以可以稱為低耦合。
耦合高,在 Java 主要體現(xiàn)在多個(gè)模塊之間的關(guān)聯(lián),關(guān)聯(lián)越強(qiáng)耦合越高,關(guān)聯(lián)越弱耦合越低。
回歸正題,阻塞隊(duì)列的解耦合主要體現(xiàn)在多個(gè)線程之間進(jìn)行交互。如以下例子:
在上、下圖中,A、B、C是我們的業(yè)務(wù)服務(wù)器,會(huì)經(jīng)常更改代碼, 因此會(huì)經(jīng)常出現(xiàn) bug 就容易掛。通過消費(fèi)者模型就能很好的避免這個(gè)問題。
當(dāng)然,阻塞隊(duì)列服務(wù)器也會(huì)掛,但相對(duì)于ABC業(yè)務(wù)服務(wù)器來說掛的機(jī)率較小。
2.2 削峰填谷
三峽大壩利用的就是削峰填谷機(jī)制,有效緩解了電力系統(tǒng)在高峰期的壓力和在低峰期的浪費(fèi)現(xiàn)象。
當(dāng)電力系統(tǒng)電力值達(dá)到高峰時(shí),三峽大壩則會(huì)把部分的水存儲(chǔ)在水庫里面,只放出適合的水流量,減少并調(diào)節(jié)電力系統(tǒng)的負(fù)荷,有效緩解電力系統(tǒng)在高峰期的浪費(fèi)現(xiàn)象。
當(dāng)電力處于低峰期時(shí)也就是電力供給不足的情況,三峽大壩會(huì)把水庫里存儲(chǔ)的水給放出來,通過電站的發(fā)電量、水庫的排水等措施,緩解了電力系統(tǒng)在低峰期的電力不足。
上述例子就是削峰填谷的一個(gè)簡(jiǎn)單理解,在 Java 中阻塞隊(duì)列就能達(dá)到削峰填谷的功能。
當(dāng)服務(wù)器與服務(wù)器之間進(jìn)行交互常常是以一個(gè)很平緩的速率進(jìn)行的,但某一時(shí)刻突然達(dá)到了一個(gè)峰值。
這個(gè)時(shí)候阻塞隊(duì)列就能把峰值帶來的壓力給頂下來,讓服務(wù)器之間還是以平穩(wěn)的速率進(jìn)行交互。
如:服務(wù)器A 作為生產(chǎn)者,服務(wù)器B 作為消費(fèi)者,服務(wù)器A 最高可達(dá)到 1秒3萬 次的速率,服務(wù)器B 最高只能 1秒1萬 次這時(shí)候就會(huì)出現(xiàn)下圖這樣的問題。
上圖中 服務(wù)器A 作為生產(chǎn)者、服務(wù)器B 作為消費(fèi)者。當(dāng) 服務(wù)器A 收到的請(qǐng)求多了。回復(fù)給阻塞隊(duì)列的內(nèi)容也變多了。
但 服務(wù)器B 最多能接受 1秒1萬 次的數(shù)據(jù)。因此,阻塞隊(duì)列就會(huì)把多的請(qǐng)求存儲(chǔ)下來并按照 1秒1萬 次的速率給 服務(wù)器B 傳輸數(shù)據(jù),這樣就不會(huì)導(dǎo)致 服務(wù)器B 崩潰。
以上的三峽大壩、服務(wù)器交互的例子就是對(duì)削峰填谷進(jìn)行的一個(gè)講解,當(dāng)然比較淺顯。具體代碼的實(shí)現(xiàn),請(qǐng)看下方講解。
2.3 生產(chǎn)者消費(fèi)者案例
生產(chǎn)者消費(fèi)者主要體現(xiàn)一個(gè)線程生產(chǎn),一個(gè)線程消費(fèi)。如下代碼:
public static void main(String[] args) { BlockingDeque<Integer> blockingDeque = new LinkedBlockingDeque<>(); //消費(fèi)者 Thread thread1 = new Thread(()->{ while (true) { try { int value = blockingDeque.take(); System.out.println("消費(fèi)者: "+value); } catch (InterruptedException e) { e.printStackTrace(); } } }); thread1.start();//啟動(dòng)線程1 //生產(chǎn)者 Thread thread2 = new Thread(()->{ int value = 1; while (true) { try { blockingDeque.put(value); System.out.println("生產(chǎn)者: "+value); Thread.sleep(1000); value++; } catch (InterruptedException e) { e.printStackTrace(); } } }); thread2.start();//啟動(dòng)線程2 }
運(yùn)行后打?。?/p>
以上代碼不難看懂,主要用到阻塞隊(duì)列的 take 和 put 方法。生產(chǎn)者 thread2 使用 put 方法生產(chǎn)元素,消費(fèi)者 thread1 使用 take 方法消費(fèi)元素。
注意,在線程內(nèi)調(diào)用 take 或put 方法,都得 try/catch InterruptedException 這個(gè)異常。我們直接Alt+Enter take 或 put方法即可。
3. 阻塞隊(duì)列生產(chǎn)者消費(fèi)者模型的實(shí)現(xiàn)
使用阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式過程如下:
首先我們要讓這個(gè)隊(duì)列循環(huán)下去,如何讓一個(gè)隊(duì)列循環(huán)下去,最好實(shí)現(xiàn)方法就是使用循環(huán)隊(duì)列。
設(shè)計(jì)中我們可以用 head 作為隊(duì)頭元素下標(biāo)、tail 作為隊(duì)尾元素下標(biāo)、size 作為當(dāng)前元素的個(gè)數(shù)。
head 等于 tail 的時(shí)候證明是初始狀態(tài)(隊(duì)列空),或者是隊(duì)列已滿。因此,有以下幾點(diǎn)注意事項(xiàng):
入隊(duì)列:
- 當(dāng) size 等于隊(duì)列長(zhǎng)度時(shí),證明隊(duì)列已滿,此時(shí)不能插入數(shù)據(jù)。
- 當(dāng) tail 等于隊(duì)列長(zhǎng)度時(shí),tail 置為0,從第一個(gè)位置開始插入元素。
出隊(duì)列:
- 當(dāng) size 等于 0 時(shí),證明隊(duì)列已空,此時(shí)不能出數(shù)據(jù)。
- 當(dāng) head 等于隊(duì)列長(zhǎng)度時(shí)候,head 置為 0 ,從第一個(gè)元素開始出元素。
當(dāng)然,為了達(dá)到阻塞的效果,在隊(duì)列滿狀態(tài)或空狀態(tài)的方法里面使用 wait 方法造成阻塞狀態(tài)。在插元素方法里面里面 notify 喚醒隊(duì)列空時(shí)的阻塞狀態(tài),在拿元素里面 notify 喚醒隊(duì)列滿時(shí)的阻塞狀態(tài)。
具體代碼實(shí)現(xiàn)如下:
class MyBlockingQueue { int [] array = new int[100];//定義一個(gè)數(shù)組為隊(duì)列 int head = 0;//隊(duì)頭下標(biāo) int tail = 0;//隊(duì)尾下標(biāo) int size = 0;//元素個(gè)數(shù) //模擬實(shí)現(xiàn) put 方法 synchronized public void put(int value) throws InterruptedException { if (size == array.length) { this.wait();//隊(duì)列已滿設(shè)為阻塞狀態(tài) } array[tail] = value;//把value值放在數(shù)組對(duì)應(yīng)下標(biāo)中 tail++;//隊(duì)尾下表自增 size++;//元素個(gè)數(shù)自增 if (tail == array.length) { tail = 0;//隊(duì)尾下標(biāo)重置為0 } this.notify();//喚醒隊(duì)列空的阻塞狀態(tài) } //模擬實(shí)現(xiàn) take 方法 synchronized public int take() throws InterruptedException { if (size == 0){ this.wait();//隊(duì)列已空設(shè)為阻塞狀態(tài) } int value = array[head];//隊(duì)頭元素負(fù)責(zé)個(gè)value head++;//隊(duì)頭下標(biāo)往后自增 size--;//元素個(gè)數(shù)自減 if (head == array.length) { head = 0;//隊(duì)頭下標(biāo)置為0 } this.notify();//喚醒隊(duì)列滿的阻塞狀態(tài) return value;//返回隊(duì)頭元素 } } public class ThreadDemo2 { public static void main(String[] args) { MyBlockingQueue myBlockingQueue = new MyBlockingQueue(); //生產(chǎn)者 Thread thread1 = new Thread(()-> { int i = 1; while (true) { try { System.out.println("生產(chǎn)者: "+i); myBlockingQueue.put(i); i++; Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); thread1.start(); //消費(fèi)者 Thread thread2 = new Thread(()-> { while (true) { try { int i = myBlockingQueue.take(); System.out.println("消費(fèi)者: "+i); } catch (InterruptedException e) { e.printStackTrace(); } } }); thread2.start(); } }
運(yùn)行后打?。?/p>
以上代碼,我使用一個(gè)數(shù)組來模擬實(shí)現(xiàn)循環(huán)隊(duì)列的這樣更容易去理解。其他細(xì)節(jié)大家可以在代碼中的注釋進(jìn)行理解。 隊(duì)列已經(jīng)循環(huán)隊(duì)列不太熟悉朋友可以回頭好好復(fù)習(xí)一下。
注意,一個(gè)隊(duì)列不可能為空狀態(tài)又為滿狀態(tài),因此在上述代碼中,notify 喚醒的都是對(duì)方的狀態(tài)。這樣一個(gè)阻塞隊(duì)列生產(chǎn)者消費(fèi)者模式就能很好的實(shí)現(xiàn)了。
另外,阻塞隊(duì)列不存在線程安全問題,因?yàn)樽枞?duì)列底層有加鎖機(jī)制。因此,大家可以安心使用。
到此這篇關(guān)于Java多線程使用阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型詳解的文章就介紹到這了,更多相關(guān)Java多線程生產(chǎn)者消費(fèi)者模型內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot搭配AOP實(shí)現(xiàn)自定義注解
這篇文章主要為大家詳細(xì)介紹了SpringBoot如何搭配AOP實(shí)現(xiàn)自定義注解,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2022-12-12詳解spring batch的使用和定時(shí)器Quart的使用
spring Batch是一個(gè)基于Spring的企業(yè)級(jí)批處理框架,它通過配合定時(shí)器Quartz來輕易實(shí)現(xiàn)大批量的數(shù)據(jù)讀取或插入,并且全程自動(dòng)化,無需人員管理2017-08-08Java設(shè)計(jì)者模式簡(jiǎn)單工廠模式解析
這篇文章主要介紹了Java設(shè)計(jì)者模式簡(jiǎn)單工廠模式解析,介紹了其簡(jiǎn)介,實(shí)例以及優(yōu)缺點(diǎn)分析,具有一定參考價(jià)值,需要的朋友可以了解下。2017-11-11