欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

一文徹底搞懂java多線程和線程池

 更新時間:2021年09月15日 11:03:16   作者:hguisu  
當(dāng)一個服務(wù)器接受到大量短小線程的請求時,使用線程池技術(shù)是非常合適的,它可以大大減少線程的創(chuàng)建和銷毀次數(shù),提高服務(wù)器的工作效率,這篇文章主要給大家介紹了如何通過一文徹底搞懂java多線程和線程池的相關(guān)資料,需要的朋友可以參考下

 什么是線程 

        傳統(tǒng)的程序設(shè)計語言同一時刻只能執(zhí)行單任務(wù)操作,效率非常低,如果網(wǎng)絡(luò)程序在接收數(shù)據(jù)時發(fā)生阻塞,只能等到程序接收數(shù)據(jù)之后才能繼續(xù)運行。隨著 Internet 的飛速發(fā)展,這種單任務(wù)運行的狀況越來越不被接受。如果網(wǎng)絡(luò)接收數(shù)據(jù)阻塞,后臺服務(wù)程序就會一直處于等待狀態(tài)而不能繼續(xù)任何操作。 這種阻塞情況經(jīng)常發(fā)生, 這時的 CPU資源完全處于閑置狀態(tài)。        

        多線程實現(xiàn)后臺服務(wù)程序可以同時處理多個任務(wù),并不發(fā)生阻塞現(xiàn)象。多線程是 Java 語言的一個很重要的特征。 多線程程序設(shè)計最大的特點就是能夠提高程序執(zhí)行效率和處理速度。Java 程序可同時并行運行多個相對獨立的線程。例如創(chuàng)建一個線程來接收數(shù)據(jù),另一個線程發(fā)送數(shù)據(jù),既使發(fā)送線程在接收數(shù)據(jù)時被阻塞,接受數(shù)據(jù)線程仍然可以運行。 線程(Thread)是控制線程(Thread of Control)的縮寫,它是具有一定順序的指令序列(即所編寫的程序代碼)、存放方法中定義局部變量的棧和一些共享數(shù)據(jù)。線程是相互獨立的,每個方法的局部變量和其他線程的局部變量是分開的,因此,任何線程都不能訪問除自身之外的其他線程的局部變量。如果兩個線程同時訪問同一個方法,那每個線程將各自得到此方法的一個拷貝。     

       Java 提供的多線程機制使一個程序可同時執(zhí)行多個任務(wù)。線程有時也被稱為小進程,它是從一個大進程里分離出來的小的獨立的線程。由于實現(xiàn)了多線程技術(shù),Java 顯得更健壯。多線程帶來的好處是更好的交互性能和實時控制性能。多線程是強大而靈巧的編程工具,但要用好它卻不是件容易的事。在多線程編程中,每個線程都通過代碼實現(xiàn)線程的行為,并將數(shù)據(jù)供給代碼操作。編碼和數(shù)據(jù)有時是相當(dāng)獨立的,可分別向線程提供。多個線程可以同時處理同一代碼和同一數(shù)據(jù),不同的線程也可以處理各自不同的編碼和數(shù)據(jù)。

一. Java實現(xiàn)線程的三種方式

先簡單看看java多線程如何實現(xiàn)的:

1.1、繼承Thread類

讓自己的類繼承 Thread 類:

public class Test extends Thread {
    public static void main(String[] args) {
        Thread t = new Test();
        t.start();
    }
 
    @Override
    public void run() {
        System.out.println("Override run() ...");
    }
}

用Thread類的方式創(chuàng)建多線程的特點:

      1、因為線程已經(jīng)繼承Thread類,所以不可以再繼承其它類。

       2、如果需要訪問當(dāng)前線程,直接使用this即可。 

1.2、實現(xiàn)Runnable接口,并覆寫run方法

實現(xiàn) Runnable 接口:

public class RunnableTest implements Runnable {
    public static void main(String[] args) throws Exception{
        Thread t = new Thread(new RunnableTest());
        t.start();
    }
    @Override
    public void run(){
        System.out.println(123);
    }
}

1.3、實現(xiàn)Callable接口,并覆寫call方法

  1. 實現(xiàn)Callable接口的類TCallable
  2. 以TCallable為參數(shù)創(chuàng)建FutureTask對象
  3. 將FutureTask作為參數(shù)創(chuàng)建Thread對象
  4. 調(diào)用線程對象的start()方法
public class TCallable implements Callable{
	public static void main() {
        FutureTask futureTask = new FutureTask(new TCallable());
        Thread thread = new Thread(futureTask);
        thread.start();
        try {
            Object o = futureTask.get();
            System.out.println(o);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
 
    @Override
    public Object call() throws Exception {
        return "Override call() ...";
    }
}

用Runnable與Callable接口的方式創(chuàng)建多線程的特點:

       1、線程類只是實現(xiàn)了Runnable接口或Callable接口,還可以繼承其它類。

       2、在這種方式下,多個線程可以共享一個target對象,所以非常適合多個線程來處理同一份資源情況。

       3、如果需要訪問當(dāng)前線程,需要使用Thread.currentThread方法。

       4、Callable接口與Runnable接口相比,只是Callable接口可以返回值而已。 

主要介紹第三種方式Callable,因為講到線程池的時候,大多時候使用Callable接口。

二. Callable接口

Callable用于產(chǎn)生結(jié)果,F(xiàn)uture用于獲取結(jié)果。

2.1 Callable接口

     Callable接口代表一段可以調(diào)用并返回結(jié)果的代碼;

      Java 5在concurrency包中引入了java.util.concurrent.Callable 接口,它和Runnable接口很相似,但是 Runnable 不會返回結(jié)果,并且無法拋出返回結(jié)果的異常。 Callable可以返回一個對象或者拋出一個異常。

      Callable接口使用泛型去定義它的返回類型。Executors類提供了一些有用的方法在線程池中執(zhí)行Callable內(nèi)的任務(wù)。由于Callable任務(wù)是并行的,我們必須等待它返回的結(jié)果。

Callable接口的源碼如下: 

public interface Callable<V> {
    V call() throws Exception; // 計算結(jié)果
}

2.2 Future接口

Future用于表示異步計算的結(jié)果。

由于Callable任務(wù)是并行的,我們必須等待它返回的結(jié)果。那如何獲取返回結(jié)果?java.util.concurrent.Future對象為我們解決了這個問題。在線程池提交Callable任務(wù)后返回了一個Future對象,使用Future可以知道Callable任務(wù)的狀態(tài)和得到Callable返回的執(zhí)行結(jié)果。Future提供get()方法讓我們可以等待Callable結(jié)束并獲取它的執(zhí)行結(jié)果。

Future接口的源碼如下:從源碼我們可以知道,F(xiàn)uture可以對異步運算的任務(wù)的結(jié)果進行等待獲取、判斷是否已經(jīng)完成、取消任務(wù)等操作。

public interface Future<V> {
    boolean  cancel(boolean mayInterruptIfRunning);// 試圖取消對此任務(wù)的執(zhí)行
    boolean  isCancelled();      // 如果在任務(wù)正常完成前將其取消,則返回 true
    boolean  isDone();           // 如果任務(wù)已完成,則返回 true
    V  get() throws InterruptedException, ExecutionException; // 如有必要,等待計算完成,然后獲取其結(jié)果
    // 如有必要,最多等待為使計算完成所給定的時間之后,獲取其結(jié)果(如果結(jié)果可用)。
    V  get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

2.3 Future實現(xiàn)類是FutureTask。

如果不想分支線程阻塞主線程,又想取得分支線程的執(zhí)行結(jié)果,就用FutureTask , FutureTask實現(xiàn)了RunnableFuture接口,這個接口的定義如下:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

可以看到這個接口實現(xiàn)了Runnable和Future接口,接口中的具體實現(xiàn)由FutureTask來實現(xiàn)。這個類的構(gòu)造方法如下 :

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    // state記錄FutureTask的狀態(tài)
    this.state = NEW;       // ensure visibility of callable
}

FutureTask 構(gòu)造函數(shù)傳入一個 Callable 的具體實現(xiàn)類,一個 FutureTask 對象可以對調(diào)用了 Callable 和 Runnable 的對象進行包裝,由于 FutureTask 也是Runnable 接口的實現(xiàn)類,所以 FutureTask 也可以放入線程池中。 

在web容器使用這種多線程的方式,要記住記得shutdown關(guān)閉,否則可能導(dǎo)致線程沒有被關(guān)閉回收,結(jié)果線程數(shù)一直增加

當(dāng)線程數(shù)太多時,肯定會導(dǎo)致內(nèi)存溢出或者影響服務(wù)器性能等。

三. Java線程池

既然單個線程的創(chuàng)建和銷毀都很簡單,我們?yōu)槭裁匆褂镁€程池?

3.1、背景

JAVA是面向?qū)ο?,JAVA虛擬機每創(chuàng)建一個對象都要獲取內(nèi)存資源或者其他很多其他資源,同時試圖跟蹤每個對象,以便可以在對象銷毀后進行垃圾回收。因此創(chuàng)建和銷毀對象是非常費時間的。所以為了盡可能提高應(yīng)用程序資源使用效率,就是要降低就是盡可能降低創(chuàng)建和銷毀對象的次數(shù),特別是一些非常耗資源的對象創(chuàng)建和銷毀,這就是一些"池化資源"技術(shù)產(chǎn)生的原因,比如大家熟悉的數(shù)據(jù)庫連接池。

