java阻塞隊(duì)列BlockingQueue詳細(xì)解讀
前言
在新增的Concurrent包中,BlockingQueue很好的解決了多線程中,如何高效安全“傳輸”數(shù)據(jù)的問(wèn)題。通過(guò)這些高效并且線程安全的隊(duì)列類(lèi),為我們快速搭建高質(zhì)量的多線程程序帶來(lái)極大的便利。本文詳細(xì)介紹了BlockingQueue家庭中的所有成員,包括他們各自的功能以及常見(jiàn)使用場(chǎng)景。
BlockingQueue: j.u.c包下的提供了線程安全的隊(duì)列訪問(wèn)的接口,并發(fā)包下很多高級(jí)同步類(lèi)的實(shí)現(xiàn)都是基于阻塞隊(duì)列列實(shí)現(xiàn)的
1、當(dāng)阻塞列進(jìn)行插入數(shù)據(jù)時(shí),如果隊(duì)列已滿,線程將會(huì)阻塞等待直到隊(duì)列非滿
2、從阻塞隊(duì)列讀數(shù)據(jù)時(shí),如果隊(duì)列為空,線程將會(huì)阻塞等待直到隊(duì)列里面是非空的時(shí)候
- ArrayBlockingQueue: 基于數(shù)組實(shí)現(xiàn)的?個(gè)阻塞隊(duì)列,需要指定容量??, FIFO先進(jìn)先出順序
- LinkedBlockingQueue: 基于鏈表實(shí)現(xiàn)的?個(gè)阻塞隊(duì)列,如果不指定容量??,默認(rèn)Integer.MAX_VALUE, FIFO先進(jìn)先出順序
- PriorityBlockingQueue: ?個(gè)?持優(yōu)先級(jí)的?界阻塞隊(duì)列,默認(rèn)情況下元素采??然順序升序排序,也可以?定義排序?qū)崿F(xiàn) java.lang.Comparable接?
- DelayQueue: 延遲隊(duì)列,在指定時(shí)間才能獲取隊(duì)列元素的功能,隊(duì)列頭元素是最接近過(guò)期的元素, ??的對(duì)象必須實(shí)現(xiàn) java.util.concurrent.Delayed 接?并實(shí)現(xiàn)CompareTo和getDelay?法
認(rèn)識(shí)BlockingQueue
阻塞隊(duì)列,顧名思義,首先它是一個(gè)隊(duì)列,而一個(gè)隊(duì)列在數(shù)據(jù)結(jié)構(gòu)中所起的作用大致如下圖所示:
從上圖我們可以很清楚看到,通過(guò)一個(gè)共享的隊(duì)列,可以使得數(shù)據(jù)由隊(duì)列的一端輸入,從另外一端輸出;
常用的隊(duì)列主要有以下兩種:(當(dāng)然通過(guò)不同的實(shí)現(xiàn)方式,還可以延伸出很多不同類(lèi)型的隊(duì)列,DelayQueue就是其中的一種)
先進(jìn)先出(FIFO):先插入的隊(duì)列的元素也最先出隊(duì)列,類(lèi)似于排隊(duì)的功能。從某種程度上來(lái)說(shuō)這種隊(duì)列也體現(xiàn)了一種公平性。
后進(jìn)先出(LIFO):后插入隊(duì)列的元素最先出隊(duì)列,這種隊(duì)列優(yōu)先處理最近發(fā)生的事件。
多線程環(huán)境中,通過(guò)隊(duì)列可以很容易實(shí)現(xiàn)數(shù)據(jù)共享,比如經(jīng)典的“生產(chǎn)者”和“消費(fèi)者”模型中,通過(guò)隊(duì)列可以很便利地實(shí)現(xiàn)兩者之間的數(shù)據(jù)共享。假設(shè)我們有若干生產(chǎn)者線程,另外又有若干個(gè)消費(fèi)者線程。如果生產(chǎn)者線程需要把準(zhǔn)備好的數(shù)據(jù)共享給消費(fèi)者線程,利用隊(duì)列的方式來(lái)傳遞數(shù)據(jù),就可以很方便地解決他們之間的數(shù)據(jù)共享問(wèn)題。但如果生產(chǎn)者和消費(fèi)者在某個(gè)時(shí)間段內(nèi),萬(wàn)一發(fā)生數(shù)據(jù)處理速度不匹配的情況呢?理想情況下,如果生產(chǎn)者產(chǎn)出數(shù)據(jù)的速度大于消費(fèi)者消費(fèi)的速度,并且當(dāng)生產(chǎn)出來(lái)的數(shù)據(jù)累積到一定程度的時(shí)候,那么生產(chǎn)者必須暫停等待一下(阻塞生產(chǎn)者線程),以便等待消費(fèi)者線程把累積的數(shù)據(jù)處理完畢,反之亦然。然而,在concurrent包發(fā)布以前,在多線程環(huán)境下,我們每個(gè)程序員都必須去自己控制這些細(xì)節(jié),尤其還要兼顧效率和線程安全,而這會(huì)給我們的程序帶來(lái)不小的復(fù)雜度。好在此時(shí),強(qiáng)大的concurrent包橫空出世了,而他也給我們帶來(lái)了強(qiáng)大的BlockingQueue。(在多線程領(lǐng)域:所謂阻塞,在某些情況下會(huì)掛起線程(即阻塞),一旦條件滿足,被掛起的線程又會(huì)自動(dòng)被喚醒),下面兩幅圖演示了BlockingQueue的兩個(gè)常見(jiàn)阻塞場(chǎng)景:
如上圖所示:當(dāng)隊(duì)列中沒(méi)有數(shù)據(jù)的情況下,消費(fèi)者端的所有線程都會(huì)被自動(dòng)阻塞(掛起),直到有數(shù)據(jù)放入隊(duì)列。
如上圖所示:當(dāng)隊(duì)列中填滿數(shù)據(jù)的情況下,生產(chǎn)者端的所有線程都會(huì)被自動(dòng)阻塞(掛起),直到隊(duì)列中有空的位置,線程被自動(dòng)喚醒。
這也是我們?cè)诙嗑€程環(huán)境下,為什么需要BlockingQueue的原因。作為BlockingQueue的使用者,我們?cè)僖膊恍枰P(guān)心什么時(shí)候需要阻塞線程,什么時(shí)候需要喚醒線程,因?yàn)檫@一切BlockingQueue都給你一手包辦了。既然BlockingQueue如此神通廣大,讓我們一起來(lái)見(jiàn)識(shí)下它的常用方法:
BlockingQueue的核心方法
放入數(shù)據(jù)
(1)offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,則返回true,否則返回false.(本方法不阻塞當(dāng)前執(zhí)行方法的線程);
(2)offer(E o, long timeout, TimeUnit unit):可以設(shè)定等待的時(shí)間,如果在指定的時(shí)間內(nèi),還不能往隊(duì)列中加入BlockingQueue,則返回失敗。
(3)put(anObject):把a(bǔ)nObject加到BlockingQueue里,如果BlockQueue沒(méi)有空間,則調(diào)用此方法的線程被阻斷直到BlockingQueue里面有空間再繼續(xù).
獲取數(shù)據(jù)
(1)poll(time):取走BlockingQueue里排在首位的對(duì)象,若不能立即取出,則可以等time參數(shù)規(guī)定的時(shí)間,取不到時(shí)返回null;
(2)poll(long timeout, TimeUnit unit):從BlockingQueue取出一個(gè)隊(duì)首的對(duì)象,如果在指定時(shí)間內(nèi),隊(duì)列一旦有數(shù)據(jù)可取,則立即返回隊(duì)列中的數(shù)據(jù)。否則知道時(shí)間超時(shí)還沒(méi)有數(shù)據(jù)可取,返回失敗。
(3)take():取走BlockingQueue里排在首位的對(duì)象,若BlockingQueue為空,阻斷進(jìn)入等待狀態(tài)直到BlockingQueue有新的數(shù)據(jù)被加入;
(4)drainTo():一次性從BlockingQueue獲取所有可用的數(shù)據(jù)對(duì)象(還可以指定獲取數(shù)據(jù)的個(gè)數(shù)),通過(guò)該方法,可以提升獲取數(shù)據(jù)效率;不需要多次分批加鎖或釋放鎖。
常見(jiàn)BlockingQueue
在了解了BlockingQueue的基本功能后,讓我們來(lái)看看BlockingQueue家庭大致有哪些成員?
ArrayBlockingQueue
基于數(shù)組的阻塞隊(duì)列實(shí)現(xiàn),在ArrayBlockingQueue內(nèi)部,維護(hù)了一個(gè)定長(zhǎng)數(shù)組,以便緩存隊(duì)列中的數(shù)據(jù)對(duì)象,這是一個(gè)常用的阻塞隊(duì)列,除了一個(gè)定長(zhǎng)數(shù)組外,ArrayBlockingQueue內(nèi)部還保存著兩個(gè)整形變量,分別標(biāo)識(shí)著隊(duì)列的頭部和尾部在數(shù)組中的位置。
ArrayBlockingQueue在生產(chǎn)者放入數(shù)據(jù)和消費(fèi)者獲取數(shù)據(jù),都是共用同一個(gè)鎖對(duì)象,由此也意味著兩者無(wú)法真正并行運(yùn)行,這點(diǎn)尤其不同于LinkedBlockingQueue;按照實(shí)現(xiàn)原理來(lái)分析,ArrayBlockingQueue完全可以采用分離鎖,從而實(shí)現(xiàn)生產(chǎn)者和消費(fèi)者操作的完全并行運(yùn)行。Doug Lea之所以沒(méi)這樣去做,也許是因?yàn)锳rrayBlockingQueue的數(shù)據(jù)寫(xiě)入和獲取操作已經(jīng)足夠輕巧,以至于引入獨(dú)立的鎖機(jī)制,除了給代碼帶來(lái)額外的復(fù)雜性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個(gè)明顯的不同之處在于,前者在插入或刪除元素時(shí)不會(huì)產(chǎn)生或銷(xiāo)毀任何額外的對(duì)象實(shí)例,而后者則會(huì)生成一個(gè)額外的Node對(duì)象。這在長(zhǎng)時(shí)間內(nèi)需要高效并發(fā)地處理大批量數(shù)據(jù)的系統(tǒng)中,其對(duì)于GC的影響還是存在一定的區(qū)別。而在創(chuàng)建ArrayBlockingQueue時(shí),我們還可以控制對(duì)象的內(nèi)部鎖是否采用公平鎖,默認(rèn)采用非公平鎖。
LinkedBlockingQueue
基于鏈表的阻塞隊(duì)列,同ArrayListBlockingQueue類(lèi)似,其內(nèi)部也維持著一個(gè)數(shù)據(jù)緩沖隊(duì)列(該隊(duì)列由一個(gè)鏈表構(gòu)成),當(dāng)生產(chǎn)者往隊(duì)列中放入一個(gè)數(shù)據(jù)時(shí),隊(duì)列會(huì)從生產(chǎn)者手中獲取數(shù)據(jù),并緩存在隊(duì)列內(nèi)部,而生產(chǎn)者立即返回;只有當(dāng)隊(duì)列緩沖區(qū)達(dá)到最大值緩存容量時(shí)(LinkedBlockingQueue可以通過(guò)構(gòu)造函數(shù)指定該值),才會(huì)阻塞生產(chǎn)者隊(duì)列,直到消費(fèi)者從隊(duì)列中消費(fèi)掉一份數(shù)據(jù),生產(chǎn)者線程會(huì)被喚醒,反之對(duì)于消費(fèi)者這端的處理也基于同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理并發(fā)數(shù)據(jù),還因?yàn)槠鋵?duì)于生產(chǎn)者端和消費(fèi)者端分別采用了獨(dú)立的鎖來(lái)控制數(shù)據(jù)同步,這也意味著在高并發(fā)的情況下生產(chǎn)者和消費(fèi)者可以并行地操作隊(duì)列中的數(shù)據(jù),以此來(lái)提高整個(gè)隊(duì)列的并發(fā)性能。
作為開(kāi)發(fā)者,我們需要注意的是,如果構(gòu)造一個(gè)LinkedBlockingQueue對(duì)象,而沒(méi)有指定其容量大小,LinkedBlockingQueue會(huì)默認(rèn)一個(gè)類(lèi)似無(wú)限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產(chǎn)者的速度一旦大于消費(fèi)者的速度,也許還沒(méi)有等到隊(duì)列滿阻塞產(chǎn)生,系統(tǒng)內(nèi)存就有可能已被消耗殆盡了。
ArrayBlockingQueue和LinkedBlockingQueue是兩個(gè)最普通也是最常用的阻塞隊(duì)列,一般情況下,在處理多線程間的生產(chǎn)者消費(fèi)者問(wèn)題,使用這兩個(gè)類(lèi)足以。
下面的代碼演示了如何使用BlockingQueue:
生產(chǎn)者線程
import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @Author: ruan * Date: 2021/9/25 17:36 * @Description: 生產(chǎn)者線程 */ public class Producer implements Runnable { /** * 標(biāo)識(shí)位---是否運(yùn)行中 */ private volatile boolean isRunning = true; /** * 創(chuàng)建阻塞隊(duì)列 */ private BlockingQueue<String> queue; /** * 用于記錄庫(kù)存量 */ private static AtomicInteger count = new AtomicInteger(0); public Producer(BlockingQueue<String> queue) { this.queue = queue; } /** * 構(gòu)造函數(shù) */ @Override public void run() { String data = null; Random random = new Random(); System.out.println("生產(chǎn)者線程啟動(dòng)...."); while (isRunning){ try { System.out.println("正在生產(chǎn)數(shù)據(jù)..."); Thread.sleep(random.nextInt(1000)); data = "data" + count.incrementAndGet(); System.out.println("將數(shù)據(jù):" + data + "放入隊(duì)列..."); if (!queue.offer(data,2, TimeUnit.SECONDS)){ System.out.println("數(shù)據(jù)放入失敗"); } } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("生產(chǎn)者線程關(guān)閉"); } } } public void stop(){ isRunning = false; } }
消費(fèi)者線程
import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; /** * @Author: ruan * Date: 2021/9/25 17:48 * @Description: 消費(fèi)者線程 */ public class Consumer implements Runnable { /** * 創(chuàng)建阻塞隊(duì)列 */ private BlockingQueue<String> queue; /** * 構(gòu)造函數(shù) */ public Consumer(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { System.out.println("消費(fèi)者線程啟動(dòng)..."); Random random = new Random(); boolean isRunning = true; while (isRunning){ System.out.println("正在從隊(duì)列中獲取數(shù)據(jù)"); try { String data = queue.poll(2, TimeUnit.SECONDS); if (null != data){ System.out.println("拿到數(shù)據(jù):" + data); System.out.println("正在消費(fèi)數(shù)據(jù):" + data); Thread.sleep(random.nextInt(1000)); }else { //如果2s內(nèi)無(wú)數(shù)據(jù),認(rèn)為生產(chǎn)者線程退出,消費(fèi)者線程也退出 isRunning = false; } } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.println("消費(fèi)者線程退出"); } } } }
主程序
import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; /** * @Author: ruan * Date: 2021/9/25 17:56 * @Description: */ public class Main { public static void main(String[] args) throws InterruptedException { /** * 創(chuàng)建阻塞隊(duì)列 */ BlockingQueue<String> queue = new LinkedBlockingDeque<>(10); /** * 創(chuàng)建生產(chǎn)者和消費(fèi)者 */ Producer producer1 = new Producer(queue); Producer producer2 = new Producer(queue); Producer producer3 = new Producer(queue); Consumer consumer = new Consumer(queue); /** * 創(chuàng)建線程 */ ExecutorService service = Executors.newCachedThreadPool(); service.execute(producer1); service.execute(producer2); service.execute(producer3); service.execute(consumer); Thread.sleep(10000); //停止生產(chǎn) producer1.stop(); producer2.stop(); producer3.stop(); Thread.sleep(2000); service.shutdown(); } }
DelayQueue
DelayQueue中的元素只有當(dāng)其指定的延遲時(shí)間到了,才能夠從隊(duì)列中獲取到該元素。DelayQueue是一個(gè)沒(méi)有大小限制的隊(duì)列,因此往隊(duì)列中插入數(shù)據(jù)的操作(生產(chǎn)者)永遠(yuǎn)不會(huì)被阻塞,而只有獲取數(shù)據(jù)的操作(消費(fèi)者)才會(huì)被阻塞。
使用場(chǎng)景:
DelayQueue使用場(chǎng)景較少,但都相當(dāng)巧妙,常見(jiàn)的例子比如使用一個(gè)DelayQueue來(lái)管理一個(gè)超時(shí)未響應(yīng)的連接隊(duì)列。
PriorityBlockingQueue
基于優(yōu)先級(jí)的阻塞隊(duì)列(優(yōu)先級(jí)的判斷通過(guò)構(gòu)造函數(shù)傳入的Compator對(duì)象來(lái)決定),但需要注意的是PriorityBlockingQueue并不會(huì)阻塞數(shù)據(jù)生產(chǎn)者,而只會(huì)在沒(méi)有可消費(fèi)的數(shù)據(jù)時(shí),阻塞數(shù)據(jù)的消費(fèi)者。因此使用的時(shí)候要特別注意,生產(chǎn)者生產(chǎn)數(shù)據(jù)的速度絕對(duì)不能快于消費(fèi)者消費(fèi)數(shù)據(jù)的速度,否則時(shí)間一長(zhǎng),會(huì)最終耗盡所有的可用堆內(nèi)存空間。在實(shí)現(xiàn)PriorityBlockingQueue時(shí),內(nèi)部控制線程同步的鎖采用的是公平鎖。
SynchronousQueue
一種無(wú)緩沖的等待隊(duì)列,類(lèi)似于無(wú)中介的直接交易,有點(diǎn)像原始社會(huì)中的生產(chǎn)者和消費(fèi)者,生產(chǎn)者拿著產(chǎn)品去集市銷(xiāo)售給產(chǎn)品的最終消費(fèi)者,而消費(fèi)者必須親自去集市找到所要商品的直接生產(chǎn)者,如果一方?jīng)]有找到合適的目標(biāo),那么對(duì)不起,大家都在集市等待。相對(duì)于有緩沖的BlockingQueue來(lái)說(shuō),少了一個(gè)中間經(jīng)銷(xiāo)商的環(huán)節(jié)(緩沖區(qū)),如果有經(jīng)銷(xiāo)商,生產(chǎn)者直接把產(chǎn)品批發(fā)給經(jīng)銷(xiāo)商,而無(wú)需在意經(jīng)銷(xiāo)商最終會(huì)將這些產(chǎn)品賣(mài)給那些消費(fèi)者,由于經(jīng)銷(xiāo)商可以庫(kù)存一部分商品,因此相對(duì)于直接交易模式,總體來(lái)說(shuō)采用中間經(jīng)銷(xiāo)商的模式會(huì)吞吐量高一些(可以批量買(mǎi)賣(mài));但另一方面,又因?yàn)榻?jīng)銷(xiāo)商的引入,使得產(chǎn)品從生產(chǎn)者到消費(fèi)者中間增加了額外的交易環(huán)節(jié),單個(gè)產(chǎn)品的及時(shí)響應(yīng)性能可能會(huì)降低。
聲明一個(gè)SynchronousQueue有兩種不同的方式,它們之間有著不太一樣的行為。公平模式和非公平模式的區(qū)別:
如果采用公平模式:SynchronousQueue會(huì)采用公平鎖,并配合一個(gè)FIFO隊(duì)列來(lái)阻塞多余的生產(chǎn)者和消費(fèi)者,從而體系整體的公平策略;
但如果是非公平模式(SynchronousQueue默認(rèn)):SynchronousQueue采用非公平鎖,同時(shí)配合一個(gè)LIFO隊(duì)列來(lái)管理多余的生產(chǎn)者和消費(fèi)者,而后一種模式,如果生產(chǎn)者和消費(fèi)者的處理速度有差距,則很容易出現(xiàn)饑渴的情況,即可能有某些生產(chǎn)者或者是消費(fèi)者的數(shù)據(jù)永遠(yuǎn)都得不到處理。
小結(jié)
BlockingQueue不光實(shí)現(xiàn)了一個(gè)完整隊(duì)列所具有的基本功能,同時(shí)在多線程環(huán)境下,他還自動(dòng)管理了多線間的自動(dòng)等待于喚醒功能,從而使得程序員可以忽略這些細(xì)節(jié),關(guān)注更高級(jí)的功能。
到此這篇關(guān)于java阻塞隊(duì)列BlockingQueue詳細(xì)解讀的文章就介紹到這了,更多相關(guān)java阻塞隊(duì)列BlockingQueue內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Springboot接收Get參數(shù)實(shí)踐過(guò)程
本文主要介紹了在Spring Boot中如何接收不同類(lèi)型的請(qǐng)求參數(shù),包括在路徑中直接傳遞參數(shù)、跟在問(wèn)號(hào)后面?zhèn)鬟f參數(shù)、使用Map接收參數(shù)、接收數(shù)組以及使用對(duì)象接收參數(shù)等方法2024-12-12Java后端SSM框架圖片上傳功能實(shí)現(xiàn)方法解析
這篇文章主要介紹了Java后端SSM框架圖片上傳功能實(shí)現(xiàn)方法解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-06-06Spring MVC之@RequestMapping注解詳解
本篇文章主要介紹了Spring MVC之@RequestMapping 詳解,RequestMapping是一個(gè)用來(lái)處理請(qǐng)求地址映射的注解,可用于類(lèi)或方法上。有興趣的可以了解一下。2017-01-01java+selenium實(shí)現(xiàn)自動(dòng)化打開(kāi)頁(yè)面的方法
今天小編就為大家分享一篇java+selenium實(shí)現(xiàn)自動(dòng)化打開(kāi)頁(yè)面的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-05-05Springboot添加jvm監(jiān)控實(shí)現(xiàn)數(shù)據(jù)可視化
這篇文章主要介紹了Springboot添加jvm監(jiān)控實(shí)現(xiàn)數(shù)據(jù)可視化,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-04-04Java Socket編程實(shí)例(四)- NIO TCP實(shí)踐
這篇文章主要講解Java Socket編程中NIO TCP的實(shí)例,希望能給大家做一個(gè)參考。2016-06-06Java之定時(shí)器Timer和定時(shí)任務(wù)TimerTask應(yīng)用以及原理解讀
文章介紹了Java JDK自帶的定時(shí)器Timer和定時(shí)任務(wù)TimerTask的使用和原理,Timer和TimerTask成對(duì)出現(xiàn),Timer是定時(shí)器,TimerTask是定時(shí)任務(wù),TimerTask實(shí)現(xiàn)Runnable接口的run方法,Timer的屬性TimerThreadthread繼承Thread2024-12-12Java+opencv3.2.0實(shí)現(xiàn)hough直線檢測(cè)
這篇文章主要為大家詳細(xì)介紹了Java+opencv3.2.0之hough直線檢測(cè),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-02-02