欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java優(yōu)雅的關(guān)閉線程池的方法

 更新時(shí)間:2023年06月26日 11:33:48   作者:逆流°只是風(fēng)景-bjhxcc  
本文主要介紹了Java如何優(yōu)雅的關(guān)閉線程池,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

背景

前幾天在和同事聊一個(gè)需求,說是有個(gè)數(shù)據(jù)查詢的功能,因?yàn)樯婕暗蕉鄠€(gè)第三方接口調(diào)用,想用線程池并行來做。

很正常的一個(gè)方案,但是上線后發(fā)現(xiàn),每次服務(wù)發(fā)布的時(shí)候,這個(gè)數(shù)據(jù)查詢的功能就會(huì)掛掉,后來發(fā)現(xiàn)是線程池沒有做好關(guān)閉,這里總結(jié)一下。

關(guān)鍵字:線程池、shutdown、shutdownNow、interrupt

一、線程中斷 interrupt

先補(bǔ)一補(bǔ)基礎(chǔ)的知識(shí):線程中斷。線程中斷的含義,并不是強(qiáng)制把運(yùn)行中的線程給“咔嚓”中斷,而是把線程的中斷標(biāo)志位置為true,這樣等線程之后阻塞(wait、join、sleep)的時(shí)候,就會(huì)拋出 InterruptedException,程序通過捕獲 InterruptedException 來做一定的善后處理,然后讓線程退出。

來看個(gè)例子,下面這段代碼是起一個(gè)線程,打印一百行文本,打印過程中,會(huì)把線程的中斷標(biāo)志位置為true

public static void test02() throws InterruptedException {
    Thread t = new Thread(() -> {
    for (int i = 0; i < 100; i++) {
        System.out.println("process i=" + i + ",interrupted:" + Thread.currentThread().isInterrupted());
    }
    });
    t.start();
    Thread.sleep(1);
    t.interrupt();
}

看看控制臺(tái)的輸出,發(fā)現(xiàn)在打印到 57 的時(shí)候,中斷標(biāo)志位已經(jīng)成功置為true了,但是線程任然在打印,說明只是設(shè)置了中斷標(biāo)志位,而不是直接粗暴的把線程中斷。

...
process i=55,interrupted:false
process i=56,interrupted:false
process i=57,interrupted:true
process i=58,interrupted:true
process i=59,interrupted:true
...

再看看這個(gè)示例,同樣是打印一百行文本,打印過程中會(huì)判斷中斷標(biāo)志位,如果中斷就自行退出。

public static void test02() throws InterruptedException {
    Thread t = new Thread(() -> {
    for (int i = 0; i < 100; i++) {
        if (Thread.interrupted()) {
            System.out.println("線程已中斷,退出執(zhí)行");
            break;
        }
        System.out.println("process i=" + i + ",interrupted:" + Thread.currentThread().isInterrupted());
    }
    });
    t.start();
    Thread.sleep(1);
    t.interrupt();
}

控制臺(tái)輸出如下,:

process i=49,interrupted:false
process i=50,interrupted:false
process i=51,interrupted:false

線程已中斷,退出執(zhí)行

二、線程池的關(guān)閉 shutdown 方法

了解完線程中斷,再來看看線程池的關(guān)閉方法。