針對線程而言:

線程占用系統(tǒng)內(nèi)存:雖然線程相對于進程而言是輕量級的,創(chuàng)建線程依然需要占用系統(tǒng)的內(nèi)存資源。如果無限制的創(chuàng)建線程,對應(yīng)虛擬機垃圾回收而言也是很有壓力的,畢竟線程也是對象。

線程的創(chuàng)建和關(guān)閉是需要花費時間的:如果任務(wù)非常多,頻繁的創(chuàng)建和銷毀線程也是需要占用系統(tǒng)的時間片的。

3.2、作用

1)降低資源消耗,重復(fù)利用已經(jīng)創(chuàng)建好的線程,降低線程創(chuàng)建和銷毀造成的資源消耗
2)提高響應(yīng)速度,當(dāng)任務(wù)到達時,任務(wù)可以不需要等到線程創(chuàng)建就可以立即執(zhí)行
3)提高線程的可管理性,線程是稀缺資源,如果無限制的創(chuàng)建,不僅消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。

3.3、應(yīng)用范圍

1)、需要大量的線程來完成任務(wù)。且完成任務(wù)的時間比較短。

    http請求這種任務(wù),使用線程池技術(shù)是很合適的。由于單個任務(wù)小,而任務(wù)數(shù)量巨大,你能夠想象一個熱門站點的點擊次數(shù)。

    如果一個線程的任務(wù)執(zhí)行時間非常長,就沒必要用線程池,比如一個telnet連接請求,線程池的優(yōu)勢就不明顯了。

2、對性能要求苛刻的應(yīng)用,比方要求server迅速響應(yīng)客戶請求。

3、接受突發(fā)性的大量請求,但不至于使server因此產(chǎn)生大量線程的應(yīng)用。突發(fā)性大量客戶請求,在沒有線程池情況下,將產(chǎn)生大量線程,盡管理論上大部分操作系統(tǒng)線程數(shù)目最大值不是問題,短時間內(nèi)產(chǎn)生大量線程可能使內(nèi)存到達極限,并出現(xiàn)"OutOfMemory"的錯誤。 

四. Java 線程池框架Executor

一個線程池包含下面四個基本組成部分

1、線程池管理器(ThreadPool):用于創(chuàng)建并管理線程池。包含 創(chuàng)建線程池,銷毀線程池,加入新任務(wù);

2、工作線程(PoolWorker):線程池中線程,在沒有任務(wù)時處于等待狀態(tài)。能夠循環(huán)的運行任務(wù);

3、任務(wù)接口(Task):每一個任務(wù)必須實現(xiàn)的接口,以供工作線程調(diào)度任務(wù)的運行。它主要規(guī)定了任務(wù)的入口。任務(wù)運行完后的收尾工作,任務(wù)的運行狀態(tài)等。

4、任務(wù)隊列(taskQueue):用于存放沒有處理的任務(wù)。提供一種緩沖機制。

下面我們主要官方提供的線程池管理工具Executor框架。

Executor框架是一個根據(jù)一組執(zhí)行策略調(diào)用,調(diào)度,執(zhí)行和控制的異步任務(wù)的框架。無限制的創(chuàng)建線程會引起應(yīng)用程序內(nèi)存溢出。所以創(chuàng)建一個線程池是個更好的的解決方案,因為可以限制線程的數(shù)量并且可以回收再利用這些線程。Executor框架包括:線程池,Executor,Executors,ExecutorService,CompletionService,F(xiàn)uture,Callable等。

4.1、類圖:

     Executor是一個頂層接口,在它里面只聲明了一個方法execute(Runnable),返回值為void,參數(shù)為Runnable類型,從字面意思可以理解,就是用來執(zhí)行傳進去的任務(wù)的;

    然后ExecutorService接口繼承了Executor接口,并聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

 抽象類AbstractExecutorService實現(xiàn)了ExecutorService接口,基本實現(xiàn)了ExecutorService中聲明的所有方法;

 然后ThreadPoolExecutor繼承了類AbstractExecutorService。

4.2 核心類ThreadPoolExecutor:

     java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,因此如果要透徹地了解Java中的線程池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實現(xiàn)源碼。

