欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java線程池之線程復用原理全面解析

 更新時間:2024年03月27日 10:01:01   作者:NingKangMing  
這篇文章主要介紹了Java線程池之線程復用原理,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教

什么是線程復用

在Java中,我們正常創(chuàng)建線程執(zhí)行任務,一般都是一條線程綁定一個Runnable執(zhí)行任務。

而Runnable實際只是一個普通接口,真正要執(zhí)行,則還是利用了Thread類的run方法。

這個rurn方法由native本地方法start0進行調用。

我們看Thread類的run方法實現(xiàn)

    /* What will be run. */
    private Runnable target;
 
   /**
     * If this thread was constructed using a separate
     * <code>Runnable</code> run object, then that
     * <code>Runnable</code> object's <code>run</code> method is called;
     * otherwise, this method does nothing and returns.
     * <p>
     * Subclasses of <code>Thread</code> should override this method.
     *
     * @see     #start()
     * @see     #stop()
     * @see     #Thread(ThreadGroup, Runnable, String)
     */
    @Override
    public void run() {
        if (target != null) {
            target.run();
        }
    }

很明顯,Thread類的run方法就是使用構造Thread類傳入來的Runnable對象,執(zhí)行Runnable的run方法。

這樣可以很好的將任務和Thread類解耦,如果繼承Thread類再去重寫run方法當然也是可以,但卻耦合了,并且Java是單繼承,所以繼承Thread類這種方式通常不會使用,沒有任何好處。

現(xiàn)在問題是,一個線程只能執(zhí)行一個Runnable對象,那么這條線程它就是不能復用的,完成任務它就該Terminated了。

如果系統(tǒng)任務很多,頻繁創(chuàng)建線程帶來的開銷大,線程數(shù)量不可控導致系統(tǒng)處于一種不安全的狀況,系統(tǒng)隨時可能被大量線程搞跨,于是線程池就出現(xiàn)了。

線程池要解決的問題就是用少量線程處理更多的任務,這樣一來,線程池首先要實現(xiàn)的就是線程復用。

不能說還是一條線程只處理一個Runnable任務,而是一條線程處理無數(shù)Runnable任務。

最容易想到的方案就是將Runnable對象放到隊列中,在Thread類的run方法中不斷從隊列中拉取任務執(zhí)行,這樣一來就實現(xiàn)了線程復用。

當然,實際線程池也差不多是這么干的,下面我們詳細看一下線程池實現(xiàn)線程復用的原理。

線程池處理任務的過程

線程池原理解析中有詳述線程池創(chuàng)建線程及處理任務的過程。

這里再次簡單看一下流程圖以方便理解下面的線程復用原理解析。

線程復用原理解析

線程處理任務過程源碼解析

首先我們看看線程池是怎么使用的

import cn.hutool.core.thread.ThreadFactoryBuilder;
 
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
/**
 * @author kangming.ning
 * @date 2023-02-24 16:27
 * @since 1.0
 **/
public class CustomThreadPool1 {
 
    private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNamePrefix("線程池-").build();
 
    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors(),
            Runtime.getRuntime().availableProcessors() * 2,
            60L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10),
            threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
 
    public static void main(String[] args) throws InterruptedException {
        Runnable r = () -> {
            System.out.println(Thread.currentThread().getName() + " is running");
        };
        for (int i = 0; i < 35; i++) {
            Thread.sleep(1000);
            threadPoolExecutor.submit(r);
        }
    }
 
}

可見,threadPoolExecutor的sumit方法就是用來提交任務的,于是,從這個方法開始分析源碼,把源碼的關注點放在線程復用部分。

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

第一句只是用來包裝一下有返回值的任務,不必關注,重點看execute(ftask)這句。

    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

代碼量不多,信息量極大。

注釋的內(nèi)容就是在解釋線程池執(zhí)行任務的處理過程,這個看上面的流程圖即可。

任務如果為空直接拋空指針異常。

下面看第一個if語句

if (workerCountOf(c) < corePoolSize) {
       if (addWorker(command, true))
           return;
        c = ctl.get();
}

如果worker數(shù)量少于核心線程數(shù),則通過addWorker(command, true)方法添加一個worker。這里要注意,線程池把每一條線程都封裝成了Worker的實例。

addWorker方法的作用是在線程池中創(chuàng)建一個線程并執(zhí)行第一個參數(shù)傳入的任務,它的第二個參數(shù)是個boolean值,如果傳入 true 則代表增加線程時判斷當前線程是否少于 corePoolSize,小于則增加新線程,大于等于則不增加;如果傳入false則使用maximumPoolSize來判斷是否增加新線程。

接下來看下面第二個if的代碼

if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

