Java 線程池詳解
系統(tǒng)啟動(dòng)一個(gè)線程的成本是比較高的,因?yàn)樗婕暗脚c操作系統(tǒng)的交互,使用線程池的好處是提高性能,當(dāng)系統(tǒng)中包含大量并發(fā)的線程時(shí),會(huì)導(dǎo)致系統(tǒng)性能劇烈下降,甚至導(dǎo)致JVM崩潰,而線程池的最大線程數(shù)參數(shù)可以控制系統(tǒng)中并發(fā)線程數(shù)不超過次數(shù)。
一、Executors 工廠類用來產(chǎn)生線程池,該工廠類包含以下幾個(gè)靜態(tài)工廠方法來創(chuàng)建對(duì)應(yīng)的線程池。創(chuàng)建的線程池是一個(gè)ExecutorService對(duì)象,使用該對(duì)象的submit方法或者是execute方法執(zhí)行相應(yīng)的Runnable或者是Callable任務(wù)。線程池本身在不再需要的時(shí)候調(diào)用shutdown()方法停止線程池,調(diào)用該方法后,該線程池將不再允許任務(wù)添加進(jìn)來,但是會(huì)直到已添加的所有任務(wù)執(zhí)行完成后才死亡。
1、newCachedThreadPool(),創(chuàng)建一個(gè)具有緩存功能的線程池,提交到該線程池的任務(wù)(Runnable或Callable對(duì)象)創(chuàng)建的線程,如果執(zhí)行完成,會(huì)被緩存到CachedThreadPool中,供后面需要執(zhí)行的任務(wù)使用。
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class CacheThreadPool { static class Task implements Runnable { @Override public void run() { System.out.println(this + " " + Thread.currentThread().getName() + " AllStackTraces map size: " + Thread.currentThread().getAllStackTraces().size()); } } public static void main(String[] args) { ExecutorService cacheThreadPool = Executors.newCachedThreadPool(); //先添加三個(gè)任務(wù)到線程池 for(int i = 0 ; i < 3; i++) { cacheThreadPool.execute(new Task()); } //等三個(gè)線程執(zhí)行完成后,再次添加三個(gè)任務(wù)到線程池 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } for(int i = 0 ; i < 3; i++) { cacheThreadPool.execute(new Task()); } } }
執(zhí)行結(jié)果如下:
CacheThreadPool$Task@2d312eb9 pool-1-thread-1 AllStackTraces map size: 7 CacheThreadPool$Task@59522b86 pool-1-thread-3 AllStackTraces map size: 7 CacheThreadPool$Task@73dbb89f pool-1-thread-2 AllStackTraces map size: 7 CacheThreadPool$Task@5795cedc pool-1-thread-3 AllStackTraces map size: 7 CacheThreadPool$Task@256d5600 pool-1-thread-1 AllStackTraces map size: 7 CacheThreadPool$Task@7d1c5894 pool-1-thread-2 AllStackTraces map size: 7
線程池中的線程對(duì)象進(jìn)行了緩存,當(dāng)有新任務(wù)執(zhí)行時(shí)進(jìn)行了復(fù)用。但是如果有特別多的并發(fā)時(shí),緩存線程池還是會(huì)創(chuàng)建很多個(gè)線程對(duì)象。
2、newFixedThreadPool(int nThreads) 創(chuàng)建一個(gè)指定線程個(gè)數(shù),線程可復(fù)用的線程池。
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class FixedThreadPool { static class Task implements Runnable { @Override public void run() { System.out.println(this + " " + Thread.currentThread().getName() + " AllStackTraces map size: " + Thread.currentThread().getAllStackTraces().size()); } } public static void main(String[] args) { ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3); // 先添加三個(gè)任務(wù)到線程池 for (int i = 0; i < 5; i++) { fixedThreadPool.execute(new Task()); } // 等三個(gè)線程執(zhí)行完成后,再次添加三個(gè)任務(wù)到線程池 try { Thread.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0; i < 3; i++) { fixedThreadPool.execute(new Task()); } } }
執(zhí)行結(jié)果:
FixedThreadPool$Task@7045c12d pool-1-thread-2 AllStackTraces map size: 7 FixedThreadPool$Task@50fa0bef pool-1-thread-2 AllStackTraces map size: 7 FixedThreadPool$Task@ccb1870 pool-1-thread-2 AllStackTraces map size: 7 FixedThreadPool$Task@7392b4e3 pool-1-thread-1 AllStackTraces map size: 7 FixedThreadPool$Task@5bdeff18 pool-1-thread-2 AllStackTraces map size: 7 FixedThreadPool$Task@7d5554e1 pool-1-thread-1 AllStackTraces map size: 7 FixedThreadPool$Task@24468092 pool-1-thread-3 AllStackTraces map size: 7 FixedThreadPool$Task@fa7b978 pool-1-thread-2 AllStackTraces map size: 7
3、newSingleThreadExecutor(),創(chuàng)建一個(gè)只有單線程的線程池,相當(dāng)于調(diào)用newFixedThreadPool(1)
4、newSheduledThreadPool(int corePoolSize),創(chuàng)建指定線程數(shù)的線程池,它可以在指定延遲后執(zhí)行線程。也可以以某一周期重復(fù)執(zhí)行某一線程,知道調(diào)用shutdown()關(guān)閉線程池。
示例如下:
import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; public class ScheduledThreadPool { static class Task implements Runnable { @Override public void run() { System.out.println("time " + System.currentTimeMillis() + " " + Thread.currentThread().getName() + " AllStackTraces map size: " + Thread.currentThread().getAllStackTraces().size()); } } public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3); scheduledExecutorService.schedule(new Task(), 3, TimeUnit.SECONDS); scheduledExecutorService.scheduleAtFixedRate(new Task(), 3, 5, TimeUnit.SECONDS); try { Thread.sleep(30 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } scheduledExecutorService.shutdown(); } }
運(yùn)行結(jié)果如下:
time 1458921795240 pool-1-thread-1 AllStackTraces map size: 6 time 1458921795241 pool-1-thread-2 AllStackTraces map size: 6 time 1458921800240 pool-1-thread-1 AllStackTraces map size: 7 time 1458921805240 pool-1-thread-1 AllStackTraces map size: 7 time 1458921810240 pool-1-thread-1 AllStackTraces map size: 7 time 1458921815240 pool-1-thread-1 AllStackTraces map size: 7 time 1458921820240 pool-1-thread-1 AllStackTraces map size: 7
由運(yùn)行時(shí)間可看出,任務(wù)是按照5秒的周期執(zhí)行的。
5、newSingleThreadScheduledExecutor() 創(chuàng)建一個(gè)只有一個(gè)線程的線程池,同調(diào)用newScheduledThreadPool(1)。
二、ForkJoinPool和ForkJoinTask
ForkJoinPool是ExecutorService的實(shí)現(xiàn)類,支持將一個(gè)任務(wù)劃分為多個(gè)小任務(wù)并行計(jì)算,在把多個(gè)小任務(wù)的計(jì)算結(jié)果合并成總的計(jì)算結(jié)果。它有兩個(gè)構(gòu)造函數(shù)
ForkJoinPool(int parallelism)創(chuàng)建一個(gè)包含parallelism個(gè)并行線程的ForkJoinPool。
ForkJoinPool(),以Runtime.availableProcessors()方法返回值作為parallelism參數(shù)來創(chuàng)建ForkJoinPool。
ForkJoinTask 代表一個(gè)可以并行,合并的任務(wù)。它是實(shí)現(xiàn)了Future<T>接口的抽象類,它有兩個(gè)抽象子類,代表無返回值任務(wù)的RecuriveAction和有返回值的RecursiveTask??筛鶕?jù)具體需求繼承這兩個(gè)抽象類實(shí)現(xiàn)自己的對(duì)象,然后調(diào)用ForkJoinPool的submit 方法執(zhí)行。
RecuriveAction 示例如下,實(shí)現(xiàn)并行輸出0-300的數(shù)字。
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveAction; import java.util.concurrent.TimeUnit; public class ActionForkJoinTask { static class PrintTask extends RecursiveAction { private static final int THRESHOLD = 50; private int start; private int end; public PrintTask(int start, int end) { this.start = start; this.end = end; } @Override protected void compute() { if (end - start < THRESHOLD) { for(int i = start; i < end; i++) { System.out.println(Thread.currentThread().getName() + " " + i); } } else { int middle = (start + end) / 2; PrintTask left = new PrintTask(start, middle); PrintTask right = new PrintTask(middle, end); left.fork(); right.fork(); } } } public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(); pool.submit(new PrintTask(0, 300)); try { pool.awaitTermination(2, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } pool.shutdown(); } }
在拆分小任務(wù)后,調(diào)用任務(wù)的fork()方法,加入到ForkJoinPool中并行執(zhí)行。
RecursiveTask示例,實(shí)現(xiàn)并行計(jì)算100個(gè)整數(shù)求和。拆分為每20個(gè)數(shù)求和后獲取結(jié)果,在最后合并為最后的結(jié)果。
import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; public class TaskForkJoinTask { static class CalTask extends RecursiveTask<Integer> { private static final int THRESHOLD = 20; private int arr[]; private int start; private int end; public CalTask(int[] arr, int start, int end) { this.arr = arr; this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; if (end - start < THRESHOLD) { for (int i = start; i < end; i++) { sum += arr[i]; } System.out.println(Thread.currentThread().getName() + " sum:" + sum); return sum; } else { int middle = (start + end) / 2; CalTask left = new CalTask(arr, start, middle); CalTask right = new CalTask(arr, middle, end); left.fork(); right.fork(); return left.join() + right.join(); } } } public static void main(String[] args) { int arr[] = new int[100]; Random random = new Random(); int total = 0; for (int i = 0; i < arr.length; i++) { int tmp = random.nextInt(20); total += (arr[i] = tmp); } System.out.println("total " + total); ForkJoinPool pool = new ForkJoinPool(4); Future<Integer> future = pool.submit(new CalTask(arr, 0, arr.length)); try { System.out.println("cal result: " + future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } pool.shutdown(); } }
執(zhí)行結(jié)果如下:
total 912 ForkJoinPool-1-worker-2 sum:82 ForkJoinPool-1-worker-2 sum:123 ForkJoinPool-1-worker-2 sum:144 ForkJoinPool-1-worker-3 sum:119 ForkJoinPool-1-worker-2 sum:106 ForkJoinPool-1-worker-2 sum:128 ForkJoinPool-1-worker-2 sum:121 ForkJoinPool-1-worker-3 sum:89 cal result: 912
子任務(wù)執(zhí)行完后,調(diào)用任務(wù)的join()方法獲取子任務(wù)執(zhí)行結(jié)果,再相加獲得最后的結(jié)果。
相關(guān)文章
Java 常見的限流算法詳細(xì)分析并實(shí)現(xiàn)
大數(shù)據(jù)量高并發(fā)訪問時(shí),經(jīng)常出現(xiàn)服務(wù)或接口面對(duì)暴漲的請(qǐng)求而不可用的情況,甚至引發(fā)連鎖反映導(dǎo)致整個(gè)系統(tǒng)崩潰。此時(shí)你需要使用的技術(shù)手段之一就是限流,當(dāng)請(qǐng)求達(dá)到一定的并發(fā)數(shù)或速率,就進(jìn)行等待、排隊(duì)、降級(jí)、拒絕服務(wù)等。限流時(shí),常見算法是計(jì)數(shù)器、漏斗、令牌桶算法2022-04-04關(guān)于Java中finalize析構(gòu)方法的作用詳解
構(gòu)造方法用于創(chuàng)建和初始化類對(duì)象,也就是說,構(gòu)造方法負(fù)責(zé)”生出“一個(gè)類對(duì)象,并可以在對(duì)象出生時(shí)進(jìn)行必要的操作,在這篇文章中會(huì)給大家簡(jiǎn)單介紹一下析構(gòu)方法,需要的朋友可以參考下2023-05-05SpringBoot中實(shí)現(xiàn)Redis緩存預(yù)熱
緩存預(yù)熱是一種在系統(tǒng)啟動(dòng)后,但在實(shí)際使用前將數(shù)據(jù)加載到緩存中的技術(shù),本文主要來和大家一起探討如何在Spring Boot應(yīng)用程序中實(shí)現(xiàn)Redis緩存預(yù)熱,以確保系統(tǒng)在處理請(qǐng)求前就已經(jīng)處于最佳狀態(tài),感興趣的可以了解下2023-11-11myeclipse中使用maven前常見錯(cuò)誤及解決辦法
這篇文章主要介紹了myeclipse中使用maven前常見錯(cuò)誤及解決辦法 的相關(guān)資料,需要的朋友可以參考下2016-05-05SpringBoot集成Tomcat服務(wù)架構(gòu)配置
這篇文章主要為大家介紹了SpringBoot集成Tomcat服務(wù)架構(gòu)配置,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-02-02使用Runtime 調(diào)用Process.waitfor導(dǎo)致的阻塞問題
這篇文章主要介紹了使用Runtime 調(diào)用Process.waitfor導(dǎo)致的阻塞問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12詳解JAVA中的Collection接口和其主要實(shí)現(xiàn)的類
這篇文章主要介紹了JAVA中的Collection接口和其主要實(shí)現(xiàn)的類,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03使用@TransactionalEventListener監(jiān)聽事務(wù)教程
這篇文章主要介紹了使用@TransactionalEventListener監(jiān)聽事務(wù)教程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09如何用匿名內(nèi)部類實(shí)現(xiàn) Java 同步回調(diào)
這篇文章主要介紹了如何用匿名內(nèi)部類實(shí)現(xiàn) Java 同步回調(diào),幫助大家更好的理解和學(xué)習(xí)Java,感興趣的朋友可以了解下2020-10-10