在ThreadPoolExecutor類中提供了四個構(gòu)造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(
             int corePoolSize,
             int maximumPoolSize,
             long keepAliveTime,
             TimeUnit unit,
             BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(
             int corePoolSize,
             int maximumPoolSize,
             long keepAliveTime,
             TimeUnit unit,
             BlockingQueue<Runnable> workQueue,
             ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(
             int corePoolSize,
             int maximumPoolSize,
             long keepAliveTime,
             TimeUnit unit,
             BlockingQueue<Runnable> workQueue,
             RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(
             int corePoolSize,
             int maximumPoolSize,
             long keepAliveTime,
             TimeUnit unit,
             BlockingQueue<Runnable> workQueue,
             ThreadFactory threadFactory,
             RejectedExecutionHandler handler);
    ...
}

ThreadPoolExecutor提供了四個構(gòu)造器,事實上,通過觀察每個構(gòu)造器的源碼具體實現(xiàn),發(fā)現(xiàn)前面三個構(gòu)造器都是調(diào)用的第四個構(gòu)造器進行的初始化工作。下面解釋下一下構(gòu)造器中各個參數(shù)的含義:

public ThreadPoolExecutor(int corePoolSize,  //線程池核心線程數(shù)量
                   int maximumPoolSize, //線程池最大線程數(shù)量
                   long keepAliveTime, //線程KeepAlive時間,當(dāng)線程池數(shù)量超過核心線程數(shù)量以后,idle時間超過這個值的線程會被終止
                   TimeUnit unit,              //線程KeepAlive時間單位
                   BlockingQueue<Runnable> workQueue,    //任務(wù)隊列
                   ThreadFactory threadFactory,                  //創(chuàng)建線程的工廠對象
                   RejectedExecutionHandler handler)  

corePoolSize:  核心池的大小,這個參數(shù)跟后面講述的線程池的實現(xiàn)原理有非常大的關(guān)系。在創(chuàng)建了線程池后,默認(rèn)情況下,線程池中并沒有任何線程,而是等待有任務(wù)到來才創(chuàng)建線程去執(zhí)行任務(wù),除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預(yù)創(chuàng)建線程的意思,即在沒有任務(wù)到來之前就創(chuàng)建corePoolSize個線程或者一個線程。默認(rèn)情況下,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0,當(dāng)有任務(wù)來之后,就會創(chuàng)建一個線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達到corePoolSize后,就會把到達的任務(wù)放到緩存隊列當(dāng)中;

maximumPoolSize:線程池最大線程數(shù),這個參數(shù)也是一個非常重要的參數(shù),它表示在線程池中最多能創(chuàng)建多少個線程;

keepAliveTime:表示線程沒有任務(wù)執(zhí)行時最多保持多久時間會終止。默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數(shù)不大于corePoolSize,即當(dāng)線程池中的線程數(shù)大于corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數(shù)不超過corePoolSize。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時,keepAliveTime參數(shù)也會起作用,直到線程池中的線程數(shù)為0;

unit:參數(shù)keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態(tài)屬性:

       TimeUnit.DAYS;               //天
       TimeUnit.HOURS;             //小時
       TimeUnit.MINUTES;           //分鐘
       TimeUnit.SECONDS;           //秒
       TimeUnit.MILLISECONDS;      //毫秒
       TimeUnit.MICROSECONDS;      //微妙
       TimeUnit.NANOSECONDS;       //納秒

workQueue:一個阻塞隊列,用來存儲等待執(zhí)行的任務(wù),這個參數(shù)的選擇也很重要,會對線程池的運行過程產(chǎn)生重大影響,一般來說,這里的阻塞隊列有以下幾種選擇:ArrayBlockingQueue;LinkedBlockingQueue;SynchronousQueue;

  1. ArrayBlockingQueue :  有界的數(shù)組隊列

   2. LinkedBlockingQueue : 可支持有界/無界的隊列,使用鏈表實現(xiàn)

   3. PriorityBlockingQueue : 優(yōu)先隊列,可以針對任務(wù)排序

   4. SynchronousQueue : 隊列長度為1的隊列,和Array有點區(qū)別就是:client thread提交到block queue會是一個阻塞過程,直到有一個worker thread連接上來poll task。

     ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關(guān)。

threadFactory:線程工廠,主要用來創(chuàng)建線程;

handler:表示當(dāng)拒絕處理任務(wù)時的策略,有以下四種取值:

       ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。 
       ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
       ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)
       ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)

在ThreadPoolExecutor類中有幾個非常重要的方法:

execute()

submit()

shutdown()

shutdownNow()

execute()方法:  實際上是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現(xiàn),這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向線程池提交一個任務(wù),交由線程池去執(zhí)行。

submit()方法: 是在ExecutorService中聲明的方法,在AbstractExecutorService就已經(jīng)有了具體的實現(xiàn),在ThreadPoolExecutor中并沒有對其進行重寫,這個方法也是用來向線程池提交任務(wù)的,但是它和execute()方法不同,它能夠返回任務(wù)執(zhí)行的結(jié)果,去看submit()方法的實現(xiàn),會發(fā)現(xiàn)它實際上還是調(diào)用的execute()方法,只不過它利用了Future來獲取任務(wù)執(zhí)行結(jié)果(Future相關(guān)內(nèi)容將在下一篇講述)。

shutdown()和shutdownNow()是用來關(guān)閉線程池的。

     還有很多其他的方法:

      比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關(guān)屬性的方法,有興趣的朋友可以自行查閱API。

4.3 ThreadPoolExecutor邏輯結(jié)構(gòu)

   ThreadPoolExecutor線程池的邏輯結(jié)構(gòu):

第一步、初始的poolSize < corePoolSize,提交的runnable任務(wù),會直接做為new一個Thread的參數(shù),立馬執(zhí)行

第二步、當(dāng)提交的任務(wù)數(shù)超過了corePoolSize,就進入了第二步操作。會將當(dāng)前的runable提交到一個block queue中

第三步,如果block queue是個有界隊列,當(dāng)隊列滿了之后就進入了第三步。如果poolSize < maximumPoolsize時,會嘗試new 一個Thread的進行救急處理,立馬執(zhí)行對應(yīng)的runnable任務(wù)

第四步,如果第三步救急方案也無法處理了,就會走到第四步執(zhí)行reject操作

五.Executor線程池實例

比如我們要并發(fā)調(diào)用遠(yuǎn)程http接口:

5.1、使用Runnable接口創(chuàng)建線程,并由executor調(diào)度執(zhí)行:

 
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
 
public class TExecutor  {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 5; i++) {
            RunnableTask runnable = new RunnableTask("http://xxxx.com/api/v2", Integer.toString(i));
            executorService.execute(runnable);
        }
        System.out.println("線程任務(wù)開始執(zhí)行");
        executorService.shutdown();
 
    }
 
    /**
     * 線程池提交Runnable任務(wù)
     */
    public void  runnableTask() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        Future<?> futureTask = null;
        for (int i = 0; i < 5; i++) {
            RunnableTask runnable = new RunnableTask("http://xxxx.com/api/v2", Integer.toString(1));
            futureTask = executorService.submit(runnable);
            try {
                futureTask.get(10, TimeUnit.SECONDS);    //等待超時,等待給定的時間之后,獲取其結(jié)果
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                e.printStackTrace();
            } finally{
                futureTask.cancel(true);
            }
        }
        executorService.shutdown();
    }
}
class RunnableTask implements Runnable {
    String url;
    String param;
    public RunnableTask(String url, String param) {
        this.url =  url;
        this.param =  param;
    }
    //http調(diào)用
    public String requestHttp() {
        String result = "";
        return result;
    }
 
