欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java中線程執(zhí)行狀態(tài)檢測的四種可靠方法

 更新時間:2025年05月06日 08:35:09   作者:異常君  
在多線程開發(fā)中,開發(fā)者常需監(jiān)控子線程狀態(tài),這個問題關(guān)系到系統(tǒng)的可靠性和錯誤處理能力,需要一套完善的方案來解決,下面我們來看看四種從基礎(chǔ)到高級的解決方案吧

在多線程開發(fā)中,開發(fā)者常需監(jiān)控子線程狀態(tài)。當主線程創(chuàng)建并啟動多個工作線程后,如何判斷這些任務(wù)是否成功完成?工作線程因未處理異常終止時,主線程往往無法直接感知。這個問題關(guān)系到系統(tǒng)的可靠性和錯誤處理能力,需要一套完善的方案來解決。

線程執(zhí)行狀態(tài)檢測的挑戰(zhàn)

在 Java 多線程編程中,主線程啟動工作線程后,默認情況下無法直接獲知工作線程的執(zhí)行結(jié)果。這是由于線程的獨立性特征決定的:

那么,如何讓主線程知道工作線程是否執(zhí)行成功呢?下面介紹四種從基礎(chǔ)到高級的解決方案。

方法一:使用 Thread.join()等待線程完成

最基礎(chǔ)的方法是調(diào)用線程的 join()方法,使主線程等待工作線程執(zhí)行完畢。

public class JoinExample {
    public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            try {
                System.out.println("工作線程開始執(zhí)行");
                // 模擬任務(wù)執(zhí)行
                Thread.sleep(2000);
                System.out.println("工作線程執(zhí)行完成");
            } catch (InterruptedException e) {
                System.err.println("工作線程被中斷:" + e.getMessage());
                Thread.currentThread().interrupt();
            } catch (Exception e) {
                System.err.println("工作線程執(zhí)行異常:" + e.getMessage());
            }
        });

        try {
            thread.start();
            // 主線程等待工作線程執(zhí)行完成,支持設(shè)置超時
            // thread.join(3000); // 等待最多3秒
            thread.join();

            // 檢查線程狀態(tài)
            if (!thread.isAlive()) {
                System.out.println("工作線程已結(jié)束執(zhí)行");
                // 但仍無法知道是成功還是失敗
            }
        } catch (InterruptedException e) {
            System.err.println("主線程等待過程中被中斷");
            Thread.currentThread().interrupt();
        }
    }
}

局限性

  • 只能知道線程是否執(zhí)行完畢,無法獲取執(zhí)行結(jié)果或異常信息
  • 支持基礎(chǔ)的超時設(shè)置(通過join(long)),但功能有限(如超時后需手動檢查線程狀態(tài),無法中途取消)
  • 對于多個線程,需要逐個 join,代碼繁瑣

方法二:通過共享變量或回調(diào)傳遞執(zhí)行狀態(tài)

通過在線程間共享對象,可以傳遞執(zhí)行狀態(tài)信息:

class TaskResult {
    private volatile boolean success = false;
    private volatile String message = "";
    private volatile Exception exception = null;
    private volatile boolean completed = false; // 增加完成標記

    // 使用synchronized保證設(shè)置結(jié)果的原子性
    public synchronized void setSuccess(String message) {
        if (!completed) { // 避免重復設(shè)置
            this.success = true;
            this.message = message;
            this.completed = true;
        }
    }

    public synchronized void setFailure(Exception e, String message) {
        if (!completed) { // 避免重復設(shè)置
            this.success = false;
            this.exception = e;
            this.message = message;
            this.completed = true;
        }
    }

    public boolean isSuccess() {
        return success;
    }

    public boolean isCompleted() {
        return completed;
    }

    public String getMessage() {
        return message;
    }

    public Exception getException() {
        return exception;
    }
}

