欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java如何優(yōu)雅關(guān)閉異步中的ExecutorService

 更新時(shí)間:2025年02月08日 08:21:23   作者:Huooya  
在并發(fā)編程領(lǐng)域,Java的ExecutorService是線程池管理的關(guān)鍵接口,這篇文章主要為大家介紹了如何優(yōu)雅關(guān)閉異步中的ExecutorService,需要的可以了解下

1.ExecutorService的核心價(jià)值

在并發(fā)編程領(lǐng)域,Java的ExecutorService(位于java.util.concurrent包)是線程池管理的關(guān)鍵接口。作為Executor框架的核心組件,它通過解耦任務(wù)提交與執(zhí)行機(jī)制,為開發(fā)者提供了:

  • 線程生命周期管理自動(dòng)化
  • 任務(wù)隊(duì)列智能調(diào)度
  • 資源復(fù)用優(yōu)化機(jī)制
  • 異步執(zhí)行結(jié)果追蹤能力

2.關(guān)閉機(jī)制的必要性

不正確的線程池關(guān)閉會(huì)導(dǎo)致:

  • 內(nèi)存泄漏(滯留線程無法回收)
  • 應(yīng)用無法正常終止(非守護(hù)線程保持活躍)
  • 任務(wù)狀態(tài)不一致(突然中斷導(dǎo)致數(shù)據(jù)問題)
  • 系統(tǒng)資源耗盡(無限制線程創(chuàng)建)

3.shutdown()方法詳解

3.1 方法特性

void shutdown()

狀態(tài)轉(zhuǎn)換

將線程池狀態(tài)設(shè)置為SHUTDOWN,觸發(fā)以下行為:

  • 拒絕新任務(wù)提交(觸發(fā)RejectedExecutionHandler)
  • 繼續(xù)執(zhí)行已存在的任務(wù):
    • 正在執(zhí)行的任務(wù)(running tasks)
    • 等待隊(duì)列中的任務(wù)(queued tasks)

典型應(yīng)用場(chǎng)景

ExecutorService executor = Executors.newFixedThreadPool(4);
// 提交多個(gè)任務(wù)...
executor.shutdown();

try {
    if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
        System.err.println("仍有任務(wù)未在時(shí)限內(nèi)完成");
    }
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}

3.2 內(nèi)部運(yùn)作機(jī)制

  • 原子性狀態(tài)更新:CAS操作修改線程池控制狀態(tài)
  • 中斷空閑線程:僅中斷等待任務(wù)的Worker線程
  • 隊(duì)列消費(fèi)保證:完全處理BlockingQueue中的剩余任務(wù)

4.shutdownNow()方法剖析

4.1 方法定義

List<Runnable> shutdownNow()

狀態(tài)轉(zhuǎn)換

將線程池狀態(tài)設(shè)置為STOP,觸發(fā):

  • 立即拒絕新任務(wù)
  • 中斷所有工作線程(無論是否在執(zhí)行任務(wù))
  • 清空任務(wù)隊(duì)列,返回未執(zhí)行任務(wù)列表

4.2 中斷處理要點(diǎn)

executor.shutdownNow();
// 典型返回值處理
List<Runnable> unprocessed = executor.shutdownNow();
if (!unprocessed.isEmpty()) {
    logger.warn("丟棄{}個(gè)未執(zhí)行任務(wù)", unprocessed.size());
}

任務(wù)中斷條件

只有當(dāng)任務(wù)代碼正確處理中斷時(shí)才能被終止:

class InterruptibleTask implements Runnable {
    public void run() {
        while (!Thread.currentThread().isInterrupted()) {
            // 執(zhí)行可中斷的操作
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // 重置中斷狀態(tài)
                break;
            }
        }
    }
}

5.對(duì)比分析

特性shutdown()shutdownNow()
新任務(wù)接受立即拒絕立即拒絕
運(yùn)行中任務(wù)處理等待完成嘗試中斷
隊(duì)列任務(wù)處理全部執(zhí)行清除并返回
返回值void未執(zhí)行任務(wù)列表
適用場(chǎng)景優(yōu)雅關(guān)閉緊急終止
線程中斷策略僅中斷空閑線程強(qiáng)制中斷所有線程

6.最佳實(shí)踐代碼示例

6.1 標(biāo)準(zhǔn)關(guān)閉模板

public class GracefulShutdownExample {
    // 定義超時(shí)時(shí)間和時(shí)間單位(30秒)
    private static final int TIMEOUT = 30;
    private static final TimeUnit UNIT = TimeUnit.SECONDS;

