jdk線程池的實現
jdk線程池ThreadPoolExecutor的7個參數
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
corePoolSize
核心線程個數 ,int類型
maximunPoolSize
最大線程數 ,int類型
keepAliveTime存活時間
傳long類型的值,
當線程池中的線程數大于corePoolSize核心線程個數,且線程是閑置狀態(tài),則這些空閑線程的最大存活時間是KeepAliveTime
TimeUnit
存活時間的單位, 有時/分/秒/毫秒等可選配置
workQueue
存放待執(zhí)行任務的阻塞隊列, 可傳入
arrayBlockingQueue 基于數組的有界阻塞隊列;
linkedBlockingQueue基于鏈表的無界阻塞隊列;
synchronousQueue最多只有1個元素的同步隊列, 隊列容量是1;
priorityBlockingQueue帶優(yōu)先級的無界阻塞隊列,出隊元素是優(yōu)先級最高或最低的元素;
DelayQueue 帶延遲功能的無界阻塞隊列, 過期元素才會出隊,隊頭元素是快要過期的元素.
以上幾個Queue都是BlockingQueue的實現類
threadFactory
創(chuàng)建線程的工廠,
jdk提供了DefaultThreadFactory默認工廠,
用Executors.defaultThreadFactory()就行.
RejectedExecutionHandler拒絕策略
當隊列滿且線程數達到maximunPoolSize最大線程數后采取的策略, 可傳入
AbortPolicy 拋出異常,這個是默認策略.
CallersRunPolicy 由調用者所在的線程執(zhí)行任務
DiscardOldestPolicy 丟棄最老的任務
DiscardPolicy 丟棄新任務,不拋出異常
jdk提供的Executors快速創(chuàng)建線程池的用法
jdk封裝了一個Executors類可以直接創(chuàng)建各種線程池,
用法形如
ExecutorService pool = Executors.newXXXXXPool()
可以用Executors類創(chuàng)建業(yè)務常用的3種線程池
固定線程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
創(chuàng)建一個核心線程數和最大線程數相同的線程池,都為nThreads,
且線程池的阻塞隊列長度是Integer.MAX_VALUE,
且keepAliveTime=0,說明只要線程個數比核心線程個數多并且當前空閑則回收.
單線程線程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
創(chuàng)建一個核心線程數和最大線程數都是1的線程池,
且線程池的阻塞隊列長度是Integer.MAX_VALUE,
且keepAliveTime=0,說明只要線程個數比核心線程個數多并且當前空閑則回收.
已緩存的線程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
創(chuàng)建一個按需創(chuàng)建線程的線程池,初始線程個數為0,最多線程個數為
Integer.MAX_VALUE,并且阻塞隊列為同步隊列.
keepAliveTime=60,說明當前線程在60s內空閑則回收.
CachedThreadPool的特殊之處在于,加入同步隊列的任務會被馬上執(zhí)行,同步隊列里邊最多只有1個任務.
使用創(chuàng)建好的ExecutorService 線程池執(zhí)行異步任務