public class SharedObjectExample {
    public static void main(String[] args) {
        // 創(chuàng)建共享結(jié)果對象
        TaskResult result = new TaskResult();

        Thread thread = new Thread(() -> {
            try {
                System.out.println("工作線程開始執(zhí)行");
                // 模擬任務(wù)執(zhí)行
                Thread.sleep(2000);

                // 設(shè)置執(zhí)行成功
                result.setSuccess("任務(wù)順利完成");
            } catch (InterruptedException e) {
                result.setFailure(e, "線程被中斷");
                Thread.currentThread().interrupt();
            } catch (Exception e) {
                result.setFailure(e, "執(zhí)行異常:" + e.getMessage());
            }
        });

        try {
            thread.start();
            thread.join(); // 等待工作線程執(zhí)行完成

            // 檢查執(zhí)行結(jié)果
            if (!result.isCompleted()) {
                System.out.println("任務(wù)尚未完成");
            } else if (result.isSuccess()) {
                System.out.println("工作線程執(zhí)行成功:" + result.getMessage());
            } else {
                System.out.println("工作線程執(zhí)行失?。? + result.getMessage());
                if (result.getException() != null) {
                    System.out.println("異常信息:" + result.getException().toString());
                }
            }
        } catch (InterruptedException e) {
            System.err.println("主線程等待過程中被中斷");
            Thread.currentThread().interrupt();
        }
    }
}

優(yōu)點

  • 可以傳遞更詳細的執(zhí)行結(jié)果和異常信息
  • 適用于簡單的線程狀態(tài)監(jiān)控

缺點

  • 需要自行處理線程安全問題
  • 代碼結(jié)構(gòu)較為松散
  • 擴展多個線程時較為復雜

方法三:使用 Future 和 Callable 獲取執(zhí)行結(jié)果和異常

Java 5 引入的 Future 接口提供了更優(yōu)雅的異步任務(wù)處理方式:

import java.util.concurrent.*;

public class FutureExample {
    public static void main(String[] args) {
        // 避免使用Executors工廠方法,直接配置ThreadPoolExecutor
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1, // 核心線程數(shù)
            1, // 最大線程數(shù)
            0L, TimeUnit.MILLISECONDS, // 空閑線程保留時間
            new LinkedBlockingQueue<>(10), // 有界隊列,防止OOM
            new ThreadPoolExecutor.CallerRunsPolicy() // 拒絕策略
        );

        // 創(chuàng)建Callable任務(wù)
        Callable<String> task = () -> {
            System.out.println("工作線程開始執(zhí)行");
            // 模擬任務(wù)執(zhí)行
            Thread.sleep(2000);
            // 任務(wù)可以有返回值
            return "任務(wù)執(zhí)行結(jié)果";

            // 如果任務(wù)失敗,直接拋出異常
            // throw new RuntimeException("任務(wù)執(zhí)行失敗");
        };

        // 提交任務(wù)并獲取Future
        Future<String> future = executor.submit(task);

        try {
            // 等待任務(wù)完成并獲取結(jié)果,可以設(shè)置超時
            String result = future.get(3, TimeUnit.SECONDS);
            System.out.println("工作線程執(zhí)行成功,結(jié)果:" + result);
        } catch (TimeoutException e) {
            System.err.println("任務(wù)執(zhí)行超時");
            future.cancel(true); // 嘗試取消任務(wù)
        } catch (ExecutionException e) {
            System.err.println("任務(wù)執(zhí)行異常:" + e.getCause().getMessage());
            // getCause()獲取任務(wù)中拋出的原始異常
        } catch (InterruptedException e) {
            System.err.println("主線程等待過程中被中斷");
            Thread.currentThread().interrupt();
        } finally {
            // 記得關(guān)閉線程池
            executor.shutdown();
        }
    }
}

優(yōu)點

  • 可以獲取線程執(zhí)行結(jié)果或異常
  • 支持超時設(shè)置
  • 可以取消任務(wù)執(zhí)行