關(guān)閉線程池有兩個(gè)方法 shutdown() 和 shutdownNow(),具體有什么區(qū)別?我們先來看看 shutdown() 方法

 /**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN); // 1. 把線程池的狀態(tài)設(shè)置為 SHUTDOWN
            interruptIdleWorkers(); // 2. 把空閑的工作線程置為中斷
            onShutdown(); // 3. 一個(gè)空實(shí)現(xiàn),暫不用關(guān)注
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

看源碼先看注釋,翻譯下:

啟動(dòng)有序關(guān)閉會(huì)執(zhí)行以前提交的任務(wù),但不接受任何新任務(wù)。如果已經(jīng)關(guān)閉,則調(diào)用不會(huì)產(chǎn)生額外的影響。此方法不等待活動(dòng)執(zhí)行的任務(wù)終止。如果需要,可使用 awaitTermination() 做到這一點(diǎn)。

2.1、第一步:advanceRunState(SHUTDOWN) 把線程池置為 SHUTDOWN

線程池狀態(tài)流轉(zhuǎn)如下。調(diào)用 shutdown() 方法會(huì)把線程池的狀態(tài)置為 SHUTDOWN,后續(xù)再往線程池提交任務(wù)就會(huì)被拒絕(execute() 方法中做了判斷)。

2.2、第二步:interruptIdleWorkers() 把空閑的工作線程置為中斷

interruptIdleWorkers() 方法遍歷所有的工作線程,如果 tryLock() 成功,就把線程置為中斷。這里,如果 tryLock() 成功,說明對(duì)應(yīng)的 woker 是一個(gè)空閑的,沒有在執(zhí)行任務(wù)的線程,如果沒成功,說明對(duì)應(yīng)的 worker 正在執(zhí)行任務(wù)。也就是說,這里的中斷,對(duì)正在執(zhí)行中的任務(wù)并沒有影響。

 private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

2.3、 第三步:onShutdown() 一個(gè)空實(shí)現(xiàn),暫不用關(guān)注

這個(gè)沒啥,就是個(gè)留空的方法。

2.4、 小結(jié)

shutdown() 方法干兩件事:

  • 把線程池狀態(tài)置為 SHUTDOWN 狀態(tài)
  • 中斷空閑線程

我們來看個(gè)例子,加深下印象。

public static void test01() throws InterruptedException {
        // corePoolSize 是 2,maximumPoolSize 是 2
        ThreadPoolExecutor es = new ThreadPoolExecutor(2, 2,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>());
        es.prestartAllCoreThreads(); // 啟動(dòng)所有 worker
        es.execute(new Task()); // Task是一個(gè)訪問某網(wǎng)站的 HTTP 請(qǐng)求,跑的慢,后面會(huì)貼出來完整代碼,這里把他當(dāng)做一個(gè)跑的慢的異步任務(wù)就行
        es.shutdown();
        es.execute(new Task()); // 在線程池 shutdown() 后 繼續(xù)添加任務(wù),這里預(yù)期是拋出異常
    }

這個(gè)例子我們主要觀察兩個(gè)現(xiàn)象。

一個(gè)是線程池會(huì)有兩個(gè)woker( prestartAllCoreThreads() 方法的調(diào)用使得已啟動(dòng)就有兩個(gè) worker),其中一個(gè)正在執(zhí)行,一個(gè)處于空閑。所以當(dāng)調(diào)用shutdown() 方法,走進(jìn) interruptIdleWorkers() 的時(shí)候,只有那個(gè)空閑的線程會(huì)調(diào)用 t.interrupt()。

第二個(gè)是調(diào)用 shutdown() 方法后,再調(diào)用 execute() 時(shí),會(huì)拋出異常,因?yàn)榫€程池的狀態(tài)已經(jīng)置為 SHUTDOWN,不再接受新的任務(wù)添加進(jìn)來。

三、線程池的關(guān)閉 shutdownNow 方式

 /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so any task that
     * fails to respond to interrupts may never terminate.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP); // 1:把線程池設(shè)置為STOP
            interruptWorkers(); // 2.中斷工作線程
            tasks = drainQueue(); // 3.把線程池中的任務(wù)都 drain 出來
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

注釋的意思是:

嘗試停止所有正在執(zhí)行的任務(wù),暫停正在等待的任務(wù)的處理,并返回等待執(zhí)行的任務(wù)列表。從該方法返回時(shí),這些任務(wù)將從任務(wù)隊(duì)列中清空(移除)。此方法不等待活動(dòng)執(zhí)行的任務(wù)終止。如果需要,可使用 awaitTermination() 做到這一點(diǎn)。除了盡最大努力嘗試停止處理主動(dòng)執(zhí)行的任務(wù)之外,沒有其他保證。此實(shí)現(xiàn)通過 Thread.Interrupt() 取消任務(wù),因此任何無法響應(yīng)中斷的任務(wù)都可能永遠(yuǎn)不會(huì)終止。

3.1、第一步:advanceRunState() 把線程池設(shè)置為STOP

和 shutdown() 方法不同的是,shutdownNow() 方法會(huì)把線程池的狀態(tài)設(shè)置為 STOP。

3.2、 第二步:interruptWorkers() 中斷工作線程

interruptWorkers() 如下,可以看到,和 shutdown() 方法不同的是,所有的工作線程都調(diào)用了 interrupt() 方法

  /**
     * Interrupts all threads, even if active. Ignores SecurityExceptions
     * (in which case some threads may remain uninterrupted).
     */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

