欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

如何解決executors線程池創(chuàng)建的線程不釋放的問題

 更新時間:2023年08月10日 14:18:25   作者:TimiWang  
這篇文章主要介紹了如何解決executors線程池創(chuàng)建的線程不釋放的問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教

executors線程池創(chuàng)建的線程不釋放問題

我們通過executors.newFixThreadPool 創(chuàng)建指定大小的線程池,在所有線程都結(jié)束后,線程池并未釋放線程。

我們之后再新建的線程池的線程將一直累加。

解決這個問題

只需要設(shè)置如下:

? ? ? ? ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*2);
? ? ? ? ? ? ?executor.setKeepAliveTime(10, TimeUnit.SECONDS);
? ? ? ? ? ? ?executor.allowCoreThreadTimeOut(true);

這樣設(shè)置后,隊列的任務(wù)全部結(jié)束后將,釋放所有的線程。

線程池中的線程為什么不會釋放而是循環(huán)等待任務(wù)呢

線程池

之前一直有這個疑問:我們平時使用線程都是各種new Thread(),然后直接在run()方法里面執(zhí)行我們要做的各種操作,使用完后需要做什么管理嗎?線程池為什么能維持住核心線程不釋放,一直接收任務(wù)進行處理呢?

線程

線程無他,主要有兩個方法,我們先看看start()方法介紹:

    /**
     * Causes this thread to begin execution; the Java Virtual Machine
     * calls the <code>run</code> method of this thread.
     * <p>
     * The result is that two threads are running concurrently: the
     * current thread (which returns from the call to the
     * <code>start</code> method) and the other thread (which executes its
     * <code>run</code> method).
     * <p>
     * It is never legal to start a thread more than once.
     * In particular, a thread may not be restarted once it has completed
     * execution.
     *
     * @exception  IllegalThreadStateException  if the thread was already
     *               started.
     * @see        #run()
     * @see        #stop()
     */
    public synchronized void start() {
        if (threadStatus != 0)
            throw new IllegalThreadStateException();
        /* Notify the group that this thread is about to be started
         * so that it can be added to the group's list of threads
         * and the group's unstarted count can be decremented. */
        group.add(this);
        started = false;
        try {
            nativeCreate(this, stackSize, daemon);
            started = true;
        } finally {
            try {
                if (!started) {
                    group.threadStartFailed(this);
                }
            } catch (Throwable ignore) {
                /* do nothing. If start0 threw a Throwable then
                  it will be passed up the call stack */
            }
        }
    }
  • 從這個方法解釋上看,start()這個方法,最終會交給VM 去執(zhí)行run()方法,所以一般情況下,我們在隨便一個線程上執(zhí)行start(),里面的run()操作都會交給VM 去執(zhí)行。
  • 而且還說明,重復(fù)啟用線程是不合法的,當一個線程完成的時候,may not be restarted once。
  • 那么這種情況下,線程池是怎么做的?他為什么就能夠重復(fù)執(zhí)行各種任務(wù)呢?

帶著各種疑問,我們?nèi)タ纯淳€程池自己是怎么實現(xiàn)的。

線程池

線程池常用的創(chuàng)建方法有那么幾種:

  • 1. newFixedThreadPool()
  • 2. newSingleThreadExecutor()
  • 3. newCachedThreadPool()
  • 4. newScheduledThreadPool()

這4個方法創(chuàng)建的線程池實例具體就不一一介紹,無非是創(chuàng)建線程的多少,以及回收等問題,因為其實這4個方法最后都會調(diào)用統(tǒng)一的構(gòu)造方法:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

具體來說只是這幾個值的不同決定了4個線程池的作用:

1. corePoolSize 代表核心線程池的個數(shù),當線程池當前的個數(shù)大于核心線程池的時候,線程池會回收多出來的線程

2. maximumPoolSize 代表最大的線程池個數(shù),當線程池需要執(zhí)行的任務(wù)大于核心線程池的時候,會創(chuàng)建更多的線程,但是最大不能超過這個數(shù)

3. keepAliveTime 代表空余的線程存活的時間,當多余的線程完成任務(wù)的時候,需要多長時間進行回收,時間單位是unit 去控制

4. workQueue 非常重要,這個工作隊列會存放所有待執(zhí)行的Runnable對象

@param workQueue the queue to use for holding tasks before they areexecuted.  This queue will hold only the {@code Runnable} tasks submitted by the {@code execute} method.

我們平時在使用線程池的時候,都是直接 實例.execute(Runnable),一起跟進去,看看這個方法具體做了什么

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        //結(jié)合上文的注釋,我們得知,第一次,先判斷當前的核心線程數(shù),
        //如果小于初始化的值,馬上創(chuàng)建;然后第二個if,將這個任務(wù)插入到工作線程,雙重判斷任務(wù),
        //假定如果前面不能直接加入到線程池Worker集合里,則加入到workQueue隊列等待執(zhí)行。
        //里面的if else判斷語句則是檢查當前線程池的狀態(tài)。如果線程池本身的狀態(tài)是要關(guān)閉并清理了,
        //我們則不能提交線程進去了。這里我們就要reject他們。
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

所以其實主要起作用的還是addWorker()方法,我們繼續(xù)跟蹤進去:

private boolean addWorker(Runnable firstTask, boolean core) {
        ···多余代碼
        try {
            w = new Worker(firstTask);  1.重點
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();   2. 重點
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

我們看重點部分,其實最重要的是firstTask這個Runnable,我們一直跟蹤這個對象就可以了,這個對象會new Worker(),那么這個wroker()就是一個包裝類,里面帶著我們實際需要執(zhí)行的任務(wù),后面進行一系列的判斷就會執(zhí)行t.start(); 這個t 就是包裝類worker類里面的Thread,所以整個邏輯又轉(zhuǎn)化進去Worker內(nèi)部。

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        /** Delegates main run loop to outer runWorker. */
        public void run() {
            runWorker(this);
        }
        ...省略代碼
    }

這個Worker包裝類,重要的屬性兩個,thread 就是剛才上面那個方法執(zhí)行的start()對象,這個thread又是把這個worker對象本身作為一個Runnable對象構(gòu)建出來的,那么當我們調(diào)用thread.start()方法時候,實際調(diào)用的就是Worker類的run()方法?,F(xiàn)在又要追蹤進去,看這個runWorker(this),做的是什么鬼東西

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

這個方法還是比較好懂的:

1. 一個大循環(huán),判斷條件是task != null || (task = getTask()) != null,task自然就是我們要執(zhí)行的任務(wù)了,當task空而且getTask()取不到任務(wù)的時候,這個while()就會結(jié)束,循環(huán)體里面進行的就是task.run();

2.這里我們其實可以打個心眼,那基本八九不離十了,肯定是這個循環(huán)一直沒有退出,所以才能維持著這一個線程不斷運行,當有外部任務(wù)進來的時候,循環(huán)體就能getTask()并且執(zhí)行。

3.下面最后放getTask()里面的代碼,驗證猜想

   private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

真相大白了,里面進行的也是一個死循環(huán),主要看 Runnable r = timed ?

workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();

工作隊列workQueue會一直去拿任務(wù),屬于核心線程的會一直卡在 workQueue.take()方法,直到拿到Runnable 然后返回,非核心線程會 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ,如果超時還沒有拿到,下一次循環(huán)判斷compareAndDecrementWorkerCount就會返回null,Worker對象的run()方法循環(huán)體的判斷為null,任務(wù)結(jié)束,然后線程被系統(tǒng)回收

注意:

一句話可以概述了,線程池就是用一堆包裝住Thread的Wroker類的集合,在里面有條件的進行著死循環(huán),從而可以不斷接受任務(wù)來進行。

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • java配置dbcp連接池(數(shù)據(jù)庫連接池)示例分享

    java配置dbcp連接池(數(shù)據(jù)庫連接池)示例分享

    java配置dbcp連接池示例分享,大家參考使用吧
    2013-12-12
  • Spring Boot啟動流程斷點過程解析

    Spring Boot啟動流程斷點過程解析

    這篇文章主要介紹了Spring Boot啟動流程斷點過程解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-11-11
  • springMVC中@RequestParam和@RequestPart的區(qū)別

    springMVC中@RequestParam和@RequestPart的區(qū)別

    本文主要介紹了springMVC中@RequestParam和@RequestPart的區(qū)別,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2024-06-06
  • Java基于Lock的生產(chǎn)者消費者模型示例

    Java基于Lock的生產(chǎn)者消費者模型示例

    這篇文章主要介紹了Java基于Lock的生產(chǎn)者消費者模型,結(jié)合實例形式分析了java基于鎖機制的生產(chǎn)者消費者模型相關(guān)實現(xiàn)與使用技巧,需要的朋友可以參考下
    2018-08-08
  • SpringBoot連接MySql數(shù)據(jù)庫的原理及代碼示例

    SpringBoot連接MySql數(shù)據(jù)庫的原理及代碼示例

    SpringBoot是一款流行的Java開發(fā)框架,它可以輕松地連接各種類型的數(shù)據(jù)庫,包括關(guān)系型數(shù)據(jù)庫和非關(guān)系型數(shù)據(jù)庫,本文將介紹SpringBoot是如何連接數(shù)據(jù)庫的,包括其原理和代碼示例,需要的朋友可以參考下
    2023-07-07
  • Java排序之冒泡排序的實現(xiàn)與優(yōu)化

    Java排序之冒泡排序的實現(xiàn)與優(yōu)化

    冒泡排序是一種簡單的交換排序。之所以叫做冒泡排序,因為我們可以把每個元素當成一個小氣泡,根據(jù)氣泡大小,一步一步移動到隊伍的一端,最后形成一定對的順序。本文將利用Java實現(xiàn)冒泡排序,并進行一定的優(yōu)化,希望對大家有所幫助
    2022-11-11
  • springboot實現(xiàn)敏感字段加密存儲解密顯示功能

    springboot實現(xiàn)敏感字段加密存儲解密顯示功能

    這篇文章主要介紹了springboot實現(xiàn)敏感字段加密存儲,解密顯示,通過mybatis,自定義注解+AOP切面,Base64加解密方式實現(xiàn)功能,本文通過代碼實現(xiàn)給大家介紹的非常詳細,需要的朋友可以參考下
    2022-02-02
  • 最新評論