使用 Future 處理多個任務(wù)

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class MultipleFuturesExample {
    public static void main(String[] args) {
        // 創(chuàng)建線程池 - 根據(jù)任務(wù)類型配置參數(shù)
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            3, // 核心線程數(shù) - 根據(jù)任務(wù)特性選擇
            3, // 最大線程數(shù)
            60L, TimeUnit.SECONDS, // 空閑線程保留時間
            new ArrayBlockingQueue<>(10), // 有界隊列,避免OOM
            new ThreadPoolExecutor.AbortPolicy() // 任務(wù)拒絕時拋出異常
        );

        // 創(chuàng)建多個任務(wù)
        List<Callable<String>> tasks = new ArrayList<>();
        for (int i = 0; i < 3; i++) {
            final int taskId = i;
            tasks.add(() -> {
                System.out.println("任務(wù)" + taskId + "開始執(zhí)行");
                // 模擬不同執(zhí)行時間
                Thread.sleep(1000 + taskId * 1000);

                // 模擬可能的失敗
                if (taskId == 1) {
                    throw new RuntimeException("任務(wù)" + taskId + "執(zhí)行失敗");
                }

                return "任務(wù)" + taskId + "執(zhí)行結(jié)果";
            });
        }

        try {
            // 提交所有任務(wù)并獲取Future列表
            List<Future<String>> futures = executor.invokeAll(tasks);

            // 處理每個任務(wù)的結(jié)果 - 確保一個任務(wù)的異常不會影響其他任務(wù)處理
            for (int i = 0; i < futures.size(); i++) {
                Future<String> future = futures.get(i);
                try {
                    // 檢查任務(wù)是否被取消
                    if (future.isCancelled()) {
                        System.err.println("任務(wù)" + i + "被取消");
                        continue;
                    }

                    String result = future.get();
                    System.out.println("任務(wù)" + i + "執(zhí)行成功:" + result);
                } catch (ExecutionException e) {
                    System.err.println("任務(wù)" + i + "執(zhí)行異常:" + e.getCause().getMessage());
                } catch (CancellationException e) {
                    System.err.println("任務(wù)" + i + "被取消");
                }
            }
        } catch (InterruptedException e) {
            System.err.println("主線程等待過程中被中斷");
            Thread.currentThread().interrupt();
        } finally {
            // 關(guān)閉線程池
            shutdownAndAwaitTermination(executor);
        }
    }

    // 安全關(guān)閉線程池的標準方法
    private static void shutdownAndAwaitTermination(ExecutorService pool) {
        pool.shutdown();
        try {
            if (!pool.awaitTermination(5, TimeUnit.SECONDS)) {
                pool.shutdownNow();
                if (!pool.awaitTermination(5, TimeUnit.SECONDS)) {
                    System.err.println("線程池無法終止");
                }
            }
        } catch (InterruptedException e) {
            pool.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

線程池拒絕策略的選擇依據(jù)

在配置線程池時,選擇合適的拒絕策略非常重要:

  • CallerRunsPolicy:將任務(wù)回退到調(diào)用者線程執(zhí)行。當系統(tǒng)負載過高時,能起到自動降速的作用,因為提交任務(wù)的線程會被迫自己執(zhí)行任務(wù),暫時無法提交新任務(wù)。適用于不能丟棄任務(wù)且需要自動調(diào)節(jié)提交速率的場景。

  • AbortPolicy:直接拋出 RejectedExecutionException 異常。適用于需要明確知道任務(wù)被拒絕并進行特殊處理的場景,比如重要業(yè)務(wù)任務(wù)。

  • DiscardPolicy:靜默丟棄任務(wù),不做任何處理。適用于任務(wù)可以安全丟棄且無需通知的場景,如統(tǒng)計類非關(guān)鍵任務(wù)。

  • DiscardOldestPolicy:丟棄隊列中等待最久的任務(wù),然后嘗試重新提交當前任務(wù)。適用于新任務(wù)比舊任務(wù)更重要的場景,如實時監(jiān)控數(shù)據(jù)。

// 選擇示例
// 1. 關(guān)鍵業(yè)務(wù)任務(wù) - 不能丟失,需要感知拒絕
new ThreadPoolExecutor(cores, maxThreads, keepAliveTime,
    timeUnit, queue, new ThreadPoolExecutor.AbortPolicy());

// 2. 高負載任務(wù) - 防止系統(tǒng)崩潰,允許降速
new ThreadPoolExecutor(cores, maxThreads, keepAliveTime,
    timeUnit, queue, new ThreadPoolExecutor.CallerRunsPolicy());

// 3. 非關(guān)鍵統(tǒng)計任務(wù) - 可以安全丟棄
new ThreadPoolExecutor(cores, maxThreads, keepAliveTime,
    timeUnit, queue, new ThreadPoolExecutor.DiscardPolicy());

方法四:使用 CompletableFuture 實現(xiàn)異步任務(wù)監(jiān)控

Java 8 引入的 CompletableFuture 提供了更強大的異步編程能力,特別適合復雜的任務(wù)依賴場景:

import java.util.concurrent.*;
import java.util.function.Supplier;

public class CompletableFutureExample {
    public static void main(String[] args) {
        // 創(chuàng)建線程池 - 適合IO密集型任務(wù)的線程池配置
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors() * 2, // IO密集型任務(wù)可使用更多線程
            Runtime.getRuntime().availableProcessors() * 2,
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100),
            new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r, "async-task-" + System.currentTimeMillis());
                    t.setDaemon(false); // 非守護線程
                    return t;
                }
            }
        );

        // 創(chuàng)建任務(wù)
        Supplier<String> task = () -> {
            try {
                System.out.println("工作線程開始執(zhí)行,ID:" + Thread.currentThread().getId());
                // 模擬任務(wù)執(zhí)行
                Thread.sleep(2000);

                // 模擬可能的異常
                if (Math.random() < 0.3) {
                    throw new RuntimeException("任務(wù)隨機失敗");
                }

                return "任務(wù)執(zhí)行結(jié)果";
            } catch (InterruptedException e) {
                // 保留完整的異常棧信息
                Thread.currentThread().interrupt();
                throw new IllegalStateException("任務(wù)被中斷", e); // 使用標準異常
            }
        };

        // 創(chuàng)建CompletableFuture并指定執(zhí)行線程池
        CompletableFuture<String> future = CompletableFuture
            .supplyAsync(task, executor)
            .thenApply(result -> {
                // 對結(jié)果進行處理 - 默認在任務(wù)線程執(zhí)行
                return "處理后的" + result;
            })
            .exceptionally(ex -> {
                // 異常處理
                System.err.println("任務(wù)異常:" + ex.getCause().getMessage());
                return "默認結(jié)果";
            });

        // 添加完成回調(diào) - 確?;卣{(diào)在主線程執(zhí)行
        ExecutorService callbackExecutor = Executors.newSingleThreadExecutor();
        future.whenCompleteAsync((result, ex) -> {
            if (ex == null) {
                System.out.println("任務(wù)完成,結(jié)果:" + result);
            } else {
                System.err.println("任務(wù)異常:" + ex.getMessage());
            }
        }, callbackExecutor);

        try {
            // 等待任務(wù)完成
            String result = future.get(5, TimeUnit.SECONDS);
            System.out.println("最終結(jié)果:" + result);
        } catch (Exception e) {
            System.err.println("獲取結(jié)果異常:" + e.getMessage());
        } finally {
            // 關(guān)閉線程池
            shutdownAndAwaitTermination(executor);
            shutdownAndAwaitTermination(callbackExecutor);
        }
    }

    // 安全關(guān)閉線程池的標準方法
    private static void shutdownAndAwaitTermination(ExecutorService pool) {
        pool.shutdown();
        try {
            if (!pool.awaitTermination(5, TimeUnit.SECONDS)) {
                pool.shutdownNow();
                if (!pool.awaitTermination(5, TimeUnit.SECONDS)) {
                    System.err.println("線程池無法終止");
                }
            }
        } catch (InterruptedException e) {
            pool.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

CompletableFuture 多級異常處理

CompletableFuture 的一大強項是處理復雜的任務(wù)鏈,下面是處理多級異常的示例:

public void processWithErrorHandling() {
    CompletableFuture<String> future = CompletableFuture
        .supplyAsync(() -> {
            // 步驟1:獲取原始數(shù)據(jù)
            if (Math.random() < 0.2) {
                throw new RuntimeException("獲取數(shù)據(jù)失敗");
            }
            return "原始數(shù)據(jù)";
        })
        .thenApply(data -> {
            // 步驟2:處理數(shù)據(jù)
            if (data.contains("錯誤")) {
                throw new IllegalArgumentException("數(shù)據(jù)格式錯誤");
            }
            return data + "已處理";
        })
        .thenApply(processed -> {
            // 步驟3:格式化結(jié)果
            if (Math.random() < 0.1) {
                throw new RuntimeException("格式化失敗");
            }
            return "[" + processed + "]";
        })
        .exceptionally(ex -> {
            // 捕獲任何步驟的異常
            Throwable cause = ex.getCause() != null ? ex.getCause() : ex;
            log.error("處理鏈異常: {}", cause.getMessage());

            // 根據(jù)異常類型返回不同的默認值
            if (cause instanceof IllegalArgumentException) {
                return "[格式錯誤默認值]";
            }
            return "[系統(tǒng)錯誤默認值]";
        });

    // 還可以添加完成回調(diào)處理最終結(jié)果
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            System.out.println("處理完成: " + result);
        } else {
            System.err.println("處理失敗: " + ex.getMessage());
            // 注意:此處不能返回新值
        }
    });
}