    @Override
    public void run() {
        System.out.println("正在執(zhí)行task "+param);
        try {
            requestHttp();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("task "+param+"執(zhí)行完畢");
    }
}

5.2、使用Callable接口創(chuàng)建線程,并由executor調(diào)度執(zhí)行:

 
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.TimeUnit;
public class TestPoolThread {
    private ExecutorService executorService = new ThreadPoolExecutor(2, 2,
            0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<Runnable>(100));
    public String main(String gid) {
        List<Callable<String>> tasks = new ArrayList<>();
        tasks.add(new CallableTask("http://xxxx.com/api/v2", "1"));
        tasks.add(new CallableTask("http://xxxx.com/api/v2", "2"));
        try {
            List<Future<String>> mFuture = executorService.invokeAll(tasks, 10, TimeUnit.SECONDS);
            String  result1 = mFuture.get(0).get();
            System.out.println( "result1:" + result1);
            String  result2 = mFuture.get(0).get();
            System.out.println( "result2:" + result2);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "";
    }
}
class CallableTask implements Callable<String> {
    String url;
    String param;
    public CallableTask(String url, String param) {
        this.url =  url;
        this.param =  param;
    }
    //http調(diào)用
    public String requestHttp() {
 
        String result = "";
        return result;
    }
    @Override
    public String call() {
        try {
            String param = "{}";
            String result = requestHttp( );
            return result;
        } catch (Exception e) {
            e.printStackTrace();
            return "";
        }
    }
}

不過在java doc中,并不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態(tài)方法來創(chuàng)建線程池:

Executors.newCachedThreadPool : 創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。

Executors.newCachedThreadPool :創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。

Executors.newFixedThreadPool  :創(chuàng)建一個定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會在隊列中等待。

Executors.newScheduledThreadPool :創(chuàng)建一個定長線程池,支持定時及周期性任務(wù)執(zhí)行。

Executors.newSingleThreadExecutor :創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行。

靜態(tài)方法的具體實現(xiàn):

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

從它們的具體實現(xiàn)來看,它們實際上也是調(diào)用了ThreadPoolExecutor,只不過參數(shù)都已配置好了。

  newFixedThreadPool創(chuàng)建的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

  newSingleThreadExecutor將corePoolSize和maximumPoolSize都設(shè)置為1,也使用的LinkedBlockingQueue;

  newCachedThreadPool將corePoolSize設(shè)置為0,將maximumPoolSize設(shè)置為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務(wù)就創(chuàng)建線程運行,當(dāng)線程空閑超過60秒,就銷毀線程。

  實際中,如果Executors提供的三個靜態(tài)方法能滿足要求,就盡量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的參數(shù)有點麻煩,要根據(jù)實際任務(wù)的類型和數(shù)量來進行配置。

  另外,如果ThreadPoolExecutor達不到要求,可以自己繼承ThreadPoolExecutor類進行重寫。

六.深入剖析線程池實現(xiàn)原理

  在上一節(jié)我們從宏觀上介紹了ThreadPoolExecutor,下面我們來深入解析一下線程池的具體實現(xiàn)原理,將從下面幾個方面講解:

1.線程池狀態(tài)
2.線程池中的線程初始化
3.任務(wù)的執(zhí)行
4.任務(wù)緩存隊列及排隊策略
5.任務(wù)拒絕策略
6.線程池的關(guān)閉
7.線程池容量的動態(tài)調(diào)整

6.1、線程池狀態(tài)

 在ThreadPoolExecutor中定義了一個volatile變量,另外定義了幾個static final變量表示線程池的各個狀態(tài):

volatile int runState;
static final int RUNNING    = 0;
static final int SHUTDOWN   = 1;
static final int STOP       = 2;
static final int TERMINATED = 3;

runState表示當(dāng)前線程池的狀態(tài),它是一個volatile變量用來保證線程之間的可見性;下面的幾個static final變量表示runState可能的幾個取值。

       當(dāng)創(chuàng)建線程池后,初始時,線程池處于RUNNING狀態(tài);

  如果調(diào)用了shutdown()方法,則線程池處于SHUTDOWN狀態(tài),此時線程池不能夠接受新的任務(wù),它會等待所有任務(wù)執(zhí)行完畢;

  如果調(diào)用了shutdownNow()方法,則線程池處于STOP狀態(tài),此時線程池不能接受新的任務(wù),并且會去嘗試終止正在執(zhí)行的任務(wù);

  當(dāng)線程池處于SHUTDOWN或STOP狀態(tài),并且所有工作線程已經(jīng)銷毀,任務(wù)緩存隊列已經(jīng)清空或執(zhí)行結(jié)束后,線程池被設(shè)置為TERMINATED狀態(tài)。

6.2、線程池中的線程初始化

默認(rèn)情況下,創(chuàng)建線程池之后,線程池中是沒有線程的,需要提交任務(wù)之后才會創(chuàng)建線程。

在實際中如果需要線程池創(chuàng)建之后立即創(chuàng)建線程,可以通過以下兩個方法辦到:

  • prestartCoreThread():初始化一個核心線程;
  • prestartAllCoreThreads():初始化所有核心線程

下面是這2個方法的實現(xiàn):

public boolean prestartCoreThread() {
    return addIfUnderCorePoolSize(null); //注意傳進去的參數(shù)是null
}
 
public int prestartAllCoreThreads() {
    int n = 0;
    while (addIfUnderCorePoolSize(null))//注意傳進去的參數(shù)是null,等待任務(wù)隊列中有任務(wù)
        ++n;
    return n;
}

6.3、任務(wù)的執(zhí)行

任務(wù)提交基本邏輯如下:

如果當(dāng)前線程池中的線程poolSize數(shù)目小于corePoolSize,則每來一個任務(wù),就會創(chuàng)建一個線程去執(zhí)行這個任務(wù);
如果當(dāng)前線程池中的線程poolSize數(shù)目>=corePoolSize,則每來一個任務(wù),會嘗試將其添加到任務(wù)緩存隊列當(dāng)中,若添加成功,則該任務(wù)會等待空閑線程將其取出去執(zhí)行;若添加失?。ㄒ话銇碚f是任務(wù)緩存隊列已滿),則會嘗試創(chuàng)建新的線程去執(zhí)行這個任務(wù);
如果當(dāng)前線程池中的線程poolSize數(shù)目達到maximumPoolSize,則會采取任務(wù)拒絕策略進行處理;如果線程池中的線程poolSize數(shù)量大于 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數(shù)目不大于corePoolSize;
如果允許為核心池中的線程設(shè)置存活時間,那么核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。

    下面我們分析一下從任務(wù)提交給線程池到任務(wù)執(zhí)行完畢整個過程。

