關(guān)于Java如何用好線程池的方法分享(建議收藏)
1. 線程的使用場(chǎng)景
異步任務(wù)
簡(jiǎn)單來(lái)說(shuō)就是某些不需要同步返回業(yè)務(wù)處理結(jié)果的場(chǎng)景,比如:短信、郵件等通知類業(yè)務(wù),評(píng)論、點(diǎn)贊等互動(dòng)性業(yè)務(wù)。
并行計(jì)算
就像MapReduce一樣,充分利用多線程的并行計(jì)算能力,將大任務(wù)拆分為多個(gè)子任務(wù),最后再將所有子任務(wù)計(jì)算后的結(jié)果進(jìn)行匯總,F(xiàn)orkJoinPool就是JDK中典型的并行計(jì)算框架。
串行任務(wù)
很簡(jiǎn)單,假設(shè)某個(gè)方法需要經(jīng)過(guò),A、B、C三個(gè)步驟,A步驟耗時(shí)1秒,B步驟耗時(shí)2秒,C步驟耗時(shí)3秒,那么如果是串行處理,則該方法最終需要耗時(shí)6秒,但如果A、B、C三個(gè)步驟互相之間是沒(méi)有依賴的,那么就可以利用多線程的方式,同時(shí)處理三個(gè)步驟,這樣該方法只需要等待耗時(shí)最長(zhǎng)的步驟結(jié)束即可。
2. 線程池創(chuàng)建
不要直接使用Executors創(chuàng)建線程池,應(yīng)通過(guò)ThreadPoolExecutor的方式,主動(dòng)明確線程池的參數(shù),避免產(chǎn)生意外。
每個(gè)參數(shù)都要顯示設(shè)置,例如像下面這樣:
private static final ExecutorService executor = new ThreadPoolExecutor( 2, 4, 1L, TimeUnit.MINUTES, new LinkedBlockingQueue<>(100), new ThreadFactoryBuilder().setNameFormat("common-pool-%d").build(), new ThreadPoolExecutor.CallerRunsPolicy());
3. 參數(shù)的配置建議
CorePoolSize(核心線程數(shù))
一般在配置核心線程數(shù)的時(shí)候,是需要結(jié)合線程池將要處理任務(wù)的特性來(lái)決定的,而任務(wù)的性質(zhì)一般可以劃分為:CPU密集型、I/O密集型。
比較通用的配置方式如下
CPU密集型:一般建議線程的核心數(shù)與CPU核心數(shù)保持一致。 I/O密集型:一般可以設(shè)置2倍的CPU核心數(shù)的線程數(shù),因?yàn)榇祟惾蝿?wù)CPU比較空閑,可以多分配點(diǎn)線程充分利用CPU資源來(lái)提高效率。
通過(guò)Runtime.getRuntime().availableProcessors()可以獲取核心線程數(shù)。
另外還有一個(gè)公式可以借鑒
線程核心數(shù) = cpu核心數(shù) / (1-阻塞系數(shù))
阻塞系數(shù) = 阻塞時(shí)間/(阻塞時(shí)間+使用CPU的時(shí)間)
實(shí)際上大多數(shù)線上業(yè)務(wù)所消耗的時(shí)間主要就是I/O等待,因此一般線程數(shù)都可以設(shè)置的多一點(diǎn),比如tomcat中默認(rèn)的線程數(shù)就是200,所以最佳的核心線程數(shù)是需要根據(jù)特定場(chǎng)景,然后通過(guò)實(shí)際上線上允許結(jié)果分析后,再不斷的進(jìn)行調(diào)整。
MaximumPoolSize
maximumPoolSize的設(shè)置也是看實(shí)際應(yīng)用場(chǎng)景,如果設(shè)置的和corePoolSize一樣,那就完全依靠阻塞隊(duì)列和拒絕策略來(lái)控制任務(wù)的處理情況,如果設(shè)置的比corePoolSize稍微大一點(diǎn),那可能對(duì)于一些突然流量的場(chǎng)景更使用。
KeepAliveTime
由maximumPoolSize創(chuàng)建出來(lái)的線程,在經(jīng)過(guò)keepAliveTime時(shí)間后進(jìn)行銷毀,依舊突發(fā)流量持續(xù)的時(shí)間來(lái)決定。
WorkQueue
那么阻塞隊(duì)列應(yīng)該設(shè)置多大呢?我們知道當(dāng)線程池中所有的線程都在工作時(shí),如果再有任務(wù)進(jìn)來(lái),就會(huì)被放到阻塞隊(duì)列中等待,如果阻塞隊(duì)列設(shè)置的太小,可能很快隊(duì)列就滿了,導(dǎo)致任務(wù)被丟棄或者異常(由拒絕策略決定),如果隊(duì)列設(shè)置的太大,又可能會(huì)帶來(lái)內(nèi)存資源的緊張,甚至OOM,以及任務(wù)延遲時(shí)間過(guò)長(zhǎng)。
所以阻塞隊(duì)列的大小,又是要結(jié)合實(shí)際場(chǎng)景來(lái)設(shè)置的。
一般會(huì)根據(jù)處理任務(wù)的速度與任務(wù)產(chǎn)生的速度進(jìn)行計(jì)算得到一個(gè)大概的數(shù)值。
假設(shè)現(xiàn)在有1個(gè)線程,每秒鐘可以處理10個(gè)任務(wù),正常情況下每秒鐘產(chǎn)生的任務(wù)數(shù)小于10,那么此時(shí)隊(duì)列長(zhǎng)度為10就足以。 但是如果高峰時(shí)期,每秒產(chǎn)生的任務(wù)數(shù)會(huì)達(dá)到20,會(huì)持續(xù)10秒,且任務(wù)又不希望丟棄,那么此時(shí)隊(duì)列的長(zhǎng)度就需要設(shè)置到100。
監(jiān)控workQueue中等待任務(wù)的數(shù)量是非常重要的,只有了解實(shí)際的情況,才能做出正確的決定。
ThreadFactory
通過(guò)threadFactory我們可以自定義線程組的名字,設(shè)置合理的名稱將有利于你線上進(jìn)行問(wèn)題排查。
Handler
最后拒絕策略,這也是要結(jié)合實(shí)際的業(yè)務(wù)場(chǎng)景來(lái)決定采用什么樣的拒絕方式,例如像過(guò)程類的數(shù)據(jù),可以直接采用DiscardOldestPolicy策略。
常見的拒絕策略
AbortPolicy
JDK自帶線程池中默認(rèn)的拒絕策略,直接拒絕任務(wù)并拋出異常。
CallerRunsPolicy
由當(dāng)前調(diào)用調(diào)用者繼續(xù)執(zhí)行當(dāng)前任務(wù)。
DiscardPolicy
直接丟棄當(dāng)前任務(wù)
DiscardOldestPolicy
丟棄阻塞隊(duì)列中最早丟進(jìn)去的任務(wù)
其他的拒絕策略
NewThreadRunsPolicy
這是Netty中的拒絕策略,和CallerRunsPolicy有點(diǎn)像,任務(wù)不會(huì)丟棄,不同的是Netty中是新建了一個(gè)線程繼續(xù)執(zhí)行當(dāng)前任務(wù)。
AbortPolicyWithReport
dubbo中的拒絕策略,也是拋出異常,不同的時(shí)對(duì)于日志內(nèi)容的輸出更加豐富,也是為了我們更好的排查問(wèn)題。
EsAbortPolicy
針對(duì)某種特定場(chǎng)景時(shí),做出不同的處理方式,比如在elasticsearch中只有當(dāng)isForceExecution為true(isForceExecution是用來(lái)判定任務(wù)是執(zhí)行還是拒絕的條件),且阻塞隊(duì)列是SizeBlockingQueue類型時(shí),才會(huì)放入當(dāng)前隊(duì)列中,否則拋出異常。
4. 線程池的任務(wù)處理流程
阻塞隊(duì)列的設(shè)計(jì)起到了良好的緩沖作用,當(dāng)面對(duì)突發(fā)流量到來(lái)時(shí),先將任務(wù)丟到隊(duì)列中,再慢慢來(lái)消費(fèi),其原理和MQ是類似的,一旦隊(duì)列也被打滿了,則說(shuō)明消費(fèi)能力與你的期望對(duì)比,已經(jīng)嚴(yán)重不足了,此時(shí)maximumPoolSize參數(shù)的設(shè)計(jì),又給了你一次處理的機(jī)會(huì),你可以選擇再開啟一部分線程來(lái)應(yīng)對(duì)突發(fā)狀況,當(dāng)危機(jī)接觸后,再主動(dòng)幫你回收這部分線程,或者選擇使用拒絕策略。
一個(gè)簡(jiǎn)單的任務(wù)處理,考慮各種實(shí)際運(yùn)行中可能遇到的情況,對(duì)于線程池的使用者來(lái)說(shuō),也應(yīng)了解線程池的任務(wù)處理流程,再結(jié)合自身的業(yè)務(wù)場(chǎng)景充分考慮其中的參數(shù)設(shè)置。
5. 線程的狀態(tài)
Java中對(duì)線程的定義有如下幾種狀態(tài):RUNNABLE, BLOCKED, WAITING, TIMED\_WAITING, NEW, TERMINATED
RUNNABLE
可運(yùn)行的狀態(tài),包含了運(yùn)行中和準(zhǔn)備就緒兩種狀態(tài),也就是說(shuō)RUNNABLE狀態(tài)下線程并不一定已經(jīng)運(yùn)行了,可能還在等待CPU資源。
BLOCKED
處于阻塞狀態(tài)下的線程,并且這個(gè)阻塞是因?yàn)檫M(jìn)入了同步代碼塊或者方法,需要等待鎖的釋放。
一旦線上出現(xiàn)blocked狀態(tài)的線程,是需要排查原因的。
WAITING
處于等待狀態(tài)下的線程,例如一個(gè)線程調(diào)用了Object.wait()方法,那么這個(gè)線程就會(huì)等待另一個(gè)線程調(diào)用Object.notify()或者Object.notifyAll()。 或者調(diào)用thread.join()的線程等待指定線程的終止。
常見的方法有:Object.wait()、Thread.join()、LockSupport.park()
TIMED_WAITING
與WAITING的區(qū)別就在于TIMED_WAITING是明確帶有具體等待時(shí)間的,常見的方法有:Thread.sleep(long)、Object.wait(long)、Thread.join(long)、LockSupport.parkNanos(long)、LockSupport.parkUntil(long)
NEW
創(chuàng)建了一個(gè)線程,但還沒(méi)調(diào)用start()方法。
TERMINATED
終止?fàn)顟B(tài),表示線程已經(jīng)執(zhí)行完畢。
6. 線程池的監(jiān)控
線程池自身提供的統(tǒng)計(jì)數(shù)據(jù)
public class ThreadPoolMonitor { private final static Logger log = LoggerFactory.getLogger(ThreadPoolMonitor.class); private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new ThreadFactoryBuilder().setNameFormat("my_thread_pool_%d").build()); public static void main(String[] args) { log.info("Pool Size: " + threadPool.getPoolSize()); log.info("Active Thread Count: " + threadPool.getActiveCount()); log.info("Task Queue Size: " + threadPool.getQueue().size()); log.info("Completed Task Count: " + threadPool.getCompletedTaskCount()); } }
通過(guò)micrometer API完成統(tǒng)計(jì),這樣就可以接入Prometheus了
package com.springboot.micrometer.monitor; import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.micrometer.core.instrument.Metrics; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; @Component public class ThreadPoolMonitor { private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4, 8, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new ThreadFactoryBuilder().setNameFormat("my_thread_pool_%d").build(), new ThreadPoolExecutor.DiscardOldestPolicy()); /** * 活躍線程數(shù) */ private AtomicLong activeThreadCount = new AtomicLong(0); /** * 隊(duì)列任務(wù)數(shù) */ private AtomicLong taskQueueSize = new AtomicLong(0); /** * 完成任務(wù)數(shù) */ private AtomicLong completedTaskCount = new AtomicLong(0); /** * 線程池中當(dāng)前線程的數(shù)量 */ private AtomicLong poolSize = new AtomicLong(0); @PostConstruct private void init() { /** * 通過(guò)micrometer API完成統(tǒng)計(jì) * * gauge最典型的使用場(chǎng)景就是統(tǒng)計(jì):list、Map、線程池、連接池等集合類型的數(shù)據(jù) */ Metrics.gauge("my_thread_pool_active_thread_count", activeThreadCount); Metrics.gauge("my_thread_pool_task_queue_size", taskQueueSize); Metrics.gauge("my_thread_pool_completed_task_count", completedTaskCount); Metrics.gauge("my_thread_pool_size", poolSize); // 模擬線程池的使用 new Thread(this::runTask).start(); } private void runTask() { // 每5秒監(jiān)控一次線程池的使用情況 monitorThreadPoolState(); // 模擬任務(wù)執(zhí)行 IntStream.rangeClosed(0, 500).forEach(i -> { // 每500毫秒,執(zhí)行一個(gè)任務(wù) try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } // 每個(gè)處理一個(gè)任務(wù)耗時(shí)5秒 threadPool.submit(() -> { try { TimeUnit.MILLISECONDS.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } }); }); } private void monitorThreadPoolState() { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { activeThreadCount.set(threadPool.getActiveCount()); taskQueueSize.set(threadPool.getQueue().size()); poolSize.set(threadPool.getPoolSize()); completedTaskCount.set(threadPool.getCompletedTaskCount()); }, 0, 5, TimeUnit.SECONDS); } }
以上就是關(guān)于Java如何用好線程池的方法分享(建議收藏)的詳細(xì)內(nèi)容,更多關(guān)于Java線程池的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Spring中自帶的@Schedule實(shí)現(xiàn)自動(dòng)任務(wù)的過(guò)程解析
這篇文章主要介紹了關(guān)于Spring中自帶的@Schedule實(shí)現(xiàn)自動(dòng)任務(wù),本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-06-06httpclient 請(qǐng)求http數(shù)據(jù),json轉(zhuǎn)map的實(shí)例
下面小編就為大家?guī)?lái)一篇httpclient 請(qǐng)求http數(shù)據(jù),json轉(zhuǎn)map的實(shí)例。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2016-12-12Java mutable對(duì)象和immutable對(duì)象的區(qū)別說(shuō)明
這篇文章主要介紹了Java mutable對(duì)象和immutable對(duì)象的區(qū)別,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06Java利用for循環(huán)輸出空心三角形、空心菱形和空心矩形的代碼
今天小編就為大家分享一篇關(guān)于Java利用for循環(huán)輸出空心三角形、空心菱形和空心矩形的代碼,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2018-12-12springboot中使用ElasticSearch的詳細(xì)教程
這篇文章主要介紹了ElasticSearch在springboot中使用的詳細(xì)教程,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-05-05SpringTask實(shí)現(xiàn)定時(shí)任務(wù)方法講解
通過(guò)重寫Schedu lingConfigurer方法實(shí)現(xiàn)對(duì)定時(shí)任務(wù)的操作,單次執(zhí)行、停止、啟動(dòng)三個(gè)主要的基本功能,動(dòng)態(tài)的從數(shù)據(jù)庫(kù)中獲取配置的定時(shí)任務(wù)cron信息,通過(guò)反射的方式靈活定位到具體的類與方法中2023-02-02