一文了解Java?線程池的正確使用姿勢
概述
線程池在平時的工作中出場率非常高,基本大家多多少少都要了解過,可能不是很全面,本文和大家基于jdk8學習下線程池的全面使用,以及分享下使用過程中遇到的一些坑。
線程池介紹
因為線程資源十分寶貴,每次創(chuàng)建和銷毀線程的開銷都比較大,另一方面,如果創(chuàng)建太多的線程,也會消耗系統(tǒng)大量資源,降低系統(tǒng)吞吐量,甚至導致服務不可用。為了解決這些問題,提出一種基于池化思想管理和使用線程的機制,就是我們的線程池。
線程池的核心思想就是能做到線程的復用,線程池中的線程執(zhí)行完成不會銷毀,而是存留在內(nèi)存里,等待執(zhí)行其他的任務。
jdk中的線程池采用的是一種生產(chǎn)者—消費者模型,如下圖:
- 外部提交任務到線程池中,如果線程數(shù)量小于指定閾值的話,直接創(chuàng)建線程
- 如果提交任務大于閾值,會存到隊列中
- 線程池中的工作線程執(zhí)行前面的任務完成后,不會銷毀,而是去從隊列中獲取任務,繼續(xù)執(zhí)行。
線程池創(chuàng)建
線程池提供如下2種方式創(chuàng)建方式:
ThreadPoolExecutor創(chuàng)建
下面是線程池類ThreadPoolExecutor最全參數(shù)的構造函數(shù)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
1.corePoolSize
: 核心線程數(shù),線程池中始終存活的線程數(shù)量
2.maximumPoolSize
:最大線程數(shù),線程池中容納最大的線程數(shù)量,這里引入一個"救急線程"的概念,可以想象為"臨時工",它的數(shù)量=maximumPoolSize
-corePoolSize
,這部分線程會超過一定時間后銷毀。
3.keepAliveTime
:"救急線程"的可以存活的時間,當超過這段時間這些線程沒有任務執(zhí)行,就會被回收。
4.unit
:單位,和keepAliveTime配合使用。
5.workQueue
: 阻塞隊列,用來存儲提交的多余的任務,等待工作線程執(zhí)行完畢后獲取,它有下面7個類型:
參數(shù) | 描述 |
---|---|
ArrayBlockingQueue | 一個由數(shù)組結構組成的有界阻塞隊列。 |
LinkedBlockingQueue | 一個由鏈表結構組成的有界阻塞隊列。 |
SynchronousQueue | 一個不存儲元素的阻塞隊列,即直接提交給線程不保持它們。 |
PriorityBlockingQueue | 一個支持優(yōu)先級排序的無界阻塞隊列。 |
DelayQueue | 一個使用優(yōu)先級隊列實現(xiàn)的無界阻塞隊列,只有在延遲期滿時才能從中提取元素。 |
LinkedTransferQueue | 一個由鏈表結構組成的無界阻塞隊列。與SynchronousQueue類似,還含有非阻塞方法。 |
LinkedBlockingDeque | 一個由鏈表結構組成的雙向阻塞隊列。 |
6.threadFactory
: 線程工廠,用于創(chuàng)建線程,可以指定線程名。
7.handler
: 拒絕策略,如果任務超限時執(zhí)行的策略,內(nèi)置了4種可選,默認AbortPolicy
,也可以自定義。
參數(shù) | 描述 |
---|---|
AbortPolicy | 拒絕并拋出異常。 |
CallerRunsPolicy | 重試提交當前的任務,即再次調(diào)用運行該任務的execute()方法。 |
DiscardOldestPolicy | 拋棄隊列頭部(最舊)的一個任務,并執(zhí)行當前任務。 |
DiscardPolicy | 拋棄當前任務。 |
通過這些參數(shù)創(chuàng)建好線程后,提交一個線程的執(zhí)行流程圖如下:
- 當線程數(shù)小于核心線程數(shù)時,創(chuàng)建線程。
- 當線程數(shù)大于等于核心線程數(shù),且任務隊列未滿時,將任務放入任務隊列。
- 當線程數(shù)大于等于核心線程數(shù),且任務隊列已滿, 若線程數(shù)小于最大線程數(shù),創(chuàng)建救急線程,否則執(zhí)行拒絕策略。
Executors創(chuàng)建
由于上面線程池的構造方法比較復雜,jdk也為我們提供了一種便利的方式,通過Executors工廠創(chuàng)建多種不同的線程池。
newFixedThreadPool
創(chuàng)建一個固定大小的線程池
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
- 核心線程數(shù)等于最大線程數(shù)
- 阻塞隊列是無界的,可以不限制任務數(shù)量,可能會因為任務太多OOM
- 使用默認的線程工廠和拒絕策略
- 適用于任務量已知、相對耗時的任務
newCachedThreadPool
創(chuàng)建一個核心線程為0,最大線程數(shù)不限的線程池
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
- 核心線程數(shù)為0,最大線程數(shù)不限制,來一個任務就會創(chuàng)建一個線程,過一段時間會銷毀,這樣可能會導致線程過多而導致系統(tǒng)資源耗盡。
- 隊列采用了 SynchronousQueue 實現(xiàn),它沒有容量,沒有線程來取是放不進去的(一手交錢、一手交貨)。
- 適合任務數(shù)比較密集,但每個任務執(zhí)行時間較短的情況。
newSingleThreadExecutor
創(chuàng)建只有一個線程的線程池。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
- 核心線程數(shù)和最大線程數(shù)都為1,任務數(shù)多于 1 時,會放入無界隊列排隊。
- 適用于只有一個任務執(zhí)行情況。
問題: newSingleThreadExecutor和newFixedThreadPool(1)區(qū)別是什么呢?
newSingleThreadExecutor中創(chuàng)建的線程通過FinalizableDelegatedExecutorService 實現(xiàn),采用裝飾器模式,只對外暴露了 ExecutorService 接口,后續(xù)也無法修改線程池的大小。而Executors.newFixedThreadPool(1) 初始時為1,以后還可以修改,對外暴露的是 ThreadPoolExecutor 對象,可以強轉(zhuǎn)后調(diào)用 setCorePoolSize 等方法進行修改。
newScheduledThreadPool
創(chuàng)建可以執(zhí)行延遲任務的線程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
適用于一些延遲執(zhí)行的調(diào)度任務
newWorkStealingPool
這是jdk8引入的一種方式,創(chuàng)建一個搶占式執(zhí)行的線程池(任務執(zhí)行順序不確定)。
public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
注意這個不是基于ThreadPoolExecutor 創(chuàng)建出來,而是基于ForkJoinPool 擴展,將任務按照工作線程均分。然后先工作完的線程去幫助沒處理完的線程工作。以實現(xiàn)最快完成工作。
適合處理很耗的任務。
線程池關鍵API和例子
提交執(zhí)行任務API
void execute(Runnable command)
提交執(zhí)行Runnable任務,無返回值
Future<T> submit(Callable<T> task)
提交任務 callable任務,用返回值 Future 獲得任務執(zhí)行結果,主線程可以執(zhí)行 FutureTask.get()
方法來阻塞等待任務執(zhí)行完成。
Future<?> submit(Runnable task)
提交Runnable任務,用返回值 Future 獲得任務執(zhí)行結果,主線程可以執(zhí)行 FutureTask.get()
方法來阻塞等待任務執(zhí)行完成。
Future<T> submit(Runnable task, T result)
提交Runnable任務,用返回值 Future 獲得任務執(zhí)行結果,返回傳入的result, 主線程可以執(zhí)行 FutureTask.get()
方法來阻塞等待任務執(zhí)行完成。
List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
批量提交Callable任務,用返回值 Future 獲得任務執(zhí)行結果,主線程阻塞等待任務執(zhí)行完成。
List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
帶超時時間的批量提交Callable任務,用返回值 Future 獲得任務執(zhí)行結果,主線程阻塞等待任務執(zhí)行完成或者過了超時時間。
T invokeAny(Collection<? extends Callable<T>> tasks)
提交 tasks 中所有任務,哪個任務先成功執(zhí)行完畢,返回此任務執(zhí)行結果,其它任務取消
T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
提交 tasks 中所有任務,哪個任務先成功執(zhí)行完畢,返回此任務執(zhí)行結果,其它任務取消,帶超時時間
@Test public void test1() throws ExecutionException, InterruptedException { ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 100, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(50), new ThreadPoolExecutor.DiscardPolicy()); for (int i = 0; i < 20; i++) { // execute的方式提交任務 threadPool.execute(() -> { log.info("execute ...."); }); } // submit runnable Future<String> futureCall = threadPool.submit(new Callable<String>() { @Override public String call() throws Exception { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return "callable Result"; } }); // 阻塞等待結果返回 String result = futureCall.get(); log.info("submit callable: {}", result); // submit runnable Future<String> future = threadPool.submit(new Runnable() { @Override public void run() { log.info("submit runnable ...."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }, "submit result"); // 阻塞等待結果返回 result = future.get(); log.info("submit runnable: {}", result); List<Callable<String>> callables = new ArrayList<>(); for (int i = 0; i < 5; i++) { final int j = i; callables.add(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(2000); return "callable" + j; } }); } List<Future<String>> futures = threadPool.invokeAll(callables); for (Future<String> stringFuture : futures) { String invoke = stringFuture.get(); log.info("invoke result: {}", invoke); } String invokeAny = threadPool.invokeAny(callables); log.info("invoke any: {}", invokeAny); }
關閉線程池API
shutdown()
優(yōu)雅關閉線程池,不會接收新任務,但已提交任務會執(zhí)行完,包括等待隊列里面的。
List<Runnable> shutdownNow()
立即關閉線程池,不會接收新任務,也不會執(zhí)行隊列中的任務,并用 interrupt 的方式中斷正在執(zhí)行的任務,返回隊列中的任務。
isShutdown()
返回線程池是否關閉
isTerminated()
如果在關閉后所有任務都已完成,則返回true。注意,除非先調(diào)用shutdown或shutdownNow,否則istterminated永遠不會為true。
boolean awaitTermination(long timeout, TimeUnit unit)
阻塞直到所有任務在關閉請求后完成執(zhí)行,或發(fā)生超時,或當前線程被中斷(以先發(fā)生的情況為準)。如果該執(zhí)行程序終止,則為True;如果在終止前超時,則為false。
線程池監(jiān)控API
long getTaskCount()
:獲取已經(jīng)執(zhí)行或正在執(zhí)行的任務數(shù)long getCompletedTaskCount()
: 獲取已經(jīng)執(zhí)行的任務數(shù)int getLargestPoolSize()
:獲取線程池曾經(jīng)創(chuàng)建過的最大線程數(shù),根據(jù)這個參數(shù),我們可以知道線程池是否滿過int getPoolSize():
獲取線程池線程數(shù)int getActiveCount()
: 獲取活躍線程數(shù)(正在執(zhí)行任務的線程數(shù))
擴展API
ThreadPoolExecutor
留下了3個擴展接口供我們使用。
protected void beforeExecute(Thread t, Runnable r)
: 任務執(zhí)行前被調(diào)用protected void afterExecute(Runnable r, Throwable t)
: 任務執(zhí)行后被調(diào)用protected void terminated()
: 線程池結束后被調(diào)用
@Test public void test3() { ExecutorService executor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)) { @Override protected void beforeExecute(Thread t, Runnable r) { System.out.println("beforeExecute is called"); } @Override protected void afterExecute(Runnable r, Throwable t) { System.out.println("afterExecute is called"); } @Override protected void terminated() { System.out.println("terminated is called"); } }; executor.submit(() -> System.out.println("this is a task")); executor.shutdown(); }
運行結果:
使用注意事項
1.創(chuàng)建線程池的時候,根據(jù)阿里巴巴規(guī)范,創(chuàng)建線程池的時候根據(jù)使用場景自定義ThreadPoolExecutor
的方式,盡量避免是使用Executors
。
2.只有當任務都是同類型并且互相獨立,線程池的性能才能達到最佳。
- 如果將運行時間較長的與運行時間較短的任務混合在一起,可能造成"擁塞"。
- 如果提交的任務依賴于其他任務,比如某任務等待另一任務的返回值或執(zhí)行結果,而這他們是提交到同一個Executor中,這種情況就會發(fā)生線程饑餓鎖。
3.在線程池中會導致從ThreadLocal中獲取數(shù)據(jù)發(fā)生混亂,應該盡量避免使用。
4.如果使用submit提交任務,會吞掉異常日志,在線程池中盡量使用try catch捕獲異常。
到此這篇關于一文了解Java 線程池的正確使用姿勢的文章就介紹到這了,更多相關Java 線程池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
如何使用Spring Boot實現(xiàn)自定義Spring Boot插件
在本文中,我們介紹了如何使用 Spring Boot 實現(xiàn)自定義插件,使用自定義插件可以幫助我們快速地添加一些額外的功能,提高系統(tǒng)的可擴展性和可維護性,感興趣的朋友跟隨小編一起看看吧2023-06-06深入淺析Java Object Serialization與 Hadoop 序列化
序列化是指將結構化對象轉(zhuǎn)化為字節(jié)流以便在網(wǎng)絡上傳輸或者寫到磁盤永久存儲的過程。下面通過本文給大家分享Java Object Serialization與 Hadoop 序列化,需要的朋友可以參考下2017-06-06Java注解的Retention和RetentionPolicy實例分析
這篇文章主要介紹了Java注解的Retention和RetentionPolicy,結合實例形式分析了Java注解Retention和RetentionPolicy的基本功能及使用方法,需要的朋友可以參考下2019-09-09