     我們先來看一下ThreadPoolExecutor類中其他的一些比較重要成員變量:

private final BlockingQueue<Runnable> workQueue;              //任務(wù)緩存隊列,用來存放等待執(zhí)行的任務(wù)
private final ReentrantLock mainLock = new ReentrantLock();   //線程池的主要狀態(tài)鎖,對線程池狀態(tài)(比如線程池大小
                                                              //、runState等)的改變都要使用這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>();  //用來存放工作集
 
private volatile long  keepAliveTime;    //線程存活時間   
private volatile boolean allowCoreThreadTimeOut;   //是否允許為核心線程設(shè)置存活時間
private volatile int   corePoolSize;     //核心池的大?。淳€程池中的線程數(shù)目大于這個參數(shù)時,提交的任務(wù)會被放進任務(wù)緩存隊列)
private volatile int   maximumPoolSize;   //線程池最大能容忍的線程數(shù)
 
private volatile int   poolSize;       //線程池中當(dāng)前的線程數(shù)
 
private volatile RejectedExecutionHandler handler; //任務(wù)拒絕策略
 
private volatile ThreadFactory threadFactory;   //線程工廠,用來創(chuàng)建線程
 
private int largestPoolSize;   //用來記錄線程池中曾經(jīng)出現(xiàn)過的最大線程數(shù),跟線程池的容量沒有任何關(guān)系
 
private long completedTaskCount;   //用來記錄已經(jīng)執(zhí)行完畢的任務(wù)個數(shù)

     每個變量的作用都已經(jīng)標(biāo)明出來了。

     下面我們我們看看任務(wù)從提交到最終執(zhí)行完畢經(jīng)歷了哪些過程。

在ThreadPoolExecutor類中,最核心的任務(wù)提交方法是execute()方法,雖然通過submit也可以提交任務(wù),但是實際上submit方法里面最終調(diào)用的還是execute()方法,所以我們只需要研究execute()方法的實現(xiàn)原理即可:

在源碼添加相關(guān)注解來解釋邏輯了:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //如果線程池中當(dāng)前線程數(shù)不小于核心池大小,創(chuàng)建線程來執(zhí)行任務(wù)
        if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
            if (runState == RUNNING && workQueue.offer(command)) {
                if (runState != RUNNING || poolSize == 0)
                    ensureQueuedTaskHandled(command);
            }
            else if (!addIfUnderMaximumPoolSize(command))
                reject(command); // is shutdown or saturated
        }
    }
 
    /**
     * 簡單拆解execute:
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //如果線程池中當(dāng)前線程數(shù)不小于核心池大小,
        if (poolSize >= corePoolSize ) {
            //創(chuàng)建新線程來執(zhí)行任務(wù)
            boolean isCreate = addIfUnderCorePoolSize(command);
            if (!isCreate) { //創(chuàng)建線程失?。杭磒oolSize>=corePoolSize或者runState不等于RUNNING)
                if (runState == RUNNING && workQueue.offer(command)) {//如果當(dāng)前線程池處于RUNNING狀態(tài),則將任務(wù)放入任務(wù)緩存隊列
                    if (runState != RUNNING || poolSize == 0)
                        ensureQueuedTaskHandled(command);//保證 添加到任務(wù)緩存隊列中的任務(wù)得到處理
                }
                //創(chuàng)建新線程來執(zhí)行任務(wù)
                boolean isUnderMaximumPoolSize = addIfUnderMaximumPoolSize(command);
                if (!isUnderMaximumPoolSize)
                    reject(command); // is shutdown or saturated
                 }
            }
            //創(chuàng)建線程來執(zhí)行任務(wù)
        }
    }
    */

我們看看2個關(guān)鍵方法的實現(xiàn):addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:

1、addIfUnderCorePoolSize:從名字可以看出意思就是當(dāng)線程池的線程低于corePoolSize大小時創(chuàng)建線程執(zhí)行任務(wù)。

2、addIfUnderMaximumPoolSize方法的實現(xiàn)和addIfUnderCorePoolSize方法的實現(xiàn)思想非常相似,唯一的區(qū)別在于addIfUnderMaximumPoolSize方法是在線程池中的線程數(shù)達到了核心池大小并且往任務(wù)隊列中添加任務(wù)失敗的情況下執(zhí)行

 /**
     * 當(dāng)線程池的線程低于corePoolSize大小時創(chuàng)建線程執(zhí)行任務(wù)
     * @param firstTask
     * @return
     */
    private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        //首先獲取到鎖
        final ReentrantLock mainLock = this.mainLock;
        //通過加鎖保證創(chuàng)建的線程是唯一
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)
                t = addThread(firstTask); //創(chuàng)建線程去執(zhí)行firstTask任務(wù)
        } finally {
            mainLock.unlock();
        }
        if (t == null)//創(chuàng)建線程失敗
            return false;
        t.start();//啟動線程
        return true;
    }
    /**
     * 和addIfUnderCorePoolSize方法的實現(xiàn)思想非常相似
     * @param firstTask
     * @return
     */
    private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < maximumPoolSize && runState == RUNNING)
                t = addThread(firstTask);
        } finally {
            mainLock.unlock();
        }
        if (t == null)
            return false;
        t.start();
        return true;
    }

創(chuàng)建新線程:首先用提交的任務(wù)創(chuàng)建了一個Worker對象,然后調(diào)用線程工廠threadFactory創(chuàng)建了一個新的線程t,然后將線程t的引用賦值給了Worker對象的成員變量thread,接著通過workers.add(w)將Worker對象添加到工作集當(dāng)中

/**
     * 創(chuàng)建新線程
     * @param firstTask
     * @return
     */
    private Thread addThread(Runnable firstTask) {
        Worker w = new Worker(firstTask);
        //創(chuàng)建一個線程,執(zhí)行任務(wù),實際就是Thread t = new Thread(w);
        Thread t = threadFactory.newThread(w);
        if (t != null) {
            w.thread = t;            //將創(chuàng)建的線程的引用賦值為w的成員變量
            workers.add(w);
            int nt = ++poolSize;     //當(dāng)前線程數(shù)加1
            if (nt > largestPoolSize)
                largestPoolSize = nt;
        }
        return t;
    }

 Worker執(zhí)行任務(wù): Worker實現(xiàn)了Runnable接口,最核心的方法是run()方法:

private final class Worker implements Runnable {
        private final ReentrantLock runLock = new ReentrantLock();
        private Runnable firstTask;
        volatile long completedTasks;
        Thread thread;
        Worker(Runnable firstTask) {
            this.firstTask = firstTask;
        }
        boolean isActive() {
            return runLock.isLocked();
        }
        void interruptIfIdle() {
            final ReentrantLock runLock = this.runLock;
            if (runLock.tryLock()) {
                try {
                    if (thread != Thread.currentThread())
                        thread.interrupt();
                } finally {
                    runLock.unlock();
                }
            }
        }
        void interruptNow() {
            thread.interrupt();
        }
 
