Java 阻塞隊列和線程池原理分析
【1】阻塞隊列
一、什么是阻塞隊列?
① 支持阻塞的插入方法:意思是當(dāng)隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
② 支持阻塞的移除方法:意思是在隊列為空時,獲取元素的線程會等待隊列變?yōu)榉强铡?/strong>
在并發(fā)編程中使用生產(chǎn)者和消費者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費線程的工作能力來提高程序整體處理數(shù)據(jù)的速度。
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費者就是消費數(shù)據(jù)的線程。在多線程開發(fā)中,如果生產(chǎn)者處理速度很快,而消費者處理速度很慢,那么生產(chǎn)者就必須等待消費者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費者的處理能力大于生產(chǎn)者,那么消費者就必須等待生產(chǎn)者。
為了解決這種生產(chǎn)消費能力不均衡的問題,便有了生產(chǎn)者和消費者模式。生產(chǎn)者和消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題。生產(chǎn)者和消費者彼此之間不直接通信,而是通過阻塞隊列來進(jìn)行通信,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取,阻塞隊列就相當(dāng)于一個緩沖區(qū),平衡了生產(chǎn)者和消費者的處理能力。
阻塞隊列常用于生產(chǎn)者和消費者的場景,生產(chǎn)者是向隊列里添加元素的線程,消費者是從隊列里取元素的線程。阻塞隊列就是生產(chǎn)者用來存放元素、消費者用來獲取元素的容器。
在Android開發(fā)中阻塞隊列也是常見的 —— Handler機制中的MessageQueue就是優(yōu)先級阻塞隊列
二、阻塞隊列有什么用?
解耦 在生產(chǎn)者和消費者之間解除了耦合
平衡兩者性能差異 平衡了生產(chǎn)者消費者之間的性能差異
三、阻塞隊列的簡單實用
①常見的阻塞隊列主要有那些?
常見的阻塞隊列主要有一下7中:
ArrayBlockingQueue
LinkedBlockingQueue
PriorityBlockingQueue
DelayQueue
SynchronousQueue
LinkedTransferQueue
LinkedBlockingDeque
②阻塞隊列常見的幾種處理方式(并非所有方式都阻塞)
方法\處理方式 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
檢查方法 | element() | peek() | 不可用 | 不可用 |
- 其中只有put和take方法時阻塞的
- 拋出異常:當(dāng)隊列滿時,如果再往隊列里插入元素,會拋出IllegalStateException(“Queuefull”)異常。當(dāng)隊列空時,從隊列里獲取元素會拋出NoSuchElementException異常。
- -返回特殊值:當(dāng)往隊列插入元素時,會返回元素是否插入成功,成功返回true。如果是移除方法,則是從隊列里取出一個元素,如果沒有則返回null。
- 一直阻塞:當(dāng)阻塞隊列滿時,如果生產(chǎn)者線程往隊列里put元素,隊列會一直阻塞生產(chǎn)者線程,直到隊列可用或者響應(yīng)中斷退出。當(dāng)隊列空時,如果消費者線程從隊列里take元素,隊列會阻塞住消費者線程,直到隊列不為空。
- 超時退出:當(dāng)阻塞隊列滿時,如果生產(chǎn)者線程往隊列里插入元素,隊列會阻塞生產(chǎn)者線程一段時間,如果超過了指定的時間,生產(chǎn)者線程就會退出。
③阻塞隊列簡單使用
- 三個線程添加數(shù)據(jù)
- 三個線程消費數(shù)據(jù)
public class MyBlockingQueue { static ArrayBlockingQueue<String> abq = new ArrayBlockingQueue(3); public static void main(String[] args) { // 生產(chǎn)者線程 for (int i = 0; i < 3; i++) { new Thread(() -> producer(), "producerThread" + i).start(); } // 消費者線程 for (int i = 0; i < 3; i++) { new Thread(() -> consumer(), "consumerThread" + i).start(); } } private static void consumer() { while (true) { try { String msg = abq.take(); System.out.println(Thread.currentThread().getName() + " ->receive msg:" + msg); } catch (InterruptedException e) { e.printStackTrace(); } } } private static void producer() { for (int i = 0; i < 100; i++) { try { abq.put("[" + i + "]"); System.out.println(Thread.currentThread().getName() + " ->send msg:" + i); } catch (InterruptedException e) { e.printStackTrace(); } } } }
執(zhí)行結(jié)果:
producerThread1 ->send msg:0
producerThread2 ->send msg:0
producerThread0 ->send msg:0
consumerThread1 ->receive msg:[0]
producerThread1 ->send msg:1
consumerThread2 ->receive msg:[0]
producerThread1 ->send msg:2
producerThread2 ->send msg:1
consumerThread1 ->receive msg:[0]
consumerThread0 ->receive msg:[1]
...
【2】Java 線程池
一、我們?yōu)槭裁葱枰狫ava 線程池?使用它的好處是什么?
①降低資源消耗。
通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
②提高響應(yīng)速度。
通常我們在Java程序中執(zhí)行一個任務(wù)的到結(jié)果分為以下步驟:
1.創(chuàng)建線程 ——> 2.執(zhí)行任務(wù) ——> 3.銷毀線程
當(dāng)任務(wù)到達(dá)時,任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。假設(shè)一個服務(wù)器完成一項任務(wù)所需時間為:T1 創(chuàng)建線程時間,T2 在線程中執(zhí)行任務(wù)的時間,T3 銷毀線程時間。 如果:T1 + T3 遠(yuǎn)大于 T2,則可以采用線程池,以提高服務(wù)器性能。線程池技術(shù)正是關(guān)注如何縮短或調(diào)整T1,T3時間的技術(shù),從而提高服務(wù)器程序性能的。它把T1,T3分別安排在服務(wù)器程序的啟動和結(jié)束的時間段或者一些空閑的時間段,這樣在服務(wù)器程序處理客戶請求時,不會有T1,T3的開銷了。
③提高線程的可管理性。
線程是稀缺資源,如果無限制地創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。
二、Java中主要提供了哪幾種線程的線程池?
Java中主要提供了一下4中線程池:
1、newCachedThreadPool:用來創(chuàng)建一個可以無限擴大的線程池,適用于負(fù)載較輕的場景,執(zhí)行短期異步任務(wù)。(可以使得任務(wù)快速得到執(zhí)行,因為任務(wù)時間執(zhí)行短,可以很快結(jié)束,也不會造成cpu過度切換)
2、newFixedThreadPool:創(chuàng)建一個固定大小的線程池,因為采用無界的阻塞隊列,所以實際線程數(shù)量永遠(yuǎn)不會變化,適用于負(fù)載較重的場景,對當(dāng)前線程數(shù)量進(jìn)行限制。(保證線程數(shù)可控,不會造成線程過多,導(dǎo)致系統(tǒng)負(fù)載更為嚴(yán)重)
3、newSingleThreadExecutor:創(chuàng)建一個單線程的線程池,適用于需要保證順序執(zhí)行各個任務(wù)。
4、newScheduledThreadPool:適用于執(zhí)行延時或者周期性任務(wù)。
三、線程類的繼承關(guān)系
ThreadPoolExecutor
的類關(guān)系Executor
是一個接口,它是Executor框架的基礎(chǔ),它將任務(wù)的提交與任務(wù)的執(zhí)行分離開來。ExecutorService
接口繼承了Executor,在其上做了一些shutdown()、submit()的擴展,可以說是真正的線程池接口;AbstractExecutorService
抽象類實現(xiàn)了ExecutorService接口中的大部分方法;ThreadPoolExecutor
是線程池的核心實現(xiàn)類,用來執(zhí)行被提交的任務(wù)。ScheduledExecutorService
接口繼承了ExecutorService接口,提供了帶"周期執(zhí)行"功能ExecutorService;ScheduledThreadPoolExecutor
是一個實現(xiàn)類,可以在給定的延遲后運行命令,或者定期執(zhí)行命令。ScheduledThreadPoolExecutor比Timer更靈活,功能更強大。
Executor——>ExecutorService——>AbstractExecutorService——>ThreadPoolExecutor
二中常用的幾種線程池都是源自ThreadPoolExecutor,所以我們來分析一下這個類
四、ThreadPoolExecutor參數(shù)的含義 corePoolSize
corePoolSize
線程池中的核心線程數(shù),當(dāng)提交一個任務(wù)時,線程池創(chuàng)建一個新線程執(zhí)行任務(wù),直到當(dāng)前線程數(shù)等于corePoolSize;如果當(dāng)前線程數(shù)為corePoolSize,繼續(xù)提交的任務(wù)被保存到阻塞隊列中,等待被執(zhí)行;
如果執(zhí)行了線程池的prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動所有核心線程。
maximumPoolSize
線程池中允許的最大線程數(shù)。如果當(dāng)前阻塞隊列滿了,且繼續(xù)提交任務(wù),則創(chuàng)建新的線程執(zhí)行任務(wù),前提是當(dāng)前線程數(shù)小于maximumPoolSize
keepAliveTime
線程空閑時的存活時間,即當(dāng)線程沒有任務(wù)執(zhí)行時,繼續(xù)存活的時間。默認(rèn)情況下,該參數(shù)只在線程數(shù)大于corePoolSize時才有用
TimeUnit
keepAliveTime的時間單位
workQueue
workQueue必須是BlockingQueue阻塞隊列。當(dāng)線程池中的線程數(shù)超過它的corePoolSize的時候,線程會進(jìn)入阻塞隊列進(jìn)行阻塞等待。通過workQueue,線程池實現(xiàn)了阻塞功能。
一般來說,我們應(yīng)該盡量使用有界隊列,因為使用無界隊列作為工作隊列會對線程池帶來如下影響。
1)當(dāng)線程池中的線程數(shù)達(dá)到corePoolSize后,新任務(wù)將在無界隊列中等待,因此線程池中的線程數(shù)不會超過corePoolSize。
2)由于1,使用無界隊列時maximumPoolSize將是一個無效參數(shù)。
3)由于1和2,使用無界隊列時keepAliveTime將是一個無效參數(shù)。
4)更重要的,使用無界queue可能會耗盡系統(tǒng)資源,有界隊列則有助于防止資源耗盡,同時即使使用有界隊列,也要盡量控制隊列的大小在一個合適的范圍。
threadFactory
創(chuàng)建線程的工廠,通過自定義的線程工廠可以給每個新建的線程設(shè)置一個具有識別度的線程名,當(dāng)然還可以更加自由的對線程做更多的設(shè)置,比如設(shè)置所有的線程為守護線程。
Executors靜態(tài)工廠里默認(rèn)的threadFactory,線程的命名規(guī)則是“pool-數(shù)字-thread-數(shù)字”。
RejectedExecutionHandler
線程池的飽和策略,當(dāng)阻塞隊列滿了,且沒有空閑的工作線程,如果繼續(xù)提交任務(wù),必須采取一種策略處理該任務(wù),線程池提供了4種策略:
(1)AbortPolicy:直接拋出異常,默認(rèn)策略;
(2)CallerRunsPolicy:用調(diào)用者所在的線程來執(zhí)行任務(wù);
(3)DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務(wù),并執(zhí)行當(dāng)前任務(wù);
(4)DiscardPolicy:直接丟棄任務(wù);
當(dāng)然也可以根據(jù)應(yīng)用場景實現(xiàn)RejectedExecutionHandler接口,自定義飽和策略,如記錄日志或持久化存儲不能處理的任務(wù)。
五、線程池工作流程(機制)
1. 如果當(dāng)前運行的線程少于corePoolSize,則創(chuàng)建新線程來執(zhí)行任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。
2. 如果運行的線程等于或多于corePoolSize,則將任務(wù)加入BlockingQueue。
3. 如果無法將任務(wù)加入BlockingQueue(隊列已滿),則創(chuàng)建新的線程來處理任務(wù)。
4. 如果創(chuàng)建新線程將使當(dāng)前運行的線程超出maximumPoolSize,任務(wù)將被拒絕,并調(diào)用RejectedExecutionHandler.rejectedExecution()方法。
六、關(guān)于兩種提交方法的比較
execute()方法用于提交不需要返回值的任務(wù),所以無法判斷任務(wù)是否被線程池執(zhí)行成功。
submit()方法用于提交需要返回值的任務(wù)。線程池會返回一個future類型的對象,通過這個future對象可以判斷任務(wù)是否執(zhí)行成功,并且可以通過future的get()方法來獲取返回值,get()方法會阻塞當(dāng)前線程直到任務(wù)完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當(dāng)前線程一段時間后立即返回,這時候有可能任務(wù)沒有執(zhí)行完。
相關(guān)文章
Java設(shè)計模式之Template?Pattern模板模式詳解
這篇文章主要介紹了Java設(shè)計模式之Template?Pattern模板模式詳解,模板模式(Template?Pattern)行為型模式之一,抽象父類定義一個操作中的算法的骨架,而將一些步驟延遲到子類中,需要的朋友可以參考下2023-10-10SSH框架網(wǎng)上商城項目第23戰(zhàn)之在線支付功能實現(xiàn)
這篇文章主要為大家詳細(xì)介紹了SSH框架網(wǎng)上商城項目第23戰(zhàn)之在線支付功能實現(xiàn),感興趣的小伙伴們可以參考一下2016-06-06JavaWeb 網(wǎng)上書店 注冊和登陸功能案例詳解
這篇文章主要介紹了JavaWeb 網(wǎng)上書店 注冊和登陸功能,結(jié)合具體案例形式詳細(xì)分析了JavaWeb 網(wǎng)上書店 注冊和登陸功能具體實現(xiàn)步驟、操作技巧與注意事項,需要的朋友可以參考下2019-08-08使用RequestBodyAdvice實現(xiàn)對Http請求非法字符過濾
這篇文章主要介紹了使用RequestBodyAdvice實現(xiàn)對Http請求非法字符過濾的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-06-06Java Socket編程(四) 重復(fù)和并發(fā)服務(wù)器
Java Socket編程(四) 重復(fù)和并發(fā)服務(wù)器...2006-12-12java 輸入一個數(shù)字,反轉(zhuǎn)輸出這個數(shù)字的值(實現(xiàn)方法)
下面小編就為大家?guī)硪黄猨ava 輸入一個數(shù)字,反轉(zhuǎn)輸出這個數(shù)字的值(實現(xiàn)方法)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-10-10Java Arrays.sort()如何實現(xiàn)對int類型數(shù)組倒序排序
這篇文章主要介紹了Java Arrays.sort()如何實現(xiàn)對int類型數(shù)組倒序排序問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-08-08聊聊在獲取方法參數(shù)名方面,Spring真的就比Mybatis強?
在獲取方法參數(shù)名方面,Spring真的就比Mybatis強嗎?今天就帶大家聊聊這個話題,如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12