exceptionally 與 whenComplete 對比

  • exceptionally: 可以恢復異常并返回替代結(jié)果。用于類似 try-catch-return 的場景。
  • whenComplete: 無法修改結(jié)果,只能執(zhí)行最終操作。類似 finally 塊,常用于日志記錄或監(jiān)控。

多個異常處理器可以組合使用,形成精細的異常處理鏈:

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(this::fetchData)
    .exceptionally(ex -> {
        // 處理fetchData階段異常
        log.warn("數(shù)據(jù)獲取失敗,使用緩存數(shù)據(jù)");
        return getCachedData();
    })
    .thenApply(this::processData)
    .exceptionally(ex -> {
        // 處理processData階段異常
        log.warn("數(shù)據(jù)處理失敗,使用簡化處理");
        return getSimpleProcessing();
    });

組合多個 CompletableFuture

import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.List;

public class MultipleCompletableFuturesExample {
    public static void main(String[] args) {
        // 創(chuàng)建自定義線程池 - CPU密集型任務(wù)配置
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(), // CPU密集型任務(wù)核心線程數(shù)=CPU核數(shù)
            Runtime.getRuntime().availableProcessors(),
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>()
        );

        try {
            // 創(chuàng)建多個CompletableFuture任務(wù)
            List<CompletableFuture<String>> futures = IntStream.range(0, 5)
                .mapToObj(i -> CompletableFuture.supplyAsync(() -> {
                    try {
                        System.out.println("任務(wù)" + i + "開始執(zhí)行");
                        Thread.sleep(1000 + i * 500);
                        // 模擬可能的失敗
                        if (i == 2) {
                            throw new RuntimeException("任務(wù)" + i + "執(zhí)行失敗");
                        }
                        return "任務(wù)" + i + "執(zhí)行成功";
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        // 直接拋出原始異常,CompletableFuture會自動包裝
                        throw new IllegalStateException("任務(wù)中斷", e);
                    }
                }, executor)
                .exceptionally(ex -> "任務(wù)異常:" + ex.getCause().getMessage()))
                .collect(Collectors.toList());

            // 等待所有任務(wù)完成
            CompletableFuture<Void> allFutures = CompletableFuture.allOf(
                futures.toArray(new CompletableFuture[0])
            );

            // 設(shè)置超時
            try {
                allFutures.get(10, TimeUnit.SECONDS);
            } catch (TimeoutException e) {
                System.err.println("部分任務(wù)執(zhí)行超時");
            }

            // 使用Stream API簡化結(jié)果收集 - 替代原來的循環(huán)
            List<String> results = futures.stream()
                .map(future -> {
                    if (future.isDone() && !future.isCancelled() && !future.isCompletedExceptionally()) {
                        try {
                            return future.getNow("任務(wù)未完成");
                        } catch (Exception e) {
                            return "獲取結(jié)果異常:" + e.getMessage();
                        }
                    } else if (future.isCancelled()) {
                        return "任務(wù)已被取消";
                    } else if (future.isCompletedExceptionally()) {
                        return "任務(wù)異常完成";
                    } else {
                        return "任務(wù)未完成";
                    }
                })
                .collect(Collectors.toList());

            // 打印所有結(jié)果
            System.out.println("所有任務(wù)執(zhí)行結(jié)果:");
            for (int i = 0; i < results.size(); i++) {
                System.out.println(i + ": " + results.get(i));
            }

        } catch (Exception e) {
            System.err.println("主線程異常:" + e.getMessage());
        } finally {
            executor.shutdown();
        }
    }
}