        private void runTask(Runnable task) {
            final ReentrantLock runLock = this.runLock;
            runLock.lock();
            try {
                if (runState < STOP &&
                        Thread.interrupted() &&
                        runState >= STOP)
                    boolean ran = false;
                beforeExecute(thread, task);   //beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現(xiàn),用戶可以根據(jù)
                //自己需要重載這個方法和后面的afterExecute方法來進行一些統(tǒng)計信息,比如某個任務(wù)的執(zhí)行時間等
                try {
                    task.run();
                    ran = true;
                    afterExecute(task, null);
                    ++completedTasks;
                } catch (RuntimeException ex) {
                    if (!ran)
                        afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                runLock.unlock();
            }
        }
 
 
    /**
     * Worker實現(xiàn)了Runnable接口
     */
        public void run() {
            try {
                Runnable task = firstTask;
                firstTask = null;
                //從ThreadPoolExecutor任務(wù)緩存隊列里面去取
                while (task != null || (task = getTask()) != null) {
                    runTask(task);
                    task = null;
                }
            } finally {
                workerDone(this);   //當(dāng)任務(wù)隊列中沒有任務(wù)時,進行清理工作
            }
        }
    }
}

6.4、任務(wù)緩存隊列及排隊策略

  workQueue任務(wù)緩存隊列是用來存放等待執(zhí)行的任務(wù)。

  workQueue的類型為BlockingQueue<Runnable>,通??梢匀∠旅嫒N類型:

  1)ArrayBlockingQueue:基于數(shù)組的先進先出隊列,此隊列創(chuàng)建時必須指定大??;

  2)LinkedBlockingQueue:基于鏈表的先進先出隊列,如果創(chuàng)建時沒有指定此隊列大小,則默認(rèn)為Integer.MAX_VALUE;

  3)synchronousQueue:這個隊列比較特殊,它不會保存提交的任務(wù),而是將直接新建一個線程來執(zhí)行新來的任務(wù)。

6.5、任務(wù)拒絕策略

  當(dāng)線程池的任務(wù)緩存隊列已滿并且線程池中的線程數(shù)目達到maximumPoolSize,如果還有任務(wù)到來就會采取任務(wù)拒絕策略,通常有以下四種策略:

ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)

6.6、線程池的關(guān)閉

  ThreadPoolExecutor提供了兩個方法,用于線程池的關(guān)閉,分別是shutdown()和shutdownNow(),其中:

  • shutdown():不會立即終止線程池,而是要等所有任務(wù)緩存隊列中的任務(wù)都執(zhí)行完后才終止,但再也不會接受新的任務(wù)shutdownNow():立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊列,返回尚未執(zhí)行的任務(wù)

6.7、線程池容量的動態(tài)調(diào)整

  ThreadPoolExecutor提供了動態(tài)調(diào)整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

setCorePoolSize:設(shè)置核心池大小setMaximumPoolSize:設(shè)置線程池最大能創(chuàng)建的線程數(shù)目大小

  當(dāng)上述參數(shù)從小變大時,ThreadPoolExecutor進行線程賦值,還可能立即創(chuàng)建新的線程來執(zhí)行任務(wù)。

六、線程池的配置策略

要想合理的配置線程池,就必須首先分析任務(wù)特性,可以從以下幾個角度來進行分析:

  1. 任務(wù)的性質(zhì):CPU密集型任務(wù),IO密集型任務(wù)和混合型任務(wù)。
  2. 任務(wù)的優(yōu)先級:高,中和低。
  3. 任務(wù)的執(zhí)行時間:長,中和短。
  4. 任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接。

任務(wù)性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。

CPU密集型任務(wù):配置盡可能少的線程數(shù)量,如配置Ncpu+1個線程的線程池。cpu密集型任務(wù)需要大量的運算,而且沒有阻塞,需要CPU一直全速運行,CPU密集任務(wù)只有在真正的多核CPU上才可能得到加速??梢允褂肦untime.availableProcessors方法獲取可用處理器的個數(shù)。

一般計算公式:CPU核數(shù) + 1個線程的線程池

 ExecutorService THREAD_POOL = new ThreadPoolExecutor(
        Runtime.getRuntime().availableProcessors(),
        Runtime.getRuntime().availableProcessors() * 2,
        60, TimeUnit.SECONDS,
        new LinkedBlockingQueue<>(Runtime.getRuntime().availableProcessors() * 20),
        new ThreadPoolExecutor.CallerRunsPolicy());

《并發(fā)編程實戰(zhàn)》一書中對于IO密集型任務(wù)建議線程池大小設(shè)為Ncpu+1Ncpu+1,原因是當(dāng)計算密集型線程偶爾由于頁缺失故障或其他原因而暫停時,這個“額外的”線程也能確保這段時間內(nèi)的CPU始終周期不會被浪費。

對于計算密集型任務(wù),不要創(chuàng)建過多的線程,由于線程有執(zhí)行棧等內(nèi)存消耗,創(chuàng)建過多的線程不會加快計算速度,反而會消耗更多的內(nèi)存空間;另一方面線程過多,頻繁切換線程上下文也會影響線程池的性能。

IO密集型任務(wù):IO操作包括讀寫磁盤文件、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)請求等阻塞操作,執(zhí)行這些操作,線程將處于等待狀態(tài)。

io密集型由于需要等待IO操作,線程并不是一直在執(zhí)行任務(wù),則配置盡可能多的線程,如2*Ncpu。即該任務(wù)需要大量的IO,即大量的阻塞,這種類型分以下兩種情況設(shè)置

1、有等待io任務(wù):如果IO密集型任務(wù)線程并非一直在執(zhí)行任務(wù),則應(yīng)配置盡可能多的線程,如CPU核數(shù) * 2

2、需要一直運行的任務(wù):參考公式:CPU核數(shù) /(1 - 阻塞系數(shù) )                 阻塞系數(shù)在0.8~0.9之間

比如:8核CPU:8/(1 - 0.9) = 80個線程數(shù)

用sleep方式模擬IO阻塞:

public class IOThreadPoolTest {
 
    // 使用無限線程數(shù)的CacheThreadPool線程池
    static ThreadPoolExecutor cachedThreadPool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
 
    static List<Callable<Object>> tasks;
 
    // 仍然是5000個任務(wù)
    static int taskNum = 5000;
 
    static {
        tasks = new ArrayList<>(taskNum);
        for (int i = 0; i < taskNum; i++) {
            tasks.add(Executors.callable(new IOTask()));
        }
    }
 
    public static void main(String[] args) throws InterruptedException {
        cachedThreadPool.invokeAll(tasks);// warm up all thread
        testExecutor(cachedThreadPool, tasks);
// 看看執(zhí)行過程中創(chuàng)建了多少個線程
        int largestPoolSize = cachedThreadPool.getLargestPoolSize();
        System.out.println("largestPoolSize:" + largestPoolSize);
        cachedThreadPool.shutdown();
    }
 
    private static void testExecutor(ExecutorService executor, List<Callable<Object>> tasks)
            throws InterruptedException {
        long start = System.currentTimeMillis();
        executor.invokeAll(tasks);
        long end = System.currentTimeMillis();
        System.out.println(end - start);
    }
 
