一文徹底搞懂java多線程和線程池
什么是線程
傳統(tǒng)的程序設(shè)計語言同一時刻只能執(zhí)行單任務(wù)操作,效率非常低,如果網(wǎng)絡(luò)程序在接收數(shù)據(jù)時發(fā)生阻塞,只能等到程序接收數(shù)據(jù)之后才能繼續(xù)運行。隨著 Internet 的飛速發(fā)展,這種單任務(wù)運行的狀況越來越不被接受。如果網(wǎng)絡(luò)接收數(shù)據(jù)阻塞,后臺服務(wù)程序就會一直處于等待狀態(tài)而不能繼續(xù)任何操作。 這種阻塞情況經(jīng)常發(fā)生, 這時的 CPU資源完全處于閑置狀態(tài)。
多線程實現(xiàn)后臺服務(wù)程序可以同時處理多個任務(wù),并不發(fā)生阻塞現(xiàn)象。多線程是 Java 語言的一個很重要的特征。 多線程程序設(shè)計最大的特點就是能夠提高程序執(zhí)行效率和處理速度。Java 程序可同時并行運行多個相對獨立的線程。例如創(chuàng)建一個線程來接收數(shù)據(jù),另一個線程發(fā)送數(shù)據(jù),既使發(fā)送線程在接收數(shù)據(jù)時被阻塞,接受數(shù)據(jù)線程仍然可以運行。 線程(Thread)是控制線程(Thread of Control)的縮寫,它是具有一定順序的指令序列(即所編寫的程序代碼)、存放方法中定義局部變量的棧和一些共享數(shù)據(jù)。線程是相互獨立的,每個方法的局部變量和其他線程的局部變量是分開的,因此,任何線程都不能訪問除自身之外的其他線程的局部變量。如果兩個線程同時訪問同一個方法,那每個線程將各自得到此方法的一個拷貝。
Java 提供的多線程機制使一個程序可同時執(zhí)行多個任務(wù)。線程有時也被稱為小進程,它是從一個大進程里分離出來的小的獨立的線程。由于實現(xiàn)了多線程技術(shù),Java 顯得更健壯。多線程帶來的好處是更好的交互性能和實時控制性能。多線程是強大而靈巧的編程工具,但要用好它卻不是件容易的事。在多線程編程中,每個線程都通過代碼實現(xiàn)線程的行為,并將數(shù)據(jù)供給代碼操作。編碼和數(shù)據(jù)有時是相當(dāng)獨立的,可分別向線程提供。多個線程可以同時處理同一代碼和同一數(shù)據(jù),不同的線程也可以處理各自不同的編碼和數(shù)據(jù)。
一. Java實現(xiàn)線程的三種方式
先簡單看看java多線程如何實現(xiàn)的:
1.1、繼承Thread類
讓自己的類繼承 Thread 類:
public class Test extends Thread { public static void main(String[] args) { Thread t = new Test(); t.start(); } @Override public void run() { System.out.println("Override run() ..."); } }
用Thread類的方式創(chuàng)建多線程的特點:
1、因為線程已經(jīng)繼承Thread類,所以不可以再繼承其它類。
2、如果需要訪問當(dāng)前線程,直接使用this即可。
1.2、實現(xiàn)Runnable接口,并覆寫run方法
實現(xiàn) Runnable 接口:
public class RunnableTest implements Runnable { public static void main(String[] args) throws Exception{ Thread t = new Thread(new RunnableTest()); t.start(); } @Override public void run(){ System.out.println(123); } }
1.3、實現(xiàn)Callable接口,并覆寫call方法
- 實現(xiàn)Callable接口的類TCallable
- 以TCallable為參數(shù)創(chuàng)建FutureTask對象
- 將FutureTask作為參數(shù)創(chuàng)建Thread對象
- 調(diào)用線程對象的start()方法
public class TCallable implements Callable{ public static void main() { FutureTask futureTask = new FutureTask(new TCallable()); Thread thread = new Thread(futureTask); thread.start(); try { Object o = futureTask.get(); System.out.println(o); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } @Override public Object call() throws Exception { return "Override call() ..."; } }
用Runnable與Callable接口的方式創(chuàng)建多線程的特點:
1、線程類只是實現(xiàn)了Runnable接口或Callable接口,還可以繼承其它類。
2、在這種方式下,多個線程可以共享一個target對象,所以非常適合多個線程來處理同一份資源情況。
3、如果需要訪問當(dāng)前線程,需要使用Thread.currentThread方法。
4、Callable接口與Runnable接口相比,只是Callable接口可以返回值而已。
主要介紹第三種方式Callable,因為講到線程池的時候,大多時候使用Callable接口。
二. Callable接口
Callable用于產(chǎn)生結(jié)果,F(xiàn)uture用于獲取結(jié)果。
2.1 Callable接口
Callable接口代表一段可以調(diào)用并返回結(jié)果的代碼;
Java 5在concurrency包中引入了java.util.concurrent.Callable 接口,它和Runnable接口很相似,但是 Runnable 不會返回結(jié)果,并且無法拋出返回結(jié)果的異常。 但Callable可以返回一個對象或者拋出一個異常。
Callable接口使用泛型去定義它的返回類型。Executors類提供了一些有用的方法在線程池中執(zhí)行Callable內(nèi)的任務(wù)。由于Callable任務(wù)是并行的,我們必須等待它返回的結(jié)果。
Callable接口的源碼如下:
public interface Callable<V> { V call() throws Exception; // 計算結(jié)果 }
2.2 Future接口
Future用于表示異步計算的結(jié)果。
由于Callable任務(wù)是并行的,我們必須等待它返回的結(jié)果。那如何獲取返回結(jié)果?java.util.concurrent.Future對象為我們解決了這個問題。在線程池提交Callable任務(wù)后返回了一個Future對象,使用Future可以知道Callable任務(wù)的狀態(tài)和得到Callable返回的執(zhí)行結(jié)果。Future提供get()方法讓我們可以等待Callable結(jié)束并獲取它的執(zhí)行結(jié)果。
Future接口的源碼如下:從源碼我們可以知道,F(xiàn)uture可以對異步運算的任務(wù)的結(jié)果進行等待獲取、判斷是否已經(jīng)完成、取消任務(wù)等操作。
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning);// 試圖取消對此任務(wù)的執(zhí)行 boolean isCancelled(); // 如果在任務(wù)正常完成前將其取消,則返回 true boolean isDone(); // 如果任務(wù)已完成,則返回 true V get() throws InterruptedException, ExecutionException; // 如有必要,等待計算完成,然后獲取其結(jié)果 // 如有必要,最多等待為使計算完成所給定的時間之后,獲取其結(jié)果(如果結(jié)果可用)。 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
2.3 Future實現(xiàn)類是FutureTask。
如果不想分支線程阻塞主線程,又想取得分支線程的執(zhí)行結(jié)果,就用FutureTask , FutureTask實現(xiàn)了RunnableFuture接口,這個接口的定義如下:
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
可以看到這個接口實現(xiàn)了Runnable和Future接口,接口中的具體實現(xiàn)由FutureTask來實現(xiàn)。這個類的構(gòu)造方法如下 :
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; // state記錄FutureTask的狀態(tài) this.state = NEW; // ensure visibility of callable }
FutureTask 構(gòu)造函數(shù)傳入一個 Callable 的具體實現(xiàn)類,一個 FutureTask 對象可以對調(diào)用了 Callable 和 Runnable 的對象進行包裝,由于 FutureTask 也是Runnable 接口的實現(xiàn)類,所以 FutureTask 也可以放入線程池中。
在web容器使用這種多線程的方式,要記住記得shutdown關(guān)閉,否則可能導(dǎo)致線程沒有被關(guān)閉回收,結(jié)果線程數(shù)一直增加
當(dāng)線程數(shù)太多時,肯定會導(dǎo)致內(nèi)存溢出或者影響服務(wù)器性能等。
三. Java線程池
既然單個線程的創(chuàng)建和銷毀都很簡單,我們?yōu)槭裁匆褂镁€程池?
3.1、背景
JAVA是面向?qū)ο?,JAVA虛擬機每創(chuàng)建一個對象都要獲取內(nèi)存資源或者其他很多其他資源,同時試圖跟蹤每個對象,以便可以在對象銷毀后進行垃圾回收。因此創(chuàng)建和銷毀對象是非常費時間的。所以為了盡可能提高應(yīng)用程序資源使用效率,就是要降低就是盡可能降低創(chuàng)建和銷毀對象的次數(shù),特別是一些非常耗資源的對象創(chuàng)建和銷毀,這就是一些"池化資源"技術(shù)產(chǎn)生的原因,比如大家熟悉的數(shù)據(jù)庫連接池。
針對線程而言:
線程占用系統(tǒng)內(nèi)存:雖然線程相對于進程而言是輕量級的,創(chuàng)建線程依然需要占用系統(tǒng)的內(nèi)存資源。如果無限制的創(chuàng)建線程,對應(yīng)虛擬機垃圾回收而言也是很有壓力的,畢竟線程也是對象。
線程的創(chuàng)建和關(guān)閉是需要花費時間的:如果任務(wù)非常多,頻繁的創(chuàng)建和銷毀線程也是需要占用系統(tǒng)的時間片的。
3.2、作用
1)降低資源消耗,重復(fù)利用已經(jīng)創(chuàng)建好的線程,降低線程創(chuàng)建和銷毀造成的資源消耗
2)提高響應(yīng)速度,當(dāng)任務(wù)到達時,任務(wù)可以不需要等到線程創(chuàng)建就可以立即執(zhí)行
3)提高線程的可管理性,線程是稀缺資源,如果無限制的創(chuàng)建,不僅消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進行統(tǒng)一分配、調(diào)優(yōu)和監(jiān)控。
3.3、應(yīng)用范圍
1)、需要大量的線程來完成任務(wù)。且完成任務(wù)的時間比較短。
http請求這種任務(wù),使用線程池技術(shù)是很合適的。由于單個任務(wù)小,而任務(wù)數(shù)量巨大,你能夠想象一個熱門站點的點擊次數(shù)。
如果一個線程的任務(wù)執(zhí)行時間非常長,就沒必要用線程池,比如一個telnet連接請求,線程池的優(yōu)勢就不明顯了。
2、對性能要求苛刻的應(yīng)用,比方要求server迅速響應(yīng)客戶請求。
3、接受突發(fā)性的大量請求,但不至于使server因此產(chǎn)生大量線程的應(yīng)用。突發(fā)性大量客戶請求,在沒有線程池情況下,將產(chǎn)生大量線程,盡管理論上大部分操作系統(tǒng)線程數(shù)目最大值不是問題,短時間內(nèi)產(chǎn)生大量線程可能使內(nèi)存到達極限,并出現(xiàn)"OutOfMemory"的錯誤。
四. Java 線程池框架Executor
一個線程池包含下面四個基本組成部分:
1、線程池管理器(ThreadPool):用于創(chuàng)建并管理線程池。包含 創(chuàng)建線程池,銷毀線程池,加入新任務(wù);
2、工作線程(PoolWorker):線程池中線程,在沒有任務(wù)時處于等待狀態(tài)。能夠循環(huán)的運行任務(wù);
3、任務(wù)接口(Task):每一個任務(wù)必須實現(xiàn)的接口,以供工作線程調(diào)度任務(wù)的運行。它主要規(guī)定了任務(wù)的入口。任務(wù)運行完后的收尾工作,任務(wù)的運行狀態(tài)等。
4、任務(wù)隊列(taskQueue):用于存放沒有處理的任務(wù)。提供一種緩沖機制。
下面我們主要官方提供的線程池管理工具Executor框架。
Executor框架是一個根據(jù)一組執(zhí)行策略調(diào)用,調(diào)度,執(zhí)行和控制的異步任務(wù)的框架。無限制的創(chuàng)建線程會引起應(yīng)用程序內(nèi)存溢出。所以創(chuàng)建一個線程池是個更好的的解決方案,因為可以限制線程的數(shù)量并且可以回收再利用這些線程。Executor框架包括:線程池,Executor,Executors,ExecutorService,CompletionService,F(xiàn)uture,Callable等。
4.1、類圖:
Executor是一個頂層接口,在它里面只聲明了一個方法execute(Runnable),返回值為void,參數(shù)為Runnable類型,從字面意思可以理解,就是用來執(zhí)行傳進去的任務(wù)的;
然后ExecutorService接口繼承了Executor接口,并聲明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象類AbstractExecutorService實現(xiàn)了ExecutorService接口,基本實現(xiàn)了ExecutorService中聲明的所有方法;
然后ThreadPoolExecutor繼承了類AbstractExecutorService。
4.2 核心類ThreadPoolExecutor:
java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,因此如果要透徹地了解Java中的線程池,必須先了解這個類。下面我們來看一下ThreadPoolExecutor類的具體實現(xiàn)源碼。
在ThreadPoolExecutor類中提供了四個構(gòu)造方法:
public class ThreadPoolExecutor extends AbstractExecutorService { ..... public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue); public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory); public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler); public ThreadPoolExecutor( int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler); ... }
ThreadPoolExecutor提供了四個構(gòu)造器,事實上,通過觀察每個構(gòu)造器的源碼具體實現(xiàn),發(fā)現(xiàn)前面三個構(gòu)造器都是調(diào)用的第四個構(gòu)造器進行的初始化工作。下面解釋下一下構(gòu)造器中各個參數(shù)的含義:
public ThreadPoolExecutor(int corePoolSize, //線程池核心線程數(shù)量 int maximumPoolSize, //線程池最大線程數(shù)量 long keepAliveTime, //線程KeepAlive時間,當(dāng)線程池數(shù)量超過核心線程數(shù)量以后,idle時間超過這個值的線程會被終止 TimeUnit unit, //線程KeepAlive時間單位 BlockingQueue<Runnable> workQueue, //任務(wù)隊列 ThreadFactory threadFactory, //創(chuàng)建線程的工廠對象 RejectedExecutionHandler handler)
corePoolSize: 核心池的大小,這個參數(shù)跟后面講述的線程池的實現(xiàn)原理有非常大的關(guān)系。在創(chuàng)建了線程池后,默認(rèn)情況下,線程池中并沒有任何線程,而是等待有任務(wù)到來才創(chuàng)建線程去執(zhí)行任務(wù),除非調(diào)用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就可以看出,是預(yù)創(chuàng)建線程的意思,即在沒有任務(wù)到來之前就創(chuàng)建corePoolSize個線程或者一個線程。默認(rèn)情況下,在創(chuàng)建了線程池后,線程池中的線程數(shù)為0,當(dāng)有任務(wù)來之后,就會創(chuàng)建一個線程去執(zhí)行任務(wù),當(dāng)線程池中的線程數(shù)目達到corePoolSize后,就會把到達的任務(wù)放到緩存隊列當(dāng)中;
maximumPoolSize:線程池最大線程數(shù),這個參數(shù)也是一個非常重要的參數(shù),它表示在線程池中最多能創(chuàng)建多少個線程;
keepAliveTime:表示線程沒有任務(wù)執(zhí)行時最多保持多久時間會終止。默認(rèn)情況下,只有當(dāng)線程池中的線程數(shù)大于corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數(shù)不大于corePoolSize,即當(dāng)線程池中的線程數(shù)大于corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數(shù)不超過corePoolSize。但是如果調(diào)用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數(shù)不大于corePoolSize時,keepAliveTime參數(shù)也會起作用,直到線程池中的線程數(shù)為0;
unit:參數(shù)keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態(tài)屬性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小時
TimeUnit.MINUTES; //分鐘
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //納秒
workQueue:一個阻塞隊列,用來存儲等待執(zhí)行的任務(wù),這個參數(shù)的選擇也很重要,會對線程池的運行過程產(chǎn)生重大影響,一般來說,這里的阻塞隊列有以下幾種選擇:ArrayBlockingQueue;LinkedBlockingQueue;SynchronousQueue;
1. ArrayBlockingQueue : 有界的數(shù)組隊列
2. LinkedBlockingQueue : 可支持有界/無界的隊列,使用鏈表實現(xiàn)
3. PriorityBlockingQueue : 優(yōu)先隊列,可以針對任務(wù)排序
4. SynchronousQueue : 隊列長度為1的隊列,和Array有點區(qū)別就是:client thread提交到block queue會是一個阻塞過程,直到有一個worker thread連接上來poll task。
ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關(guān)。
threadFactory:線程工廠,主要用來創(chuàng)建線程;
handler:表示當(dāng)拒絕處理任務(wù)時的策略,有以下四種取值:
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
在ThreadPoolExecutor類中有幾個非常重要的方法:
execute()
submit()
shutdown()
shutdownNow()
execute()方法: 實際上是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現(xiàn),這個方法是ThreadPoolExecutor的核心方法,通過這個方法可以向線程池提交一個任務(wù),交由線程池去執(zhí)行。
submit()方法: 是在ExecutorService中聲明的方法,在AbstractExecutorService就已經(jīng)有了具體的實現(xiàn),在ThreadPoolExecutor中并沒有對其進行重寫,這個方法也是用來向線程池提交任務(wù)的,但是它和execute()方法不同,它能夠返回任務(wù)執(zhí)行的結(jié)果,去看submit()方法的實現(xiàn),會發(fā)現(xiàn)它實際上還是調(diào)用的execute()方法,只不過它利用了Future來獲取任務(wù)執(zhí)行結(jié)果(Future相關(guān)內(nèi)容將在下一篇講述)。
shutdown()和shutdownNow()是用來關(guān)閉線程池的。
還有很多其他的方法:
比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關(guān)屬性的方法,有興趣的朋友可以自行查閱API。
4.3 ThreadPoolExecutor邏輯結(jié)構(gòu)
ThreadPoolExecutor線程池的邏輯結(jié)構(gòu):
第一步、初始的poolSize < corePoolSize,提交的runnable任務(wù),會直接做為new一個Thread的參數(shù),立馬執(zhí)行
第二步、當(dāng)提交的任務(wù)數(shù)超過了corePoolSize,就進入了第二步操作。會將當(dāng)前的runable提交到一個block queue中
第三步,如果block queue是個有界隊列,當(dāng)隊列滿了之后就進入了第三步。如果poolSize < maximumPoolsize時,會嘗試new 一個Thread的進行救急處理,立馬執(zhí)行對應(yīng)的runnable任務(wù)
第四步,如果第三步救急方案也無法處理了,就會走到第四步執(zhí)行reject操作
五.Executor線程池實例
比如我們要并發(fā)調(diào)用遠(yuǎn)程http接口:
5.1、使用Runnable接口創(chuàng)建線程,并由executor調(diào)度執(zhí)行:
import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; public class TExecutor { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 5; i++) { RunnableTask runnable = new RunnableTask("http://xxxx.com/api/v2", Integer.toString(i)); executorService.execute(runnable); } System.out.println("線程任務(wù)開始執(zhí)行"); executorService.shutdown(); } /** * 線程池提交Runnable任務(wù) */ public void runnableTask() { ExecutorService executorService = Executors.newSingleThreadExecutor(); Future<?> futureTask = null; for (int i = 0; i < 5; i++) { RunnableTask runnable = new RunnableTask("http://xxxx.com/api/v2", Integer.toString(1)); futureTask = executorService.submit(runnable); try { futureTask.get(10, TimeUnit.SECONDS); //等待超時,等待給定的時間之后,獲取其結(jié)果 } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); } finally{ futureTask.cancel(true); } } executorService.shutdown(); } } class RunnableTask implements Runnable { String url; String param; public RunnableTask(String url, String param) { this.url = url; this.param = param; } //http調(diào)用 public String requestHttp() { String result = ""; return result; } @Override public void run() { System.out.println("正在執(zhí)行task "+param); try { requestHttp(); } catch (Exception e) { e.printStackTrace(); } System.out.println("task "+param+"執(zhí)行完畢"); } }
5.2、使用Callable接口創(chuàng)建線程,并由executor調(diào)度執(zhí)行:
import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.TimeUnit; public class TestPoolThread { private ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100)); public String main(String gid) { List<Callable<String>> tasks = new ArrayList<>(); tasks.add(new CallableTask("http://xxxx.com/api/v2", "1")); tasks.add(new CallableTask("http://xxxx.com/api/v2", "2")); try { List<Future<String>> mFuture = executorService.invokeAll(tasks, 10, TimeUnit.SECONDS); String result1 = mFuture.get(0).get(); System.out.println( "result1:" + result1); String result2 = mFuture.get(0).get(); System.out.println( "result2:" + result2); } catch (Exception e) { e.printStackTrace(); } return ""; } } class CallableTask implements Callable<String> { String url; String param; public CallableTask(String url, String param) { this.url = url; this.param = param; } //http調(diào)用 public String requestHttp() { String result = ""; return result; } @Override public String call() { try { String param = "{}"; String result = requestHttp( ); return result; } catch (Exception e) { e.printStackTrace(); return ""; } } }
不過在java doc中,并不提倡我們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態(tài)方法來創(chuàng)建線程池:
Executors.newCachedThreadPool : 創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
Executors.newCachedThreadPool :創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
Executors.newFixedThreadPool :創(chuàng)建一個定長線程池,可控制線程最大并發(fā)數(shù),超出的線程會在隊列中等待。
Executors.newScheduledThreadPool :創(chuàng)建一個定長線程池,支持定時及周期性任務(wù)執(zhí)行。
Executors.newSingleThreadExecutor :創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務(wù),保證所有任務(wù)按照指定順序(FIFO, LIFO, 優(yōu)先級)執(zhí)行。
靜態(tài)方法的具體實現(xiàn):
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
從它們的具體實現(xiàn)來看,它們實際上也是調(diào)用了ThreadPoolExecutor,只不過參數(shù)都已配置好了。
newFixedThreadPool創(chuàng)建的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newSingleThreadExecutor將corePoolSize和maximumPoolSize都設(shè)置為1,也使用的LinkedBlockingQueue;
newCachedThreadPool將corePoolSize設(shè)置為0,將maximumPoolSize設(shè)置為Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務(wù)就創(chuàng)建線程運行,當(dāng)線程空閑超過60秒,就銷毀線程。
實際中,如果Executors提供的三個靜態(tài)方法能滿足要求,就盡量使用它提供的三個方法,因為自己去手動配置ThreadPoolExecutor的參數(shù)有點麻煩,要根據(jù)實際任務(wù)的類型和數(shù)量來進行配置。
另外,如果ThreadPoolExecutor達不到要求,可以自己繼承ThreadPoolExecutor類進行重寫。
六.深入剖析線程池實現(xiàn)原理
在上一節(jié)我們從宏觀上介紹了ThreadPoolExecutor,下面我們來深入解析一下線程池的具體實現(xiàn)原理,將從下面幾個方面講解:
1.線程池狀態(tài)
2.線程池中的線程初始化
3.任務(wù)的執(zhí)行
4.任務(wù)緩存隊列及排隊策略
5.任務(wù)拒絕策略
6.線程池的關(guān)閉
7.線程池容量的動態(tài)調(diào)整
6.1、線程池狀態(tài)
在ThreadPoolExecutor中定義了一個volatile變量,另外定義了幾個static final變量表示線程池的各個狀態(tài):
volatile int runState; static final int RUNNING = 0; static final int SHUTDOWN = 1; static final int STOP = 2; static final int TERMINATED = 3;
runState表示當(dāng)前線程池的狀態(tài),它是一個volatile變量用來保證線程之間的可見性;下面的幾個static final變量表示runState可能的幾個取值。
當(dāng)創(chuàng)建線程池后,初始時,線程池處于RUNNING狀態(tài);
如果調(diào)用了shutdown()方法,則線程池處于SHUTDOWN狀態(tài),此時線程池不能夠接受新的任務(wù),它會等待所有任務(wù)執(zhí)行完畢;
如果調(diào)用了shutdownNow()方法,則線程池處于STOP狀態(tài),此時線程池不能接受新的任務(wù),并且會去嘗試終止正在執(zhí)行的任務(wù);
當(dāng)線程池處于SHUTDOWN或STOP狀態(tài),并且所有工作線程已經(jīng)銷毀,任務(wù)緩存隊列已經(jīng)清空或執(zhí)行結(jié)束后,線程池被設(shè)置為TERMINATED狀態(tài)。
6.2、線程池中的線程初始化
默認(rèn)情況下,創(chuàng)建線程池之后,線程池中是沒有線程的,需要提交任務(wù)之后才會創(chuàng)建線程。
在實際中如果需要線程池創(chuàng)建之后立即創(chuàng)建線程,可以通過以下兩個方法辦到:
- prestartCoreThread():初始化一個核心線程;
- prestartAllCoreThreads():初始化所有核心線程
下面是這2個方法的實現(xiàn):
public boolean prestartCoreThread() { return addIfUnderCorePoolSize(null); //注意傳進去的參數(shù)是null } public int prestartAllCoreThreads() { int n = 0; while (addIfUnderCorePoolSize(null))//注意傳進去的參數(shù)是null,等待任務(wù)隊列中有任務(wù) ++n; return n; }
6.3、任務(wù)的執(zhí)行
任務(wù)提交基本邏輯如下:
如果當(dāng)前線程池中的線程poolSize數(shù)目小于corePoolSize,則每來一個任務(wù),就會創(chuàng)建一個線程去執(zhí)行這個任務(wù);
如果當(dāng)前線程池中的線程poolSize數(shù)目>=corePoolSize,則每來一個任務(wù),會嘗試將其添加到任務(wù)緩存隊列當(dāng)中,若添加成功,則該任務(wù)會等待空閑線程將其取出去執(zhí)行;若添加失?。ㄒ话銇碚f是任務(wù)緩存隊列已滿),則會嘗試創(chuàng)建新的線程去執(zhí)行這個任務(wù);
如果當(dāng)前線程池中的線程poolSize數(shù)目達到maximumPoolSize,則會采取任務(wù)拒絕策略進行處理;如果線程池中的線程poolSize數(shù)量大于 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止,直至線程池中的線程數(shù)目不大于corePoolSize;
如果允許為核心池中的線程設(shè)置存活時間,那么核心池中的線程空閑時間超過keepAliveTime,線程也會被終止。
下面我們分析一下從任務(wù)提交給線程池到任務(wù)執(zhí)行完畢整個過程。
我們先來看一下ThreadPoolExecutor類中其他的一些比較重要成員變量:
private final BlockingQueue<Runnable> workQueue; //任務(wù)緩存隊列,用來存放等待執(zhí)行的任務(wù) private final ReentrantLock mainLock = new ReentrantLock(); //線程池的主要狀態(tài)鎖,對線程池狀態(tài)(比如線程池大小 //、runState等)的改變都要使用這個鎖 private final HashSet<Worker> workers = new HashSet<Worker>(); //用來存放工作集 private volatile long keepAliveTime; //線程存活時間 private volatile boolean allowCoreThreadTimeOut; //是否允許為核心線程設(shè)置存活時間 private volatile int corePoolSize; //核心池的大?。淳€程池中的線程數(shù)目大于這個參數(shù)時,提交的任務(wù)會被放進任務(wù)緩存隊列) private volatile int maximumPoolSize; //線程池最大能容忍的線程數(shù) private volatile int poolSize; //線程池中當(dāng)前的線程數(shù) private volatile RejectedExecutionHandler handler; //任務(wù)拒絕策略 private volatile ThreadFactory threadFactory; //線程工廠,用來創(chuàng)建線程 private int largestPoolSize; //用來記錄線程池中曾經(jīng)出現(xiàn)過的最大線程數(shù),跟線程池的容量沒有任何關(guān)系 private long completedTaskCount; //用來記錄已經(jīng)執(zhí)行完畢的任務(wù)個數(shù)
每個變量的作用都已經(jīng)標(biāo)明出來了。
下面我們我們看看任務(wù)從提交到最終執(zhí)行完畢經(jīng)歷了哪些過程。
在ThreadPoolExecutor類中,最核心的任務(wù)提交方法是execute()方法,雖然通過submit也可以提交任務(wù),但是實際上submit方法里面最終調(diào)用的還是execute()方法,所以我們只需要研究execute()方法的實現(xiàn)原理即可:
在源碼添加相關(guān)注解來解釋邏輯了:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //如果線程池中當(dāng)前線程數(shù)不小于核心池大小,創(chuàng)建線程來執(zhí)行任務(wù) if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } } /** * 簡單拆解execute: public void execute(Runnable command) { if (command == null) throw new NullPointerException(); //如果線程池中當(dāng)前線程數(shù)不小于核心池大小, if (poolSize >= corePoolSize ) { //創(chuàng)建新線程來執(zhí)行任務(wù) boolean isCreate = addIfUnderCorePoolSize(command); if (!isCreate) { //創(chuàng)建線程失?。杭磒oolSize>=corePoolSize或者runState不等于RUNNING) if (runState == RUNNING && workQueue.offer(command)) {//如果當(dāng)前線程池處于RUNNING狀態(tài),則將任務(wù)放入任務(wù)緩存隊列 if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command);//保證 添加到任務(wù)緩存隊列中的任務(wù)得到處理 } //創(chuàng)建新線程來執(zhí)行任務(wù) boolean isUnderMaximumPoolSize = addIfUnderMaximumPoolSize(command); if (!isUnderMaximumPoolSize) reject(command); // is shutdown or saturated } } //創(chuàng)建線程來執(zhí)行任務(wù) } } */
我們看看2個關(guān)鍵方法的實現(xiàn):addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:
1、addIfUnderCorePoolSize:從名字可以看出意思就是當(dāng)線程池的線程低于corePoolSize大小時創(chuàng)建線程執(zhí)行任務(wù)。
2、addIfUnderMaximumPoolSize方法的實現(xiàn)和addIfUnderCorePoolSize方法的實現(xiàn)思想非常相似,唯一的區(qū)別在于addIfUnderMaximumPoolSize方法是在線程池中的線程數(shù)達到了核心池大小并且往任務(wù)隊列中添加任務(wù)失敗的情況下執(zhí)行
/** * 當(dāng)線程池的線程低于corePoolSize大小時創(chuàng)建線程執(zhí)行任務(wù) * @param firstTask * @return */ private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; //首先獲取到鎖 final ReentrantLock mainLock = this.mainLock; //通過加鎖保證創(chuàng)建的線程是唯一 mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); //創(chuàng)建線程去執(zhí)行firstTask任務(wù) } finally { mainLock.unlock(); } if (t == null)//創(chuàng)建線程失敗 return false; t.start();//啟動線程 return true; } /** * 和addIfUnderCorePoolSize方法的實現(xiàn)思想非常相似 * @param firstTask * @return */ private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
創(chuàng)建新線程:首先用提交的任務(wù)創(chuàng)建了一個Worker對象,然后調(diào)用線程工廠threadFactory創(chuàng)建了一個新的線程t,然后將線程t的引用賦值給了Worker對象的成員變量thread,接著通過workers.add(w)將Worker對象添加到工作集當(dāng)中
/** * 創(chuàng)建新線程 * @param firstTask * @return */ private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); //創(chuàng)建一個線程,執(zhí)行任務(wù),實際就是Thread t = new Thread(w); Thread t = threadFactory.newThread(w); if (t != null) { w.thread = t; //將創(chuàng)建的線程的引用賦值為w的成員變量 workers.add(w); int nt = ++poolSize; //當(dāng)前線程數(shù)加1 if (nt > largestPoolSize) largestPoolSize = nt; } return t; }
Worker執(zhí)行任務(wù): Worker實現(xiàn)了Runnable接口,最核心的方法是run()方法:
private final class Worker implements Runnable { private final ReentrantLock runLock = new ReentrantLock(); private Runnable firstTask; volatile long completedTasks; Thread thread; Worker(Runnable firstTask) { this.firstTask = firstTask; } boolean isActive() { return runLock.isLocked(); } void interruptIfIdle() { final ReentrantLock runLock = this.runLock; if (runLock.tryLock()) { try { if (thread != Thread.currentThread()) thread.interrupt(); } finally { runLock.unlock(); } } } void interruptNow() { thread.interrupt(); } private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { if (runState < STOP && Thread.interrupted() && runState >= STOP) boolean ran = false; beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現(xiàn),用戶可以根據(jù) //自己需要重載這個方法和后面的afterExecute方法來進行一些統(tǒng)計信息,比如某個任務(wù)的執(zhí)行時間等 try { task.run(); ran = true; afterExecute(task, null); ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } } /** * Worker實現(xiàn)了Runnable接口 */ public void run() { try { Runnable task = firstTask; firstTask = null; //從ThreadPoolExecutor任務(wù)緩存隊列里面去取 while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); //當(dāng)任務(wù)隊列中沒有任務(wù)時,進行清理工作 } } } }
6.4、任務(wù)緩存隊列及排隊策略
workQueue任務(wù)緩存隊列是用來存放等待執(zhí)行的任務(wù)。
workQueue的類型為BlockingQueue<Runnable>,通??梢匀∠旅嫒N類型:
1)ArrayBlockingQueue:基于數(shù)組的先進先出隊列,此隊列創(chuàng)建時必須指定大??;
2)LinkedBlockingQueue:基于鏈表的先進先出隊列,如果創(chuàng)建時沒有指定此隊列大小,則默認(rèn)為Integer.MAX_VALUE;
3)synchronousQueue:這個隊列比較特殊,它不會保存提交的任務(wù),而是將直接新建一個線程來執(zhí)行新來的任務(wù)。
6.5、任務(wù)拒絕策略
當(dāng)線程池的任務(wù)緩存隊列已滿并且線程池中的線程數(shù)目達到maximumPoolSize,如果還有任務(wù)到來就會采取任務(wù)拒絕策略,通常有以下四種策略:
ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程) ThreadPoolExecutor.CallerRunsPolicy:由調(diào)用線程處理該任務(wù)
6.6、線程池的關(guān)閉
ThreadPoolExecutor提供了兩個方法,用于線程池的關(guān)閉,分別是shutdown()和shutdownNow(),其中:
- shutdown():不會立即終止線程池,而是要等所有任務(wù)緩存隊列中的任務(wù)都執(zhí)行完后才終止,但再也不會接受新的任務(wù)shutdownNow():立即終止線程池,并嘗試打斷正在執(zhí)行的任務(wù),并且清空任務(wù)緩存隊列,返回尚未執(zhí)行的任務(wù)
6.7、線程池容量的動態(tài)調(diào)整
ThreadPoolExecutor提供了動態(tài)調(diào)整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
setCorePoolSize:設(shè)置核心池大小setMaximumPoolSize:設(shè)置線程池最大能創(chuàng)建的線程數(shù)目大小
當(dāng)上述參數(shù)從小變大時,ThreadPoolExecutor進行線程賦值,還可能立即創(chuàng)建新的線程來執(zhí)行任務(wù)。
六、線程池的配置策略
要想合理的配置線程池,就必須首先分析任務(wù)特性,可以從以下幾個角度來進行分析:
- 任務(wù)的性質(zhì):CPU密集型任務(wù),IO密集型任務(wù)和混合型任務(wù)。
- 任務(wù)的優(yōu)先級:高,中和低。
- 任務(wù)的執(zhí)行時間:長,中和短。
- 任務(wù)的依賴性:是否依賴其他系統(tǒng)資源,如數(shù)據(jù)庫連接。
任務(wù)性質(zhì)不同的任務(wù)可以用不同規(guī)模的線程池分開處理。
CPU密集型任務(wù):配置盡可能少的線程數(shù)量,如配置Ncpu+1個線程的線程池。cpu密集型任務(wù)需要大量的運算,而且沒有阻塞,需要CPU一直全速運行,CPU密集任務(wù)只有在真正的多核CPU上才可能得到加速??梢允褂肦untime.availableProcessors方法獲取可用處理器的個數(shù)。
一般計算公式:CPU核數(shù) + 1個線程的線程池
ExecutorService THREAD_POOL = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(Runtime.getRuntime().availableProcessors() * 20), new ThreadPoolExecutor.CallerRunsPolicy());
《并發(fā)編程實戰(zhàn)》一書中對于IO密集型任務(wù)建議線程池大小設(shè)為Ncpu+1Ncpu+1,原因是當(dāng)計算密集型線程偶爾由于頁缺失故障或其他原因而暫停時,這個“額外的”線程也能確保這段時間內(nèi)的CPU始終周期不會被浪費。
對于計算密集型任務(wù),不要創(chuàng)建過多的線程,由于線程有執(zhí)行棧等內(nèi)存消耗,創(chuàng)建過多的線程不會加快計算速度,反而會消耗更多的內(nèi)存空間;另一方面線程過多,頻繁切換線程上下文也會影響線程池的性能。
IO密集型任務(wù):IO操作包括讀寫磁盤文件、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)請求等阻塞操作,執(zhí)行這些操作,線程將處于等待狀態(tài)。
io密集型由于需要等待IO操作,線程并不是一直在執(zhí)行任務(wù),則配置盡可能多的線程,如2*Ncpu。即該任務(wù)需要大量的IO,即大量的阻塞,這種類型分以下兩種情況設(shè)置
1、有等待io任務(wù):如果IO密集型任務(wù)線程并非一直在執(zhí)行任務(wù),則應(yīng)配置盡可能多的線程,如CPU核數(shù) * 2
2、需要一直運行的任務(wù):參考公式:CPU核數(shù) /(1 - 阻塞系數(shù) ) 阻塞系數(shù)在0.8~0.9之間
比如:8核CPU:8/(1 - 0.9) = 80個線程數(shù)
用sleep方式模擬IO阻塞:
public class IOThreadPoolTest { // 使用無限線程數(shù)的CacheThreadPool線程池 static ThreadPoolExecutor cachedThreadPool = (ThreadPoolExecutor) Executors.newCachedThreadPool(); static List<Callable<Object>> tasks; // 仍然是5000個任務(wù) static int taskNum = 5000; static { tasks = new ArrayList<>(taskNum); for (int i = 0; i < taskNum; i++) { tasks.add(Executors.callable(new IOTask())); } } public static void main(String[] args) throws InterruptedException { cachedThreadPool.invokeAll(tasks);// warm up all thread testExecutor(cachedThreadPool, tasks); // 看看執(zhí)行過程中創(chuàng)建了多少個線程 int largestPoolSize = cachedThreadPool.getLargestPoolSize(); System.out.println("largestPoolSize:" + largestPoolSize); cachedThreadPool.shutdown(); } private static void testExecutor(ExecutorService executor, List<Callable<Object>> tasks) throws InterruptedException { long start = System.currentTimeMillis(); executor.invokeAll(tasks); long end = System.currentTimeMillis(); System.out.println(end - start); } static class IOTask implements Runnable { @Override public void run() { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } }
這里使用無線程數(shù)限制的CachedThreadPool線程池,也就是說這里的5000個任務(wù)會被5000個線程同時處理,由于所有的線程都只是阻塞而不消耗CPU資源,所以5000個任務(wù)在不到2秒的時間內(nèi)就執(zhí)行完了。
很明顯使用CachedThreadPool能有效提高IO密集型任務(wù)的吞吐量,而且由于CachedThreadPool中的線程會在空閑60秒自動回收,所以不會消耗過多的資源。
但是打開任務(wù)管理器你會發(fā)現(xiàn)執(zhí)行任務(wù)的同時內(nèi)存會飆升到接近400M,因為每個線程都消耗了一部分內(nèi)存,在5000個線程創(chuàng)建之后,內(nèi)存消耗達到了峰值。
所以使用CacheThreadPool的時候應(yīng)該避免提交大量長時間阻塞的任務(wù),以防止內(nèi)存溢出;另一種替代方案是,使用固定大小的線程池,并給一個較大的線程數(shù)(不會內(nèi)存溢出),同時為了在空閑時節(jié)省內(nèi)存資源,調(diào)用allowCoreThreadTimeOut允許核心線程超時。
線程執(zhí)行棧的大小可以通過-Xss*size*或-XX:ThreadStackSize參數(shù)調(diào)整
混合型的任務(wù):如果可以拆分,則將其拆分成一個CPU密集型任務(wù)和一個IO密集型任務(wù),只要這兩個任務(wù)執(zhí)行的時間相差不是太大,那么分解后執(zhí)行的吞吐率要高于串行執(zhí)行的吞吐率,如果這兩個任務(wù)執(zhí)行時間相差太大,則沒必要進行分解。我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當(dāng)前設(shè)備的CPU個數(shù)。
混合型任務(wù)要根據(jù)任務(wù)等待阻塞時間與CPU計算時間的比重來決定線程數(shù)量:
threads=cores1–blockingCoefficient=cores∗(1+waitTimecomputeTime)
threads=cores1–blockingCoefficient=cores∗(1+waitTimecomputeTime)
比如一個任務(wù)包含一次數(shù)據(jù)庫讀寫(0.1ms),并在內(nèi)存中對讀取的數(shù)據(jù)進行分組過濾等操作(5μs),那么線程數(shù)應(yīng)該為80左右。
線程數(shù)與阻塞比例的關(guān)系圖大致如下:
當(dāng)阻塞比例為0,也就是純計算任務(wù),線程數(shù)等于核心數(shù)(這里是4);阻塞比例越大,線程池的線程數(shù)應(yīng)該更多。
《Java并發(fā)編程實戰(zhàn)》中最原始的公式是這樣的:
>Nthreads=Ncpu∗Ucpu∗(1+WC)>
>Nthreads=Ncpu∗Ucpu∗(1+WC)>
NcpuNcpu代表CPU的個數(shù),UcpuUcpu代表CPU利用率的期望值(0<Ucpu<10<Ucpu<1),WCWC仍然是等待時間與計算時間的比例。
我上面提供的公式相當(dāng)于目標(biāo)CPU利用率為100%。
通常系統(tǒng)中不止一個線程池,所以實際配置線程數(shù)應(yīng)該將目標(biāo)CPU利用率計算進去。
優(yōu)先級不同的任務(wù)可以使用優(yōu)先級隊列PriorityBlockingQueue來處理。它可以讓優(yōu)先級高的任務(wù)先得到執(zhí)行,需要注意的是如果一直有優(yōu)先級高的任務(wù)提交到隊列里,那么優(yōu)先級低的任務(wù)可能永遠(yuǎn)不能執(zhí)行。
執(zhí)行時間不同的任務(wù)可以交給不同規(guī)模的線程池來處理,或者也可以使用優(yōu)先級隊列,讓執(zhí)行時間短的任務(wù)先執(zhí)行。
依賴數(shù)據(jù)庫連接池的任務(wù),因為線程提交SQL后需要等待數(shù)據(jù)庫返回結(jié)果,如果等待的時間越長CPU空閑時間就越長,那么線程數(shù)應(yīng)該設(shè)置越大,這樣才能更好的利用CPU。
建議使用有界隊列,有界隊列能增加系統(tǒng)的穩(wěn)定性和預(yù)警能力,可以根據(jù)需要設(shè)大一點,比如幾千。有一次我們組使用的后臺任務(wù)線程池的隊列和線程池全滿了,不斷的拋出拋棄任務(wù)的異常,通過排查發(fā)現(xiàn)是數(shù)據(jù)庫出現(xiàn)了問題,導(dǎo)致執(zhí)行SQL變得非常緩慢,因為后臺任務(wù)線程池里的任務(wù)全是需要向數(shù)據(jù)庫查詢和插入數(shù)據(jù)的,所以導(dǎo)致線程池里的工作線程全部阻塞住,任務(wù)積壓在線程池里。如果當(dāng)時我們設(shè)置成無界隊列,線程池的隊列就會越來越多,有可能會撐滿內(nèi)存,導(dǎo)致整個系統(tǒng)不可用,而不只是后臺任務(wù)出現(xiàn)問題。當(dāng)然我們的系統(tǒng)所有的任務(wù)是用的單獨的服務(wù)器部署的,而我們使用不同規(guī)模的線程池跑不同類型的任務(wù),但是出現(xiàn)這樣問題時也會影響到其他任務(wù)。
總結(jié):線程池的大小取決于任務(wù)的類型以及系統(tǒng)的特性,避免“過大”和“過小”兩種極端。線程池過大,大量的線程將在相對更少的CPU和有限的內(nèi)存資源上競爭,這不僅影響并發(fā)性能,還會因過高的內(nèi)存消耗導(dǎo)致OOM;線程池過小,將導(dǎo)致處理器得不到充分利用,降低吞吐率。
要想正確的設(shè)置線程池大小,需要了解部署的系統(tǒng)中有多少個CPU,多大的內(nèi)存,提交的任務(wù)是計算密集型、IO密集型還是兩者兼有。
七、線程池創(chuàng)建注意
阿里巴巴編碼規(guī)范里面提到:
線程池最好不要使用Executors去創(chuàng)建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學(xué)更加明確線程池的運行規(guī)則,規(guī)避資源耗盡的風(fēng)險。 說明:Executors各個方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
主要問題是堆積的請求處理隊列可能會耗費非常大的內(nèi)存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
主要問題是線程數(shù)最大數(shù)是Integer.MAX_VALUE,可能會創(chuàng)建數(shù)量非常多的線程,甚至OOM。
如果使用阿里巴巴的java編程規(guī)范插件,就會有這個提示:
ExecutorService newFixedThreadPool(int nThreads):固定大小線程池。
源碼可以看到,corePoolSize和maximumPoolSize的大小是一樣的(如果使用無界queue的話maximumPoolSize參數(shù)是沒有意義的),keepAliveTime和unit的設(shè)值表名什么?-就是該實現(xiàn)不想keep alive!最后的BlockingQueue選擇了LinkedBlockingQueue,該queue有一個特點,他是無界的。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
ExecutorService newSingleThreadExecutor():單線程。
可以看到,與fixedThreadPool很像,只不過fixedThreadPool中的入?yún)⒅苯油嘶癁?
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
ExecutorService newCachedThreadPool():無界線程池,可以進行自動線程回收。
這個實現(xiàn)就有意思了。首先是無界的線程池,所以我們可以發(fā)現(xiàn)maximumPoolSize為big big。其次BlockingQueue的選擇上使用SynchronousQueue??赡軐τ谠揃lockingQueue有些陌生,簡單說:該QUEUE中,每個插入操作必須等待另一個
線程的對應(yīng)移除操作。比如,我先添加一個元素,接下來如果繼續(xù)想嘗試添加則會阻塞,直到另一個線程取走一個元素,反之亦然。(想到什么?就是緩沖區(qū)為1的生產(chǎn)者消費者模式^_^)
注意到介紹中的自動回收線程的特性嗎,為什么呢?先不說,但注意到該實現(xiàn)中corePoolSize和maximumPoolSize的大小不同。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
總結(jié):
- ThreadPoolExecutor的使用還是很有技巧的。
- 使用無界queue可能會耗盡系統(tǒng)資源。
- 使用有界queue可能不能很好的滿足性能,需要調(diào)節(jié)線程數(shù)和queue大小
- 線程數(shù)自然也有開銷,所以需要根據(jù)不同應(yīng)用進行調(diào)節(jié)。
通常來說對于靜態(tài)任務(wù)可以歸為:
- 數(shù)量大,但是執(zhí)行時間很短
- 數(shù)量小,但是執(zhí)行時間較長
- 數(shù)量又大執(zhí)行時間又長
- 除了以上特點外,任務(wù)間還有些內(nèi)在關(guān)系
總結(jié)2:
- 對于需要保證所有提交的任務(wù)都要被執(zhí)行的情況,使用FixedThreadPool
- 如果限定只能使用一個線程進行任務(wù)處理,使用SingleThreadExecutor
- 如果希望提交的任務(wù)盡快分配線程執(zhí)行,使用CachedThreadPool
- 如果業(yè)務(wù)上允許任務(wù)執(zhí)行失敗,或者任務(wù)執(zhí)行過程可能出現(xiàn)執(zhí)行時間過長進而影響其他業(yè)務(wù)的應(yīng)用場景,可以通過使用限定線程數(shù)量的線程池以及限定長度的隊列進行容錯處理。
到此這篇關(guān)于java多線程和線程池的文章就介紹到這了,更多相關(guān)java多線程和線程池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot部署到Linux讀取resources下的文件及遇到的坑
本文主要給大家介紹SpringBoot部署到Linux讀取resources下的文件,在平時業(yè)務(wù)開發(fā)過程中,很多朋友在獲取到文件內(nèi)容亂碼或者文件讀取不到的問題,今天給大家分享小編遇到的坑及處理方案,感興趣的朋友跟隨小編一起看看吧2021-06-06教你如何使用Java8實現(xiàn)菜單樹形數(shù)據(jù)
今天給大家?guī)淼氖顷P(guān)于JAVA的相關(guān)知識,文中圍繞著如何使用Java8實現(xiàn)菜單樹形數(shù)據(jù)展開,文中有非常詳細(xì)的介紹及代碼示例,需要的朋友可以參考下2021-06-06Java常用正則表達式驗證工具類RegexUtils.java
相信大家對正則表達式一定都有所了解和研究,這篇文章主要為大家分享了Java 表單注冊常用正則表達式驗證工具類,具有一定的參考價值,感興趣的小伙伴們可以參考一下2016-11-11詳解Java中IO字節(jié)流基本操作(復(fù)制文件)并測試性能
這篇文章主要介紹了Java中IO字節(jié)流基本操作(復(fù)制文件)并測試性能,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-04-045分鐘讓你快速掌握java8 stream常用開發(fā)技巧
這篇文章主要給大家介紹了關(guān)于java8 stream常用開發(fā)技巧的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12