    // 執(zhí)行任務(wù)的方法,接收一個(gè)任務(wù)列表并將其提交給線程池執(zhí)行
    public void executeTasks(List<Runnable> tasks) {
        // 創(chuàng)建一個(gè)固定大小的線程池,大小為系統(tǒng)可用處理器核心數(shù)
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        
        try {
            // 將任務(wù)列表中的每個(gè)任務(wù)提交到線程池
            tasks.forEach(executor::submit);
        } finally {
            // 在所有任務(wù)提交完后,禁用線程池接收新任務(wù),開始優(yōu)雅關(guān)閉線程池
            executor.shutdown(); // 禁止再提交新任務(wù)
            try {
                // 等待線程池中的任務(wù)在指定超時(shí)內(nèi)完成,如果超時(shí)未完成,則強(qiáng)制關(guān)閉線程池
                if (!executor.awaitTermination(TIMEOUT, UNIT)) {
                    // 如果未能在超時(shí)內(nèi)完成,則調(diào)用 shutdownNow() 強(qiáng)制終止所有活動(dòng)任務(wù)
                    List<Runnable> unfinished = executor.shutdownNow();
                    // 處理未完成的任務(wù),例如記錄日志或重新提交
                    handleUnfinishedTasks(unfinished);
                }
            } catch (InterruptedException e) {
                // 如果在等待終止時(shí)被中斷,恢復(fù)中斷狀態(tài)并強(qiáng)制關(guān)閉線程池
                Thread.currentThread().interrupt();
                executor.shutdownNow();
            }
        }
    }
    
    // 處理未完成任務(wù)的方法,這里我們打印未完成任務(wù)的數(shù)量
    private void handleUnfinishedTasks(List<Runnable> tasks) {
        // 如果有未完成的任務(wù),打印任務(wù)數(shù)量并執(zhí)行額外的處理
        if (!tasks.isEmpty()) {
            System.out.println("未完成任務(wù)數(shù): " + tasks.size());
            // 可在此處記錄日志、重新排隊(duì)未完成的任務(wù)等
        }
    }

}

構(gòu)造線程池: Executors.newFixedThreadPool() 創(chuàng)建一個(gè)固定大小的線程池,大小為系統(tǒng)可用的處理器核心數(shù),這樣可以更高效地利用 CPU 資源。

提交任務(wù): 使用 tasks.forEach(executor::submit) 提交每個(gè)任務(wù)到線程池中執(zhí)行。

優(yōu)雅關(guān)閉線程池:

  • executor.shutdown() 禁用線程池接收新任務(wù),但仍會(huì)執(zhí)行已經(jīng)提交的任務(wù)。
  • awaitTermination() 方法用于等待所有任務(wù)執(zhí)行完成。如果超時(shí)后任務(wù)未完成,則調(diào)用 shutdownNow() 強(qiáng)制關(guān)閉線程池,停止所有正在運(yùn)行的任務(wù),并返回未完成的任務(wù)。

處理中斷: 如果在等待終止過程中發(fā)生 InterruptedException,線程會(huì)恢復(fù)中斷狀態(tài),并且強(qiáng)制關(guān)閉線程池。

處理未完成任務(wù): handleUnfinishedTasks() 方法會(huì)處理未完成的任務(wù),比如記錄日志或者重新排隊(duì)未完成的任務(wù)。

6.2 帶回調(diào)的增強(qiáng)實(shí)現(xiàn)

public class EnhancedExecutorManager {
    // 定義線程池對(duì)象
    private final ExecutorService executor;
    // 定義超時(shí)時(shí)間及單位
    private final long timeout;
    private final TimeUnit unit;

    // 構(gòu)造函數(shù),初始化線程池并設(shè)置超時(shí)時(shí)間和單位
    public EnhancedExecutorManager(int corePoolSize, long timeout, TimeUnit unit) {
        // 創(chuàng)建一個(gè)核心池大小為 corePoolSize,最大池大小為 corePoolSize * 2,最大空閑時(shí)間 60秒的線程池
        this.executor = new ThreadPoolExecutor(
            corePoolSize,                             // 核心線程池大小
            corePoolSize * 2,                         // 最大線程池大小
            60L, TimeUnit.SECONDS,                    // 空閑線程的存活時(shí)間
            new LinkedBlockingQueue<>(1000),          // 使用容量為 1000 的隊(duì)列來緩存任務(wù)
            new CustomThreadFactory(),                // 自定義線程工廠
            new ThreadPoolExecutor.CallerRunsPolicy() // 當(dāng)任務(wù)無法提交時(shí),調(diào)用者線程執(zhí)行該任務(wù)
        );
        this.timeout = timeout;                     // 設(shè)置超時(shí)時(shí)間
        this.unit = unit;                           // 設(shè)置超時(shí)時(shí)間單位
    }
    
