Java如何手動(dòng)創(chuàng)建線程池
如何手動(dòng)創(chuàng)建線程池
jdk提供了一個(gè)通過(guò)ThreadPoolExecutor創(chuàng)建一個(gè)線程池的類
構(gòu)造器
使用給定的參數(shù)和默認(rèn)的飽和策略、默認(rèn)的工廠方法創(chuàng)建線程池
ThreadPoolExecutor(int corePoolSize,? int maximumPoolSize,? long keepAliveTime,? TimeUnit unit,? BlockingQueue<Runnable> workQueue)
使用給定的參數(shù)和默認(rèn)的工廠方法創(chuàng)建線程池
ThreadPoolExecutor(int corePoolSize,? int maximumPoolSize,? long keepAliveTime,? TimeUnit unit,? BlockingQueue<Runnable> workQueue,? RejectedExecutionHandler handler)
使用給定的參數(shù)和默認(rèn)的飽和策略(AbortPolicy)創(chuàng)建線程池
ThreadPoolExecutor(int corePoolSize,? int maximumPoolSize, long keepAliveTime, BlockingQueue<Runnable> workQueue,? ThreadFactory threadFactory)
使用指定的參數(shù)創(chuàng)建線程池
ThreadPoolExecutor(int corePoolSize,? int maximumPoolSize,? long keepAliveTime,? TimeUnit unit, BlockingQueue<Runnable> workQueue,? RejectedExecutionHandler handler)
參數(shù)說(shuō)明
corePoolSize
線程池的基本大小, 當(dāng)提交一個(gè)任務(wù)到線程池的時(shí)候,線程池會(huì)創(chuàng)建一個(gè)線程來(lái)執(zhí)行任務(wù),即使當(dāng)前線程池已經(jīng)存在空閑線程,仍然會(huì)創(chuàng)建一個(gè)線程,等到需要執(zhí)行的任務(wù)數(shù)大于線程池基本大小時(shí)就不再創(chuàng)建。如果調(diào)用線程池的prestartAllCoreThreads()方法,線程池會(huì)提前創(chuàng)建并啟動(dòng)所有的基本線程。maximumPoolSizeSize
線程池最大數(shù)量,線程池允許創(chuàng)建的最大線程數(shù),如果隊(duì)列滿了,并且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會(huì)再創(chuàng)建新的線程執(zhí)行任務(wù)。值得注意的是,如果使用了無(wú)界的任務(wù)隊(duì)列這個(gè)參數(shù)就沒(méi)什么效果。keepAliveTime
線程活動(dòng)保持時(shí)間,線程池的工作線程空閑后,保持存活的時(shí)間,所以,如果任務(wù)很多,并且每個(gè)任務(wù)執(zhí)行的時(shí)間比較短,可以調(diào)大時(shí)間,提高線程的利用率。unit
線程活動(dòng)保持時(shí)間的單位,可選擇的單位有時(shí)分秒等等。workQueue
任務(wù)隊(duì)列。用來(lái)暫時(shí)保存任務(wù)的工作隊(duì)列threadFactory
用于創(chuàng)建線程的工廠
隊(duì)列
ArrayBlockingQueue
:是一個(gè)基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,此隊(duì)列按照FIFO(先進(jìn)先出)原則對(duì)元素進(jìn)行排序DelayQueue
LinkedBlockingDeque
LinkedBlockingQueue
:是一個(gè)基于鏈表結(jié)構(gòu)的有界阻塞隊(duì)列,此隊(duì)列按照FIFO排序元素,吞吐量高于ArrayBlockingQueue。靜態(tài)工廠方法Executors.newFixedThreadPool(n)使用了此隊(duì)列LinkedTransferQueue
PriorityBlockingQueue
:一個(gè)具有優(yōu)先級(jí)的無(wú)限阻塞隊(duì)列SynchronousQueue
:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。每個(gè)插入操作必須等待另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量要高于LinkedBlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool()使用了此隊(duì)列
飽和策略
當(dāng)隊(duì)列和線程池都滿了,說(shuō)明線程池處于飽和的狀態(tài),那么必須采取一種策略處理提交的新任務(wù)。這個(gè)策略默認(rèn)是AbortPolicy,表示無(wú)法處理新任務(wù)時(shí)拋出異常
ThreadPoolExecutor.AbortPolicy
:直接拋出異常ThreadPoolExecutor.CallerRunsPolicy
:只用調(diào)用這所在的線程來(lái)運(yùn)行任務(wù)ThreadPoolExecutor.DiscardOldestPolicy
:丟棄隊(duì)列里最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù)ThreadPoolExecutor.DiscardPolicy
:不處理,丟棄掉
示例
public class ThreadPool { ? ? /** ? ? ?* 線程池的基本大小 ? ? ?*/ ? ? static int corePoolSize = 10; ? ? /** ? ? ?* 線程池最大數(shù)量 ? ? ?*/ ? ? static int maximumPoolSizeSize = 100; ? ? /** ? ? ?* 線程活動(dòng)保持時(shí)間 ? ? ?*/ ? ? static long keepAliveTime = 1; ? ? /** ? ? ?* 任務(wù)隊(duì)列 ? ? ?*/ ? ? static ArrayBlockingQueue workQueue = new ArrayBlockingQueue(10); ? ? public static void main(String[] args) { ? ? ? ? ThreadPoolExecutor executor = new ThreadPoolExecutor( ? ? ? ? ? ? ? ? corePoolSize, ? ? ? ? ? ? ? ? maximumPoolSizeSize, ? ? ? ? ? ? ? ? keepAliveTime, ? ? ? ? ? ? ? ? TimeUnit.SECONDS, ? ? ? ? ? ? ? ? workQueue, ? ? ? ? ? ? ? ? new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build()); ? ? ? ? //提交一個(gè)任務(wù) ? ? ? ? executor.execute(() -> System.out.println("ok")); ? ? } }
源碼分析
任務(wù)執(zhí)行
public void execute(Runnable command) { ? ? ? ? if (command == null) ? ? ? ? ? ? throw new NullPointerException(); ? ? ? ? /* ? ? ? ? ?* Proceed in 3 steps: ? ? ? ? ?* ? ? ? ? ?* 1. If fewer than corePoolSize threads are running, try to ? ? ? ? ?* start a new thread with the given command as its first ? ? ? ? ?* task. ?The call to addWorker atomically checks runState and ? ? ? ? ?* workerCount, and so prevents false alarms that would add ? ? ? ? ?* threads when it shouldn't, by returning false. ? ? ? ? ?* ? ? ? ? ?* 2. If a task can be successfully queued, then we still need ? ? ? ? ?* to double-check whether we should have added a thread ? ? ? ? ?* (because existing ones died since last checking) or that ? ? ? ? ?* the pool shut down since entry into this method. So we ? ? ? ? ?* recheck state and if necessary roll back the enqueuing if ? ? ? ? ?* stopped, or start a new thread if there are none. ? ? ? ? ?* ? ? ? ? ?* 3. If we cannot queue task, then we try to add a new ? ? ? ? ?* thread. ?If it fails, we know we are shut down or saturated ? ? ? ? ?* and so reject the task. ? ? ? ? ?*/ ? ? ? ? int c = ctl.get(); ? ? ? ? if (workerCountOf(c) < corePoolSize) { ? ? ? ? ? ? if (addWorker(command, true)) ? ? ? ? ? ? ? ? return; ? ? ? ? ? ? c = ctl.get(); ? ? ? ? } ? ? ? ? if (isRunning(c) && workQueue.offer(command)) { ? ? ? ? ? ? int recheck = ctl.get(); ? ? ? ? ? ? if (! isRunning(recheck) && remove(command)) ? ? ? ? ? ? ? ? reject(command); ? ? ? ? ? ? else if (workerCountOf(recheck) == 0) ? ? ? ? ? ? ? ? addWorker(null, false); ? ? ? ? } ? ? ? ? else if (!addWorker(command, false)) ? ? ? ? ? ? reject(command); ? ? }
參考文檔
https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html
線程池工具類
實(shí)現(xiàn)線程的三種方式
1.繼承 Thread 類
2.實(shí)現(xiàn)Runnable 接口
3.實(shí)現(xiàn) Callbale接口和Future接口實(shí)現(xiàn)
4.三種方式比較:
繼承Thread 類 編程簡(jiǎn)單,可擴(kuò)展性差。
實(shí)現(xiàn)接口方式 可擴(kuò)展性高,編程復(fù)雜。
使用ThreadPoolExecutor編寫(xiě)線程池工具類
1.線程創(chuàng)建方式,實(shí)例化賢臣池時(shí),創(chuàng)建核心線程,
2.當(dāng)任務(wù)大于核心線程時(shí)將進(jìn)入阻塞隊(duì)列
3.當(dāng)阻塞隊(duì)列滿時(shí),任務(wù)沒(méi)有超過(guò)最大線程時(shí)創(chuàng)建新的線程
4.當(dāng)任務(wù) > 最大線程數(shù)+阻塞隊(duì)列 時(shí),執(zhí)行拒絕策略。
public class ThreadPoolUtils { public static ThreadPoolExecutor pool=null; // 無(wú)響應(yīng)執(zhí)行 public static void execute(Runnable runnable){ getThreadPool().execute(runnable); } // 有響應(yīng)執(zhí)行 public static<T> Future<T> submit(Callable<T> callable){ return getThreadPool().submit(callable); } // 創(chuàng)造線程池 private static synchronized ThreadPoolExecutor getThreadPool(){ if(pool==null){ // 獲取處理器數(shù)量 int cpuNum = Runtime.getRuntime().availableProcessors(); // 根據(jù)cpu數(shù)量,計(jì)算出合理的線程并發(fā)數(shù) // 最佳線程數(shù)目 = ((線程等待時(shí)間+線程CPU時(shí)間)/線程CPU時(shí)間 )* CPU數(shù)目 int maximumPoolSize = cpuNum * 2 + 1; // 七個(gè)參數(shù) // 1. 核心線程數(shù) // 2. 最大線程數(shù) // 3. 空閑線程最大存活時(shí)間 // 4. 時(shí)間單位 // 5. 阻塞隊(duì)列 // 6. 創(chuàng)建線程工廠 // 7. 拒絕策略 pool=new ThreadPoolExecutor(maximumPoolSize-1, maximumPoolSize, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); } return pool; } }
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
springboot整合shardingjdbc實(shí)現(xiàn)分庫(kù)分表最簡(jiǎn)單demo
我們知道分庫(kù)分表是針對(duì)某些數(shù)據(jù)量持續(xù)大幅增長(zhǎng)的表,比如用戶表、訂單表等,而不是一刀切將全部表都做分片,這篇文章主要介紹了springboot整合shardingjdbc實(shí)現(xiàn)分庫(kù)分表最簡(jiǎn)單demo,需要的朋友可以參考下2021-06-06Springboot繼承Keycloak實(shí)現(xiàn)單點(diǎn)登錄與退出功能
這篇文章主要介紹了Springboot繼承Keycloak實(shí)現(xiàn)單點(diǎn)登陸與退出,本文通過(guò)示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-08-08Set接口深入剖析之HashSet、LinkedHashSet和TreeSet
這篇文章主要介紹了Set接口深入剖析之HashSet、LinkedHashSet和TreeSet,LinkedHashSet是HashSet的子類,實(shí)現(xiàn)了Set接口,LinkedHashSet底層是一個(gè)LinkedHashMap,底層維護(hù)了一個(gè)數(shù)組+雙向鏈表,需要的朋友可以參考下2023-09-09Springboot接入MyBatisPlus的實(shí)現(xiàn)
最近web端比較熱門的框架就是SpringBoot和Mybatis-Plus,這里簡(jiǎn)單總結(jié)集成用法,具有一定的參考價(jià)值,感興趣的可以了解一下2023-09-09Java并發(fā)volatile可見(jiàn)性的驗(yàn)證實(shí)現(xiàn)
這篇文章主要介紹了Java并發(fā)volatile可見(jiàn)性的驗(yàn)證實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05