    static class IOTask implements Runnable {
 
        @Override
        public void run() {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
 
}

這里使用無線程數(shù)限制的CachedThreadPool線程池,也就是說這里的5000個任務(wù)會被5000個線程同時處理,由于所有的線程都只是阻塞而不消耗CPU資源,所以5000個任務(wù)在不到2秒的時間內(nèi)就執(zhí)行完了。

很明顯使用CachedThreadPool能有效提高IO密集型任務(wù)的吞吐量,而且由于CachedThreadPool中的線程會在空閑60秒自動回收,所以不會消耗過多的資源。

但是打開任務(wù)管理器你會發(fā)現(xiàn)執(zhí)行任務(wù)的同時內(nèi)存會飆升到接近400M,因為每個線程都消耗了一部分內(nèi)存,在5000個線程創(chuàng)建之后,內(nèi)存消耗達到了峰值。

所以使用CacheThreadPool的時候應(yīng)該避免提交大量長時間阻塞的任務(wù),以防止內(nèi)存溢出;另一種替代方案是,使用固定大小的線程池,并給一個較大的線程數(shù)(不會內(nèi)存溢出),同時為了在空閑時節(jié)省內(nèi)存資源,調(diào)用allowCoreThreadTimeOut允許核心線程超時。

線程執(zhí)行棧的大小可以通過-Xss*size*或-XX:ThreadStackSize參數(shù)調(diào)整

混合型的任務(wù):如果可以拆分,則將其拆分成一個CPU密集型任務(wù)和一個IO密集型任務(wù),只要這兩個任務(wù)執(zhí)行的時間相差不是太大,那么分解后執(zhí)行的吞吐率要高于串行執(zhí)行的吞吐率,如果這兩個任務(wù)執(zhí)行時間相差太大,則沒必要進行分解。我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個數(shù)。

混合型任務(wù)要根據(jù)任務(wù)等待阻塞時間與CPU計算時間的比重來決定線程數(shù)量:

threads=cores1–blockingCoefficient=cores∗(1+waitTimecomputeTime)

threads=cores1–blockingCoefficient=cores∗(1+waitTimecomputeTime)

比如一個任務(wù)包含一次數(shù)據(jù)庫讀寫(0.1ms),并在內(nèi)存中對讀取的數(shù)據(jù)進行分組過濾等操作(5μs),那么線程數(shù)應(yīng)該為80左右。

線程數(shù)與阻塞比例的關(guān)系圖大致如下:

當(dāng)阻塞比例為0,也就是純計算任務(wù),線程數(shù)等于核心數(shù)(這里是4);阻塞比例越大,線程池的線程數(shù)應(yīng)該更多。

《Java并發(fā)編程實戰(zhàn)》中最原始的公式是這樣的:

>Nthreads=Ncpu∗Ucpu∗(1+WC)>
>Nthreads=Ncpu∗Ucpu∗(1+WC)>

NcpuNcpu代表CPU的個數(shù),UcpuUcpu代表CPU利用率的期望值(0<Ucpu<10<Ucpu<1),WCWC仍然是等待時間與計算時間的比例。
我上面提供的公式相當(dāng)于目標(biāo)CPU利用率為100%。

通常系統(tǒng)中不止一個線程池,所以實際配置線程數(shù)應(yīng)該將目標(biāo)CPU利用率計算進去。

優(yōu)先級不同的任務(wù)可以使用優(yōu)先級隊列PriorityBlockingQueue來處理。它可以讓優(yōu)先級高的任務(wù)先得到執(zhí)行,需要注意的是如果一直有優(yōu)先級高的任務(wù)提交到隊列里,那么優(yōu)先級低的任務(wù)可能永遠(yuǎn)不能執(zhí)行。

執(zhí)行時間不同的任務(wù)可以交給不同規(guī)模的線程池來處理,或者也可以使用優(yōu)先級隊列,讓執(zhí)行時間短的任務(wù)先執(zhí)行。

依賴數(shù)據(jù)庫連接池的任務(wù),因為線程提交SQL后需要等待數(shù)據(jù)庫返回結(jié)果,如果等待的時間越長CPU空閑時間就越長,那么線程數(shù)應(yīng)該設(shè)置越大,這樣才能更好的利用CPU。

建議使用有界隊列,有界隊列能增加系統(tǒng)的穩(wěn)定性和預(yù)警能力,可以根據(jù)需要設(shè)大一點,比如幾千。有一次我們組使用的后臺任務(wù)線程池的隊列和線程池全滿了,不斷的拋出拋棄任務(wù)的異常,通過排查發(fā)現(xiàn)是數(shù)據(jù)庫出現(xiàn)了問題,導(dǎo)致執(zhí)行SQL變得非常緩慢,因為后臺任務(wù)線程池里的任務(wù)全是需要向數(shù)據(jù)庫查詢和插入數(shù)據(jù)的,所以導(dǎo)致線程池里的工作線程全部阻塞住,任務(wù)積壓在線程池里。如果當(dāng)時我們設(shè)置成無界隊列,線程池的隊列就會越來越多,有可能會撐滿內(nèi)存,導(dǎo)致整個系統(tǒng)不可用,而不只是后臺任務(wù)出現(xiàn)問題。當(dāng)然我們的系統(tǒng)所有的任務(wù)是用的單獨的服務(wù)器部署的,而我們使用不同規(guī)模的線程池跑不同類型的任務(wù),但是出現(xiàn)這樣問題時也會影響到其他任務(wù)。

總結(jié):線程池的大小取決于任務(wù)的類型以及系統(tǒng)的特性,避免“過大”和“過小”兩種極端。線程池過大,大量的線程將在相對更少的CPU和有限的內(nèi)存資源上競爭,這不僅影響并發(fā)性能,還會因過高的內(nèi)存消耗導(dǎo)致OOM;線程池過小,將導(dǎo)致處理器得不到充分利用,降低吞吐率。

要想正確的設(shè)置線程池大小,需要了解部署的系統(tǒng)中有多少個CPU,多大的內(nèi)存,提交的任務(wù)是計算密集型、IO密集型還是兩者兼有。

七、線程池創(chuàng)建注意

   阿里巴巴編碼規(guī)范里面提到:

   線程池最好不要使用Executors去創(chuàng)建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學(xué)更加明確線程池的運行規(guī)則,規(guī)避資源耗盡的風(fēng)險。 說明:Executors各個方法的弊端:

1)newFixedThreadPool和newSingleThreadExecutor:

  主要問題是堆積的請求處理隊列可能會耗費非常大的內(nèi)存,甚至OOM。

2)newCachedThreadPool和newScheduledThreadPool:

  主要問題是線程數(shù)最大數(shù)是Integer.MAX_VALUE,可能會創(chuàng)建數(shù)量非常多的線程,甚至OOM。

如果使用阿里巴巴的java編程規(guī)范插件,就會有這個提示:

