Java并發(fā)編程之線程池實現(xiàn)原理詳解
前言
池化思想是一種空間換時間的思想,期望使用預先創(chuàng)建好的對象來減少頻繁創(chuàng)建對象的性能開銷,同時還可以對對象進行統(tǒng)一管理,減少對象使用成本。
java中有多種池化思想的應用,例如:數(shù)據(jù)庫連接池、線程池、字符串常量池等。
為什么使用線程池
頻繁的開啟線程或者停止線程,線程需要重新被cpu從就緒到運行狀態(tài)調(diào)度,需要發(fā)生cpu的上下文切換,效率非常低。
線程池作用
- 降低線程創(chuàng)建和銷毀的開銷:通過線程池重用已經(jīng)創(chuàng)建的線程,可以避免頻繁創(chuàng)建和銷毀線程所造成的內(nèi)存和CPU資源開銷。
- 提高線程執(zhí)行效率:線程池中的線程是經(jīng)過優(yōu)化的,通常會采用更少的線程、更高效的調(diào)度算法、更快的執(zhí)行速度等方式,以提高線程的執(zhí)行效率。
- 降低線程間競爭的激烈程度:線程池中的線程數(shù)量是有限的,可以減少線程間競爭的激烈程度,從而降低CPU資源的消耗。
- 提高應用程序的可伸縮性和健壯性:線程池可以控制線程的數(shù)量和執(zhí)行速度,可以更好地滿足應用程序的需求,從而提高應用程序的可伸縮性和健壯性。
ThreadPoolExecutor參數(shù)
- int corePoolSize: 核心線程數(shù)
- int maximumPoolSize: 最大線程數(shù)
- long keepAliveTime: 超出corePoolSize后創(chuàng)建的線程的存活時間
- TimeUnit unit: keepAliveTime的時間單位
- BlockingQueue workQueue: 任務隊列,存放待執(zhí)行任務
- ThreadFactory threadFactory: 創(chuàng)建線程的線程工廠
- RejectedExecutionHandler handler: 拒絕策略
當線程數(shù)小于核心線程數(shù)時,創(chuàng)建線程。
當線程數(shù)大于等于核心線程數(shù),且任務隊列未滿時,將任務放入任務隊列。
當線程數(shù)大于等于核心線程數(shù),且任務隊列已滿時:
- 1)若線程數(shù)小于最大線程數(shù),創(chuàng)建線程
- 2)若線程數(shù)等于最大線程數(shù),執(zhí)行拒絕策略
拒絕策略
當線程池中的線程數(shù)量達到最大值或者任務隊列已滿時,如果再有新的任務提交給線程池,線程池會拒絕接受新的任務。這時,線程池會采用一定的拒絕策略來處理這些被拒絕的任務。
Java中提供了四種拒絕策略:
- AbortPolicy:默認策略,直接拋出RejectedExecutionException異常,表示任務被拒絕執(zhí)行。
- CallerRunsPolicy:這個策略會使用當前客戶端線程來執(zhí)行任務。如果客戶端線程不夠,或者存在線程被阻塞,則仍然會拋出RejectedExecutionException異常。
- DiscardPolicy:這個策略會直接丟棄被拒絕的任務,不會執(zhí)行任何操作。
- DiscardOldestPolicy:這個策略會丟棄隊列中等待時間最長的任務,然后執(zhí)行當前被提交的任務。如果隊列中沒有等待時間最長的任務,則會使用CallerRunsPolicy策略來處理被拒絕的任務。
也可以自定義拒絕策略,實現(xiàn)RejectedExecutionHandler
接口即可。
建議自定義實現(xiàn)拒絕策略,將任務持久化到db,后期在手動補償。
線程池的創(chuàng)建方式
Executors為我們提供了四種新建線程池的方式:
newCachedThreadPool()
可緩存線程池
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
線程池是創(chuàng)建一個核心線程數(shù)為0,最大線程為Inter.MAX_VALUE的線程池,線程池數(shù)量不確定,有空閑線程則優(yōu)先使用,沒用則創(chuàng)建新的線程處理任務,處理完放入線程池。
newFixedThreadPool(): 可定長度,限制最大線程數(shù)
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
創(chuàng)建一個核心線程數(shù)跟最大線程數(shù)相同的線程池,線程池數(shù)量大小不變,如果有任務放入隊列,等待空閑線程。
newScheduledThreadPool(): 可定時線程池\
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
創(chuàng)建一個沒有最大線程數(shù)限制的可以定時執(zhí)行線程池,還有創(chuàng)建一個只有單個線程的可以定時執(zhí)行線程池(Executors.newSingleThreadScheduledExecutor())
newSingleThreadExecutor(): 單線程 線程池
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
池里只有一個線程
這四種底層都是基于ThreadPoolExecutor構造函數(shù)封裝,且都采用的無界隊列,使用時需注意防止內(nèi)存溢出。
自定義線程名稱
可以通過自定義ThreadFactory來為線程池中的線程指定名稱。
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("custom-thread-%d").build();
線程池的五種狀態(tài)
// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
- RUNNING:線程池運行狀態(tài),此時線程池中的任務隊列可能有等待中的任務,但線程池會持續(xù)從隊列中取出任務執(zhí)行。
- SHUTDOWN:線程池關閉狀態(tài),此時線程池會拒絕接受新的任務,但會執(zhí)行完已經(jīng)排隊的任務,不接受新任務并不意味著已經(jīng)排隊的任務必須執(zhí)行完。
- STOP:線程池強制停止狀態(tài),此時線程池不僅會拒絕接受新的任務,而且會中斷正在執(zhí)行的任務,并終止線程池。
- TIDYING:線程池在轉(zhuǎn)換為終止狀態(tài)時的一種特殊狀態(tài),此時線程池會執(zhí)行鉤子方法terminated(),并等待所有任務執(zhí)行完成。
- TERMINATED:線程池終止狀態(tài),此時線程池中的所有任務已經(jīng)執(zhí)行完成,線程池被徹底終止。
線程數(shù)設置
- 核心線程數(shù):線程池中始終存在的線程數(shù)量。當任務被提交到線程池時,如果當前運行的線程少于核心線程數(shù),則會創(chuàng)建一個新的線程來執(zhí)行該任務,即使其他的核心線程正在空閑狀態(tài)。因此,核心線程數(shù)通常應該設置為預期的并發(fā)數(shù)。
- 最大線程數(shù):允許線程池中同時存在的最大線程數(shù)量。當線程池中的線程數(shù)量達到最大線程數(shù)時,后續(xù)提交到線程池中的任務將被暫存到任務隊列中等待處理。因此,最大線程數(shù)不應該設置過高,否則可能會導致系統(tǒng)資源緊張。
合理地設置核心線程數(shù)和最大線程數(shù)可以優(yōu)化線程池的性能和響應時間。下面是一些設置建議:
- 核心線程數(shù) = CPU核心數(shù) + 1
- 最大線程數(shù) = 核心線程數(shù) * 2
- 如果任務執(zhí)行時間較長,可以適當增加最大線程數(shù),以避免任務堆積在隊列中無法及時處理
具體的設置需要根據(jù)實際情況來考慮,如果線程池主要執(zhí)行的是I/O密集型任務,可以適當增加核心線程數(shù)和最大線程數(shù),以充分利用系統(tǒng)資源。如果線程池主要執(zhí)行的是CPU密集型任務,則需要根據(jù)系統(tǒng)的CPU核心數(shù)來設置核心線程數(shù)和最大線程數(shù),避免過度消耗CPU資源。
springboot集成線程池
package com.fandf.common.config; import com.fandf.common.utils.CustomThreadPoolTaskExecutor; import lombok.Getter; import lombok.Setter; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; /** * @author fandongfeng */ @EnableAsync(proxyTargetClass = true) @Configuration public class DefaultAsycTaskConfig { /** * 線程池維護線程的最小數(shù)量. */ @Value("${asyc-task.corePoolSize:10}") private int corePoolSize; /** * 線程池維護線程的最大數(shù)量 */ @Value("${asyc-task.maxPoolSize:200}") private int maxPoolSize; /** * 隊列最大長度 */ @Value("${asyc-task.queueCapacity:10000}") private int queueCapacity; /** * 線程池前綴 */ @Value("${asyc-task.threadNamePrefix:FdfExecutor-}") private String threadNamePrefix; @Bean public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new CustomThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix(threadNamePrefix); /* rejection-policy:當pool已經(jīng)達到max size的時候,如何處理新任務 CALLER_RUNS:不在新線程中執(zhí)行任務,而是有調(diào)用者所在的線程來執(zhí)行 */ executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }
使用
@Service public class MyService { @Async("taskExecutor") public void doSomething() { // 異步執(zhí)行的任務 } }
到此這篇關于Java并發(fā)編程之線程池實現(xiàn)原理詳解的文章就介紹到這了,更多相關Java線程池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
為什么在foreach循環(huán)中JAVA集合不能添加或刪除元素
今天給大家?guī)淼奈恼率顷P于Java的相關知識,文章圍繞著為什么在foreach循環(huán)中JAVA集合不能添加或刪除元素展開,文中有非常詳細的介紹及代碼示例,需要的朋友可以參考下2021-06-06MyBatis-Plus中實現(xiàn)自定義復雜排序邏輯的詳細步驟
這篇文章主要介紹了MyBatis-Plus中實現(xiàn)自定義復雜排序邏輯,通過使用MyBatis-Plus的QueryWrapper和SQL原始片段,我們可以靈活地實現(xiàn)復雜的數(shù)據(jù)排序邏輯,這種方法尤其適用于需要對數(shù)據(jù)進行特定規(guī)則排序的場景,需要的朋友可以參考下2024-07-07java實戰(zhàn)之飛機大戰(zhàn)小游戲(源碼加注釋)
這篇文章主要介紹了java實戰(zhàn)之飛機大戰(zhàn)小游戲(源碼加注釋),文中有非常詳細的代碼示例,對正在學習java的小伙伴們有非常好的幫助,需要的朋友可以參考下2021-04-04javacv開發(fā)詳解之調(diào)用本機攝像頭視頻
這篇文章主要介紹了javacv開發(fā)詳解之調(diào)用本機攝像頭視頻,對javacv感興趣的同學,可以參考下2021-04-04