欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java中ExecutorService和ThreadPoolExecutor運(yùn)行原理

 更新時(shí)間:2021年08月31日 11:22:06   作者:丁丁丁沖鴨丶  
本文主要介紹了Java中ExecutorService和ThreadPoolExecutor運(yùn)行原理,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下

為什么要使用線程池

服務(wù)器應(yīng)用程序中經(jīng)常出現(xiàn)的情況是:?jiǎn)蝹€(gè)任務(wù)處理的時(shí)間很短而請(qǐng)求的數(shù)目卻是巨大的。

構(gòu)建服務(wù)器應(yīng)用程序的一個(gè)過(guò)于簡(jiǎn)單的模型應(yīng)該是:每當(dāng)一個(gè)請(qǐng)求到達(dá)就創(chuàng)建一個(gè)新線程,然后在新線程中為請(qǐng)求服務(wù)。實(shí)際上,對(duì)于原型開(kāi)發(fā)這種方法工作得很好,但如果試圖部署以這種方式運(yùn)行的服務(wù)器應(yīng)用程序,那么這種方法的嚴(yán)重不足就很明顯。

每個(gè)請(qǐng)求對(duì)應(yīng)一個(gè)線程(thread-per-request)方法的不足之一是:為每個(gè)請(qǐng)求創(chuàng)建一個(gè)新線程的開(kāi)銷很大;為每個(gè)請(qǐng)求創(chuàng)建新線程的服務(wù)器在創(chuàng)建和銷毀線程上花費(fèi)的時(shí)間和消耗的系統(tǒng)資源要比花在處理實(shí)際的用戶請(qǐng)求的時(shí)間和資源更多。除了創(chuàng)建和銷毀線程的開(kāi)銷之外,活動(dòng)的線程也消耗系統(tǒng)資源(線程的生命周期!)。在一個(gè)JVM 里創(chuàng)建太多的線程可能會(huì)導(dǎo)致系統(tǒng)由于過(guò)度消耗內(nèi)存而用完內(nèi)存或“切換過(guò)度”。為了防止資源不足,服務(wù)器應(yīng)用程序需要一些辦法來(lái)限制任何給定時(shí)刻處理的請(qǐng)求數(shù)目。

線程池為線程生命周期開(kāi)銷問(wèn)題和資源不足問(wèn)題提供了解決方案。通過(guò)對(duì)多個(gè)任務(wù)重用線程,線程創(chuàng)建的開(kāi)銷被分?jǐn)偟搅硕鄠€(gè)任務(wù)上。其好處是,因?yàn)樵谡?qǐng)求到達(dá)時(shí)線程已經(jīng)存在,所以無(wú)意中也消除了線程創(chuàng)建所帶來(lái)的延遲。這樣,就可以立即為請(qǐng)求服務(wù),使應(yīng)用程序響應(yīng)更快。而且,通過(guò)適當(dāng)?shù)卣{(diào)整線程池中的線程數(shù)目,也就是當(dāng)請(qǐng)求的數(shù)目超過(guò)某個(gè)閾值時(shí),就強(qiáng)制其它任何新到的請(qǐng)求一直等待,直到獲得一個(gè)線程來(lái)處理為止,從而可以防止資源不足。

線程池的創(chuàng)建

ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("thread-name").build();
    //創(chuàng)建線程池
ExecutorService exc = new ThreadPoolExecutor(20, 20, 30000,
            TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), namedThreadFactory);

/*
	參數(shù)的意義
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
*/

線程的提交方法

在idea中,我們可以通過(guò)alt+7(注意不是F7)在左下角查看當(dāng)前類的所有方法

在這里插入圖片描述

在圖中我們可以看到ExecutorService有execute和submit兩種方法,但是他并沒(méi)有實(shí)現(xiàn)execute方法,所以方法是灰的

接下來(lái)我們看ExecutorService的實(shí)現(xiàn)類ThreadPoolExecutor

在這里插入圖片描述

可以看到ThreadPoolExecutor 實(shí)現(xiàn)了execute這個(gè)方法,那接下來(lái)我們具體看execute和submit的方法

具體實(shí)現(xiàn)

在我第一次學(xué)習(xí)時(shí),我就是這么簡(jiǎn)單來(lái)理解的

exc.submit() //提交有返回值   傳入的為callable和runable  返回值為future
exc.execute() //提交無(wú)返回值  傳入的為runable

但是我發(fā)現(xiàn)為什么submit可以執(zhí)行callable,又執(zhí)行runable?這不是兩個(gè)不同的創(chuàng)建線程的方式嗎?我點(diǎn)進(jìn)去了ThreadPoolExecutor類,但是在其中沒(méi)有找到submit方法,于是我按了alt+7

在這里插入圖片描述

在AbstractExecutorService類中發(fā)現(xiàn)了submit方法

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
    }

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