submit操作
提交一個任務, 任務參數可以是 Runnable實現類 或 Callable 實現類.
返回的類型是Future 表示異步計算的結果, 可以用future.get()方法拿到數據.
shutdown操作
調用shutdown方法后,線程池就不會再接受新的任務了,但是工作隊列里邊的任務還是要執(zhí)行的, 該方法會立刻返回,不等待隊列任務完成再返回.
使用線程池的情況下當程序結束時記得調用shutdown關閉線程池, 如果不關閉線程池,則會導致 線程池資源一直不被釋放.
shutdownNow操作
調用shutdownNow方法后,線程池就不會再接受新的任務了,并且會丟棄工作隊列里邊的任務,正在執(zhí)行的任務會被中斷,該方法會立刻返回,并不等待激活的任務執(zhí)行完成. 返回值為這時候隊列里面被丟棄的任務列表.
awaitTermination操作
當線程調用awaitTermination方法后,當前線程會被阻塞, 直到線程池狀態(tài)變?yōu)門ERMINATED 才返回,或者等待時間超時才返回.
案例1-測試FixedThreadPool執(zhí)行CallableTask任務
package cn.demo;
import cn.hutool.core.util.RandomUtil;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ExecutorTestsForCallableTask {
public static void main(String[] args) throws ExecutionException, InterruptedException {
String res1 = "";
String res2 = "";
String res3 = "";
String res4 = "";
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
//submit 提交4個任務, 實際執(zhí)行時,任務是并發(fā)執(zhí)行的,執(zhí)行順序不固定
Future<String> submit1 = fixedThreadPool.submit(
new TestCallableTask(RandomUtil.randomInt(30,1000),"t1"));
Future<String> submit2 = fixedThreadPool.submit(
new TestCallableTask(RandomUtil.randomInt(100,400),"t2"));
Future<String> submit3 = fixedThreadPool.submit(
new TestCallableTask(RandomUtil.randomInt(30,350),"t3"));
Future<String> submit4 = fixedThreadPool.submit(
new TestCallableTask(RandomUtil.randomInt(310,500),"t4"));
res1 = submit1.get();
System.out.println(res1);
res2 = submit2.get();
System.out.println(res2);
res3 = submit3.get();
System.out.println(res3);
res4 = submit4.get();
System.out.println(res4);
fixedThreadPool.shutdown();
}
}package cn.demo;
import cn.hutool.core.util.RandomUtil;
import java.time.LocalDateTime;
import java.util.concurrent.Callable;
public class TestCallableTask implements Callable<String> {
private int testIntVal;
private String taskSeq;
public TestCallableTask(int testIntVal, String taskSeq) {
this.testIntVal = testIntVal;
this.taskSeq = taskSeq;
}
@Override
public String call() throws Exception {
String s = LocalDateTime.now().toString();
System.out.println(s+"->"+taskSeq+" run ....");
int i = testIntVal;
System.out.println(i);
try {
Thread.sleep(RandomUtil.randomInt(100,300));
} catch (InterruptedException e) {
e.printStackTrace();
}
if (i>300){
return "300more";
}else {
return "300less";
}
}
}
案例2-測試FixedThreadPool執(zhí)行RunnableTask任務
package cn.demo;
import java.util.concurrent.*;
public class ExecutorTestsForRunnableTask {
public static void main(String[] args) throws ExecutionException, InterruptedException {
String res1 = "";
String res2 = "";
String res3 = "";
String res4 = "";
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
//submit 提交4個任務, 實際執(zhí)行時,任務是并發(fā)執(zhí)行的,執(zhí)行順序不固定
Task1Param task1Param = new Task1Param();
task1Param.setUrl("f23r3r");
task1Param.setName("1heg43t34t34t");
Future<String> stringFuture = fixedThreadPool.submit(
new TestTask1Runnable(task1Param), "success1 ok");
Task1Param t2 = new Task1Param();
t2.setUrl("gnsg2323");
t2.setName("2wwswer2r1asdaaws");
Future<String> f2 = fixedThreadPool.submit(new TestTask1Runnable(t2), "success2 ok");
Task1Param t3 = new Task1Param();
t3.setUrl("thwasr23r");
t3.setName("3erzawfe23rawsf");
Future<String> f3 = fixedThreadPool.submit(new TestTask1Runnable(t3), "success3 ok");
Task1Param t4 = new Task1Param();
t4.setUrl("mjkdsragt");
t4.setName("4tbertydraewrsfk");
Future<String> f4 = fixedThreadPool.submit(new TestTask1Runnable(t4), "success4 ok");
res1 = stringFuture.get();
System.out.println(res1);
res2 = f2.get();
System.out.println(res2);
res3 = f3.get();
System.out.println(res3);
res4 = f4.get();
System.out.println(res4);
fixedThreadPool.shutdown();
}
}package cn.demo;
import cn.hutool.core.util.RandomUtil;
import java.time.LocalDateTime;
public class TestTask1Runnable implements Runnable{
private Task1Param task1Param;
public TestTask1Runnable(Task1Param task1Param) {
this.task1Param = task1Param;
}
@Override
public void run() {
try {
Thread.sleep(RandomUtil.randomInt(200,600));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(task1Param.getName());
System.out.println(task1Param.getUrl());
String s = LocalDateTime.now().toString();
System.out.println(s+" TestTask1Runnable run ....");
}
}使用自定義的ThreadPoolExecutor來執(zhí)行異步任務
package cn.demo;
import cn.hutool.core.util.RandomUtil;
import java.util.concurrent.*;
public class TpeTest {
private final static ThreadPoolExecutor pool =
new ThreadPoolExecutor(
1,1,
1L, TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(1),
new ThreadPoolExecutor.CallerRunsPolicy());
public static void main(String[] args) throws ExecutionException, InterruptedException {
Future<String> submit1 = pool.submit(
new TestCallableTask(RandomUtil.randomInt(30,1000),"t1"));
Future<String> submit2 = pool.submit(
new TestCallableTask(RandomUtil.randomInt(100,400),"t2"));
Future<String> submit3 = pool.submit(
new TestCallableTask(RandomUtil.randomInt(30,350),"t3"));
Future<String> submit4 = pool.submit(
new TestCallableTask(RandomUtil.randomInt(310,500),"t4"));
System.out.println("task1-"+submit1.get());
System.out.println("task2-"+submit2.get());
System.out.println("task3-"+submit3.get());
System.out.println("task4-"+submit4.get());
pool.shutdown();
}
}線程池使用FutureTask時需要注意的事情
線程池使用FutureTask時,如果把拒絕策略設置為 DiscardPolicy 和 DiscardOldestPolicy,并且在被拒絕的任務的Future對象上調用了無參get方法,那么調用線程會一直被阻塞.
如上面的代碼,如果把CallerRunsPolicy替換成 DiscardPolicy 或 DiscardOldestPolicy ,就會導致任務一直被阻塞,一直無法取到future.get()的值.
到此這篇關于jdk線程池的實現的文章就介紹到這了,更多相關jdk線程池內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
SpringMVC框架和SpringBoot項目中控制器的響應結果深入分析
這篇文章主要介紹了SpringMVC框架和SpringBoot項目中控制器的響應結果,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習吧2022-12-12
Spring中使用atomikos+druid實現經典分布式事務的方法
這篇文章主要介紹了Spring中使用atomikos+druid實現經典分布式事務的方法,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2019-06-06
Java線程創(chuàng)建靜態(tài)代理模式代碼實例
這篇文章主要介紹了Java線程創(chuàng)建靜態(tài)代理模式代碼實例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-11-11
Java怎樣創(chuàng)建集合才能避免造成內存泄漏你了解嗎
內存泄漏是指無用對象持續(xù)占有內存或無用對象的內存得不到及時釋放,從而造成內存空間的浪費稱為內存泄漏。長生命周期的對象持有短生命周期對象的引用就很可能發(fā)生內存泄漏,盡管短生命周期對象已經不再需要,但是因為長生命周期持有它的引用而導致不能被回收2021-09-09