ExecutorService newFixedThreadPool(int nThreads):固定大小線程池。

源碼可以看到,corePoolSize和maximumPoolSize的大小是一樣的(如果使用無界queue的話maximumPoolSize參數(shù)是沒有意義的),keepAliveTime和unit的設(shè)值表名什么?-就是該實現(xiàn)不想keep alive!最后的BlockingQueue選擇了LinkedBlockingQueue,該queue有一個特點,他是無界的。

public static ExecutorService newFixedThreadPool(int nThreads) {  
        return new ThreadPoolExecutor(nThreads, nThreads,  
                                      0L, TimeUnit.MILLISECONDS,  
                                      new LinkedBlockingQueue<Runnable>());  
    }  

ExecutorService newSingleThreadExecutor():單線程。

可以看到,與fixedThreadPool很像,只不過fixedThreadPool中的入?yún)⒅苯油嘶癁?

public static ExecutorService newSingleThreadExecutor() {  
        return new FinalizableDelegatedExecutorService  
            (new ThreadPoolExecutor(1, 1,  
                                    0L, TimeUnit.MILLISECONDS,  
                                    new LinkedBlockingQueue<Runnable>()));  
    }  

ExecutorService newCachedThreadPool():無界線程池,可以進行自動線程回收。

這個實現(xiàn)就有意思了。首先是無界的線程池,所以我們可以發(fā)現(xiàn)maximumPoolSize為big big。其次BlockingQueue的選擇上使用SynchronousQueue??赡軐τ谠揃lockingQueue有些陌生,簡單說:該QUEUE中,每個插入操作必須等待另一個

線程的對應(yīng)移除操作。比如,我先添加一個元素,接下來如果繼續(xù)想嘗試添加則會阻塞,直到另一個線程取走一個元素,反之亦然。(想到什么?就是緩沖區(qū)為1的生產(chǎn)者消費者模式^_^)

注意到介紹中的自動回收線程的特性嗎,為什么呢?先不說,但注意到該實現(xiàn)中corePoolSize和maximumPoolSize的大小不同。

public static ExecutorService newCachedThreadPool() {  
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  
                                      60L, TimeUnit.SECONDS,  
                                      new SynchronousQueue<Runnable>());  
    }  

總結(jié):

  1. ThreadPoolExecutor的使用還是很有技巧的。
  2. 使用無界queue可能會耗盡系統(tǒng)資源。
  3. 使用有界queue可能不能很好的滿足性能,需要調(diào)節(jié)線程數(shù)和queue大小
  4. 線程數(shù)自然也有開銷,所以需要根據(jù)不同應(yīng)用進行調(diào)節(jié)。

通常來說對于靜態(tài)任務(wù)可以歸為:

  1. 數(shù)量大,但是執(zhí)行時間很短
  2. 數(shù)量小,但是執(zhí)行時間較長
  3. 數(shù)量又大執(zhí)行時間又長
  4. 除了以上特點外,任務(wù)間還有些內(nèi)在關(guān)系

總結(jié)2:

  • 對于需要保證所有提交的任務(wù)都要被執(zhí)行的情況,使用FixedThreadPool
  • 如果限定只能使用一個線程進行任務(wù)處理,使用SingleThreadExecutor
  • 如果希望提交的任務(wù)盡快分配線程執(zhí)行,使用CachedThreadPool
  • 如果業(yè)務(wù)上允許任務(wù)執(zhí)行失敗,或者任務(wù)執(zhí)行過程可能出現(xiàn)執(zhí)行時間過長進而影響其他業(yè)務(wù)的應(yīng)用場景,可以通過使用限定線程數(shù)量的線程池以及限定長度的隊列進行容錯處理。

到此這篇關(guān)于java多線程和線程池的文章就介紹到這了,更多相關(guān)java多線程和線程池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 淺談線性表的原理及簡單實現(xiàn)方法

    淺談線性表的原理及簡單實現(xiàn)方法

    下面小編就為大家?guī)硪黄獪\談線性表的原理及簡單實現(xiàn)方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-06-06
  • Springboot整合mybatis的步驟

    Springboot整合mybatis的步驟

    這篇文章主要介紹了Springboot整合mybatis的步驟,幫助大家更好的理解和學(xué)習(xí)使用
    2021-04-04
  • Java二叉樹的四種遍歷方式詳解

    Java二叉樹的四種遍歷方式詳解

    這篇文章主要介紹了Java二叉樹的四種遍歷,二叉樹的遍歷可以分為前序、中序、后序、層次遍歷,需要的朋友可以參考下
    2021-11-11
  • SpringBoot部署到Linux讀取resources下的文件及遇到的坑

    SpringBoot部署到Linux讀取resources下的文件及遇到的坑

    本文主要給大家介紹SpringBoot部署到Linux讀取resources下的文件,在平時業(yè)務(wù)開發(fā)過程中,很多朋友在獲取到文件內(nèi)容亂碼或者文件讀取不到的問題,今天給大家分享小編遇到的坑及處理方案,感興趣的朋友跟隨小編一起看看吧
    2021-06-06
  • 教你如何使用Java8實現(xiàn)菜單樹形數(shù)據(jù)

    教你如何使用Java8實現(xiàn)菜單樹形數(shù)據(jù)

    今天給大家?guī)淼氖顷P(guān)于JAVA的相關(guān)知識,文中圍繞著如何使用Java8實現(xiàn)菜單樹形數(shù)據(jù)展開,文中有非常詳細(xì)的介紹及代碼示例,需要的朋友可以參考下
    2021-06-06
  • Java實現(xiàn)飛機小游戲

    Java實現(xiàn)飛機小游戲

    這篇文章主要為大家詳細(xì)介紹了Java實現(xiàn)飛機小游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-06-06
  • Java常用正則表達式驗證工具類RegexUtils.java

    Java常用正則表達式驗證工具類RegexUtils.java

    相信大家對正則表達式一定都有所了解和研究,這篇文章主要為大家分享了Java 表單注冊常用正則表達式驗證工具類,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2016-11-11
  • 詳解Java中IO字節(jié)流基本操作(復(fù)制文件)并測試性能

    詳解Java中IO字節(jié)流基本操作(復(fù)制文件)并測試性能

    這篇文章主要介紹了Java中IO字節(jié)流基本操作(復(fù)制文件)并測試性能,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-04-04
  • Java實現(xiàn)自定義枚舉值校驗器的示例代碼

    Java實現(xiàn)自定義枚舉值校驗器的示例代碼

    這篇文章主要為大家詳細(xì)介紹了Java實現(xiàn)自定義枚舉值校驗器的相關(guān)資料,文中的示例代碼講解詳細(xì),具有一定的借鑒價值,需要的可以參考一下
    2023-02-02
  • 5分鐘讓你快速掌握java8 stream常用開發(fā)技巧

    5分鐘讓你快速掌握java8 stream常用開發(fā)技巧

    這篇文章主要給大家介紹了關(guān)于java8 stream常用開發(fā)技巧的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-12-12

最新評論