一文徹底搞懂java多線程和線程池
什么是線程
傳統(tǒng)的程序設(shè)計(jì)語言同一時(shí)刻只能執(zhí)行單任務(wù)操作,效率非常低,如果網(wǎng)絡(luò)程序在接收數(shù)據(jù)時(shí)發(fā)生阻塞,只能等到程序接收數(shù)據(jù)之后才能繼續(xù)運(yùn)行。隨著 Internet 的飛速發(fā)展,這種單任務(wù)運(yùn)行的狀況越來越不被接受。如果網(wǎng)絡(luò)接收數(shù)據(jù)阻塞,后臺(tái)服務(wù)程序就會(huì)一直處于等待狀態(tài)而不能繼續(xù)任何操作。 這種阻塞情況經(jīng)常發(fā)生, 這時(shí)的 CPU資源完全處于閑置狀態(tài)。
多線程實(shí)現(xiàn)后臺(tái)服務(wù)程序可以同時(shí)處理多個(gè)任務(wù),并不發(fā)生阻塞現(xiàn)象。多線程是 Java 語言的一個(gè)很重要的特征。 多線程程序設(shè)計(jì)最大的特點(diǎn)就是能夠提高程序執(zhí)行效率和處理速度。Java 程序可同時(shí)并行運(yùn)行多個(gè)相對(duì)獨(dú)立的線程。例如創(chuàng)建一個(gè)線程來接收數(shù)據(jù),另一個(gè)線程發(fā)送數(shù)據(jù),既使發(fā)送線程在接收數(shù)據(jù)時(shí)被阻塞,接受數(shù)據(jù)線程仍然可以運(yùn)行。 線程(Thread)是控制線程(Thread of Control)的縮寫,它是具有一定順序的指令序列(即所編寫的程序代碼)、存放方法中定義局部變量的棧和一些共享數(shù)據(jù)。線程是相互獨(dú)立的,每個(gè)方法的局部變量和其他線程的局部變量是分開的,因此,任何線程都不能訪問除自身之外的其他線程的局部變量。如果兩個(gè)線程同時(shí)訪問同一個(gè)方法,那每個(gè)線程將各自得到此方法的一個(gè)拷貝。
Java 提供的多線程機(jī)制使一個(gè)程序可同時(shí)執(zhí)行多個(gè)任務(wù)。線程有時(shí)也被稱為小進(jìn)程,它是從一個(gè)大進(jìn)程里分離出來的小的獨(dú)立的線程。由于實(shí)現(xiàn)了多線程技術(shù),Java 顯得更健壯。多線程帶來的好處是更好的交互性能和實(shí)時(shí)控制性能。多線程是強(qiáng)大而靈巧的編程工具,但要用好它卻不是件容易的事。在多線程編程中,每個(gè)線程都通過代碼實(shí)現(xiàn)線程的行為,并將數(shù)據(jù)供給代碼操作。編碼和數(shù)據(jù)有時(shí)是相當(dāng)獨(dú)立的,可分別向線程提供。多個(gè)線程可以同時(shí)處理同一代碼和同一數(shù)據(jù),不同的線程也可以處理各自不同的編碼和數(shù)據(jù)。
一. Java實(shí)現(xiàn)線程的三種方式
先簡(jiǎn)單看看java多線程如何實(shí)現(xiàn)的:
1.1、繼承Thread類
讓自己的類繼承 Thread 類:
public class Test extends Thread { public static void main(String[] args) { Thread t = new Test(); t.start(); } @Override public void run() { System.out.println("Override run() ..."); } }
用Thread類的方式創(chuàng)建多線程的特點(diǎn):
1、因?yàn)榫€程已經(jīng)繼承Thread類,所以不可以再繼承其它類。
2、如果需要訪問當(dāng)前線程,直接使用this即可。
1.2、實(shí)現(xiàn)Runnable接口,并覆寫run方法
實(shí)現(xiàn) Runnable 接口:
public class RunnableTest implements Runnable { public static void main(String[] args) throws Exception{ Thread t = new Thread(new RunnableTest()); t.start(); } @Override public void run(){ System.out.println(123); } }
1.3、實(shí)現(xiàn)Callable接口,并覆寫call方法
- 實(shí)現(xiàn)Callable接口的類TCallable
- 以TCallable為參數(shù)創(chuàng)建FutureTask對(duì)象
- 將FutureTask作為參數(shù)創(chuàng)建Thread對(duì)象
- 調(diào)用線程對(duì)象的start()方法
public class TCallable implements Callable{ public static void main() { FutureTask futureTask = new FutureTask(new TCallable()); Thread thread = new Thread(futureTask); thread.start(); try { Object o = futureTask.get(); System.out.println(o); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } @Override public Object call() throws Exception { return "Override call() ..."; } }
用Runnable與Callable接口的方式創(chuàng)建多線程的特點(diǎn):
1、線程類只是實(shí)現(xiàn)了Runnable接口或Callable接口,還可以繼承其它類。
2、在這種方式下,多個(gè)線程可以共享一個(gè)target對(duì)象,所以非常適合多個(gè)線程來處理同一份資源情況。
3、如果需要訪問當(dāng)前線程,需要使用Thread.currentThread方法。
4、Callable接口與Runnable接口相比,只是Callable接口可以返回值而已。
主要介紹第三種方式Callable,因?yàn)橹v到線程池的時(shí)候,大多時(shí)候使用Callable接口。
二. Callable接口
Callable用于產(chǎn)生結(jié)果,F(xiàn)uture用于獲取結(jié)果。
2.1 Callable接口
Callable接口代表一段可以調(diào)用并返回結(jié)果的代碼;
Java 5在concurrency包中引入了java.util.concurrent.Callable 接口,它和Runnable接口很相似,但是 Runnable 不會(huì)返回結(jié)果,并且無法拋出返回結(jié)果的異常。 但Callable可以返回一個(gè)對(duì)象或者拋出一個(gè)異常。
Callable接口使用泛型去定義它的返回類型。Executors類提供了一些有用的方法在線程池中執(zhí)行Callable內(nèi)的任務(wù)。由于Callable任務(wù)是并行的,我們必須等待它返回的結(jié)果。
Callable接口的源碼如下:
public interface Callable<V> { V call() throws Exception; // 計(jì)算結(jié)果 }
2.2 Future接口
Future用于表示異步計(jì)算的結(jié)果。
由于Callable任務(wù)是并行的,我們必須等待它返回的結(jié)果。那如何獲取返回結(jié)果?java.util.concurrent.Future對(duì)象為我們解決了這個(gè)問題。在線程池提交Callable任務(wù)后返回了一個(gè)Future對(duì)象,使用Future可以知道Callable任務(wù)的狀態(tài)和得到Callable返回的執(zhí)行結(jié)果。Future提供get()方法讓我們可以等待Callable結(jié)束并獲取它的執(zhí)行結(jié)果。
Future接口的源碼如下:從源碼我們可以知道,F(xiàn)uture可以對(duì)異步運(yùn)算的任務(wù)的結(jié)果進(jìn)行等待獲取、判斷是否已經(jīng)完成、取消任務(wù)等操作。
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning);// 試圖取消對(duì)此任務(wù)的執(zhí)行 boolean isCancelled(); // 如果在任務(wù)正常完成前將其取消,則返回 true boolean isDone(); // 如果任務(wù)已完成,則返回 true V get() throws InterruptedException, ExecutionException; // 如有必要,等待計(jì)算完成,然后獲取其結(jié)果 // 如有必要,最多等待為使計(jì)算完成所給定的時(shí)間之后,獲取其結(jié)果(如果結(jié)果可用)。 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
2.3 Future實(shí)現(xiàn)類是FutureTask。
如果不想分支線程阻塞主線程,又想取得分支線程的執(zhí)行結(jié)果,就用FutureTask , FutureTask實(shí)現(xiàn)了RunnableFuture接口,這個(gè)接口的定義如下:
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
可以看到這個(gè)接口實(shí)現(xiàn)了Runnable和Future接口,接口中的具體實(shí)現(xiàn)由FutureTask來實(shí)現(xiàn)。這個(gè)類的構(gòu)造方法如下 :
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; // state記錄FutureTask的狀態(tài) this.state = NEW; // ensure visibility of callable }
FutureTask 構(gòu)造函數(shù)傳入一個(gè) Callable 的具體實(shí)現(xiàn)類,一個(gè) FutureTask 對(duì)象可以對(duì)調(diào)用了 Callable 和 Runnable 的對(duì)象進(jìn)行包裝,由于 FutureTask 也是Runnable 接口的實(shí)現(xiàn)類,所以 FutureTask 也可以放入線程池中。
在web容器使用這種多線程的方式,要記住記得shutdown關(guān)閉,否則可能導(dǎo)致線程沒有被關(guān)閉回收,結(jié)果線程數(shù)一直增加
當(dāng)線程數(shù)太多時(shí),肯定會(huì)導(dǎo)致內(nèi)存溢出或者影響服務(wù)器性能等。
三. Java線程池
既然單個(gè)線程的創(chuàng)建和銷毀都很簡(jiǎn)單,我們?yōu)槭裁匆褂镁€程池?
3.1、背景
JAVA是面向?qū)ο螅琂AVA虛擬機(jī)每創(chuàng)建一個(gè)對(duì)象都要獲取內(nèi)存資源或者其他很多其他資源,同時(shí)試圖跟蹤每個(gè)對(duì)象,以便可以在對(duì)象銷毀后進(jìn)行垃圾回收。因此創(chuàng)建和銷毀對(duì)象是非常費(fèi)時(shí)間的。所以為了盡可能提高應(yīng)用程序資源使用效率,就是要降低就是盡可能降低創(chuàng)建和銷毀對(duì)象的次數(shù),特別是一些非常耗資源的對(duì)象創(chuàng)建和銷毀,這就是一些"池化資源"技術(shù)產(chǎn)生的原因,比如大家熟悉的數(shù)據(jù)庫(kù)連接池。
針對(duì)線程而言:
線程占用系統(tǒng)內(nèi)存:雖然線程相對(duì)于進(jìn)程而言是輕量級(jí)的,創(chuàng)建線程依然需要占用系統(tǒng)的內(nèi)存資源。如果無限制的創(chuàng)建線程,對(duì)應(yīng)虛擬機(jī)垃圾回收而言也是很有壓力的,畢竟線程也是對(duì)象。
線程的創(chuàng)建和關(guān)閉是需要花費(fèi)時(shí)間的:如果任務(wù)非常多,頻繁的創(chuàng)建和銷毀線程也是需要占用系統(tǒng)的時(shí)間片的。
3.2、作用
1)降低資源消耗,重復(fù)利用已經(jīng)創(chuàng)建好的線程,降低線程創(chuàng)建和銷毀造成的資源消耗
2)提高響應(yīng)速度,當(dāng)任務(wù)到達(dá)時(shí),任務(wù)可以不需要等到線程創(chuàng)建就可以立即執(zhí)行
3)提高線程的可管理性,線程是稀缺資源,如果無限制的創(chuàng)建,不僅消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。
3.3、應(yīng)用范圍
1)、需要大量的線程來完成任務(wù)。且完成任務(wù)的時(shí)間比較短。
http請(qǐng)求這種任務(wù),使用線程池技術(shù)是很合適的。由于單個(gè)任務(wù)小,而任務(wù)數(shù)量巨大,你能夠想象一個(gè)熱門站點(diǎn)的點(diǎn)擊次數(shù)。
如果一個(gè)線程的任務(wù)執(zhí)行時(shí)間非常長(zhǎng),就沒必要用線程池,比如一個(gè)telnet連接請(qǐng)求,線程池的優(yōu)勢(shì)就不明顯了。
2、對(duì)性能要求苛刻的應(yīng)用,比方要求server迅速響應(yīng)客戶請(qǐng)求。
3、接受突發(fā)性的大量請(qǐng)求,但不至于使server因此產(chǎn)生大量線程的應(yīng)用。突發(fā)性大量客戶請(qǐng)求,在沒有線程池情況下,將產(chǎn)生大量線程,盡管理論上大部分操作系統(tǒng)線程數(shù)目最大值不是問題,短時(shí)間內(nèi)產(chǎn)生大量線程可能使內(nèi)存到達(dá)極限,并出現(xiàn)"OutOfMemory"的錯(cuò)誤。
四. Java 線程池框架Executor
一個(gè)線程池包含下面四個(gè)基本組成部分:
1、線程池管理器(ThreadPool):用于創(chuàng)建并管理線程池。包含 創(chuàng)建線程池,銷毀線程池,加入新任務(wù);
2、工作線程(PoolWorker):線程池中線程,在沒有任務(wù)時(shí)處于等待狀態(tài)。能夠循環(huán)的運(yùn)行任務(wù);
3、任務(wù)接口(Task):每一個(gè)任務(wù)必須實(shí)現(xiàn)的接口,以供工作線程調(diào)度任務(wù)的運(yùn)行。它主要規(guī)定了任務(wù)的入口。任務(wù)運(yùn)行完后的收尾工作,任務(wù)的運(yùn)行狀態(tài)等。
4、任務(wù)隊(duì)列(taskQueue):用于存放沒有處理的任務(wù)。提供一種緩沖機(jī)制。
下面我們主要官方提供的線程池管理工具Executor框架。
Executor框架是一個(gè)根據(jù)一組執(zhí)行策略調(diào)用,調(diào)度,執(zhí)行和控制的異步任務(wù)的框架。無限制的創(chuàng)建線程會(huì)引起應(yīng)用程序內(nèi)存溢出。所以創(chuàng)建一個(gè)線程池是個(gè)更好的的解決方案,因?yàn)榭梢韵拗凭€程的數(shù)量并且可以回收再利用這些線程。Executor框架包括:線程池,Executor,Executors,ExecutorService,CompletionService,F(xiàn)uture,Callable等。
4.1、類圖:
Executor是一個(gè)頂層接口,在它里面只聲明了一個(gè)方法execute(Runnable),返回值為void,參數(shù)為Runnable類型,從字面意思可以理解,就是用來執(zhí)行傳進(jìn)去的任務(wù)的;
然后ExecutorService接口繼承了Executor接口,并聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象類AbstractExecutorService實(shí)現(xiàn)了ExecutorService接口,基本實(shí)現(xiàn)了ExecutorService中聲明的所有方法;
然后ThreadPoolExecutor繼承了類AbstractExecutorService。
4.2 核心類ThreadPoolExecutor:
java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個(gè)類,因此如果要透徹地了解Java中的線程池,必須先了解這個(gè)類。下面我們來看一下ThreadPoolExecutor類的具體實(shí)現(xiàn)源碼。
在ThreadPoolExecutor類中提供了四個(gè)構(gòu)造方法:
public class ThreadPoolExecutor extends AbstractExecutorService { ..... public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue); public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory); public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler); public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler); ... }
ThreadPoolExecutor提供了四個(gè)構(gòu)造器,事實(shí)上,通過觀察每個(gè)構(gòu)造器的源碼具體實(shí)現(xiàn),發(fā)現(xiàn)前面三個(gè)構(gòu)造器都是調(diào)用的第四個(gè)構(gòu)造器進(jìn)行的初始化工作。下面解釋下一下構(gòu)造器中各個(gè)參數(shù)的含義:
public ThreadPoolExecutor(int corePoolSize, //線程池核心線程數(shù)量 int maximumPoolSize, //線程池最大線程數(shù)量 long keepAliveTime, //線程KeepAlive時(shí)間,當(dāng)線程池?cái)?shù)量超過核心線程數(shù)量以后,idle時(shí)間超過這個(gè)值的線程會(huì)被終止 TimeUnit unit, //線程KeepAlive時(shí)間單位 BlockingQueue<Runnable> workQueue, //任務(wù)隊(duì)列 ThreadFactory threadFactory, //創(chuàng)建線程的工廠對(duì)象 RejectedExecutionHandler handler)
corePoolSize: 核心池的大小,這個(gè)參數(shù)跟后面講述的線程池的實(shí)現(xiàn)原理有非常大的關(guān)系。在創(chuàng)建了線程池后,默認(rèn)情況下,線程池中并沒有任何線程,而是等待有任務(wù)到來才創(chuàng)建線程去執(zhí)行任務(wù),除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個(gè)方法的名字就可以看出,是預(yù)創(chuàng)建線程的意思,即在沒有任務(wù)到來之前就創(chuàng)建corePoolSize個(gè)線程或者一個(gè)線程。默認(rèn)情況下,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0,當(dāng)有任務(wù)來之后,就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達(dá)到corePoolSize后,就會(huì)把到達(dá)的任務(wù)放到緩存隊(duì)列當(dāng)中;
maximumPoolSize:線程池最大線程數(shù),這個(gè)參數(shù)也是一個(gè)非常重要的參數(shù),它表示在線程池中最多能創(chuàng)建多少個(gè)線程;
keepAliveTime:表示線程沒有任務(wù)執(zhí)行時(shí)最多保持多久時(shí)間會(huì)終止。默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),keepAliveTime才會(huì)起作用,直到線程池中的線程數(shù)不大于corePoolSize,即當(dāng)線程池中的線程數(shù)大于corePoolSize時(shí),如果一個(gè)線程空閑的時(shí)間達(dá)到keepAliveTime,則會(huì)終止,直到線程池中的線程數(shù)不超過corePoolSize。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時(shí),keepAliveTime參數(shù)也會(huì)起作用,直到線程池中的線程數(shù)為0;
unit:參數(shù)keepAliveTime的時(shí)間單位,有7種取值,在TimeUnit類中有7種靜態(tài)屬性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小時(shí)
TimeUnit.MINUTES; //分鐘
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //納秒
workQueue:一個(gè)阻塞隊(duì)列,用來存儲(chǔ)等待執(zhí)行的任務(wù),這個(gè)參數(shù)的選擇也很重要,會(huì)對(duì)線程池的運(yùn)行過程產(chǎn)生重大影響,一般來說,這里的阻塞隊(duì)列有以下幾種選擇:ArrayBlockingQueue;LinkedBlockingQueue;SynchronousQueue;
1. ArrayBlockingQueue : 有界的數(shù)組隊(duì)列
2. LinkedBlockingQueue : 可支持有界/無界的隊(duì)列,使用鏈表實(shí)現(xiàn)
3. PriorityBlockingQueue : 優(yōu)先隊(duì)列,可以針對(duì)任務(wù)排序
4. SynchronousQueue : 隊(duì)列長(zhǎng)度為1的隊(duì)列,和Array有點(diǎn)區(qū)別就是:client thread提交到block queue會(huì)是一個(gè)阻塞過程,直到有一個(gè)worker thread連接上來poll task。
ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊(duì)策略與BlockingQueue有關(guān)。
threadFactory:線程工廠,主要用來創(chuàng)建線程;
handler:表示當(dāng)拒絕處理任務(wù)時(shí)的策略,有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
在ThreadPoolExecutor類中有幾個(gè)非常重要的方法:
execute()
submit()
shutdown()
shutdownNow()
execute()方法: 實(shí)際上是Executor中聲明的方法,在ThreadPoolExecutor進(jìn)行了具體的實(shí)現(xiàn),這個(gè)方法是ThreadPoolExecutor的核心方法,通過這個(gè)方法可以向線程池提交一個(gè)任務(wù),交由線程池去執(zhí)行。
submit()方法: 是在ExecutorService中聲明的方法,在AbstractExecutorService就已經(jīng)有了具體的實(shí)現(xiàn),在ThreadPoolExecutor中并沒有對(duì)其進(jìn)行重寫,這個(gè)方法也是用來向線程池提交任務(wù)的,但是它和execute()方法不同,它能夠返回任務(wù)執(zhí)行的結(jié)果,去看submit()方法的實(shí)現(xiàn),會(huì)發(fā)現(xiàn)它實(shí)際上還是調(diào)用的execute()方法,只不過它利用了Future來獲取任務(wù)執(zhí)行結(jié)果(Future相關(guān)內(nèi)容將在下一篇講述)。
shutdown()和shutdownNow()是用來關(guān)閉線程池的。
還有很多其他的方法:
比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關(guān)屬性的方法,有興趣的朋友可以自行查閱API。
4.3 ThreadPoolExecutor邏輯結(jié)構(gòu)
ThreadPoolExecutor線程池的邏輯結(jié)構(gòu):
第一步、初始的poolSize < corePoolSize,提交的runnable任務(wù),會(huì)直接做為new一個(gè)Thread的參數(shù),立馬執(zhí)行
第二步、當(dāng)提交的任務(wù)數(shù)超過了corePoolSize,就進(jìn)入了第二步操作。會(huì)將當(dāng)前的runable提交到一個(gè)block queue中
第三步,如果block queue是個(gè)有界隊(duì)列,當(dāng)隊(duì)列滿了之后就進(jìn)入了第三步。如果poolSize < maximumPoolsize時(shí),會(huì)嘗試new 一個(gè)Thread的進(jìn)行救急處理,立馬執(zhí)行對(duì)應(yīng)的runnable任務(wù)
第四步,如果第三步救急方案也無法處理了,就會(huì)走到第四步執(zhí)行reject操作
五.Executor線程池實(shí)例
比如我們要并發(fā)調(diào)用遠(yuǎn)程http接口:
5.1、使用Runnable接口創(chuàng)建線程,并由executor調(diào)度執(zhí)行:
import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; public class TExecutor { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 5; i++) { RunnableTask runnable = new RunnableTask("http://xxxx.com/api/v2", Integer.toString(i)); executorService.execute(runnable); } System.out.println("線程任務(wù)開始執(zhí)行"); executorService.shutdown(); } /** * 線程池提交Runnable任務(wù) */ public void runnableTask() { ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<?> futureTask = null; for (int i = 0; i < 5; i++) { RunnableTask runnable = new RunnableTask("http://xxxx.com/api/v2", Integer.toString(1)); futureTask = executorService.submit(runnable); try { futureTask.get(10, TimeUnit.SECONDS); //等待超時(shí),等待給定的時(shí)間之后,獲取其結(jié)果 } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); } finally{ futureTask.cancel(true); } } executorService.shutdown(); } } class RunnableTask implements Runnable { String url; String param; public RunnableTask(String url, String param) { this.url = url; this.param = param; } //http調(diào)用 public String requestHttp() { String result = ""; return result; } @Override public void run() { System.out.println("正在執(zhí)行task "+param); try { requestHttp(); } catch (Exception e) { e.printStackTrace(); } System.out.println("task "+param+"執(zhí)行完畢"); } }
5.2、使用Callable接口創(chuàng)建線程,并由executor調(diào)度執(zhí)行:
import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.TimeUnit; public class TestPoolThread { private ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100)); public String main(String gid) { List<Callable<String>> tasks = new ArrayList<>(); tasks.add(new CallableTask("http://xxxx.com/api/v2", "1")); tasks.add(new CallableTask("http://xxxx.com/api/v2", "2")); try { List<Future<String>> mFuture = executorService.invokeAll(tasks, 10, TimeUnit.SECONDS); String result1 = mFuture.get(0).get(); System.out.println( "result1:" + result1); String result2 = mFuture.get(0).get(); System.out.println( "result2:" + result2); } catch (Exception e) { e.printStackTrace(); } return ""; } } class CallableTask implements Callable<String> { String url; String param; public CallableTask(String url, String param) { this.url = url; this.param = param; } //http調(diào)用 public String requestHttp() { String result = ""; return result; } @Override public String call() { try { String param = "{}"; String result = requestHttp( ); return result; } catch (Exception e) { e.printStackTrace(); return ""; } } }
不過在java doc中,并不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個(gè)靜態(tài)方法來創(chuàng)建線程池:
Executors.newCachedThreadPool : 創(chuàng)建一個(gè)可緩存線程池,如果線程池長(zhǎng)度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
Executors.newCachedThreadPool :創(chuàng)建一個(gè)可緩存線程池,如果線程池長(zhǎng)度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
Executors.newFixedThreadPool :創(chuàng)建一個(gè)定長(zhǎng)線程池,可控制線程最大并發(fā)數(shù),超出的線程會(huì)在隊(duì)列中等待。
Executors.newScheduledThreadPool :創(chuàng)建一個(gè)定長(zhǎng)線程池,支持定時(shí)及周期性任務(wù)執(zhí)行。
Executors.newSingleThreadExecutor :創(chuàng)建一個(gè)單線程化的線程池,它只會(huì)用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級(jí))執(zhí)行。
靜態(tài)方法的具體實(shí)現(xiàn):
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
從它們的具體實(shí)現(xiàn)來看,它們實(shí)際上也是調(diào)用了ThreadPoolExecutor,只不過參數(shù)都已配置好了。
newFixedThreadPool創(chuàng)建的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newSingleThreadExecutor將corePoolSize和maximumPoolSize都設(shè)置為1,也使用的LinkedBlockingQueue;
newCachedThreadPool將corePoolSize設(shè)置為0,將maximumPoolSize設(shè)置為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務(wù)就創(chuàng)建線程運(yùn)行,當(dāng)線程空閑超過60秒,就銷毀線程。
實(shí)際中,如果Executors提供的三個(gè)靜態(tài)方法能滿足要求,就盡量使用它提供的三個(gè)方法,因?yàn)樽约喝ナ謩?dòng)配置ThreadPoolExecutor的參數(shù)有點(diǎn)麻煩,要根據(jù)實(shí)際任務(wù)的類型和數(shù)量來進(jìn)行配置。
另外,如果ThreadPoolExecutor達(dá)不到要求,可以自己繼承ThreadPoolExecutor類進(jìn)行重寫。
六.深入剖析線程池實(shí)現(xiàn)原理
在上一節(jié)我們從宏觀上介紹了ThreadPoolExecutor,下面我們來深入解析一下線程池的具體實(shí)現(xiàn)原理,將從下面幾個(gè)方面講解:
1.線程池狀態(tài)
2.線程池中的線程初始化
3.任務(wù)的執(zhí)行
4.任務(wù)緩存隊(duì)列及排隊(duì)策略
5.任務(wù)拒絕策略
6.線程池的關(guān)閉
7.線程池容量的動(dòng)態(tài)調(diào)整
6.1、線程池狀態(tài)
在ThreadPoolExecutor中定義了一個(gè)volatile變量,另外定義了幾個(gè)static final變量表示線程池的各個(gè)狀態(tài):
volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3;
runState表示當(dāng)前線程池的狀態(tài),它是一個(gè)volatile變量用來保證線程之間的可見性;下面的幾個(gè)static final變量表示runState可能的幾個(gè)取值。
當(dāng)創(chuàng)建線程池后,初始時(shí),線程池處于RUNNING狀態(tài);
如果調(diào)用了shutdown()方法,則線程池處于SHUTDOWN狀態(tài),此時(shí)線程池不能夠接受新的任務(wù),它會(huì)等待所有任務(wù)執(zhí)行完畢;
如果調(diào)用了shutdownNow()方法,則線程池處于STOP狀態(tài),此時(shí)線程池不能接受新的任務(wù),并且會(huì)去嘗試終止正在執(zhí)行的任務(wù);
當(dāng)線程池處于SHUTDOWN或STOP狀態(tài),并且所有工作線程已經(jīng)銷毀,任務(wù)緩存隊(duì)列已經(jīng)清空或執(zhí)行結(jié)束后,線程池被設(shè)置為TERMINATED狀態(tài)。
6.2、線程池中的線程初始化
默認(rèn)情況下,創(chuàng)建線程池之后,線程池中是沒有線程的,需要提交任務(wù)之后才會(huì)創(chuàng)建線程。
在實(shí)際中如果需要線程池創(chuàng)建之后立即創(chuàng)建線程,可以通過以下兩個(gè)方法辦到:
- prestartCoreThread():初始化一個(gè)核心線程;
- prestartAllCoreThreads():初始化所有核心線程
下面是這2個(gè)方法的實(shí)現(xiàn):
public boolean prestartCoreThread() { return addIfUnderCorePoolSize(null); //注意傳進(jìn)去的參數(shù)是null } public int prestartAllCoreThreads() { int n = 0; while (addIfUnderCorePoolSize(null))//注意傳進(jìn)去的參數(shù)是null,等待任務(wù)隊(duì)列中有任務(wù) ++n; return n; }
6.3、任務(wù)的執(zhí)行
任務(wù)提交基本邏輯如下:
如果當(dāng)前線程池中的線程poolSize數(shù)目小于corePoolSize,則每來一個(gè)任務(wù),就會(huì)創(chuàng)建一個(gè)線程去執(zhí)行這個(gè)任務(wù);
如果當(dāng)前線程池中的線程poolSize數(shù)目>=corePoolSize,則每來一個(gè)任務(wù),會(huì)嘗試將其添加到任務(wù)緩存隊(duì)列當(dāng)中,若添加成功,則該任務(wù)會(huì)等待空閑線程將其取出去執(zhí)行;若添加失敗(一般來說是任務(wù)緩存隊(duì)列已滿),則會(huì)嘗試創(chuàng)建新的線程去執(zhí)行這個(gè)任務(wù);
如果當(dāng)前線程池中的線程poolSize數(shù)目達(dá)到maximumPoolSize,則會(huì)采取任務(wù)拒絕策略進(jìn)行處理;如果線程池中的線程poolSize數(shù)量大于 corePoolSize時(shí),如果某線程空閑時(shí)間超過keepAliveTime,線程將被終止,直至線程池中的線程數(shù)目不大于corePoolSize;
如果允許為核心池中的線程設(shè)置存活時(shí)間,那么核心池中的線程空閑時(shí)間超過keepAliveTime,線程也會(huì)被終止。
下面我們分析一下從任務(wù)提交給線程池到任務(wù)執(zhí)行完畢整個(gè)過程。
我們先來看一下ThreadPoolExecutor類中其他的一些比較重要成員變量:
private final BlockingQueue<Runnable> workQueue; //任務(wù)緩存隊(duì)列,用來存放等待執(zhí)行的任務(wù) private final ReentrantLock mainLock = new ReentrantLock(); //線程池的主要狀態(tài)鎖,對(duì)線程池狀態(tài)(比如線程池大小 //、runState等)的改變都要使用這個(gè)鎖 private final HashSet<Worker> workers = new HashSet<Worker>(); //用來存放工作集 private volatile long keepAliveTime; //線程存活時(shí)間 private volatile boolean allowCoreThreadTimeOut; //是否允許為核心線程設(shè)置存活時(shí)間 private volatile int corePoolSize; //核心池的大?。淳€程池中的線程數(shù)目大于這個(gè)參數(shù)時(shí),提交的任務(wù)會(huì)被放進(jìn)任務(wù)緩存隊(duì)列) private volatile int maximumPoolSize; //線程池最大能容忍的線程數(shù) private volatile int poolSize; //線程池中當(dāng)前的線程數(shù) private volatile RejectedExecutionHandler handler; //任務(wù)拒絕策略 private volatile ThreadFactory threadFactory; //線程工廠,用來創(chuàng)建線程 private int largestPoolSize; //用來記錄線程池中曾經(jīng)出現(xiàn)過的最大線程數(shù),跟線程池的容量沒有任何關(guān)系 private long completedTaskCount; //用來記錄已經(jīng)執(zhí)行完畢的任務(wù)個(gè)數(shù)
每個(gè)變量的作用都已經(jīng)標(biāo)明出來了。
下面我們我們看看任務(wù)從提交到最終執(zhí)行完畢經(jīng)歷了哪些過程。
在ThreadPoolExecutor類中,最核心的任務(wù)提交方法是execute()方法,雖然通過submit也可以提交任務(wù),但是實(shí)際上submit方法里面最終調(diào)用的還是execute()方法,所以我們只需要研究execute()方法的實(shí)現(xiàn)原理即可:
在源碼添加相關(guān)注解來解釋邏輯了:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //如果線程池中當(dāng)前線程數(shù)不小于核心池大小,創(chuàng)建線程來執(zhí)行任務(wù) if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } } /** * 簡(jiǎn)單拆解execute: public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //如果線程池中當(dāng)前線程數(shù)不小于核心池大小, if (poolSize >= corePoolSize ) { //創(chuàng)建新線程來執(zhí)行任務(wù) boolean isCreate = addIfUnderCorePoolSize(command); if (!isCreate) { //創(chuàng)建線程失?。杭磒oolSize>=corePoolSize或者runState不等于RUNNING) if (runState == RUNNING && workQueue.offer(command)) {//如果當(dāng)前線程池處于RUNNING狀態(tài),則將任務(wù)放入任務(wù)緩存隊(duì)列 if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command);//保證 添加到任務(wù)緩存隊(duì)列中的任務(wù)得到處理 } //創(chuàng)建新線程來執(zhí)行任務(wù) boolean isUnderMaximumPoolSize = addIfUnderMaximumPoolSize(command); if (!isUnderMaximumPoolSize) reject(command); // is shutdown or saturated } } //創(chuàng)建線程來執(zhí)行任務(wù) } } */
我們看看2個(gè)關(guān)鍵方法的實(shí)現(xiàn):addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:
1、addIfUnderCorePoolSize:從名字可以看出意思就是當(dāng)線程池的線程低于corePoolSize大小時(shí)創(chuàng)建線程執(zhí)行任務(wù)。
2、addIfUnderMaximumPoolSize方法的實(shí)現(xiàn)和addIfUnderCorePoolSize方法的實(shí)現(xiàn)思想非常相似,唯一的區(qū)別在于addIfUnderMaximumPoolSize方法是在線程池中的線程數(shù)達(dá)到了核心池大小并且往任務(wù)隊(duì)列中添加任務(wù)失敗的情況下執(zhí)行
/** * 當(dāng)線程池的線程低于corePoolSize大小時(shí)創(chuàng)建線程執(zhí)行任務(wù) * @param firstTask * @return */ private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; //首先獲取到鎖 final ReentrantLock mainLock = this.mainLock; //通過加鎖保證創(chuàng)建的線程是唯一 mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); //創(chuàng)建線程去執(zhí)行firstTask任務(wù) } finally { mainLock.unlock(); } if (t == null)//創(chuàng)建線程失敗 return false; t.start();//啟動(dòng)線程 return true; } /** * 和addIfUnderCorePoolSize方法的實(shí)現(xiàn)思想非常相似 * @param firstTask * @return */ private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
創(chuàng)建新線程:首先用提交的任務(wù)創(chuàng)建了一個(gè)Worker對(duì)象,然后調(diào)用線程工廠threadFactory創(chuàng)建了一個(gè)新的線程t,然后將線程t的引用賦值給了Worker對(duì)象的成員變量thread,接著通過workers.add(w)將Worker對(duì)象添加到工作集當(dāng)中
/** * 創(chuàng)建新線程 * @param firstTask * @return */ private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); //創(chuàng)建一個(gè)線程,執(zhí)行任務(wù),實(shí)際就是Thread t = new Thread(w); Thread t = threadFactory.newThread(w); if (t != null) { w.thread = t; //將創(chuàng)建的線程的引用賦值為w的成員變量 workers.add(w); int nt = ++poolSize; //當(dāng)前線程數(shù)加1 if (nt > largestPoolSize) largestPoolSize = nt; } return t; }
Worker執(zhí)行任務(wù): Worker實(shí)現(xiàn)了Runnable接口,最核心的方法是run()方法:
private final class Worker implements Runnable { private final ReentrantLock runLock = new ReentrantLock(); private Runnable firstTask; volatile long completedTasks; Thread thread; Worker(Runnable firstTask) { this.firstTask = firstTask; } boolean isActive() { return runLock.isLocked(); } void interruptIfIdle() { final ReentrantLock runLock = this.runLock; if (runLock.tryLock()) { try { if (thread != Thread.currentThread()) thread.interrupt(); } finally { runLock.unlock(); } } } void interruptNow() { thread.interrupt(); } private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { if (runState < STOP && Thread.interrupted() && runState >= STOP) boolean ran = false; beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor類的一個(gè)方法,沒有具體實(shí)現(xiàn),用戶可以根據(jù) //自己需要重載這個(gè)方法和后面的afterExecute方法來進(jìn)行一些統(tǒng)計(jì)信息,比如某個(gè)任務(wù)的執(zhí)行時(shí)間等 try { task.run(); ran = true; afterExecute(task, null); ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } } /** * Worker實(shí)現(xiàn)了Runnable接口 */ public void run() { try { Runnable task = firstTask; firstTask = null; //從ThreadPoolExecutor任務(wù)緩存隊(duì)列里面去取 while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); //當(dāng)任務(wù)隊(duì)列中沒有任務(wù)時(shí),進(jìn)行清理工作 } } } }
6.4、任務(wù)緩存隊(duì)列及排隊(duì)策略
workQueue任務(wù)緩存隊(duì)列是用來存放等待執(zhí)行的任務(wù)。
workQueue的類型為BlockingQueue<Runnable>,通常可以取下面三種類型:
1)ArrayBlockingQueue:基于數(shù)組的先進(jìn)先出隊(duì)列,此隊(duì)列創(chuàng)建時(shí)必須指定大小;
2)LinkedBlockingQueue:基于鏈表的先進(jìn)先出隊(duì)列,如果創(chuàng)建時(shí)沒有指定此隊(duì)列大小,則默認(rèn)為Integer.MAX_VALUE;
3)synchronousQueue:這個(gè)隊(duì)列比較特殊,它不會(huì)保存提交的任務(wù),而是將直接新建一個(gè)線程來執(zhí)行新來的任務(wù)。
6.5、任務(wù)拒絕策略
當(dāng)線程池的任務(wù)緩存隊(duì)列已滿并且線程池中的線程數(shù)目達(dá)到maximumPoolSize,如果還有任務(wù)到來就會(huì)采取任務(wù)拒絕策略,通常有以下四種策略:
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊(duì)列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程) ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
6.6、線程池的關(guān)閉
ThreadPoolExecutor提供了兩個(gè)方法,用于線程池的關(guān)閉,分別是shutdown()和shutdownNow(),其中:
- shutdown():不會(huì)立即終止線程池,而是要等所有任務(wù)緩存隊(duì)列中的任務(wù)都執(zhí)行完后才終止,但再也不會(huì)接受新的任務(wù)shutdownNow():立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊(duì)列,返回尚未執(zhí)行的任務(wù)
6.7、線程池容量的動(dòng)態(tài)調(diào)整
ThreadPoolExecutor提供了動(dòng)態(tài)調(diào)整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
setCorePoolSize:設(shè)置核心池大小setMaximumPoolSize:設(shè)置線程池最大能創(chuàng)建的線程數(shù)目大小
當(dāng)上述參數(shù)從小變大時(shí),ThreadPoolExecutor進(jìn)行線程賦值,還可能立即創(chuàng)建新的線程來執(zhí)行任務(wù)。
六、線程池的配置策略
要想合理的配置線程池,就必須首先分析任務(wù)特性,可以從以下幾個(gè)角度來進(jìn)行分析:
- 任務(wù)的性質(zhì):CPU密集型任務(wù),IO密集型任務(wù)和混合型任務(wù)。
- 任務(wù)的優(yōu)先級(jí):高,中和低。
- 任務(wù)的執(zhí)行時(shí)間:長(zhǎng),中和短。
- 任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫(kù)連接。
任務(wù)性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。
CPU密集型任務(wù):配置盡可能少的線程數(shù)量,如配置Ncpu+1個(gè)線程的線程池。cpu密集型任務(wù)需要大量的運(yùn)算,而且沒有阻塞,需要CPU一直全速運(yùn)行,CPU密集任務(wù)只有在真正的多核CPU上才可能得到加速??梢允褂肦untime.availableProcessors方法獲取可用處理器的個(gè)數(shù)。
一般計(jì)算公式:CPU核數(shù) + 1個(gè)線程的線程池
ExecutorService THREAD_POOL = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(Runtime.getRuntime().availableProcessors() * 20), new ThreadPoolExecutor.CallerRunsPolicy());
《并發(fā)編程實(shí)戰(zhàn)》一書中對(duì)于IO密集型任務(wù)建議線程池大小設(shè)為Ncpu+1Ncpu+1,原因是當(dāng)計(jì)算密集型線程偶爾由于頁(yè)缺失故障或其他原因而暫停時(shí),這個(gè)“額外的”線程也能確保這段時(shí)間內(nèi)的CPU始終周期不會(huì)被浪費(fèi)。
對(duì)于計(jì)算密集型任務(wù),不要?jiǎng)?chuàng)建過多的線程,由于線程有執(zhí)行棧等內(nèi)存消耗,創(chuàng)建過多的線程不會(huì)加快計(jì)算速度,反而會(huì)消耗更多的內(nèi)存空間;另一方面線程過多,頻繁切換線程上下文也會(huì)影響線程池的性能。
IO密集型任務(wù):IO操作包括讀寫磁盤文件、讀寫數(shù)據(jù)庫(kù)、網(wǎng)絡(luò)請(qǐng)求等阻塞操作,執(zhí)行這些操作,線程將處于等待狀態(tài)。
io密集型由于需要等待IO操作,線程并不是一直在執(zhí)行任務(wù),則配置盡可能多的線程,如2*Ncpu。即該任務(wù)需要大量的IO,即大量的阻塞,這種類型分以下兩種情況設(shè)置
1、有等待io任務(wù):如果IO密集型任務(wù)線程并非一直在執(zhí)行任務(wù),則應(yīng)配置盡可能多的線程,如CPU核數(shù) * 2
2、需要一直運(yùn)行的任務(wù):參考公式:CPU核數(shù) /(1 - 阻塞系數(shù) ) 阻塞系數(shù)在0.8~0.9之間
比如:8核CPU:8/(1 - 0.9) = 80個(gè)線程數(shù)
用sleep方式模擬IO阻塞:
public class IOThreadPoolTest { // 使用無限線程數(shù)的CacheThreadPool線程池 static ThreadPoolExecutor cachedThreadPool = (ThreadPoolExecutor) Executors.newCachedThreadPool(); static List<Callable<Object>> tasks; // 仍然是5000個(gè)任務(wù) static int taskNum = 5000; static { tasks = new ArrayList<>(taskNum); for (int i = 0; i < taskNum; i++) { tasks.add(Executors.callable(new IOTask())); } } public static void main(String[] args) throws InterruptedException { cachedThreadPool.invokeAll(tasks);// warm up all thread testExecutor(cachedThreadPool, tasks); // 看看執(zhí)行過程中創(chuàng)建了多少個(gè)線程 int largestPoolSize = cachedThreadPool.getLargestPoolSize(); System.out.println("largestPoolSize:" + largestPoolSize); cachedThreadPool.shutdown(); } private static void testExecutor(ExecutorService executor, List<Callable<Object>> tasks) throws InterruptedException { long start = System.currentTimeMillis(); executor.invokeAll(tasks); long end = System.currentTimeMillis(); System.out.println(end - start); } static class IOTask implements Runnable { @Override public void run() { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }
這里使用無線程數(shù)限制的CachedThreadPool線程池,也就是說這里的5000個(gè)任務(wù)會(huì)被5000個(gè)線程同時(shí)處理,由于所有的線程都只是阻塞而不消耗CPU資源,所以5000個(gè)任務(wù)在不到2秒的時(shí)間內(nèi)就執(zhí)行完了。
很明顯使用CachedThreadPool能有效提高IO密集型任務(wù)的吞吐量,而且由于CachedThreadPool中的線程會(huì)在空閑60秒自動(dòng)回收,所以不會(huì)消耗過多的資源。
但是打開任務(wù)管理器你會(huì)發(fā)現(xiàn)執(zhí)行任務(wù)的同時(shí)內(nèi)存會(huì)飆升到接近400M,因?yàn)槊總€(gè)線程都消耗了一部分內(nèi)存,在5000個(gè)線程創(chuàng)建之后,內(nèi)存消耗達(dá)到了峰值。
所以使用CacheThreadPool的時(shí)候應(yīng)該避免提交大量長(zhǎng)時(shí)間阻塞的任務(wù),以防止內(nèi)存溢出;另一種替代方案是,使用固定大小的線程池,并給一個(gè)較大的線程數(shù)(不會(huì)內(nèi)存溢出),同時(shí)為了在空閑時(shí)節(jié)省內(nèi)存資源,調(diào)用allowCoreThreadTimeOut允許核心線程超時(shí)。
線程執(zhí)行棧的大小可以通過-Xss*size*或-XX:ThreadStackSize參數(shù)調(diào)整
混合型的任務(wù):如果可以拆分,則將其拆分成一個(gè)CPU密集型任務(wù)和一個(gè)IO密集型任務(wù),只要這兩個(gè)任務(wù)執(zhí)行的時(shí)間相差不是太大,那么分解后執(zhí)行的吞吐率要高于串行執(zhí)行的吞吐率,如果這兩個(gè)任務(wù)執(zhí)行時(shí)間相差太大,則沒必要進(jìn)行分解。我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個(gè)數(shù)。
混合型任務(wù)要根據(jù)任務(wù)等待阻塞時(shí)間與CPU計(jì)算時(shí)間的比重來決定線程數(shù)量:
threads=cores1–blockingCoefficient=cores∗(1+waitTimecomputeTime)
threads=cores1–blockingCoefficient=cores∗(1+waitTimecomputeTime)
比如一個(gè)任務(wù)包含一次數(shù)據(jù)庫(kù)讀寫(0.1ms),并在內(nèi)存中對(duì)讀取的數(shù)據(jù)進(jìn)行分組過濾等操作(5μs),那么線程數(shù)應(yīng)該為80左右。
線程數(shù)與阻塞比例的關(guān)系圖大致如下:
當(dāng)阻塞比例為0,也就是純計(jì)算任務(wù),線程數(shù)等于核心數(shù)(這里是4);阻塞比例越大,線程池的線程數(shù)應(yīng)該更多。
《Java并發(fā)編程實(shí)戰(zhàn)》中最原始的公式是這樣的:
>Nthreads=Ncpu∗Ucpu∗(1+WC)>
>Nthreads=Ncpu∗Ucpu∗(1+WC)>
NcpuNcpu代表CPU的個(gè)數(shù),UcpuUcpu代表CPU利用率的期望值(0<Ucpu<10<Ucpu<1),WCWC仍然是等待時(shí)間與計(jì)算時(shí)間的比例。
我上面提供的公式相當(dāng)于目標(biāo)CPU利用率為100%。
通常系統(tǒng)中不止一個(gè)線程池,所以實(shí)際配置線程數(shù)應(yīng)該將目標(biāo)CPU利用率計(jì)算進(jìn)去。
優(yōu)先級(jí)不同的任務(wù)可以使用優(yōu)先級(jí)隊(duì)列PriorityBlockingQueue來處理。它可以讓優(yōu)先級(jí)高的任務(wù)先得到執(zhí)行,需要注意的是如果一直有優(yōu)先級(jí)高的任務(wù)提交到隊(duì)列里,那么優(yōu)先級(jí)低的任務(wù)可能永遠(yuǎn)不能執(zhí)行。
執(zhí)行時(shí)間不同的任務(wù)可以交給不同規(guī)模的線程池來處理,或者也可以使用優(yōu)先級(jí)隊(duì)列,讓執(zhí)行時(shí)間短的任務(wù)先執(zhí)行。
依賴數(shù)據(jù)庫(kù)連接池的任務(wù),因?yàn)榫€程提交SQL后需要等待數(shù)據(jù)庫(kù)返回結(jié)果,如果等待的時(shí)間越長(zhǎng)CPU空閑時(shí)間就越長(zhǎng),那么線程數(shù)應(yīng)該設(shè)置越大,這樣才能更好的利用CPU。
建議使用有界隊(duì)列,有界隊(duì)列能增加系統(tǒng)的穩(wěn)定性和預(yù)警能力,可以根據(jù)需要設(shè)大一點(diǎn),比如幾千。有一次我們組使用的后臺(tái)任務(wù)線程池的隊(duì)列和線程池全滿了,不斷的拋出拋棄任務(wù)的異常,通過排查發(fā)現(xiàn)是數(shù)據(jù)庫(kù)出現(xiàn)了問題,導(dǎo)致執(zhí)行SQL變得非常緩慢,因?yàn)楹笈_(tái)任務(wù)線程池里的任務(wù)全是需要向數(shù)據(jù)庫(kù)查詢和插入數(shù)據(jù)的,所以導(dǎo)致線程池里的工作線程全部阻塞住,任務(wù)積壓在線程池里。如果當(dāng)時(shí)我們?cè)O(shè)置成無界隊(duì)列,線程池的隊(duì)列就會(huì)越來越多,有可能會(huì)撐滿內(nèi)存,導(dǎo)致整個(gè)系統(tǒng)不可用,而不只是后臺(tái)任務(wù)出現(xiàn)問題。當(dāng)然我們的系統(tǒng)所有的任務(wù)是用的單獨(dú)的服務(wù)器部署的,而我們使用不同規(guī)模的線程池跑不同類型的任務(wù),但是出現(xiàn)這樣問題時(shí)也會(huì)影響到其他任務(wù)。
總結(jié):線程池的大小取決于任務(wù)的類型以及系統(tǒng)的特性,避免“過大”和“過小”兩種極端。線程池過大,大量的線程將在相對(duì)更少的CPU和有限的內(nèi)存資源上競(jìng)爭(zhēng),這不僅影響并發(fā)性能,還會(huì)因過高的內(nèi)存消耗導(dǎo)致OOM;線程池過小,將導(dǎo)致處理器得不到充分利用,降低吞吐率。
要想正確的設(shè)置線程池大小,需要了解部署的系統(tǒng)中有多少個(gè)CPU,多大的內(nèi)存,提交的任務(wù)是計(jì)算密集型、IO密集型還是兩者兼有。
七、線程池創(chuàng)建注意
阿里巴巴編碼規(guī)范里面提到:
線程池最好不要使用Executors去創(chuàng)建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學(xué)更加明確線程池的運(yùn)行規(guī)則,規(guī)避資源耗盡的風(fēng)險(xiǎn)。 說明:Executors各個(gè)方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
主要問題是堆積的請(qǐng)求處理隊(duì)列可能會(huì)耗費(fèi)非常大的內(nèi)存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
主要問題是線程數(shù)最大數(shù)是Integer.MAX_VALUE,可能會(huì)創(chuàng)建數(shù)量非常多的線程,甚至OOM。
如果使用阿里巴巴的java編程規(guī)范插件,就會(huì)有這個(gè)提示:
ExecutorService newFixedThreadPool(int nThreads):固定大小線程池。
源碼可以看到,corePoolSize和maximumPoolSize的大小是一樣的(如果使用無界queue的話maximumPoolSize參數(shù)是沒有意義的),keepAliveTime和unit的設(shè)值表名什么?-就是該實(shí)現(xiàn)不想keep alive!最后的BlockingQueue選擇了LinkedBlockingQueue,該queue有一個(gè)特點(diǎn),他是無界的。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
ExecutorService newSingleThreadExecutor():?jiǎn)尉€程。
可以看到,與fixedThreadPool很像,只不過fixedThreadPool中的入?yún)⒅苯油嘶癁?
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
ExecutorService newCachedThreadPool():無界線程池,可以進(jìn)行自動(dòng)線程回收。
這個(gè)實(shí)現(xiàn)就有意思了。首先是無界的線程池,所以我們可以發(fā)現(xiàn)maximumPoolSize為big big。其次BlockingQueue的選擇上使用SynchronousQueue??赡軐?duì)于該BlockingQueue有些陌生,簡(jiǎn)單說:該QUEUE中,每個(gè)插入操作必須等待另一個(gè)
線程的對(duì)應(yīng)移除操作。比如,我先添加一個(gè)元素,接下來如果繼續(xù)想嘗試添加則會(huì)阻塞,直到另一個(gè)線程取走一個(gè)元素,反之亦然。(想到什么?就是緩沖區(qū)為1的生產(chǎn)者消費(fèi)者模式^_^)
注意到介紹中的自動(dòng)回收線程的特性嗎,為什么呢?先不說,但注意到該實(shí)現(xiàn)中corePoolSize和maximumPoolSize的大小不同。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
總結(jié):
- ThreadPoolExecutor的使用還是很有技巧的。
- 使用無界queue可能會(huì)耗盡系統(tǒng)資源。
- 使用有界queue可能不能很好的滿足性能,需要調(diào)節(jié)線程數(shù)和queue大小
- 線程數(shù)自然也有開銷,所以需要根據(jù)不同應(yīng)用進(jìn)行調(diào)節(jié)。
通常來說對(duì)于靜態(tài)任務(wù)可以歸為:
- 數(shù)量大,但是執(zhí)行時(shí)間很短
- 數(shù)量小,但是執(zhí)行時(shí)間較長(zhǎng)
- 數(shù)量又大執(zhí)行時(shí)間又長(zhǎng)
- 除了以上特點(diǎn)外,任務(wù)間還有些內(nèi)在關(guān)系
總結(jié)2:
- 對(duì)于需要保證所有提交的任務(wù)都要被執(zhí)行的情況,使用FixedThreadPool
- 如果限定只能使用一個(gè)線程進(jìn)行任務(wù)處理,使用SingleThreadExecutor
- 如果希望提交的任務(wù)盡快分配線程執(zhí)行,使用CachedThreadPool
- 如果業(yè)務(wù)上允許任務(wù)執(zhí)行失敗,或者任務(wù)執(zhí)行過程可能出現(xiàn)執(zhí)行時(shí)間過長(zhǎng)進(jìn)而影響其他業(yè)務(wù)的應(yīng)用場(chǎng)景,可以通過使用限定線程數(shù)量的線程池以及限定長(zhǎng)度的隊(duì)列進(jìn)行容錯(cuò)處理。
到此這篇關(guān)于java多線程和線程池的文章就介紹到這了,更多相關(guān)java多線程和線程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
淺談線性表的原理及簡(jiǎn)單實(shí)現(xiàn)方法
下面小編就為大家?guī)硪黄獪\談線性表的原理及簡(jiǎn)單實(shí)現(xiàn)方法。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-06-06SpringBoot部署到Linux讀取resources下的文件及遇到的坑
本文主要給大家介紹SpringBoot部署到Linux讀取resources下的文件,在平時(shí)業(yè)務(wù)開發(fā)過程中,很多朋友在獲取到文件內(nèi)容亂碼或者文件讀取不到的問題,今天給大家分享小編遇到的坑及處理方案,感興趣的朋友跟隨小編一起看看吧2021-06-06教你如何使用Java8實(shí)現(xiàn)菜單樹形數(shù)據(jù)
今天給大家?guī)淼氖顷P(guān)于JAVA的相關(guān)知識(shí),文中圍繞著如何使用Java8實(shí)現(xiàn)菜單樹形數(shù)據(jù)展開,文中有非常詳細(xì)的介紹及代碼示例,需要的朋友可以參考下2021-06-06Java常用正則表達(dá)式驗(yàn)證工具類RegexUtils.java
相信大家對(duì)正則表達(dá)式一定都有所了解和研究,這篇文章主要為大家分享了Java 表單注冊(cè)常用正則表達(dá)式驗(yàn)證工具類,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2016-11-11詳解Java中IO字節(jié)流基本操作(復(fù)制文件)并測(cè)試性能
這篇文章主要介紹了Java中IO字節(jié)流基本操作(復(fù)制文件)并測(cè)試性能,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-04-04Java實(shí)現(xiàn)自定義枚舉值校驗(yàn)器的示例代碼
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)自定義枚舉值校驗(yàn)器的相關(guān)資料,文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,需要的可以參考一下2023-02-025分鐘讓你快速掌握java8 stream常用開發(fā)技巧
這篇文章主要給大家介紹了關(guān)于java8 stream常用開發(fā)技巧的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12