    // 優(yōu)雅關(guān)閉線程池的方法
    public void shutdown() {
        executor.shutdown(); // 首先嘗試正常關(guān)閉線程池,不再接收新的任務(wù)
        
        try {
            // 如果線程池未能在指定的超時(shí)時(shí)間內(nèi)終止,則強(qiáng)制關(guān)閉
            if (!executor.awaitTermination(timeout, unit)) {
                System.out.println("強(qiáng)制終止線程池...");
                // 強(qiáng)制停止所有正在執(zhí)行的任務(wù)并返回丟棄的任務(wù)列表
                List<Runnable> droppedTasks = executor.shutdownNow();
                System.out.println("丟棄任務(wù)數(shù): " + droppedTasks.size());
            }
        } catch (InterruptedException e) {
            // 如果在等待過程中線程池關(guān)閉操作被中斷,立即強(qiáng)制關(guān)閉并恢復(fù)中斷狀態(tài)
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
    
    // 自定義線程工廠類,用于創(chuàng)建線程
    private static class CustomThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1); // 線程池編號(hào),用于生成線程名
        private final ThreadGroup group; // 線程組
        private final AtomicInteger threadNumber = new AtomicInteger(1); // 線程編號(hào)
        private final String namePrefix; // 線程名稱前綴
    
        CustomThreadFactory() {
            // 獲取當(dāng)前系統(tǒng)的安全管理器,如果沒有,則使用當(dāng)前線程的線程組
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            // 設(shè)置線程池的名稱前綴
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() + // 線程池編號(hào)遞增
                         "-thread-";
        }
    
        // 創(chuàng)建新線程的方法
        public Thread newThread(Runnable r) {
            // 創(chuàng)建新的線程,線程組、名稱及優(yōu)先級(jí)均已設(shè)置
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0); // 默認(rèn)優(yōu)先級(jí)和daemon設(shè)置
            // 如果線程是守護(hù)線程,則將其設(shè)置為非守護(hù)線程
            if (t.isDaemon())
                t.setDaemon(false);
            // 設(shè)置線程優(yōu)先級(jí)為默認(rèn)
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t; // 返回新創(chuàng)建的線程
        }
    }
}

線程池初始化:

  • EnhancedExecutorManager 的構(gòu)造方法使用 ThreadPoolExecutor 創(chuàng)建一個(gè)線程池,線程池大小通過 corePoolSize 參數(shù)傳遞。線程池的最大線程數(shù)是核心線程數(shù)的兩倍。
  • LinkedBlockingQueue 用作任務(wù)隊(duì)列,大小為 1000。若任務(wù)量超過隊(duì)列容量,則使用 CallerRunsPolicy 策略,即由提交任務(wù)的線程執(zhí)行該任務(wù)。
  • 使用自定義的 CustomThreadFactory 來創(chuàng)建線程。

優(yōu)雅關(guān)閉線程池:

  • shutdown() 方法首先調(diào)用 executor.shutdown() 來拒絕接受新的任務(wù),然后等待線程池在指定的超時(shí)時(shí)間內(nèi)關(guān)閉。
  • 如果線程池在超時(shí)時(shí)間內(nèi)未能正常關(guān)閉,則調(diào)用 shutdownNow() 強(qiáng)制關(guān)閉并丟棄未執(zhí)行的任務(wù),同時(shí)輸出丟棄任務(wù)的數(shù)量。
  • 如果在等待關(guān)閉過程中發(fā)生 InterruptedException,會(huì)強(qiáng)制關(guān)閉線程池,并恢復(fù)中斷狀態(tài)。

自定義線程工廠:

  • CustomThreadFactory 通過實(shí)現(xiàn) ThreadFactory 接口來定義創(chuàng)建線程的行為,主要包括線程組、線程名稱、守護(hù)線程狀態(tài)和線程優(yōu)先級(jí)的配置。
  • 每個(gè)線程的名稱遵循 pool-編號(hào)-thread-編號(hào) 的格式。線程池的編號(hào)是遞增的,每個(gè)線程有自己的編號(hào)。

7.關(guān)鍵注意事項(xiàng)

  • 守護(hù)線程問題:默認(rèn)創(chuàng)建的是非守護(hù)線程,需顯式關(guān)閉
  • 中斷策略一致性:任務(wù)必須實(shí)現(xiàn)正確的中斷處理邏輯
  • 拒絕策略配合:合理配置RejectedExecutionHandler
  • 資源釋放順序:數(shù)據(jù)庫連接等資源應(yīng)先于線程池關(guān)閉
  • 監(jiān)控機(jī)制:建議集成線程池監(jiān)控(如JMX)