執(zhí)行到這里說明核心線程數(shù)已滿或者說addWorker失敗了。此時先檢查線程池是否為運行狀態(tài),是的話直接把任務放隊列,這跟上面的流程圖是一致的,核心線程數(shù)滿則放隊列。

當然當任務提交成功后還是會重新檢查線程池的狀態(tài),如果線程池沒在跑則會移除任務并且執(zhí)行拒絕策略。

再看里面的else if分支

if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);

進入else if分支說明線程池是在運行的,這里是檢查一下是否有線程可供使用,雖說上面已經(jīng)檢查過目前的線程數(shù)已大于核心線程數(shù),但不排除核心線程數(shù)設置為0 這種情況,這樣一來,任務添加后缺沒線程去執(zhí)行,這種情況是不允許的。

再往下看最后一句else if代碼

 else if (!addWorker(command, false))
            reject(command);

能執(zhí)行到這里,說明要么是線程池不在運行中,要么就是核心線程和隊列都滿了,此時需要開啟線程池的后備力量,嘗試添加非核心線程直到線程數(shù)達到最大線程數(shù)限制,注意到addWorker方法第二個參數(shù)傳了false,正是添加線程時使用最大線程數(shù)限制來判斷是否添加新線程。

假設添加失敗意味著最大線程數(shù)也達到了最大值并且沒空閑線程去執(zhí)行當前任務,此時執(zhí)行reject拒絕策略。

線程復用源碼解析

通過上面的解析我們可以看到,添加線程以執(zhí)行任務的核心方法是addWorker,大概看一下Worker的代碼

    /**
     * Class Worker mainly maintains interrupt control state for
     * threads running tasks, along with other minor bookkeeping.
     * This class opportunistically extends AbstractQueuedSynchronizer
     * to simplify acquiring and releasing a lock surrounding each
     * task execution.  This protects against interrupts that are
     * intended to wake up a worker thread waiting for a task from
     * instead interrupting a task being run.  We implement a simple
     * non-reentrant mutual exclusion lock rather than use
     * ReentrantLock because we do not want worker tasks to be able to
     * reacquire the lock when they invoke pool control methods like
     * setCorePoolSize.  Additionally, to suppress interrupts until
     * the thread actually starts running tasks, we initialize lock
     * state to a negative value, and clear it upon start (in
     * runWorker).
     */
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;
 
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
 
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
 
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
 
        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.
 
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
 
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
 
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
 
        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
 
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

可見,Worker對Thread進行了封裝,它本身也是一個Runnable對象,內(nèi)部的Thread對象則是真正用來執(zhí)行任務的線程對象。

因此添加Worker實則就是在線程池中添加運行任務的線程,可以看出在Worker的構造函數(shù)中新建了一條線程并且把引用賦值給了thread對象。

而在上面的addWorker方法中start了這條線程,而這條線程的Runnable對象正是Worker對象自身。

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

既然addWorker方法執(zhí)行了線程的start方法,因此Worker類里面的run方法將被系統(tǒng)調度

 /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

