Java多線程Queue、BlockingQueue和使用BlockingQueue實(shí)現(xiàn)生產(chǎn)消費(fèi)者模型方法解析
Queue是什么
隊(duì)列,是一種數(shù)據(jù)結(jié)構(gòu)。除了優(yōu)先級(jí)隊(duì)列和LIFO隊(duì)列外,隊(duì)列都是以FIFO(先進(jìn)先出)的方式對(duì)各個(gè)元素進(jìn)行排序的。無(wú)論使用哪種排序方式,隊(duì)列的頭都是調(diào)用remove()或poll()移除元素的。在FIFO隊(duì)列中,所有新元素都插入隊(duì)列的末尾。
Queue中的方法
Queue中的方法不難理解,6個(gè),每2對(duì)是一個(gè)也就是總共3對(duì)??匆幌翵DKAPI就知道了:
注意一點(diǎn)就好,Queue通常不允許插入Null,盡管某些實(shí)現(xiàn)(比如LinkedList)是允許的,但是也不建議。
BlockingQueue
1、BlockingQueue概述
BlockingQueue也是java.util.concurrent下的主要用來(lái)控制線程同步的工具。
BlockingQueue有四個(gè)具體的實(shí)現(xiàn)類,根據(jù)不同需求,選擇不同的實(shí)現(xiàn)類
1、ArrayBlockingQueue:一個(gè)由數(shù)組支持的有界阻塞隊(duì)列,規(guī)定大小的BlockingQueue,其構(gòu)造函數(shù)必須帶一個(gè)int參數(shù)來(lái)指明其大小.其所含的對(duì)象是以FIFO(先入先出)順序排序的。
2、LinkedBlockingQueue:大小不定的BlockingQueue,若其構(gòu)造函數(shù)帶一個(gè)規(guī)定大小的參數(shù),生成的BlockingQueue有大小限制,若不帶大小參數(shù),所生成的BlockingQueue的大小由Integer.MAX_VALUE來(lái)決定.其所含的對(duì)象是以FIFO(先入先出)順序排序的。
3、PriorityBlockingQueue:類似于LinkedBlockQueue,但其所含對(duì)象的排序不是FIFO,而是依據(jù)對(duì)象的自然排序順序或者是構(gòu)造函數(shù)的Comparator決定的順序。
4、SynchronousQueue:特殊的BlockingQueue,對(duì)其的操作必須是放和取交替完成的。
LinkedBlockingQueue可以指定容量,也可以不指定,不指定的話,默認(rèn)最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在隊(duì)列滿的時(shí)候會(huì)阻塞直到有隊(duì)列成員被消費(fèi),take方法在隊(duì)列空的時(shí)候會(huì)阻塞,直到有隊(duì)列成員被放進(jìn)來(lái)。
講BlockingQueue,因?yàn)锽lockingQueue是Queue中的一個(gè)重點(diǎn),并且通過(guò)BlockingQueue我們?cè)俅渭由顚?duì)于生產(chǎn)者/消費(fèi)者模型的理解。其他的Queue都不難,通過(guò)查看JDKAPI和簡(jiǎn)單閱讀源碼完全可以理解他們的作用。
BlockingQueue,顧名思義,阻塞隊(duì)列。BlockingQueue是在java.util.concurrent下的,因此不難理解,BlockingQueue是為了解決多線程中數(shù)據(jù)高效安全傳輸而提出的。
多線程中,很多場(chǎng)景都可以使用隊(duì)列實(shí)現(xiàn),比如經(jīng)典的生產(chǎn)者/消費(fèi)者模型,通過(guò)隊(duì)列可以便利地實(shí)現(xiàn)兩者之間數(shù)據(jù)的共享,定義一個(gè)生產(chǎn)者線程,定義一個(gè)消費(fèi)者線程,通過(guò)隊(duì)列共享數(shù)據(jù)就可以了。
當(dāng)然現(xiàn)實(shí)不可能都是理想的,比如消費(fèi)者消費(fèi)速度比生產(chǎn)者生產(chǎn)的速度要快,那么消費(fèi)者消費(fèi)到一定程度上的時(shí)候,必須要暫停等待一下了(使消費(fèi)者線程處于WAITING狀態(tài))。BlockingQueue的提出,就是為了解決這個(gè)問(wèn)題的,他不用程序員去控制這些細(xì)節(jié),同時(shí)還要兼顧效率和線程安全。
阻塞隊(duì)列所謂的"阻塞",指的是某些情況下線程會(huì)掛起(即阻塞),一旦條件滿足,被掛起的線程又會(huì)自動(dòng)喚醒。使用BlockingQueue,不需要關(guān)心什么時(shí)候需要阻塞線程,什么時(shí)候需要喚醒線程,這些內(nèi)容BlockingQueue都已經(jīng)做好了
2、BlockingQueue中的方法
BlockingQueue既然是Queue的子接口,必然有Queue中的方法,上面已經(jīng)列了??匆幌翨lockingQueue中特有的方法:
(1)voidput(Ee)throwsInterruptedException
把e添加進(jìn)BlockingQueue中,如果BlockingQueue中沒(méi)有空間,則調(diào)用線程被阻塞,進(jìn)入等待狀態(tài),直到BlockingQueue中有空間再繼續(xù)
(2)voidtake()throwsInterruptedException
取走BlockingQueue里面排在首位的對(duì)象,如果BlockingQueue為空,則調(diào)用線程被阻塞,進(jìn)入等待狀態(tài),直到BlockingQueue有新的數(shù)據(jù)被加入
(3)intdrainTo(Collection<?superE>c,intmaxElements)
一次性取走BlockingQueue中的數(shù)據(jù)到c中,可以指定取的個(gè)數(shù)。通過(guò)該方法可以提升獲取數(shù)據(jù)效率,不需要多次分批加鎖或釋放鎖
3、ArrayBlockingQueue
基于數(shù)組的阻塞隊(duì)列,必須指定隊(duì)列大小。比較簡(jiǎn)單。ArrayBlockingQueue中只有一個(gè)ReentrantLock對(duì)象,這意味著生產(chǎn)者和消費(fèi)者無(wú)法并行運(yùn)行(見(jiàn)下面的代碼)。另外,創(chuàng)建ArrayBlockingQueue時(shí),可以指定ReentrantLock是否為公平鎖,默認(rèn)采用非公平鎖。
/** Main lock guarding all access */ private final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
4、LinkedBlockingQueue
基于鏈表的阻塞隊(duì)列,和ArrayBlockingQueue差不多。不過(guò)LinkedBlockingQueue如果不指定隊(duì)列容量大小,會(huì)默認(rèn)一個(gè)類似無(wú)限大小的容量,之所以說(shuō)是類似是因?yàn)檫@個(gè)無(wú)限大小是Integer.MAX_VALUE,這么說(shuō)就好理解ArrayBlockingQueue為什么必須要制定大小了,如果ArrayBlockingQueue不指定大小的話就用Integer.MAX_VALUE,那將造成大量的空間浪費(fèi),但是基于鏈表實(shí)現(xiàn)就不一樣的,一個(gè)一個(gè)節(jié)點(diǎn)連起來(lái)而已。另外,LinkedBlockingQueue生產(chǎn)者和消費(fèi)者都有自己的鎖(見(jiàn)下面的代碼),這意味著生產(chǎn)者和消費(fèi)者可以"同時(shí)"運(yùn)行。
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
5、SynchronousQueue
比較特殊,一種沒(méi)有緩沖的等待隊(duì)列。什么叫做沒(méi)有緩沖區(qū),ArrayBlocking中有:
/** The queued items */ private final E[] items;
數(shù)組用以存儲(chǔ)隊(duì)列。LinkedBlockingQueue中有:
/** * Linked list node class */ static class Node<E> { /** The item, volatile to ensure barrier separating write and read */ volatile E item; Node<E> next; Node(E x) { item = x; } }
將隊(duì)列以鏈表形式連接。
生產(chǎn)者/消費(fèi)者操作數(shù)據(jù)實(shí)際上都是通過(guò)這兩個(gè)"中介"來(lái)操作數(shù)據(jù)的,但是SynchronousQueue則是生產(chǎn)者直接把數(shù)據(jù)給消費(fèi)者(消費(fèi)者直接從生產(chǎn)者這里拿數(shù)據(jù)),好像又回到了沒(méi)有生產(chǎn)者/消費(fèi)者模型的老辦法了。換句話說(shuō),每一個(gè)插入操作必須等待一個(gè)線程對(duì)應(yīng)的移除操作。SynchronousQueue又有兩種模式:
1、公平模式
采用公平鎖,并配合一個(gè)FIFO隊(duì)列(Queue)來(lái)管理多余的生產(chǎn)者和消費(fèi)者
2、非公平模式
采用非公平鎖,并配合一個(gè)LIFO棧(Stack)來(lái)管理多余的生產(chǎn)者和消費(fèi)者,這也是SynchronousQueue默認(rèn)的模式
利用BlockingQueue實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型
上一篇我們寫(xiě)的生產(chǎn)者消費(fèi)者模型有局限,局限體現(xiàn)在:
緩沖區(qū)內(nèi)只能存放一個(gè)數(shù)據(jù),實(shí)際生產(chǎn)者/消費(fèi)者模型中的緩沖區(qū)內(nèi)可以存放大量生產(chǎn)者生產(chǎn)出來(lái)的數(shù)據(jù)
生產(chǎn)者和消費(fèi)者處理數(shù)據(jù)的速度幾乎一樣
OK,我們就用BlockingQueue來(lái)簡(jiǎn)單寫(xiě)一個(gè)例子,并且讓生產(chǎn)者、消費(fèi)者處理數(shù)據(jù)速度不同。子類選擇的是ArrayBlockingQueue,大小定為10:
public static void main(String[] args) { final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(10); Runnable producerRunnable = new Runnable() { int i = 0; public void run() { while (true) { try { System.out.println("我生產(chǎn)了一個(gè)" + i++); bq.put(i + ""); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Runnable customerRunnable = new Runnable() { public void run() { while (true) { try { System.out.println("我消費(fèi)了一個(gè)" + bq.take()); Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }; Thread producerThread = new Thread(producerRunnable); Thread customerThread = new Thread(customerRunnable); producerThread.start(); customerThread.start(); }
代碼的做法是讓生產(chǎn)者生產(chǎn)速度快于消費(fèi)者消費(fèi)速度的,看一下運(yùn)行結(jié)果:
我生產(chǎn)了一個(gè)0 我消費(fèi)了一個(gè)1 我生產(chǎn)了一個(gè)1 我生產(chǎn)了一個(gè)2 我消費(fèi)了一個(gè)2 我生產(chǎn)了一個(gè)3 我生產(chǎn)了一個(gè)4 我生產(chǎn)了一個(gè)5 我消費(fèi)了一個(gè)3 我生產(chǎn)了一個(gè)6 我生產(chǎn)了一個(gè)7 我生產(chǎn)了一個(gè)8 我消費(fèi)了一個(gè)4 我生產(chǎn)了一個(gè)9 我生產(chǎn)了一個(gè)10 我生產(chǎn)了一個(gè)11 我消費(fèi)了一個(gè)5 我生產(chǎn)了一個(gè)12 我生產(chǎn)了一個(gè)13 我生產(chǎn)了一個(gè)14 我消費(fèi)了一個(gè)6 我生產(chǎn)了一個(gè)15 我生產(chǎn)了一個(gè)16 我消費(fèi)了一個(gè)7 我生產(chǎn)了一個(gè)17 我消費(fèi)了一個(gè)8 我生產(chǎn)了一個(gè)18
分兩部分來(lái)看輸出結(jié)果:
1、第1行~第23行。這塊BlockingQueue未滿,所以生產(chǎn)者隨便生產(chǎn),消費(fèi)者隨便消費(fèi),基本上都是生產(chǎn)3個(gè)消費(fèi)1個(gè),消費(fèi)者消費(fèi)速度慢
2、第24行~第27行,從前面我們可以看出,生產(chǎn)到16,消費(fèi)到6,說(shuō)明到了ArrayBlockingQueue的極限10了,這時(shí)候沒(méi)辦法,生產(chǎn)者生產(chǎn)一個(gè)ArrayBlockingQueue就滿了,所以不能繼續(xù)生產(chǎn)了,只有等到消費(fèi)者消費(fèi)完才可以繼續(xù)生產(chǎn)。所以之后的打印內(nèi)容一定是一個(gè)生產(chǎn)者、一個(gè)消費(fèi)者
這就是前面一章開(kāi)頭說(shuō)的"通過(guò)平衡生產(chǎn)者和消費(fèi)者的處理能力來(lái)提高整體處理數(shù)據(jù)的速度",這給例子應(yīng)該體現(xiàn)得很明顯。另外,也不要擔(dān)心非單一生產(chǎn)者/消費(fèi)者場(chǎng)景下的系統(tǒng)假死問(wèn)題,緩沖區(qū)空、緩沖區(qū)滿的場(chǎng)景BlockingQueue都是定義了不同的Condition,所以不會(huì)喚醒自己的同類。
總結(jié)
以上就是本文關(guān)于Java多線程Queue、BlockingQueue和使用BlockingQueue實(shí)現(xiàn)生產(chǎn)消費(fèi)者模型方法解析的全部?jī)?nèi)容,希望對(duì)大家有所幫助。感興趣的朋友可以參閱本站:
Java利用future及時(shí)獲取多線程運(yùn)行結(jié)果
如有不足之處,歡迎留言指出。
- Java多線程 生產(chǎn)者消費(fèi)者模型實(shí)例詳解
- Java多線程 BlockingQueue實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模型詳解
- Java基于Lock的生產(chǎn)者消費(fèi)者模型示例
- Java多線程中不同條件下編寫(xiě)生產(chǎn)消費(fèi)者模型方法介紹
- Java并發(fā)編程中的生產(chǎn)者與消費(fèi)者模型簡(jiǎn)述
- Java多線程并發(fā)生產(chǎn)者消費(fèi)者設(shè)計(jì)模式實(shí)例解析
- JAVA多線程實(shí)現(xiàn)生產(chǎn)者消費(fèi)者的實(shí)例詳解
- Java實(shí)現(xiàn)簡(jiǎn)易生產(chǎn)者消費(fèi)者模型過(guò)程解析
相關(guān)文章
Spring Boot 2.X 快速集成單元測(cè)試解析
這篇文章主要介紹了Spring Boot 2.X 快速集成單元測(cè)試解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-08-08spring MVC中傳遞對(duì)象參數(shù)示例詳解
這篇文章主要給大家介紹了在spring MVC中傳遞對(duì)象參數(shù)的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起看吧。2017-06-06Spark學(xué)習(xí)筆記之Spark SQL的具體使用
這篇文章主要介紹了Spark學(xué)習(xí)筆記之Spark SQL的具體使用,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-06-06java:無(wú)法訪問(wèn)org.springframework.boot.SpringApplication的解決方法
這篇文章主要給大家介紹了關(guān)于java:無(wú)法訪問(wèn)org.springframework.boot.SpringApplication的解決方法,文中通過(guò)實(shí)例代碼將解決的辦法介紹的非常詳細(xì),需要的朋友可以參考下2023-01-01Java環(huán)境變量的設(shè)置方法(圖文教程)
想要成功配置Java的環(huán)境變量,那肯定就要安裝JDK,才能開(kāi)始配置的。2013-05-05JavaMail實(shí)現(xiàn)郵件發(fā)送機(jī)制
這篇文章主要為大家詳細(xì)介紹了JavaMail實(shí)現(xiàn)郵件發(fā)送機(jī)制,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-08-08如何基于Autowired對(duì)構(gòu)造函數(shù)進(jìn)行注釋
這篇文章主要介紹了如何基于Autowired對(duì)構(gòu)造函數(shù)進(jìn)行注釋,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-10-10詳解Java?ReentrantReadWriteLock讀寫(xiě)鎖的原理與實(shí)現(xiàn)
ReentrantReadWriteLock讀寫(xiě)鎖是使用AQS的集大成者,用了獨(dú)占模式和共享模式。本文和大家一起理解下ReentrantReadWriteLock讀寫(xiě)鎖的實(shí)現(xiàn)原理,需要的可以了解一下2022-10-10淺談SpringBoot實(shí)現(xiàn)自動(dòng)裝配的方法原理
SpringBoot的自動(dòng)裝配是它的一大特點(diǎn),可以大大提高開(kāi)發(fā)效率,減少重復(fù)性代碼的編寫(xiě)。本文將詳細(xì)講解SpringBoot如何實(shí)現(xiàn)自動(dòng)裝配,需要的朋友可以參考下2023-05-05