8.高級(jí)應(yīng)用場(chǎng)景

分級(jí)關(guān)閉策略

public class TieredShutdownManager { 
    // 定義三個(gè)優(yōu)先級(jí)的線程池列表:高優(yōu)先級(jí)、中優(yōu)先級(jí)、低優(yōu)先級(jí)
    private final List<ExecutorService> highPriority; 
    private final List<ExecutorService> normalPriority; 
    private final List<ExecutorService> lowPriority; 
  
    // 公共方法用于優(yōu)雅關(guān)閉所有線程池
    public void gracefulShutdown() { 
        // 依次關(guān)閉高、中、低優(yōu)先級(jí)的線程池
        shutdownTier(highPriority, 10, TimeUnit.SECONDS); 
        shutdownTier(normalPriority, 30, TimeUnit.SECONDS); 
        shutdownTier(lowPriority, 60, TimeUnit.SECONDS); 
    } 
  
    // 私有方法,用于優(yōu)雅關(guān)閉指定優(yōu)先級(jí)的線程池
    private void shutdownTier(List<ExecutorService> tier, long timeout, TimeUnit unit) { 
        // 對(duì)指定的線程池列表執(zhí)行關(guān)閉操作
        tier.forEach(ExecutorService::shutdown); 
  
        // 對(duì)每個(gè)線程池執(zhí)行等待終止的操作,指定超時(shí)時(shí)間
        tier.forEach(executor -> { 
            try { 
                // 如果線程池未在超時(shí)時(shí)間內(nèi)終止,則調(diào)用 shutdownNow 強(qiáng)制關(guān)閉
                if (!executor.awaitTermination(timeout, unit)) { 
                    executor.shutdownNow(); 
                } 
            } catch (InterruptedException e) { 
                // 如果在等待終止過程中線程被中斷,恢復(fù)中斷狀態(tài)并強(qiáng)制關(guān)閉線程池
                Thread.currentThread().interrupt(); 
                executor.shutdownNow(); 
            } 
        }); 
    } 
}

gracefulShutdown 方法按照優(yōu)先級(jí)順序依次關(guān)閉高、中、低優(yōu)先級(jí)的線程池。

shutdownTier 方法首先嘗試正常關(guān)閉每個(gè)線程池(調(diào)用 shutdown),然后通過 awaitTermination 方法等待線程池在指定的時(shí)間內(nèi)結(jié)束,如果未成功結(jié)束,則調(diào)用 shutdownNow 強(qiáng)制關(guān)閉。

在關(guān)閉過程中,如果發(fā)生中斷,則會(huì)捕獲 InterruptedException 異常,并且中斷當(dāng)前線程,同時(shí)強(qiáng)制關(guān)閉線程池。

9.性能優(yōu)化建議

根據(jù)任務(wù)類型選擇隊(duì)列策略:

  • CPU密集型:有界隊(duì)列(ArrayBlockingQueue)
  • IO密集型:無界隊(duì)列(LinkedBlockingQueue)

監(jiān)控關(guān)鍵指標(biāo):

ThreadPoolExecutor executor = (ThreadPoolExecutor) service;
System.out.println("活躍線程數(shù): " + executor.getActiveCount());
System.out.println("完成任務(wù)數(shù): " + executor.getCompletedTaskCount());
System.out.println("隊(duì)列大小: " + executor.getQueue().size());

動(dòng)態(tài)調(diào)整參數(shù):

executor.setCorePoolSize(newSize);
executor.setMaximumPoolSize(newMaxSize);

10.總結(jié)建議

根據(jù)Oracle官方文檔建議,在大多數(shù)生產(chǎn)場(chǎng)景中推薦以下關(guān)閉流程:

  • 優(yōu)先調(diào)用shutdown()
  • 設(shè)置合理的awaitTermination超時(shí)
  • 必要時(shí)調(diào)用shutdownNow()
  • 始終處理返回的未完成任務(wù)
  • 記錄完整的關(guān)閉日志

正確選擇關(guān)閉策略需要綜合考量:

  • 任務(wù)重要性等級(jí)
  • 系統(tǒng)資源限制
  • 業(yè)務(wù)連續(xù)性需求
  • 數(shù)據(jù)一致性要求

到此這篇關(guān)于Java如何優(yōu)雅關(guān)閉異步中的ExecutorService的文章就介紹到這了,更多相關(guān)Java關(guān)閉ExecutorService內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論