CompletableFuture 優(yōu)點

  • 支持任務(wù)組合和鏈式調(diào)用(如流水線、并行執(zhí)行)
  • 更靈活的異常處理機制
  • 可添加任務(wù)完成回調(diào)
  • 支持多任務(wù)協(xié)調(diào)(allOf、anyOf 等)

Future 與 CompletableFuture 的關(guān)鍵區(qū)別

Future.get() vs Future.getNow()

  • get(): 阻塞方法,會等待任務(wù)完成或超時
  • getNow(defaultValue): 非阻塞方法,若任務(wù)未完成立即返回默認值

CompletableFuture 的線程模型

  • 默認使用ForkJoinPool.commonPool(),適合計算密集型任務(wù)
  • 自定義線程池的最佳實踐:
    • IO 密集型任務(wù):線程數(shù) = CPU 核心數(shù) * (1 + 平均等待時間/平均計算時間)
    • CPU 密集型任務(wù):線程數(shù) = CPU 核心數(shù) + 1
    • 總是使用有界隊列防止 OOM
    • 明確設(shè)置拒絕策略

線程池泄漏風險

  • 未正確關(guān)閉線程池會導致應(yīng)用無法正常退出
  • 生產(chǎn)環(huán)境必須使用try-finally模式確保線程池關(guān)閉

