Java基于阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型示例詳解
一、阻塞式隊(duì)列
什么是阻塞式隊(duì)列(有兩點(diǎn)):
- 第一點(diǎn):當(dāng)隊(duì)列滿的時(shí)候,如果此時(shí)入隊(duì)列的話就會出現(xiàn)阻塞,直到其它線程從隊(duì)列中取走元素為止。
- 第二點(diǎn):當(dāng)隊(duì)列為空的時(shí)候,如果繼續(xù)出隊(duì)列,此時(shí)就會出現(xiàn)阻塞,一直阻塞到其它線程往隊(duì)列中添加元素為止。
二、生產(chǎn)者消費(fèi)者模型
什么是生產(chǎn)者消費(fèi)者模型
生產(chǎn)者消費(fèi)者模型是常見的多線程編程模型,可以用來解決生產(chǎn)者和消費(fèi)者之間的數(shù)據(jù)交互問題。
阻塞隊(duì)列的最主要的一個(gè)目的之一就是實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型(基于阻塞隊(duì)列實(shí)現(xiàn)),生產(chǎn)者消費(fèi)主模型是處理多線程問題的一種方式。
生產(chǎn)消費(fèi)者模型的優(yōu)勢
生產(chǎn)者消費(fèi)主模型的優(yōu)勢:針對分布式系統(tǒng)有兩個(gè)優(yōu)勢,一個(gè)是解耦合(耦合我們可以理解為依賴程度)、另一個(gè)是削峰填谷。
- 解耦合:生產(chǎn)者和消費(fèi)主之間通過緩沖區(qū)進(jìn)行解耦合,而不會對彼此產(chǎn)生直接的依賴,我們通過引入生產(chǎn)者消費(fèi)者模型(即阻塞隊(duì)列)就可以達(dá)到解耦合的效果,但是付出的代價(jià)就是效率有所降低。
- 削峰填谷:服務(wù)器接收到的來自用戶端的請求數(shù)量可能會因?yàn)橐恍┩话l(fā)時(shí)間而暴增,此時(shí)服務(wù)器面臨的壓力就非常大了。我們要知道一臺服務(wù)器承擔(dān)的上限是一樣的,不同的服務(wù)器所能承擔(dān)的上限又是不同的。(機(jī)器的硬件資源(CPU、內(nèi)存、硬盤、網(wǎng)絡(luò)帶寬等等)是有限的,而服務(wù)器每處理一個(gè)請求都需要消耗一定的資源,請求足夠多直到機(jī)器的硬件資源招架不住的時(shí)候服務(wù)器也就掛了)通過引入生產(chǎn)消費(fèi)者模型(即阻塞隊(duì)列)就可以起到一個(gè)緩沖的作用,其中阻塞隊(duì)列就承擔(dān)了服務(wù)器的一部分壓力,然后當(dāng)峰值消退的時(shí)候,服務(wù)器接收到的請求就相對較少了,此時(shí)服務(wù)器由于阻塞隊(duì)列的原因依然可以按照既定的順序處理請求。
阻塞隊(duì)列只是一個(gè)數(shù)據(jù)結(jié)構(gòu),如果我們把這個(gè)數(shù)據(jù)結(jié)構(gòu)單獨(dú)實(shí)現(xiàn)稱了一個(gè)服務(wù)器程序,并且使用單獨(dú)的主機(jī)或者主機(jī)群來進(jìn)行部署的話,此時(shí)阻塞式隊(duì)列就進(jìn)化成了消息隊(duì)列。而在Java標(biāo)準(zhǔn)庫中已經(jīng)實(shí)現(xiàn)了阻塞隊(duì)列,并且實(shí)現(xiàn)了三種阻塞隊(duì)列的實(shí)現(xiàn)方式:
三、生產(chǎn)者消費(fèi)者舉例代碼
生產(chǎn)消費(fèi)者模型代碼如下(基于阻塞式隊(duì)列):
import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; // 生產(chǎn)消費(fèi)者模型——阻塞隊(duì)列 public class Demo20 { public static void main(String[] args) { // 創(chuàng)建一個(gè)阻塞隊(duì)列來作為交易場所 BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10); Thread t1 = new Thread(() -> { int count = 0; while(true) { try { queue.put(count); System.out.println("生產(chǎn)元素:" + count); count++; Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread t2 = new Thread(() -> { while(true) { while(true) { try { Integer n = queue.take(); System.out.println("消費(fèi)元素:" + n); } catch (InterruptedException e) { e.printStackTrace(); } } } }); t1.start(); t2.start(); } }
代碼運(yùn)行結(jié)果如下:
四、基于阻塞式隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
現(xiàn)在,我們自己來基于循環(huán)隊(duì)列來實(shí)現(xiàn)阻塞式隊(duì)列。注意我們這里實(shí)現(xiàn)的阻塞隊(duì)列是基于數(shù)組、基于循環(huán)隊(duì)列的阻塞隊(duì)列。
我們在實(shí)現(xiàn)阻塞隊(duì)列的時(shí)候有以下幾點(diǎn)需要注意:
- 線程安全問題:需要給put方法和take()方法進(jìn)行加鎖操作。
- 經(jīng)過加鎖之后還需要考慮到內(nèi)存可見性問題,這里就涉及到volatile關(guān)鍵字的使用。
- 阻塞狀態(tài)以及阻塞狀態(tài)的解除時(shí)機(jī)要把握好(即wait()方法和notify()方法的使用)。
- wait()方法不一定是被notify()方法喚醒的,還有可能是被interrupt()方法喚醒的:如果interrupt方法是按照try catch的形式來進(jìn)行編寫的,一旦interrupt方法喚醒wait方法,接著執(zhí)行完catch之后,代碼并不會結(jié)束而是繼續(xù)往后執(zhí)行,此時(shí)就會出現(xiàn)覆蓋元素的問題。(解決方法,使用while循環(huán)不斷等待和檢查條件。如果不使用 while 循環(huán)在狀態(tài)被滿足之前不斷地等待和檢查條件,就有可能在 wait 方法返回之后仍然不能安全地進(jìn)行操作,這可能導(dǎo)致程序出現(xiàn)異常和錯(cuò)誤。)強(qiáng)烈建議使用wait方法的時(shí)候搭配while循環(huán)來判定條件
代碼如下:
class MyBlockQueue { // 使用string類型的數(shù)組來保存元素,我們假設(shè)這里只存string private String[] items = new String[1000]; //head表示指向隊(duì)列的頭部 volatile private int head = 0; volatile private int tail = 0; volatile private int size = 0; // size表示元素個(gè)數(shù) private Object locker = new Object(); public void put(String elem) throws InterruptedException { synchronized(locker) { while(size >= items.length) { //隊(duì)列已滿 locker.wait(); //return; } items[tail] = elem; tail++; if(tail >= items.length) { tail = 0; } //tail++和下面的if判斷可以替換成tail = (tail + 1) % (items.length) //但是站在CPU的角度來看,其實(shí)還是簡單的if判斷比較快 size++; locker.notify(); // 用來喚醒隊(duì)列為空的阻塞情況 } } //出隊(duì)列 public String take() throws InterruptedException { synchronized(locker) { while(size == 0) { locker.wait(); } String elem = items[head]; head++; if(head >= items.length) { head = 0; } size--; //使用notify來喚醒隊(duì)列阻塞滿的情況 locker.notify(); return elem; } } } public class Demo21 { public static void main(String[] args) { // 創(chuàng)建兩個(gè)線程分別表示消費(fèi)者和生產(chǎn)者 MyBlockQueue queue = new MyBlockQueue(); Thread t1 = new Thread(() -> { int count = 0; while(true) { try { queue.put(count + ""); System.out.println("生產(chǎn)元素: " + count); count++; } catch (InterruptedException e) { e.printStackTrace(); } } }); Thread t2 = new Thread(() -> { while(true) { try { String count = queue.take(); System.out.println("消費(fèi)元素: " + count); Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); t1.start(); t2.start(); } }
以上就是Java基于阻塞隊(duì)列實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型示例詳解的詳細(xì)內(nèi)容,更多關(guān)于Java阻塞隊(duì)列的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Mybatis 實(shí)現(xiàn)打印sql語句的代碼
這篇文章主要介紹了Mybatis 實(shí)現(xiàn)打印sql語句的代碼,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07Java多態(tài)(動(dòng)力節(jié)點(diǎn)Java學(xué)院整理)
多態(tài)是指允許不同類的對象對同一消息做出響應(yīng)。即同一消息可以根據(jù)發(fā)送對象的不同而采用多種不同的行為方式。接下來通過本文給大家介紹java多態(tài)相關(guān)知識,感興趣的朋友一起學(xué)習(xí)吧2017-04-04深入了解Spring中最常用的11個(gè)擴(kuò)展點(diǎn)
我們一說到spring,可能第一個(gè)想到的是?IOC(控制反轉(zhuǎn))?和?AOP(面向切面編程)。除此之外,我們在使用spring的過程中,有沒有發(fā)現(xiàn)它的擴(kuò)展能力非常強(qiáng)。今天就來跟大家一起聊聊,在Spring中最常用的11個(gè)擴(kuò)展點(diǎn)2022-09-09Java自帶定時(shí)任務(wù)ScheduledThreadPoolExecutor實(shí)現(xiàn)定時(shí)器和延時(shí)加載功能
今天小編就為大家分享一篇關(guān)于Java自帶定時(shí)任務(wù)ScheduledThreadPoolExecutor實(shí)現(xiàn)定時(shí)器和延時(shí)加載功能,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2018-12-12SpringMvc靜態(tài)資源訪問實(shí)現(xiàn)方法代碼實(shí)例
這篇文章主要介紹了SpringMvc靜態(tài)資源訪問實(shí)現(xiàn)方法代碼實(shí)例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-08-08SpringBoot應(yīng)用部署于外置Tomcat容器的方法
這篇文章主要介紹了SpringBoot應(yīng)用部署于外置Tomcat容器的方法,本文分步驟給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2018-06-06