Java如何優(yōu)雅關(guān)閉異步中的ExecutorService
1.ExecutorService的核心價(jià)值
在并發(fā)編程領(lǐng)域,Java的ExecutorService(位于java.util.concurrent包)是線程池管理的關(guān)鍵接口。作為Executor框架的核心組件,它通過解耦任務(wù)提交與執(zhí)行機(jī)制,為開發(fā)者提供了:
- 線程生命周期管理自動(dòng)化
- 任務(wù)隊(duì)列智能調(diào)度
- 資源復(fù)用優(yōu)化機(jī)制
- 異步執(zhí)行結(jié)果追蹤能力
2.關(guān)閉機(jī)制的必要性
不正確的線程池關(guān)閉會(huì)導(dǎo)致:
- 內(nèi)存泄漏(滯留線程無法回收)
- 應(yīng)用無法正常終止(非守護(hù)線程保持活躍)
- 任務(wù)狀態(tài)不一致(突然中斷導(dǎo)致數(shù)據(jù)問題)
- 系統(tǒng)資源耗盡(無限制線程創(chuàng)建)
3.shutdown()方法詳解
3.1 方法特性
void shutdown()
狀態(tài)轉(zhuǎn)換
將線程池狀態(tài)設(shè)置為SHUTDOWN,觸發(fā)以下行為:
- 拒絕新任務(wù)提交(觸發(fā)RejectedExecutionHandler)
- 繼續(xù)執(zhí)行已存在的任務(wù):
- 正在執(zhí)行的任務(wù)(running tasks)
- 等待隊(duì)列中的任務(wù)(queued tasks)
典型應(yīng)用場(chǎng)景
ExecutorService executor = Executors.newFixedThreadPool(4); // 提交多個(gè)任務(wù)... executor.shutdown(); try { if (!executor.awaitTermination(60, TimeUnit.SECONDS)) { System.err.println("仍有任務(wù)未在時(shí)限內(nèi)完成"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
3.2 內(nèi)部運(yùn)作機(jī)制
- 原子性狀態(tài)更新:CAS操作修改線程池控制狀態(tài)
- 中斷空閑線程:僅中斷等待任務(wù)的Worker線程
- 隊(duì)列消費(fèi)保證:完全處理BlockingQueue中的剩余任務(wù)
4.shutdownNow()方法剖析
4.1 方法定義
List<Runnable> shutdownNow()
狀態(tài)轉(zhuǎn)換
將線程池狀態(tài)設(shè)置為STOP,觸發(fā):
- 立即拒絕新任務(wù)
- 中斷所有工作線程(無論是否在執(zhí)行任務(wù))
- 清空任務(wù)隊(duì)列,返回未執(zhí)行任務(wù)列表
4.2 中斷處理要點(diǎn)
executor.shutdownNow(); // 典型返回值處理 List<Runnable> unprocessed = executor.shutdownNow(); if (!unprocessed.isEmpty()) { logger.warn("丟棄{}個(gè)未執(zhí)行任務(wù)", unprocessed.size()); }
任務(wù)中斷條件
只有當(dāng)任務(wù)代碼正確處理中斷時(shí)才能被終止:
class InterruptibleTask implements Runnable { public void run() { while (!Thread.currentThread().isInterrupted()) { // 執(zhí)行可中斷的操作 try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // 重置中斷狀態(tài) break; } } } }
5.對(duì)比分析
特性 | shutdown() | shutdownNow() |
---|---|---|
新任務(wù)接受 | 立即拒絕 | 立即拒絕 |
運(yùn)行中任務(wù)處理 | 等待完成 | 嘗試中斷 |
隊(duì)列任務(wù)處理 | 全部執(zhí)行 | 清除并返回 |
返回值 | void | 未執(zhí)行任務(wù)列表 |
適用場(chǎng)景 | 優(yōu)雅關(guān)閉 | 緊急終止 |
線程中斷策略 | 僅中斷空閑線程 | 強(qiáng)制中斷所有線程 |
6.最佳實(shí)踐代碼示例
6.1 標(biāo)準(zhǔn)關(guān)閉模板
public class GracefulShutdownExample { // 定義超時(shí)時(shí)間和時(shí)間單位(30秒) private static final int TIMEOUT = 30; private static final TimeUnit UNIT = TimeUnit.SECONDS; // 執(zhí)行任務(wù)的方法,接收一個(gè)任務(wù)列表并將其提交給線程池執(zhí)行 public void executeTasks(List<Runnable> tasks) { // 創(chuàng)建一個(gè)固定大小的線程池,大小為系統(tǒng)可用處理器核心數(shù) ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); try { // 將任務(wù)列表中的每個(gè)任務(wù)提交到線程池 tasks.forEach(executor::submit); } finally { // 在所有任務(wù)提交完后,禁用線程池接收新任務(wù),開始優(yōu)雅關(guān)閉線程池 executor.shutdown(); // 禁止再提交新任務(wù) try { // 等待線程池中的任務(wù)在指定超時(shí)內(nèi)完成,如果超時(shí)未完成,則強(qiáng)制關(guān)閉線程池 if (!executor.awaitTermination(TIMEOUT, UNIT)) { // 如果未能在超時(shí)內(nèi)完成,則調(diào)用 shutdownNow() 強(qiáng)制終止所有活動(dòng)任務(wù) List<Runnable> unfinished = executor.shutdownNow(); // 處理未完成的任務(wù),例如記錄日志或重新提交 handleUnfinishedTasks(unfinished); } } catch (InterruptedException e) { // 如果在等待終止時(shí)被中斷,恢復(fù)中斷狀態(tài)并強(qiáng)制關(guān)閉線程池 Thread.currentThread().interrupt(); executor.shutdownNow(); } } } // 處理未完成任務(wù)的方法,這里我們打印未完成任務(wù)的數(shù)量 private void handleUnfinishedTasks(List<Runnable> tasks) { // 如果有未完成的任務(wù),打印任務(wù)數(shù)量并執(zhí)行額外的處理 if (!tasks.isEmpty()) { System.out.println("未完成任務(wù)數(shù): " + tasks.size()); // 可在此處記錄日志、重新排隊(duì)未完成的任務(wù)等 } } }
構(gòu)造線程池: Executors.newFixedThreadPool()
創(chuàng)建一個(gè)固定大小的線程池,大小為系統(tǒng)可用的處理器核心數(shù),這樣可以更高效地利用 CPU 資源。
提交任務(wù): 使用 tasks.forEach(executor::submit)
提交每個(gè)任務(wù)到線程池中執(zhí)行。
優(yōu)雅關(guān)閉線程池:
executor.shutdown()
禁用線程池接收新任務(wù),但仍會(huì)執(zhí)行已經(jīng)提交的任務(wù)。awaitTermination()
方法用于等待所有任務(wù)執(zhí)行完成。如果超時(shí)后任務(wù)未完成,則調(diào)用shutdownNow()
強(qiáng)制關(guān)閉線程池,停止所有正在運(yùn)行的任務(wù),并返回未完成的任務(wù)。
處理中斷: 如果在等待終止過程中發(fā)生 InterruptedException
,線程會(huì)恢復(fù)中斷狀態(tài),并且強(qiáng)制關(guān)閉線程池。
處理未完成任務(wù): handleUnfinishedTasks()
方法會(huì)處理未完成的任務(wù),比如記錄日志或者重新排隊(duì)未完成的任務(wù)。
6.2 帶回調(diào)的增強(qiáng)實(shí)現(xiàn)
public class EnhancedExecutorManager { // 定義線程池對(duì)象 private final ExecutorService executor; // 定義超時(shí)時(shí)間及單位 private final long timeout; private final TimeUnit unit; // 構(gòu)造函數(shù),初始化線程池并設(shè)置超時(shí)時(shí)間和單位 public EnhancedExecutorManager(int corePoolSize, long timeout, TimeUnit unit) { // 創(chuàng)建一個(gè)核心池大小為 corePoolSize,最大池大小為 corePoolSize * 2,最大空閑時(shí)間 60秒的線程池 this.executor = new ThreadPoolExecutor( corePoolSize, // 核心線程池大小 corePoolSize * 2, // 最大線程池大小 60L, TimeUnit.SECONDS, // 空閑線程的存活時(shí)間 new LinkedBlockingQueue<>(1000), // 使用容量為 1000 的隊(duì)列來緩存任務(wù) new CustomThreadFactory(), // 自定義線程工廠 new ThreadPoolExecutor.CallerRunsPolicy() // 當(dāng)任務(wù)無法提交時(shí),調(diào)用者線程執(zhí)行該任務(wù) ); this.timeout = timeout; // 設(shè)置超時(shí)時(shí)間 this.unit = unit; // 設(shè)置超時(shí)時(shí)間單位 } // 優(yōu)雅關(guān)閉線程池的方法 public void shutdown() { executor.shutdown(); // 首先嘗試正常關(guān)閉線程池,不再接收新的任務(wù) try { // 如果線程池未能在指定的超時(shí)時(shí)間內(nèi)終止,則強(qiáng)制關(guān)閉 if (!executor.awaitTermination(timeout, unit)) { System.out.println("強(qiáng)制終止線程池..."); // 強(qiáng)制停止所有正在執(zhí)行的任務(wù)并返回丟棄的任務(wù)列表 List<Runnable> droppedTasks = executor.shutdownNow(); System.out.println("丟棄任務(wù)數(shù): " + droppedTasks.size()); } } catch (InterruptedException e) { // 如果在等待過程中線程池關(guān)閉操作被中斷,立即強(qiáng)制關(guān)閉并恢復(fù)中斷狀態(tài) executor.shutdownNow(); Thread.currentThread().interrupt(); } } // 自定義線程工廠類,用于創(chuàng)建線程 private static class CustomThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); // 線程池編號(hào),用于生成線程名 private final ThreadGroup group; // 線程組 private final AtomicInteger threadNumber = new AtomicInteger(1); // 線程編號(hào) private final String namePrefix; // 線程名稱前綴 CustomThreadFactory() { // 獲取當(dāng)前系統(tǒng)的安全管理器,如果沒有,則使用當(dāng)前線程的線程組 SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); // 設(shè)置線程池的名稱前綴 namePrefix = "pool-" + poolNumber.getAndIncrement() + // 線程池編號(hào)遞增 "-thread-"; } // 創(chuàng)建新線程的方法 public Thread newThread(Runnable r) { // 創(chuàng)建新的線程,線程組、名稱及優(yōu)先級(jí)均已設(shè)置 Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); // 默認(rèn)優(yōu)先級(jí)和daemon設(shè)置 // 如果線程是守護(hù)線程,則將其設(shè)置為非守護(hù)線程 if (t.isDaemon()) t.setDaemon(false); // 設(shè)置線程優(yōu)先級(jí)為默認(rèn) if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; // 返回新創(chuàng)建的線程 } } }
線程池初始化:
EnhancedExecutorManager
的構(gòu)造方法使用ThreadPoolExecutor
創(chuàng)建一個(gè)線程池,線程池大小通過corePoolSize
參數(shù)傳遞。線程池的最大線程數(shù)是核心線程數(shù)的兩倍。LinkedBlockingQueue
用作任務(wù)隊(duì)列,大小為 1000。若任務(wù)量超過隊(duì)列容量,則使用CallerRunsPolicy
策略,即由提交任務(wù)的線程執(zhí)行該任務(wù)。- 使用自定義的
CustomThreadFactory
來創(chuàng)建線程。
優(yōu)雅關(guān)閉線程池:
shutdown()
方法首先調(diào)用executor.shutdown()
來拒絕接受新的任務(wù),然后等待線程池在指定的超時(shí)時(shí)間內(nèi)關(guān)閉。- 如果線程池在超時(shí)時(shí)間內(nèi)未能正常關(guān)閉,則調(diào)用
shutdownNow()
強(qiáng)制關(guān)閉并丟棄未執(zhí)行的任務(wù),同時(shí)輸出丟棄任務(wù)的數(shù)量。 - 如果在等待關(guān)閉過程中發(fā)生
InterruptedException
,會(huì)強(qiáng)制關(guān)閉線程池,并恢復(fù)中斷狀態(tài)。
自定義線程工廠:
CustomThreadFactory
通過實(shí)現(xiàn)ThreadFactory
接口來定義創(chuàng)建線程的行為,主要包括線程組、線程名稱、守護(hù)線程狀態(tài)和線程優(yōu)先級(jí)的配置。- 每個(gè)線程的名稱遵循
pool-編號(hào)-thread-編號(hào)
的格式。線程池的編號(hào)是遞增的,每個(gè)線程有自己的編號(hào)。
7.關(guān)鍵注意事項(xiàng)
- 守護(hù)線程問題:默認(rèn)創(chuàng)建的是非守護(hù)線程,需顯式關(guān)閉
- 中斷策略一致性:任務(wù)必須實(shí)現(xiàn)正確的中斷處理邏輯
- 拒絕策略配合:合理配置RejectedExecutionHandler
- 資源釋放順序:數(shù)據(jù)庫連接等資源應(yīng)先于線程池關(guān)閉
- 監(jiān)控機(jī)制:建議集成線程池監(jiān)控(如JMX)
8.高級(jí)應(yīng)用場(chǎng)景
分級(jí)關(guān)閉策略
public class TieredShutdownManager { // 定義三個(gè)優(yōu)先級(jí)的線程池列表:高優(yōu)先級(jí)、中優(yōu)先級(jí)、低優(yōu)先級(jí) private final List<ExecutorService> highPriority; private final List<ExecutorService> normalPriority; private final List<ExecutorService> lowPriority; // 公共方法用于優(yōu)雅關(guān)閉所有線程池 public void gracefulShutdown() { // 依次關(guān)閉高、中、低優(yōu)先級(jí)的線程池 shutdownTier(highPriority, 10, TimeUnit.SECONDS); shutdownTier(normalPriority, 30, TimeUnit.SECONDS); shutdownTier(lowPriority, 60, TimeUnit.SECONDS); } // 私有方法,用于優(yōu)雅關(guān)閉指定優(yōu)先級(jí)的線程池 private void shutdownTier(List<ExecutorService> tier, long timeout, TimeUnit unit) { // 對(duì)指定的線程池列表執(zhí)行關(guān)閉操作 tier.forEach(ExecutorService::shutdown); // 對(duì)每個(gè)線程池執(zhí)行等待終止的操作,指定超時(shí)時(shí)間 tier.forEach(executor -> { try { // 如果線程池未在超時(shí)時(shí)間內(nèi)終止,則調(diào)用 shutdownNow 強(qiáng)制關(guān)閉 if (!executor.awaitTermination(timeout, unit)) { executor.shutdownNow(); } } catch (InterruptedException e) { // 如果在等待終止過程中線程被中斷,恢復(fù)中斷狀態(tài)并強(qiáng)制關(guān)閉線程池 Thread.currentThread().interrupt(); executor.shutdownNow(); } }); } }
gracefulShutdown
方法按照優(yōu)先級(jí)順序依次關(guān)閉高、中、低優(yōu)先級(jí)的線程池。
shutdownTier
方法首先嘗試正常關(guān)閉每個(gè)線程池(調(diào)用 shutdown
),然后通過 awaitTermination
方法等待線程池在指定的時(shí)間內(nèi)結(jié)束,如果未成功結(jié)束,則調(diào)用 shutdownNow
強(qiáng)制關(guān)閉。
在關(guān)閉過程中,如果發(fā)生中斷,則會(huì)捕獲 InterruptedException
異常,并且中斷當(dāng)前線程,同時(shí)強(qiáng)制關(guān)閉線程池。
9.性能優(yōu)化建議
根據(jù)任務(wù)類型選擇隊(duì)列策略:
- CPU密集型:有界隊(duì)列(ArrayBlockingQueue)
- IO密集型:無界隊(duì)列(LinkedBlockingQueue)
監(jiān)控關(guān)鍵指標(biāo):
ThreadPoolExecutor executor = (ThreadPoolExecutor) service; System.out.println("活躍線程數(shù): " + executor.getActiveCount()); System.out.println("完成任務(wù)數(shù): " + executor.getCompletedTaskCount()); System.out.println("隊(duì)列大小: " + executor.getQueue().size());
動(dòng)態(tài)調(diào)整參數(shù):
executor.setCorePoolSize(newSize); executor.setMaximumPoolSize(newMaxSize);
10.總結(jié)建議
根據(jù)Oracle官方文檔建議,在大多數(shù)生產(chǎn)場(chǎng)景中推薦以下關(guān)閉流程:
- 優(yōu)先調(diào)用shutdown()
- 設(shè)置合理的awaitTermination超時(shí)
- 必要時(shí)調(diào)用shutdownNow()
- 始終處理返回的未完成任務(wù)
- 記錄完整的關(guān)閉日志
正確選擇關(guān)閉策略需要綜合考量:
- 任務(wù)重要性等級(jí)
- 系統(tǒng)資源限制
- 業(yè)務(wù)連續(xù)性需求
- 數(shù)據(jù)一致性要求
到此這篇關(guān)于Java如何優(yōu)雅關(guān)閉異步中的ExecutorService的文章就介紹到這了,更多相關(guān)Java關(guān)閉ExecutorService內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java科學(xué)計(jì)數(shù)法轉(zhuǎn)換成數(shù)字的幾種方法
我們?cè)谔幚泶髷?shù)值的時(shí)候,常常會(huì)遇到使用科學(xué)計(jì)數(shù)法表示的數(shù)字,科學(xué)計(jì)數(shù)法是一種表示大數(shù)值或小數(shù)值的方式,下面這篇文章主要給大家介紹了關(guān)于java科學(xué)計(jì)數(shù)法轉(zhuǎn)換成數(shù)字的幾種方法,需要的朋友可以參考下2024-03-03SpringBoot熱部署Springloaded實(shí)現(xiàn)過程解析
這篇文章主要介紹了SpringBoot熱部署Springloaded實(shí)現(xiàn)過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03關(guān)于springboot集成swagger3時(shí)spring-plugin-core報(bào)錯(cuò)的問題
這篇文章主要介紹了關(guān)于springboot集成swagger3時(shí)spring-plugin-core報(bào)錯(cuò)的問題,本文給大家分享解決方法,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-09-09java使用Nagao算法實(shí)現(xiàn)新詞發(fā)現(xiàn)、熱門詞的挖掘
這篇文章主要介紹了java使用Nagao算法實(shí)現(xiàn)新詞發(fā)現(xiàn)、熱門詞的挖掘的思路和詳細(xì)代碼,需要的朋友可以參考下2015-07-07解決SpringBoot中MultipartResolver和ServletFileUpload的沖突問題
這篇文章主要介紹了解決SpringBoot中MultipartResolver和ServletFileUpload的沖突問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10IDEA卡在”正在解析Maven依賴項(xiàng)“的解決方法
在創(chuàng)建新的SpringBoot項(xiàng)目時(shí),始終卡在"正在解析Maven依賴項(xiàng)…",本文小編給大家介紹了幾種相關(guān)的解決方案,具有一定的參考價(jià)值,需要的朋友可以參考下2023-11-11