使用 UncaughtExceptionHandler 捕獲線程未處理異常

對于直接使用 Thread 的場景,可以設(shè)置 UncaughtExceptionHandler 來捕獲線程中未被捕獲的異常:

public class ExceptionHandlerExample {
    public static void main(String[] args) {
        // 設(shè)置默認的未捕獲異常處理器
        Thread.setDefaultUncaughtExceptionHandler((thread, throwable) -> {
            System.err.println("線程" + thread.getName() + "發(fā)生未捕獲異常:" + throwable.getMessage());
        });

        // 創(chuàng)建并啟動可能拋出異常的線程
        Thread thread = new Thread(() -> {
            System.out.println("工作線程開始執(zhí)行");
            // 模擬未捕獲的異常
            throw new RuntimeException("發(fā)生了一個未處理的異常");
        });

        // 也可以為單個線程設(shè)置異常處理器
        thread.setUncaughtExceptionHandler((t, e) -> {
            System.err.println("線程" + t.getName() + "的專屬異常處理器捕獲到異常:" + e.getMessage());
        });

        thread.start();

        try {
            // 讓主線程等待一會,確保能看到異常處理結(jié)果
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

注意:UncaughtExceptionHandler 只能捕獲線程中未被 try-catch 處理的異常,主要用于記錄異常信息,且無法區(qū)分任務(wù)是成功還是主動失敗(如返回錯誤碼)。特別說明,只有未檢查異常(RuntimeException 及其子類)才會觸發(fā)該處理器,受檢異常必須在代碼中顯式處理。

線程池配置最佳實踐

根據(jù)任務(wù)類型選擇合適的線程池配置至關(guān)重要:

// CPU密集型任務(wù)線程池 - 線程數(shù)接近CPU核心數(shù)
ExecutorService cpuIntensivePool = new ThreadPoolExecutor(
    Runtime.getRuntime().availableProcessors(),
    Runtime.getRuntime().availableProcessors(),
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<>(1000), // 合理大小的隊列
    new ThreadPoolExecutor.CallerRunsPolicy() // 避免任務(wù)丟失
);

// IO密集型任務(wù)線程池 - 更多的線程數(shù)應(yīng)對IO等待
ExecutorService ioIntensivePool = new ThreadPoolExecutor(
    Runtime.getRuntime().availableProcessors() * 2,
    Runtime.getRuntime().availableProcessors() * 4,
    60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(1000),
    new ThreadPoolExecutor.AbortPolicy() // 隊列滿時拒絕新任務(wù)
);

避開 Executors 工廠方法的原因

  • newFixedThreadPoolnewSingleThreadExecutor 使用無界隊列(LinkedBlockingQueue),可能導致 OOM
  • newCachedThreadPool 允許創(chuàng)建無限線程,可能導致 OOM
  • newScheduledThreadPool 同樣允許無限任務(wù)堆積

實際場景示例

以下是一個在實際項目中常見的 Web 后臺任務(wù)監(jiān)控示例:

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;

public class WebBackgroundTasksExample {

    // 模擬數(shù)據(jù)處理服務(wù)
    static class DataProcessingService {
        public List<String> processLargeData(List<String> data) {
            // 創(chuàng)建線程池 - 根據(jù)任務(wù)特性選擇合適配置
            ThreadPoolExecutor executor = new ThreadPoolExecutor(
                Math.min(data.size(), Runtime.getRuntime().availableProcessors() * 2),
                Math.min(data.size(), Runtime.getRuntime().availableProcessors() * 2),
                0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(Math.max(10, data.size())), // 有界隊列
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r, "data-processor-" + System.currentTimeMillis());
                        t.setUncaughtExceptionHandler((thread, ex) ->
                            System.err.println("線程" + thread.getName() + "未捕獲異常: " + ex.getMessage())
                        );
                        return t;
                    }
                },
                new ThreadPoolExecutor.CallerRunsPolicy() // 拒絕策略
            );

            try {
                // 為每條數(shù)據(jù)創(chuàng)建處理任務(wù)
                List<CompletableFuture<String>> futures = data.stream()
                    .map(item -> CompletableFuture.supplyAsync(() -> {
                        try {
                            // 模擬復雜處理
                            if (item.contains("error")) {
                                throw new RuntimeException("處理數(shù)據(jù)項 '" + item + "' 時出錯");
                            }
                            Thread.sleep(500); // 模擬處理時間
                            return item.toUpperCase(); // 處理結(jié)果
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            throw new IllegalStateException("處理被中斷", e);
                        }
                    }, executor)
                    .exceptionally(ex -> {
                        // 記錄錯誤,但允許繼續(xù)處理其他數(shù)據(jù)
                        System.err.println("數(shù)據(jù)處理異常: " + ex.getMessage());
                        return "ERROR_" + item;
                    }))
                    .collect(Collectors.toList());

                // 等待所有任務(wù)完成,但最多等待10秒
                CompletableFuture<Void> allFutures = CompletableFuture.allOf(
                    futures.toArray(new CompletableFuture[0])
                );

                try {
                    allFutures.get(10, TimeUnit.SECONDS);
                } catch (TimeoutException e) {
                    System.err.println("處理超時,將返回已完成部分的結(jié)果");
                }

                // 使用Stream API收集結(jié)果
                return IntStream.range(0, data.size())
                    .mapToObj(i -> {
                        CompletableFuture<String> future = futures.get(i);
                        String item = data.get(i);

                        if (future.isDone() && !future.isCompletedExceptionally() && !future.isCancelled()) {
                            try {
                                return future.getNow("處理失敗");
                            } catch (Exception e) {
                                return "處理出錯: " + e.getMessage();
                            }
                        } else if (future.isCancelled()) {
                            return item + "_已取消";
                        } else if (future.isCompletedExceptionally()) {
                            return item + "_異常完成";
                        } else {
                            return item + "_處理未完成";
                        }
                    })
                    .collect(Collectors.toList());

            } finally {
                // 確保關(guān)閉線程池
                shutdownAndAwaitTermination(executor);
            }
        }

        // 安全關(guān)閉線程池的標準方法
        private void shutdownAndAwaitTermination(ExecutorService pool) {
            pool.shutdown();
            try {
                if (!pool.awaitTermination(5, TimeUnit.SECONDS)) {
                    pool.shutdownNow();
                    if (!pool.awaitTermination(5, TimeUnit.SECONDS)) {
                        System.err.println("線程池無法終止");
                    }
                }
            } catch (InterruptedException e) {
                pool.shutdownNow();
                Thread.currentThread().interrupt();
            }
        }
    }