在代碼中我們可以看到,三個(gè)方法都是用這個(gè)代碼來(lái)統(tǒng)一實(shí)現(xiàn)的,

        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;

不同的是,當(dāng)使用submit而傳入的是runable接口時(shí),會(huì)多一個(gè)返回值的參數(shù),如果沒(méi)有這個(gè)參數(shù)則會(huì)在newTaskFor中多加一個(gè)null參數(shù),我們?cè)龠M(jìn)入newTaskFor方法

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
    }

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new FutureTask<T>(callable);
    }

同樣存在兩個(gè)方法,一個(gè)是為了接收runable,一個(gè)接收callable,但是這次都是使用了new FutureTask來(lái)傳入,因?yàn)镕utureTask可以運(yùn)行二者

我們知道Thread實(shí)現(xiàn)了Runable接口可以實(shí)現(xiàn)線程,但關(guān)于為什么FutureTask可以運(yùn)行Callable接口

首先,在下面代碼中可以看到FutureTask實(shí)現(xiàn)了RunnableFuture,RunnableFuture實(shí)現(xiàn)了Runnable,所以可以用過(guò)Thread來(lái)運(yùn)行start

public class FutureTask<V> implements RunnableFuture<V> 

public interface RunnableFuture<V> extends Runnable, Future<V> 


FutureTask<Integer> futureTask = new FutureTask(callableTask);
Thread thread = new Thread(futureTask);
thread.start();

而Thread中的start又是調(diào)用的接口中的run方法,但是Callable明明沒(méi)有run方法啊,這就要看FutureTask中的run方法了

在這里插入圖片描述

在這個(gè)FutureTask的run方法中,調(diào)用的是Callable的call方法,所以得以運(yùn)行,并且將返回值另外保存了,關(guān)于異步返回值的原理下次再說(shuō)。

回到new FutureTask(callable);的代碼中
我們接著點(diǎn)進(jìn)去看源碼

public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

對(duì)于第一個(gè)傳入callable的我們已經(jīng)知道了原理,就是FutureTask如何通過(guò)Thread運(yùn)行Callable的。
那么對(duì)于第二個(gè)傳入Runable的代碼又是個(gè)什么東西?

this.callable = Executors.callable(runnable, result);

好,我們?cè)冱c(diǎn)進(jìn)去看源碼,進(jìn)入了一個(gè)Executors,這個(gè)類沒(méi)有任何的實(shí)現(xiàn)與繼承,真是太好了,看方法

public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }

哦豁,又將Runable傳入了一個(gè)RunnableAdapter類,真復(fù)雜,就要看看到底多少層,看這個(gè)名字,Adapter,適配器?大概知道里面會(huì)有什么操作了,我們?cè)冱c(diǎn)進(jìn)去看源碼

static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

這是Executors中的一個(gè)內(nèi)部類,它實(shí)現(xiàn)了Callable接口,我依稀記得傳入的是一個(gè)Runable接口,原來(lái)在這個(gè)類中,將Callable的call方法重寫(xiě)了,其中調(diào)用了Runable的run方法,并且具有返回值,還記得這個(gè)result嗎,在最初的AbstractExecutorService中,對(duì)于傳入Runable的submit方法有兩個(gè),有參數(shù)則傳遞,無(wú)參數(shù)則傳入為null,

在這里插入圖片描述

總結(jié)1

至此,就了解到了為什么submit又可以傳入Runable也可以傳入Callable。
接下來(lái)稍微說(shuō)一下如何運(yùn)行。

ThreadPoolExecutor運(yùn)行原理

在這里插入圖片描述

細(xì)心的小伙伴發(fā)現(xiàn)了,說(shuō)好的submit,竟然又用execute
進(jìn)入execute方法,直接用ctrl點(diǎn)擊無(wú)效,我們打開(kāi)ThreadPoolExecutor中,按alt+7,左下角execute是亮的,方法是在這里實(shí)現(xiàn)的

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

先是判斷線程池?cái)?shù)量,在判斷傳入線程狀態(tài),滿足條件就使用addWorker,不滿足就reject拒絕
進(jìn)入addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

這里又是一堆條件驗(yàn)證,都是核心代碼,最后通過(guò)一個(gè)內(nèi)置類worker來(lái)獲取線程實(shí)例,然后加鎖繼續(xù)驗(yàn)證,條件都滿足時(shí),t.start(),終于可以運(yùn)行

Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            ---
            final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
            ---
            workerAdded = true;
            ---
            if (workerAdded) {
                    t.start();
            ---

總結(jié)2

至此,整個(gè)流程都已說(shuō)明,關(guān)于Callable和Future完成異步返回值的原理,下次再說(shuō)

到此這篇關(guān)于Java中ExecutorService和ThreadPoolExecutor運(yùn)行原理的文章就介紹到這了,更多相關(guān)Java ExecutorService ThreadPoolExecutor內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論