3.3、第三步:drainQueue() 把線程池中的任務(wù)都 drain 出來

drainQueue() 方法如下,把阻塞隊(duì)列里面等待的任務(wù)都拿出來,并返回。關(guān)閉線程池的時(shí)候,可以基于這個(gè)特性,把返回的任務(wù)都打印出來,做個(gè)記錄。

  /**
     * Drains the task queue into a new list, normally using
     * drainTo. But if the queue is a DelayQueue or any other kind of
     * queue for which poll or drainTo may fail to remove some
     * elements, it deletes them one by one.
     */
    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }

3.4、小結(jié)

shutdownNow() 方法干三件事:

  • 把線程池狀態(tài)置為 STOP 狀態(tài)
  • 中斷工作線程
  • 把線程池中的任務(wù)都 drain 出來并返回

我們來看個(gè)例子,代碼合剛才的一樣,只是關(guān)閉線程用的是shutdownNow()

public static void test01() throws InterruptedException {
        // corePoolSize 是 1,maximumPoolSize 是 1,無限容量
        ThreadPoolExecutor es = new ThreadPoolExecutor(1, 1,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>());
        es.prestartAllCoreThreads(); // 啟動(dòng)所有 worker
        es.execute(new Task()); // Task是一個(gè)訪問某網(wǎng)站的 HTTP 請(qǐng)求,跑的慢,后面會(huì)貼出來完整代碼,這里把他當(dāng)做一個(gè)跑的慢的異步任務(wù)就行
        es.execute(new Task());
        List<Runnable> result = es.shutdownNow();
        System.out.println(result);
        es.execute(new Task()); // 在線程池 shutdownNow() 后 繼續(xù)添加任務(wù),這里預(yù)期是拋出異常
    }

這個(gè)例子我們主要觀察三個(gè)現(xiàn)象。一個(gè)是線程池有兩個(gè)woker,所以當(dāng)調(diào)用shutdownNow() 方法,走進(jìn) interruptWorkers() 的時(shí)候,所有的 woker 都會(huì)調(diào)用 t.interrupt()。

第二個(gè)是 shutdownNow() 方法會(huì)返回還沒來得及執(zhí)行的task,并打印出來。第三個(gè)是調(diào)用 shutdownNow() 方法后,再調(diào)用 execute() 時(shí),會(huì)拋出異常,因?yàn)榫€程池的狀態(tài)已經(jīng)置為 STOP,不再接受新的任務(wù)添加

四、實(shí)戰(zhàn),與 JVM 鉤子配合

實(shí)際工作中,我們一般是使用 shutdown() 方法,因?yàn)樗容^“溫和”,會(huì)等待我們把線程池中的任務(wù)都執(zhí)行完,這里也已 shutdown() 方法為例。

我們回到最開頭聊到的那個(gè) case,機(jī)器重新發(fā)布,但是線程池中還有沒執(zhí)行完任務(wù),機(jī)器一關(guān),這些任務(wù)全部被kill,怎么辦呢?有什么機(jī)制能夠阻塞一下,等待這個(gè)任務(wù)執(zhí)行完再關(guān)閉嗎?

有的,用 JVM 的鉤子!

實(shí)例代碼如下,一個(gè)線程池,提交了三個(gè)任務(wù)去執(zhí)行,執(zhí)行完得半分鐘。然后增加一個(gè)JVM的鉤子,這個(gè)鉤子可以簡(jiǎn)單理解為監(jiān)聽器,注冊(cè)后,JVM在關(guān)閉的時(shí)候就會(huì)調(diào)用這個(gè)方法,調(diào)用完才會(huì)正式關(guān)閉JVM。

public static void test01() throws InterruptedException {
        ThreadPoolExecutor es = new ThreadPoolExecutor(1, 1,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>());
        es.execute(new Task());
        es.execute(new Task());
        es.execute(new Task());
        Thread shutdownHook = new Thread(() -> {
            es.shutdown();
            try {
                es.awaitTermination(3, TimeUnit.MINUTES);
            } catch (InterruptedException e) {
                e.printStackTrace();
                System.out.println("等待超時(shí),直接關(guān)閉");
            }
        });
        Runtime.getRuntime().addShutdownHook(shutdownHook);
    }