里面只有一個runWorker方法,并且把Worker對象傳了進去,明顯,runWorker是實現(xiàn)線程復用的方法 。

 
    /**
     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them, while coping with a number of issues:
     *
     * 1. We may start out with an initial task, in which case we
     * don't need to get the first one. Otherwise, as long as pool is
     * running, we get tasks from getTask. If it returns null then the
     * worker exits due to changed pool state or configuration
     * parameters.  Other exits result from exception throws in
     * external code, in which case completedAbruptly holds, which
     * usually leads processWorkerExit to replace this thread.
     *
     * 2. Before running any task, the lock is acquired to prevent
     * other pool interrupts while the task is executing, and then we
     * ensure that unless pool is stopping, this thread does not have
     * its interrupt set.
     *
     * 3. Each task run is preceded by a call to beforeExecute, which
     * might throw an exception, in which case we cause thread to die
     * (breaking loop with completedAbruptly true) without processing
     * the task.
     *
     * 4. Assuming beforeExecute completes normally, we run the task,
     * gathering any of its thrown exceptions to send to afterExecute.
     * We separately handle RuntimeException, Error (both of which the
     * specs guarantee that we trap) and arbitrary Throwables.
     * Because we cannot rethrow Throwables within Runnable.run, we
     * wrap them within Errors on the way out (to the thread's
     * UncaughtExceptionHandler).  Any thrown exception also
     * conservatively causes thread to die.
     *
     * 5. After task.run completes, we call afterExecute, which may
     * also throw an exception, which will also cause thread to
     * die. According to JLS Sec 14.20, this exception is the one that
     * will be in effect even if task.run throws.
     *
     * The net effect of the exception mechanics is that afterExecute
     * and the thread's UncaughtExceptionHandler have as accurate
     * information as we can provide about any problems encountered by
     * user code.
     *
     * @param w the worker
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

代碼不多,注釋很多,核心意思就是這是一個死循環(huán),不斷從隊列獲取任務進行執(zhí)行。

通過上面代碼可以清晰的看出,一開始將firstTask賦值給task Runnable對象,然后下面有個while死循環(huán),不斷的從隊列獲取task進行執(zhí)行,里面的核心邏輯就是task.run(),Runnable對象的run方法由這條Worker線程像調用普通方法一樣的調用,這個就是線程復用的原理。

將Runnable對象放隊列,然后在一個主循環(huán)里面不斷從隊列里獲取任務進行執(zhí)行。

最后看一下getTask方法

    /**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait, and if the queue is
     *    non-empty, this worker is not the last thread in the pool.
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
 
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
 
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
 
            int wc = workerCountOf(c);
 
            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
 
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
 
            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

可見,里面就是從隊列里面獲取一個Runnable對象進行返回而已。

總結

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關文章

  • springboot controller 增加指定前綴的兩種實現(xiàn)方法

    springboot controller 增加指定前綴的兩種實現(xiàn)方法

    這篇文章主要介紹了springboot controller 增加指定前綴的兩種實現(xiàn)方法,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-02-02
  • java模擬實現(xiàn)銀行ATM機操作

    java模擬實現(xiàn)銀行ATM機操作

    這篇文章主要為大家詳細介紹了java模擬實現(xiàn)銀行ATM機操作,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-05-05
  • Springboot實現(xiàn)TLS雙向認證的方法

    Springboot實現(xiàn)TLS雙向認證的方法

    本文介紹了使用keytool生成和管理自簽名CA證書、服務器證書和客戶端證書的方法,適合Java生態(tài)系統(tǒng),通過配置信任庫和服務器/客戶端配置,實現(xiàn)了Spring Boot中的TLS雙向認證,感興趣的朋友一起看看吧
    2025-02-02
  • Springboot中的Validation參數(shù)校驗詳解

    Springboot中的Validation參數(shù)校驗詳解

    這篇文章主要介紹了Springboot中的Validation參數(shù)校驗詳解,Springboot參數(shù)校驗是一種常用的驗證機制,在傳遞參數(shù)時進行校驗,以確保參數(shù)的有效性和正確性,該機制可以幫助開發(fā)者在代碼實現(xiàn)前就避免一些常見的錯誤,需要的朋友可以參考下
    2023-10-10
  • 分布式開發(fā)醫(yī)療掛號系統(tǒng)數(shù)據(jù)字典模塊前后端實現(xiàn)

    分布式開發(fā)醫(yī)療掛號系統(tǒng)數(shù)據(jù)字典模塊前后端實現(xiàn)

    這篇文章主要為大家介紹了分布式開發(fā)醫(yī)療掛號系統(tǒng)數(shù)據(jù)字典模塊前后端實現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-04-04
  • 淺談SpringMVC之視圖解析器(ViewResolver)

    淺談SpringMVC之視圖解析器(ViewResolver)

    本篇文章主要介紹了淺談SpringMVC之視圖解析器(ViewResolver),具有一定的參考價值,有興趣的可以了解一下
    2017-08-08
  • 這一次搞懂Spring事務是如何傳播的

    這一次搞懂Spring事務是如何傳播的

    這篇文章主要介紹了這一次搞懂Spring事務是如何傳播的,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-08-08
  • Java中List的使用方法簡單介紹

    Java中List的使用方法簡單介紹

    這篇文章主要針對Java中List的使用方法為大家介紹了進行簡單介紹,List是個集合接口,只要是集合類接口都會有個“迭代子”( Iterator ),利用這個迭代子,就可以對list內(nèi)存的一組對象進行操作,感興趣的小伙伴們可以參考一下
    2016-07-07
  • 使用SpringBoot創(chuàng)建一個RESTful API的詳細步驟

    使用SpringBoot創(chuàng)建一個RESTful API的詳細步驟

    使用 Java 的 Spring Boot 創(chuàng)建 RESTful API 可以滿足多種開發(fā)場景,它提供了快速開發(fā)、易于配置、可擴展、可維護的優(yōu)點,尤其適合現(xiàn)代軟件開發(fā)的需求,幫助你快速構建出高性能的后端服務,需要的朋友可以參考下
    2025-01-01
  • Java中synchronized關鍵字修飾方法同步的用法詳解

    Java中synchronized關鍵字修飾方法同步的用法詳解

    synchronized可以用來同步靜態(tài)和非靜態(tài)方法,下面就具體來看一下Java中synchronized關鍵字修飾方法同步的用法詳解:
    2016-06-06

最新評論