    public static void main(String[] args) {
        // 準備測試數(shù)據(jù)
        List<String> testData = new ArrayList<>();
        testData.add("item1");
        testData.add("error_item");
        testData.add("item3");
        testData.add("item4");
        testData.add("another_error");

        // 處理數(shù)據(jù)
        DataProcessingService service = new DataProcessingService();
        List<String> results = service.processLargeData(testData);

        // 輸出結(jié)果
        System.out.println("處理結(jié)果:");
        for (int i = 0; i < results.size(); i++) {
            System.out.println(testData.get(i) + " -> " + results.get(i));
        }
    }
}

總結(jié)

方法支持返回值異常處理方式任務(wù)組合能力異步回調(diào)支持線程池集成度支持超時支持任務(wù)取消適用場景
Thread.join()無(需共享變量)僅順序等待是(基礎(chǔ))簡單等待工作線程完成
共享變量/對象是(手動)手動設(shè)置異常狀態(tài)復雜否(除非配合 join)簡單的狀態(tài)傳遞
Future/Callableget()拋出異常有限(需批量處理)高(線程池)是(cancel())需要獲取執(zhí)行結(jié)果和異常
CompletableFutureexceptionally 鏈式強(allOf/anyOf)是(回調(diào))復雜的任務(wù)鏈和依賴關(guān)系
UncaughtExceptionHandler僅捕獲未檢查異常有限全局異常監(jiān)控

在實際開發(fā)中,根據(jù)業(yè)務(wù)場景的復雜度和需求選擇合適的方法。對于簡單任務(wù),Thread.join()或共享變量就足夠了;對于需要返回值和異常處理的場景,F(xiàn)uture/Callable 是不錯的選擇;而對于復雜任務(wù)鏈和依賴關(guān)系,CompletableFuture 則是最佳方案。無論選擇哪種方式,都要確保正確處理異常、設(shè)置合理超時,并妥善管理線程資源。

