Java線程池實(shí)現(xiàn)原理詳解
1.池化背景
在面向?qū)ο缶幊讨?,?chuàng)建和銷毀對象是很費(fèi)時(shí)間的,因?yàn)閯?chuàng)建一個(gè)對象要獲取內(nèi)存資源或者其它更多資源。在Java中更是如此,虛擬機(jī)將試圖跟蹤每一個(gè)對象,以便能夠在對象銷毀后進(jìn)行垃圾回收。所以提高服務(wù)程序效率的一個(gè)手段就是盡可能減少創(chuàng)建和銷毀對象的次數(shù),特別是一些很耗資源的對象創(chuàng)建和銷毀。如何利用已有對象來服務(wù)就是一個(gè)需要解決的關(guān)鍵問題,其實(shí)這就是一些"池化資源"技術(shù)產(chǎn)生的原因 。
2.java線程池的優(yōu)勢
(1):降低資源消耗。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
(2):提高響應(yīng)速度。當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就能立即執(zhí)行。
(3):提高線程的可管理性。線程是稀缺資源,如果無限制地創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。但是,要做到合理利用線程池,必須對其實(shí)現(xiàn)原理了如指掌
3.線程池實(shí)現(xiàn)原理
當(dāng)向線程池提交一個(gè)任務(wù)之后,線程池是如何處理這個(gè)任務(wù)的呢?本節(jié)來看一下線程池的主要處理流程,處理流程圖如圖3-1所示:
從圖中可以看出,當(dāng)提交一個(gè)新任務(wù)到線程池時(shí),線程池的處理流程如下。 1)線程池判斷核心線程池里的線程是否都在執(zhí)行任務(wù)。如果不是,則創(chuàng)建一個(gè)新的工作線程來執(zhí)行任務(wù)。如果核心線程池里的線程都在執(zhí)行任務(wù),則進(jìn)入下個(gè)流程。 2)線程池判斷工作隊(duì)列是否已經(jīng)滿。如果工作隊(duì)列沒有滿,則將新提交的任務(wù)存儲在這個(gè)工作隊(duì)列里。如果工作隊(duì)列滿了,則進(jìn)入下個(gè)流程。 3)線程池判斷線程池的線程是否都處于工作狀態(tài)。如果沒有,則創(chuàng)建一個(gè)新的工作線程來執(zhí)行任務(wù)。如果已經(jīng)滿了,則交給飽和策略來處理這個(gè)任務(wù)。 ThreadPoolExecutor執(zhí)行execute()方法的示意圖,如圖3-2所示:
ThreadPoolExecutor執(zhí)行execute方法分下面4種情況。 1)如果當(dāng)前運(yùn)行的線程少于corePoolSize,則創(chuàng)建新線程來執(zhí)行任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。 2)如果運(yùn)行的線程等于或多于corePoolSize,則將任務(wù)加入BlockingQueue。 3)如果無法將任務(wù)加入BlockingQueue(隊(duì)列已滿),則創(chuàng)建新的線程來處理任務(wù)(注意,執(zhí)行這一步驟需要獲取全局鎖)。 4)如果創(chuàng)建新線程將使當(dāng)前運(yùn)行的線程超出maximumPoolSize,任務(wù)將被拒絕,并調(diào)用RejectedExecutionHandler.rejectedExecution()方法。 ThreadPoolExecutor采取上述步驟的總體設(shè)計(jì)思路,是為了在執(zhí)行execute()方法時(shí),盡可能地避免獲取全局鎖(那將會是一個(gè)嚴(yán)重的可伸縮瓶頸)。在ThreadPoolExecutor完成預(yù)熱之后(當(dāng)前運(yùn)行的線程數(shù)大于等于corePoolSize),幾乎所有的execute()方法調(diào)用都是執(zhí)行步驟2,而步驟2不需要獲取全局鎖。 源碼分析:上面的流程分析讓我們很直觀地了解了線程池的工作原理,讓我們再通過源代碼來看看是如何實(shí)現(xiàn)的,線程池執(zhí)行任務(wù)的方法如下。
public void execute(Runnable command) { ? ?if (command == null) ? ? ? ?throw new NullPointerException(); ? ?//獲取clt,clt記錄著線程池狀態(tài)和運(yùn)行線程數(shù)。 ? ?int c = ctl.get(); ? ?//運(yùn)行線程數(shù)小于核心線程數(shù)時(shí),創(chuàng)建線程放入線程池中,并且運(yùn)行當(dāng)前任務(wù)。 ? ?if (workerCountOf(c) < corePoolSize) { ? ? ? ?if (addWorker(command, true)) ? ? ? ? ? ?return; ? ? ? ?//創(chuàng)建線程失敗,重新獲取clt。 ? ? ? ?c = ctl.get(); ? } ? ?//線程池是運(yùn)行狀態(tài)并且運(yùn)行線程大于核心線程數(shù)時(shí),把任務(wù)放入隊(duì)列中。 ? ?if (isRunning(c) && workQueue.offer(command)) { ? ? ? ?int recheck = ctl.get(); ? ? ? ?//重新檢查線程池不是運(yùn)行狀態(tài)時(shí), ? ? ? ?//把任務(wù)移除隊(duì)列,并通過拒絕策略對該任務(wù)進(jìn)行處理。 ? ? ? ?if (! isRunning(recheck) && remove(command)) ? ? ? ? ? ?reject(command); ? ? ? ?//當(dāng)前運(yùn)行線程數(shù)為0時(shí),創(chuàng)建線程加入線程池中。 ? ? ? ?else if (workerCountOf(recheck) == 0) ? ? ? ? ? ?addWorker(null, false); ? } ? ?//運(yùn)行線程大于核心線程數(shù)時(shí)并且隊(duì)列已滿時(shí), ? ?//創(chuàng)建線程放入線程池中,并且運(yùn)行當(dāng)前任務(wù)。 ? ?else if (!addWorker(command, false)) ? ? ? ?//運(yùn)行線程大于最大線程數(shù)時(shí),失敗則拒絕該任務(wù) ? ? ? ?reject(command); } ?
在execute()方法中多次調(diào)用addWorker方法。其源碼如下
private boolean addWorker(Runnable firstTask, boolean core) { ? ?retry: ? ?for (;;) { ? ? ? ?//獲取clt,clt記錄著線程池狀態(tài)和運(yùn)行線程數(shù)。 ? ? ? ?int c = ctl.get(); ? ? ? ?//獲取線程池的運(yùn)行狀態(tài)。 ? ? ? ?int rs = runStateOf(c); ? ? ? ? ?//線程池處于關(guān)閉狀態(tài),或者當(dāng)前任務(wù)為null ? ? ? ?//或者隊(duì)列不為空,則直接返回失敗。 ? ? ? ?if (rs >= SHUTDOWN && ? ? ? ? ? ?! (rs == SHUTDOWN && ? ? ? ? ? ? ? firstTask == null && ? ? ? ? ? ? ? ! workQueue.isEmpty())) ? ? ? ? ? ?return false; ? ? ? ? ?for (;;) { ? ? ? ? ? ?//獲取線程池中的線程數(shù) ? ? ? ? ? ?int wc = workerCountOf(c); ? ? ? ? ? ?//線程數(shù)超過CAPACITY,則返回false; ? ? ? ? ? ?//這里的core是addWorker方法的第二個(gè)參數(shù), ? ? ? ? ? ?//如果為true則根據(jù)核心線程數(shù)進(jìn)行比較, ? ? ? ? ? ?//如果為false則根據(jù)最大線程數(shù)進(jìn)行比較。 ? ? ? ? ? ?if (wc >= CAPACITY || ? ? ? ? ? ? ? ?wc >= (core ? corePoolSize : maximumPoolSize)) ? ? ? ? ? ? ? ?return false; ? ? ? ? ? ?//嘗試增加線程數(shù),如果成功,則跳出第一個(gè)for循環(huán) ? ? ? ? ? ?if (compareAndIncrementWorkerCount(c)) ? ? ? ? ? ? ? ?break retry; ? ? ? ? ? ?//如果增加線程數(shù)失敗,則重新獲取ctl ? ? ? ? ? ?c = ctl.get(); ? ? ? ? ? ?//如果當(dāng)前的運(yùn)行狀態(tài)不等于rs,說明狀態(tài)已被改變, ? ? ? ? ? ?//返回第一個(gè)for循環(huán)繼續(xù)執(zhí)行 ? ? ? ? ? ?if (runStateOf(c) != rs) ? ? ? ? ? ? ? ?continue retry; ? ? ? } ? } ? ? ?boolean workerStarted = false; ? ?boolean workerAdded = false; ? ?Worker w = null; ? ?try { ? ? ? ?//根據(jù)當(dāng)前任務(wù)來創(chuàng)建Worker對象 ? ? ? ?w = new Worker(firstTask); ? ? ? ?final Thread t = w.thread; ? ? ? ?if (t != null) { ? ? ? ? ? ?final ReentrantLock mainLock = this.mainLock; ? ? ? ? ? ?mainLock.lock(); ? ? ? ? ? ?try { ? ? ? ? ? ? ? ?//獲得鎖以后,重新檢查線程池狀態(tài) ? ? ? ? ? ? ? ?int rs = runStateOf(ctl.get()); ? ? ? ? ? ? ? ? ?if (rs < SHUTDOWN || ? ? ? ? ? ? ? ? ? (rs == SHUTDOWN && firstTask == null)) { ? ? ? ? ? ? ? ? ? ?if (t.isAlive()) ? ? ? ? ? ? ? ? ? ? ? ?throw new IllegalThreadStateException(); ? ? ? ? ? ? ? ? ? ?//把剛剛創(chuàng)建的線程加入到線程池中 ? ? ? ? ? ? ? ? ? ?workers.add(w); ? ? ? ? ? ? ? ? ? ?int s = workers.size(); ? ? ? ? ? ? ? ? ? ?//記錄線程池中出現(xiàn)過的最大線程數(shù)量 ? ? ? ? ? ? ? ? ? ?if (s > largestPoolSize) ? ? ? ? ? ? ? ? ? ? ? ?largestPoolSize = s; ? ? ? ? ? ? ? ? ? ?workerAdded = true; ? ? ? ? ? ? ? } ? ? ? ? ? } finally { ? ? ? ? ? ? ? ?mainLock.unlock(); ? ? ? ? ? } ? ? ? ? ? ?if (workerAdded) { ? ? ? ? ? ? ? ?//啟動(dòng)線程,開始運(yùn)行任務(wù) ? ? ? ? ? ? ? ?t.start(); ? ? ? ? ? ? ? ?workerStarted = true; ? ? ? ? ? } ? ? ? } ? } finally { ? ? ? ?if (! workerStarted) ? ? ? ? ? ?addWorkerFailed(w); ? } ? ?return workerStarted; } ?
4.線程池的創(chuàng)建
? ?public ThreadPoolExecutor(int corePoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?int maximumPoolSize, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?long keepAliveTime, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?TimeUnit unit, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?BlockingQueue<Runnable> workQueue, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?ThreadFactory threadFactory, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?RejectedExecutionHandler handler) { ? ? ? ?if (corePoolSize < 0 || ? ? ? ? ? ?maximumPoolSize <= 0 || ? ? ? ? ? ?maximumPoolSize < corePoolSize || ? ? ? ? ? ?keepAliveTime < 0) ? ? ? ? ? ?throw new IllegalArgumentException(); ? ? ? ?if (workQueue == null || threadFactory == null || handler == null) ? ? ? ? ? ?throw new NullPointerException(); ? ? ? ?this.acc = System.getSecurityManager() == null ? ? ? ? ? ? ? ? ?null : ? ? ? ? ? ? ? ?AccessController.getContext(); ? ? ? ?this.corePoolSize = corePoolSize; ? ? ? ?this.maximumPoolSize = maximumPoolSize; ? ? ? ?this.workQueue = workQueue; ? ? ? ?this.keepAliveTime = unit.toNanos(keepAliveTime); ? ? ? ?this.threadFactory = threadFactory; ? ? ? ?this.handler = handler; ? }
創(chuàng)建一個(gè)線程池時(shí)需要輸入幾個(gè)參數(shù),如下。 1)corePoolSize(線程池的基本大小)(必需) :當(dāng)提交一個(gè)任務(wù)到線程池時(shí),線程池會創(chuàng)建一個(gè)線程來執(zhí)行任務(wù),即使其他空閑的基本線程能夠執(zhí)行新任務(wù)也會創(chuàng)建線程,等到需要執(zhí)行的任務(wù)數(shù)大于線程池基本大小時(shí)就不再創(chuàng)建。如果調(diào)用了線程池的prestartAllCoreThreads()方法,線程池會提前創(chuàng)建并啟動(dòng)所有基本線程。
2)maximumPoolSize(線程池最大數(shù)量)(必需) :線程池允許創(chuàng)建的最大線程數(shù)。如果隊(duì)列滿了,并 且已創(chuàng)建的線程數(shù)小于最大線程數(shù),則線程池會再創(chuàng)建新的線程執(zhí)行任務(wù)。值得注意的是,如 果使用了無界的任務(wù)隊(duì)列這個(gè)參數(shù)就沒什么效果。
3)keepAliveTime(線程活動(dòng)保持時(shí)間)(必需) :線程池的工作線程空閑后,保持存活的時(shí)間。所以, 如果任務(wù)很多,并且每個(gè)任務(wù)執(zhí)行的時(shí)間比較短,可以調(diào)大時(shí)間,提高線程的利用率。
4)TimeUnit(線程活動(dòng)保持時(shí)間的單位)(必需) :可選的單位有天(DAYS)、小時(shí)(HOURS)、分鐘 (MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和納秒 (NANOSECONDS,千分之一微秒)。
5)workQueue(任務(wù)隊(duì)列)(必需) :用于保存等待執(zhí)行的任務(wù)的阻塞隊(duì)列。可以選擇以下幾個(gè)阻塞隊(duì)列。
- ArrayBlockingQueue:是一個(gè)基于數(shù)組結(jié)構(gòu)的有界阻塞隊(duì)列,此隊(duì)列按FIFO(先進(jìn)先出)原 則對元素進(jìn)行排序。
- LinkedBlockingQueue:一個(gè)基于鏈表結(jié)構(gòu)的阻塞隊(duì)列,此隊(duì)列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。靜態(tài)工廠方法Executors.newFixedThreadPool()使用了這個(gè)隊(duì)
- SynchronousQueue:一個(gè)不存儲元素的阻塞隊(duì)列。每個(gè)插入操作必須等到另一個(gè)線程調(diào)用移除操作,否則插入操作一直處于阻塞狀態(tài),吞吐量通常要高于Linked-BlockingQueue,靜態(tài)工廠方法Executors.newCachedThreadPool使用了這個(gè)隊(duì)列。
- PriorityBlockingQueue:一個(gè)具有優(yōu)先級的無限阻塞隊(duì)列。
6)ThreadFactory (可選) :用于設(shè)置創(chuàng)建線程的工廠,可以通過線程工廠給每個(gè)創(chuàng)建出來的線程設(shè)置更有意義的名字。使用開源框架guava提供的ThreadFactoryBuilder可以快速給線程池里的線程設(shè)置有意義的名字,代碼如下: new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
- RejectedExecutionHandler(飽和策略):當(dāng)隊(duì)列和線程池都滿了,說明線程池處于飽和狀態(tài),那么必須采取一種策略處理提交的新任務(wù)。這個(gè)策略默認(rèn)情況下是AbortPolicy,表示無法處理新任務(wù)時(shí)拋出異常。在JDK 1.5中Java線程池框架提供了以下4種策略。
- AbortPolicy:直接拋出異常。
- CallerRunsPolicy:只用調(diào)用者所在線程來運(yùn)行任務(wù)。
- DiscardOldestPolicy:丟棄隊(duì)列里最近的一個(gè)任務(wù),并執(zhí)行當(dāng)前任務(wù)。
- DiscardPolicy:不處理,丟棄掉。
當(dāng)然,也可以根據(jù)應(yīng)用場景需要來實(shí)現(xiàn)RejectedExecutionHandler接口自定義策略。如記錄日志或持久化存儲不能處理的任
示例代碼:
private ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() ? ? ? ? ? .setNameFormat("atlas-pool-%d").build(); ? private ExecutorService fixedThreadPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() + 1, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?Runtime.getRuntime().availableProcessors() * 40, ? ? ? ? ? ? ? ? ? 0L, ? ? ? ? ? ? ? ? ? TimeUnit.MILLISECONDS, ? ? ? ? ? ?new LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors() * 20), ? ? ? ? ? ? ? ? ? namedThreadFactory); ? ? //多檢查使用 public List<AtlasElementDataDTO> getData(@RequestBody AtlasElementDataVO atlasElementDataVO, HttpServletRequest request) throws InterruptedException { ? ? ? ?CountDownLatch countDownLatch = new CountDownLatch(atlasElementDataVO.getAtlasElementDataParamList().size()); ? ? ? ?UserForm currentUser = RequestUserHolder.getCurrentUser(); ? ? ? ?String url = IasStringUtils.getFileNameByUrl(request.getHeader("Referer")); ? ? ? ?List<AtlasElementDataDTO> list = new ArrayList<>(); ? ? ? ?log.info("data query start total start" + IasDateUtils.dateTimeToStringTime(new Date())); ? ? ? ?atlasElementDataVO.getAtlasElementDataParamList().forEach(atlasElementDataParamDTO -> { ? ? ? ? ? ?fixedThreadPool.submit(() -> { ? ? ? ? ? ? ? ?try { ? ? ? ? ? ? ? ? ? ?log.info("data query start" + IasDateUtils.dateTimeToStringTime(new Date())); ? ? ? ? ? ? ? ? ? ?AtlasElementDataDTO atlasElementDataDTO = new AtlasElementDataDTO(); ? ? ? ? ? ? ? ? ? ?atlasElementDataDTO.setId(atlasElementDataParamDTO.getId()); ? ? ? ? ? ? ? ? ? ?atlasElementDataDTO.setRecord(atlasService.getData(atlasElementDataParamDTO.getId(), atlasElementDataParamDTO.getParam(), currentUser != null ? currentUser.getUserId() : null, url)); ? ? ? ? ? ? ? ? ? ?atlasElementDataDTO.set_xAxis(atlasElementDataParamDTO.get_xAxis()); ? ? ? ? ? ? ? ? ? ?list.add(atlasElementDataDTO); ? ? ? ? ? ? ? ? ? ?log.info("data query end" + IasDateUtils.dateTimeToStringTime(new Date())); ? ? ? ? ? ? ? } catch (Exception e) { ? ? ? ? ? ? ? ? ? ?throw new BusinessException(ErrorCodeEnum.QUERY_ERROR.getCode(), ErrorCodeEnum.QUERY_ERROR.getMessage()); ? ? ? ? ? ? ? }finally { ? ? ? ? ? ? ? ? ? ?countDownLatch.countDown(); ? ? ? ? ? ? ? } ? ? ? ? ? }); ? ? ? }); ? ? ? ?countDownLatch.await(); ? ? ? ?log.info("data query start total end :" + IasDateUtils.dateTimeToStringTime(new Date())); ? ? ? ?return list; ? }
5.線程池執(zhí)行任務(wù)
可以使用兩個(gè)方法向線程池提交任務(wù),分別為execute()和submit()方法。execute()方法用于提交不需要返回值的任務(wù),所以無法判斷任務(wù)是否被線程池執(zhí)行成功。通過以下代碼可知execute()方法輸入的任務(wù)是一個(gè)Runnable類的實(shí)例。
threadsPool.execute(new Runnable() { ? ? ? ? ? ? ? ? ? ?@Override ? ? ? ? ? ? ? ? ? ?public void run() { ? ? ? ? ? ? ? ? ? ?// TODO Auto-generated method stub ? ? ? ? ? ? ? ? ? } });
submit()方法用于提交需要返回值的任務(wù)。線程池會返回一個(gè)future類型的對象,通過這個(gè)future對象可以判斷任務(wù)是否執(zhí)行成功,并且可以通過future的get()方法來獲取返回值,get()方法會阻塞當(dāng)前線程直到任務(wù)完成,而使用get(long timeout,TimeUnit unit)方法則會阻塞當(dāng)前線程一段時(shí)間后立即返回,這時(shí)候有可能任務(wù)沒有執(zhí)行完。
Future<Object> future = executor.submit(harReturnValuetask); try { Object s = future.get(); } catch (InterruptedException e) { // 處理中斷異常 } catch (ExecutionException e) { // 處理無法執(zhí)行任務(wù)異常 } finally { // 關(guān)閉線程池 executor.shutdown(); }
6.關(guān)閉線程池
可以通過調(diào)用線程池的shutdown或shutdownNow方法來關(guān)閉線程池。它們的原理是遍歷線程池中的工作線程,然后逐個(gè)調(diào)用線程的interrupt方法來中斷線程,所以無法響應(yīng)中斷的任務(wù)可能永遠(yuǎn)無法終止。但是它們存在一定的區(qū)別,shutdownNow首先將線程池的狀態(tài)設(shè)置成STOP,然后嘗試停止所有的正在執(zhí)行或暫停任務(wù)的線程,并返回等待執(zhí)行任務(wù)的列表,而shutdown只是將線程池的狀態(tài)設(shè)置成SHUTDOWN狀態(tài),然后中斷所有沒有正在執(zhí)行任務(wù)的線程。只要調(diào)用了這兩個(gè)關(guān)閉方法中的任意一個(gè),isShutdown方法就會返回true。當(dāng)所有的任務(wù)都已關(guān)閉后,才表示線程池關(guān)閉成功,這時(shí)調(diào)用isTerminaed方法會返回true。至于應(yīng)該調(diào)用哪一種方法來關(guān)閉線程池,應(yīng)該由提交到線程池的任務(wù)特性決定,通常調(diào)用shutdown方法來關(guān)閉線程池,如果任務(wù)不一定要執(zhí)行完,則可以調(diào)用shutdownNow方法。
7.合理配置線程池
要想合理地配置線程池,就必須首先分析任務(wù)特性,可以從以下幾個(gè)角度來分析。 任務(wù)的性質(zhì):CPU密集型任務(wù)、IO密集型任務(wù)和混合型任務(wù)。
- 任務(wù)的優(yōu)先級:高、中和低。
- 任務(wù)的執(zhí)行時(shí)間:長、中和短。
- 任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接。
性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。CPU密集型任務(wù)應(yīng)配置盡可能小的線程,如配置Ncpu+1個(gè)線程的線程池。由于IO密集型任務(wù)線程并不是一直在執(zhí)行任務(wù),則應(yīng)配置盡可能多的線程,如2*Ncpu?;旌闲偷娜蝿?wù),如果可以拆分,將其拆分成一個(gè)CPU密集型任務(wù)和一個(gè)IO密集型任務(wù),只要這兩個(gè)任務(wù)執(zhí)行的時(shí)間相差不是太大,那么分解后執(zhí)行的吞吐量將高于串行執(zhí)行的吞吐量。如果這兩個(gè)任務(wù)執(zhí)行時(shí)間相差太大,則沒必要進(jìn)行分解??梢酝ㄟ^Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個(gè)數(shù)。優(yōu)先級不同的任務(wù)可以使用優(yōu)先級隊(duì)列PriorityBlockingQueue來處理。它可以讓優(yōu)先級高的任務(wù)先執(zhí)行。 注意 如果一直有優(yōu)先級高的任務(wù)提交到隊(duì)列里,那么優(yōu)先級低的任務(wù)可能永遠(yuǎn)不能執(zhí)行。 執(zhí)行時(shí)間不同的任務(wù)可以交給不同規(guī)模的線程池來處理,或者可以使用優(yōu)先級隊(duì)列,讓執(zhí)行時(shí)間短的任務(wù)先執(zhí)行。 依賴數(shù)據(jù)庫連接池的任務(wù),因?yàn)榫€程提交SQL后需要等待數(shù)據(jù)庫返回結(jié)果,等待的時(shí)間越長,則CPU空閑時(shí)間就越長,那么線程數(shù)應(yīng)該設(shè)置得越大,這樣才能更好地利用CPU。 建議使用有界隊(duì)列。有界隊(duì)列能增加系統(tǒng)的穩(wěn)定性和預(yù)警能力,可以根據(jù)需要設(shè)大一點(diǎn)兒,比如幾千。有一次,我們系統(tǒng)里后臺任務(wù)線程池的隊(duì)列和線程池全滿了,不斷拋出拋棄任務(wù)的異常,通過排查發(fā)現(xiàn)是數(shù)據(jù)庫出現(xiàn)了問題,導(dǎo)致執(zhí)行SQL變得非常緩慢,因?yàn)楹笈_任務(wù)線程池里的任務(wù)全是需要向數(shù)據(jù)庫查詢和插入數(shù)據(jù)的,所以導(dǎo)致線程池里的工作線程全部阻塞,任務(wù)積壓在線程池里。如果當(dāng)時(shí)我們設(shè)置成無界隊(duì)列,那么線程池的隊(duì)列就會越來越多, 有可能會撐滿內(nèi)存,導(dǎo)致整個(gè)系統(tǒng)不可用,而不只是后臺任務(wù)出現(xiàn)問題。當(dāng)然,我們的系統(tǒng)所有的任務(wù)是用單獨(dú)的服務(wù)器部署的,我們使用不同規(guī)模的線程池完成不同類型的任務(wù),但是出現(xiàn)這樣問題時(shí)也會影響到其他任務(wù)。
一般來說池中總線程數(shù)是核心池線程數(shù)量兩倍,只要確保當(dāng)核心池有線程停止時(shí),核心池外能有線程進(jìn)入核心池即可。 線程中的任務(wù)最終是交給CPU的線程去處理的,而CPU可同時(shí)處理線程數(shù)量大部分是CPU核數(shù)的兩倍,運(yùn)行環(huán)境中CPU的核數(shù)我們可以通過Runtime.getRuntime().availableProcessors()這個(gè)方法而獲取。理論上來說核心池線程數(shù)量應(yīng)該為Runtime.getRuntime().availableProcessors()*2,那么結(jié)果是否符合我們的預(yù)期呢,事實(shí)上大部分的任務(wù)都是I/O密集型的,即大部分任務(wù)消耗集中在的輸入輸出。而CPU密集型任務(wù)主要消耗CPU資源進(jìn)行計(jì)算,當(dāng)任務(wù)為CPU密集型時(shí),核心池線程數(shù)設(shè)置為CPU核數(shù)+1即可)
8.線程池監(jiān)控
如果在系統(tǒng)中大量使用線程池,則有必要對線程池進(jìn)行監(jiān)控,方便在出現(xiàn)問題時(shí),可以根據(jù)線程池的使用狀況快速定位問題??梢酝ㄟ^線程池提供的參數(shù)進(jìn)行監(jiān)控,在監(jiān)控線程池的時(shí)候可以使用以下屬性。 1)taskCount:線程池需要執(zhí)行的任務(wù)數(shù)量。 2)completedTaskCount:線程池在運(yùn)行過程中已完成的任務(wù)數(shù)量,小于或等于taskCount。 3)largestPoolSize:線程池里曾經(jīng)創(chuàng)建過的最大線程數(shù)量。通過這個(gè)數(shù)據(jù)可以知道線程池是否曾經(jīng)滿過。如該數(shù)值等于線程池的最大大小,則表示線程池曾經(jīng)滿過。 4)getPoolSize:線程池的線程數(shù)量。如果線程池不銷毀的話,線程池里的線程不會自動(dòng)銷毀,所以這個(gè)大小只增不減。 5)getActiveCount:獲取活動(dòng)的線程數(shù)。 通過擴(kuò)展線程池進(jìn)行監(jiān)控??梢酝ㄟ^繼承線程池來自定義線程池,重寫線程池的beforeExecute、afterExecute和terminated方法,也可以在任務(wù)執(zhí)行前、執(zhí)行后和線程池關(guān)閉前執(zhí)行一些代碼來進(jìn)行監(jiān)控。例如,監(jiān)控任務(wù)的平均執(zhí)行時(shí)間、最大執(zhí)行時(shí)間和最小執(zhí)行時(shí)間等。這幾個(gè)方法在線程池里是空方法。
以上就是Java線程池實(shí)現(xiàn)原理詳解的詳細(xì)內(nèi)容,更多關(guān)于Java線程池的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot整合jasypt實(shí)現(xiàn)數(shù)據(jù)加密的步驟
聽說過jasypt嗎?它可是一個(gè)超級流行的Java庫哦,提供了簡單又高效的加密和解密接口,整合jasypt后,我們的SpringBoot應(yīng)用就能輕松處理敏感數(shù)據(jù)的加密和解密,而不必為復(fù)雜的加密算法頭疼啦,下面給大家介紹SpringBoot整合jasypt實(shí)現(xiàn)數(shù)據(jù)加密的步驟,感興趣的朋友一起看看吧2025-04-04HashMap實(shí)現(xiàn)保存兩個(gè)key相同的數(shù)據(jù)
這篇文章主要介紹了HashMap實(shí)現(xiàn)保存兩個(gè)key相同的數(shù)據(jù)操作,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06詳解Spring-Boot集成Spring session并存入redis
這篇文章主要介紹了詳解Spring-Boot集成Spring session并存入redis,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-05-05SpringBoot數(shù)據(jù)庫查詢超時(shí)配置詳解
這篇文章主要介紹了SpringBoot數(shù)據(jù)庫查詢超時(shí)配置,超時(shí)配置可以避免長時(shí)間占用數(shù)據(jù)庫連接,提高系統(tǒng)的響應(yīng)速度和吞吐量,還可以快速的反饋可以提升用戶體驗(yàn),避免用戶因長時(shí)間等待而感到挫敗,文中有詳細(xì)的代碼示例供大家參考,需要的朋友可以參考下2024-11-11IntelliJ IDEA導(dǎo)入Gradle項(xiàng)目的方法
這篇文章主要介紹了IntelliJ IDEA導(dǎo)入Gradle項(xiàng)目的方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-03-03TraceIdPatternLogbackLayout日志攔截源碼解析
這篇文章主要為大家介紹了TraceIdPatternLogbackLayout日志攔截源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-11-11