在機(jī)器上執(zhí)行,會(huì)發(fā)現(xiàn),我使用 ctrl + c (注意不是ctrl + z )關(guān)閉進(jìn)程,會(huì)發(fā)現(xiàn)進(jìn)程并沒有直接關(guān)閉,線程池任然執(zhí)行,一直等到線程池的任務(wù)執(zhí)行完,進(jìn)程才會(huì)正式退出。

怎么樣,是不是很神奇。本文中涉及的 Task 的源碼如下。這個(gè)任務(wù)是對(duì) stackoverflow 網(wǎng)站發(fā)起 10 次請(qǐng)求,用來模擬跑的比較慢的任務(wù),當(dāng)然這不是重點(diǎn),可以忽略,有興趣動(dòng)手試一下本文代碼的同學(xué)可以參考下。

 public static class Task implements Runnable {
        @Override
        public void run() {
            System.out.println("task start");
            for (int i = 0; i < 10; i++) {
                httpGet();
                System.out.println("task execute " + i);
            }
            System.out.println("task finish");
        }
        private void httpGet() {
            String url = "https://stackoverflow.com/";
            String result = "";
            BufferedReader in = null;
            try {
                String urlName = url;
                URL realUrl = new URL(urlName);
                // 打開和URL之間的連接
                URLConnection conn = realUrl.openConnection();
                // 設(shè)置通用的請(qǐng)求屬性
                conn.setRequestProperty("accept", "*/*");
                conn.setRequestProperty("connection", "Keep-Alive");
                conn.setRequestProperty("user-agent",
                        "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1)");
                // 建立實(shí)際的連接
                conn.connect();
                // 獲取所有響應(yīng)頭字段
                Map<String, List<String>> map = conn.getHeaderFields();
//                 遍歷所有的響應(yīng)頭字段
//                for (String key : map.keySet()) {
//                    System.out.println(key + "--->" + map.get(key));
//                }
                // 定義BufferedReader輸入流來讀取URL的響應(yīng)
                in = new BufferedReader(
                        new InputStreamReader(conn.getInputStream()));
                String line;
                while ((line = in.readLine()) != null) {
                    result += "/n" + line;
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            // 使用finally塊來關(guān)閉輸入流
            finally {
                try {
                    if (in != null) {
                        in.close();
                    }
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
//            System.out.print(result);
        }
    }

五、總結(jié)

想要優(yōu)雅的關(guān)閉線程池,首先要理解線程中斷的含義。

其次,關(guān)閉線程池有兩種方式:shutdown() 和 shutdownNow(),二者最大的區(qū)別是 shutdown() 只是把空閑的 woker 置為中斷,不影響正在運(yùn)行的woker,并且會(huì)繼續(xù)把待執(zhí)行的任務(wù)給處理完。shutdonwNow() 則是把所有的 woker 都置為中斷,待執(zhí)行的任務(wù)全部抽出并返回,日常工作中更多是使用 shutdown()。

最后,單純的使用 shutdown() 也不靠譜,還得使用 awaitTermination() 和 JVM 的鉤子,才算優(yōu)雅的關(guān)閉線程池。

到此這篇關(guān)于Java優(yōu)雅的關(guān)閉線程池的方法的文章就介紹到這了,更多相關(guān)Java 關(guān)閉線程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java負(fù)載均衡算法實(shí)現(xiàn)之輪詢和加權(quán)輪詢

    Java負(fù)載均衡算法實(shí)現(xiàn)之輪詢和加權(quán)輪詢

    網(wǎng)上找了不少負(fù)載均衡算法的資源,都不夠全面,后來自己結(jié)合了網(wǎng)上的一些算法實(shí)現(xiàn),下面這篇文章主要給大家介紹了關(guān)于Java負(fù)載均衡算法實(shí)現(xiàn)之輪詢和加權(quán)輪詢的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2022-04-04
  • Java容器源碼LinkedList原理解析

    Java容器源碼LinkedList原理解析

    這篇文章主要介紹了Java容器源碼LinkedList原理解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2020-11-11
  • 最新評(píng)論