以上就是Java中線程執(zhí)行狀態(tài)檢測的四種可靠方法的詳細內(nèi)容,更多關(guān)于Java線程執(zhí)行狀態(tài)檢測的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • SpringBoot集成Flink-CDC實現(xiàn)對數(shù)據(jù)庫數(shù)據(jù)的監(jiān)聽問題

    SpringBoot集成Flink-CDC實現(xiàn)對數(shù)據(jù)庫數(shù)據(jù)的監(jiān)聽問題

    Flink CDC(Flink Change Data Capture)是一種基于數(shù)據(jù)庫日志的CDC技術(shù),它實現(xiàn)了一個全增量一體化的數(shù)據(jù)集成框架,這篇文章主要介紹了SpringBoot集成Flink-CDC,實現(xiàn)對數(shù)據(jù)庫數(shù)據(jù)的監(jiān)聽,需要的朋友可以參考下
    2024-07-07
  • Scala中優(yōu)雅的處理Null問題

    Scala中優(yōu)雅的處理Null問題

    Spark 采用混合方式,大部分情況下使用 Option,但個別時候出于性能原因才使用了null。一個很好的習慣是當有方法返回值可能為null的時候,使用Option來代替,本文給大家介紹Scala處理Null的知識詳解,一起看看吧
    2021-08-08
  • 你知道Java的這些騷操作嗎?

    你知道Java的這些騷操作嗎?

    今天在看python相關(guān)的東西,看到各種騷操作,回頭想了下Java有沒有什么騷操作,整理下面幾種,一起看一下吧,需要的朋友可以參考下
    2021-05-05
  • 解析SpringSecurity自定義登錄驗證成功與失敗的結(jié)果處理問題

    解析SpringSecurity自定義登錄驗證成功與失敗的結(jié)果處理問題

    這篇文章主要介紹了SpringSecurity系列之自定義登錄驗證成功與失敗的結(jié)果處理問題,本文通過實例給大家講解的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下
    2019-11-11
  • Mybatis-Plus使用@TableField實現(xiàn)自動填充日期的代碼示例

    Mybatis-Plus使用@TableField實現(xiàn)自動填充日期的代碼示例

    數(shù)據(jù)庫中經(jīng)常有create_time,update_time兩個字段,在代碼中設(shè)置時間有點太麻煩了?mybatis-plus可以幫我們自動填充,本文主要介紹了Mybatis-Plus使用@TableField實現(xiàn)自動填充日期的代碼示例,感興趣的可以了解一下
    2022-04-04
  • 淺談Java線程池的7大核心參數(shù)

    淺談Java線程池的7大核心參數(shù)

    本篇文章基于正在看這篇文章的你已經(jīng)具備了基本的Java并發(fā)的相關(guān)知識.如果對于Java并發(fā)編程一無所知的話,請先看看Java并發(fā)編程的一些前導基礎(chǔ)知識,文中有非常詳細的圖文示例及代碼,,需要的朋友可以參考下
    2021-05-05
  • Java多線程父線程向子線程傳值問題及解決

    Java多線程父線程向子線程傳值問題及解決

    文章總結(jié)了5種解決父子之間數(shù)據(jù)傳遞困擾的解決方案,包括ThreadLocal+TaskDecorator、UserUtils、CustomTaskDecorator、ExecutorConfig、RequestContextHolder+TaskDecorator、MDC+TaskDecorator和InheritableThreadLocal
    2025-02-02
  • IDEA打包的兩種方式及注意事項說明

    IDEA打包的兩種方式及注意事項說明

    這篇文章主要介紹了IDEA打包的兩種方式及注意事項說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-04-04
  • SpringMVC利用dropzone組件實現(xiàn)圖片上傳

    SpringMVC利用dropzone組件實現(xiàn)圖片上傳

    這篇文章主要介紹了SpringMVC利用dropzone組件實現(xiàn)圖片上傳,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-02-02
  • IDEA2020 Plugins不能用的解決辦法及Plugins 搜索不了插件的問題

    IDEA2020 Plugins不能用的解決辦法及Plugins 搜索不了插件的問題

    這篇文章主要介紹了IDEA2020 Plugins不能用的解決辦法,文中給大家介紹了Intellij IDEA 2020.1 的Plugins 搜索不了插件,連接超時的問題,本文給大家介紹的非常詳細,需要的朋友可以參考下
    2020-06-06

最新評論