學(xué)生視角手把手帶你寫Java?線程池改良版
Java手寫線程池(第二代)
第二代線程池的優(yōu)化
1:新增了4種拒絕策略。分別為:MyAbortPolicy、MyDiscardPolicy、MyDiscardOldestPolicy、MyCallerRunsPolicy
2:對(duì)線程池MyThreadPoolExecutor的構(gòu)造方法進(jìn)行優(yōu)化,增加了參數(shù)校驗(yàn),防止亂傳參數(shù)現(xiàn)象。
3:這是最重要的一個(gè)優(yōu)化。
- 移除線程池的線程預(yù)熱功能。因?yàn)榫€程預(yù)熱會(huì)極大的耗費(fèi)內(nèi)存,當(dāng)我們不用線程池時(shí)也會(huì)一直在運(yùn)行狀態(tài)。
- 換來的是在調(diào)用execute方法添加任務(wù)時(shí)通過檢查workers線程集合目前的大小與corePoolSize的值去比較,再通過new MyWorker()去創(chuàng)建添加線程到線程池,這樣好處就是當(dāng)我們創(chuàng)建線程池如果不使用的話則對(duì)當(dāng)前內(nèi)存沒有一點(diǎn)影響,當(dāng)使用了才會(huì)創(chuàng)建線程并放入線程池中進(jìn)行復(fù)用。
線程池構(gòu)造器
public MyThreadPoolExecutor(){ this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle); } public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) { this(corePoolSize,waitingQueue,threadFactory,defaultHandle); } public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) { this.workers=new HashSet<>(corePoolSize); if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){ this.corePoolSize=corePoolSize; this.waitingQueue=waitingQueue; this.threadFactory=threadFactory; this.handle=handle; }else { throw new NullPointerException("線程池參數(shù)不合法"); } }
線程池拒絕策略
策略接口:MyRejectedExecutionHandle
package com.springframework.concurrent; /** * 自定義拒絕策略 * @author 游政杰 */ public interface MyRejectedExecutionHandle { void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor); }
策略內(nèi)部實(shí)現(xiàn)類
/** * 實(shí)現(xiàn)自定義拒絕策略 */ //拋異常策略(默認(rèn)) public static class MyAbortPolicy implements MyRejectedExecutionHandle{ public MyAbortPolicy(){ } @Override public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) { throw new MyRejectedExecutionException("任務(wù)-> "+r.toString()+"被線程池-> "+t.toString()+" 拒絕"); } } //默默丟棄策略 public static class MyDiscardPolicy implements MyRejectedExecutionHandle{ public MyDiscardPolicy() { } @Override public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) { } } //丟棄掉最老的任務(wù)策略 public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{ public MyDiscardOldestPolicy() { } @Override public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) { if(!threadPoolExecutor.isShutdown()){ //如果線程池沒被關(guān)閉 threadPoolExecutor.getWaitingQueue().poll();//丟掉最老的任務(wù),此時(shí)就有位置當(dāng)新任務(wù)了 threadPoolExecutor.execute(runnable); //把新任務(wù)加入到隊(duì)列中 } } } //由調(diào)用者調(diào)用策略 public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{ public MyCallerRunsPolicy(){ } @Override public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) { if(!threadPoolExecutor.isShutdown()){//判斷線程池是否被關(guān)閉 runnable.run(); } } }
封裝拒絕方法
protected final void reject(Runnable runnable){ this.handle.rejectedExecution(runnable, this); } protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){ this.handle.rejectedExecution(runnable, threadPoolExecutor); }
execute方法
@Override public boolean execute(Runnable runnable) { if (!this.waitingQueue.offer(runnable)) { this.reject(runnable); return false; } else { if(this.workers!=null&&this.workers.size()<corePoolSize){//這種情況才能添加線程 MyWorker worker = new MyWorker(); //通過構(gòu)造方法添加線程 } return true; } }
可以看出只有當(dāng)往線程池放任務(wù)時(shí)才會(huì)創(chuàng)建線程對(duì)象。
手寫線程池源碼
MyExecutorService
package com.springframework.concurrent; import java.util.concurrent.BlockingQueue; /** * 自定義線程池業(yè)務(wù)接口 * @author 游政杰 */ public interface MyExecutorService { boolean execute(Runnable runnable); void shutdown(); void shutdownNow(); boolean isShutdown(); BlockingQueue<Runnable> getWaitingQueue(); }
MyRejectedExecutionException
package com.springframework.concurrent; /** * 自定義拒絕異常 */ public class MyRejectedExecutionException extends RuntimeException { public MyRejectedExecutionException() { } public MyRejectedExecutionException(String message) { super(message); } public MyRejectedExecutionException(String message, Throwable cause) { super(message, cause); } public MyRejectedExecutionException(Throwable cause) { super(cause); } }
MyRejectedExecutionHandle
package com.springframework.concurrent; /** * 自定義拒絕策略 * @author 游政杰 */ public interface MyRejectedExecutionHandle { void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor); }
核心類MyThreadPoolExecutor
package com.springframework.concurrent; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; /** * 純手?jǐn)]線程池框架 * @author 游政杰 */ public class MyThreadPoolExecutor implements MyExecutorService{ private static final AtomicInteger taskcount=new AtomicInteger(0);//執(zhí)行任務(wù)次數(shù) private static final AtomicInteger threadNumber=new AtomicInteger(0); //線程編號(hào) private static volatile int corePoolSize; //核心線程數(shù) private final HashSet<MyWorker> workers; //工作線程 private final BlockingQueue<Runnable> waitingQueue; //等待隊(duì)列 private static final String THREADPOOL_NAME="MyThread-Pool-";//線程名稱 private volatile boolean isRunning=true; //是否運(yùn)行 private volatile boolean STOPNOW=false; //是否立刻停止 private volatile ThreadFactory threadFactory; //線程工廠 private static final MyRejectedExecutionHandle defaultHandle=new MyThreadPoolExecutor.MyAbortPolicy();//默認(rèn)拒絕策略 private volatile MyRejectedExecutionHandle handle; //拒絕紫略 public MyThreadPoolExecutor(){ this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle); } public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) { this(corePoolSize,waitingQueue,threadFactory,defaultHandle); } public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) { this.workers=new HashSet<>(corePoolSize); if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){ this.corePoolSize=corePoolSize; this.waitingQueue=waitingQueue; this.threadFactory=threadFactory; this.handle=handle; }else { throw new NullPointerException("線程池參數(shù)不合法"); } } /** * 實(shí)現(xiàn)自定義拒絕策略 */ //拋異常策略(默認(rèn)) public static class MyAbortPolicy implements MyRejectedExecutionHandle{ public MyAbortPolicy(){ } @Override public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) { throw new MyRejectedExecutionException("任務(wù)-> "+r.toString()+"被線程池-> "+t.toString()+" 拒絕"); } } //默默丟棄策略 public static class MyDiscardPolicy implements MyRejectedExecutionHandle{ public MyDiscardPolicy() { } @Override public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) { } } //丟棄掉最老的任務(wù)策略 public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{ public MyDiscardOldestPolicy() { } @Override public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) { if(!threadPoolExecutor.isShutdown()){ //如果線程池沒被關(guān)閉 threadPoolExecutor.getWaitingQueue().poll();//丟掉最老的任務(wù),此時(shí)就有位置當(dāng)新任務(wù)了 threadPoolExecutor.execute(runnable); //把新任務(wù)加入到隊(duì)列中 } } } //由調(diào)用者調(diào)用策略 public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{ public MyCallerRunsPolicy(){ } @Override public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) { if(!threadPoolExecutor.isShutdown()){//判斷線程池是否被關(guān)閉 runnable.run(); } } } //call拒絕方法 protected final void reject(Runnable runnable){ this.handle.rejectedExecution(runnable, this); } protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){ this.handle.rejectedExecution(runnable, threadPoolExecutor); } /** * MyWorker就是我們每一個(gè)線程對(duì)象 */ private final class MyWorker implements Runnable{ final Thread thread; //為每個(gè)MyWorker MyWorker(){ Thread td = threadFactory.newThread(this); td.setName(THREADPOOL_NAME+threadNumber.getAndIncrement()); this.thread=td; this.thread.start(); workers.add(this); } //執(zhí)行任務(wù) @Override public void run() { //循環(huán)接收任務(wù) while (true) { //循環(huán)退出條件: //1:當(dāng)isRunning為false并且waitingQueue的隊(duì)列大小為0(也就是無任務(wù)了),會(huì)優(yōu)雅的退出。 //2:當(dāng)STOPNOW為true,則說明調(diào)用了shutdownNow方法進(jìn)行暴力退出。 if((!isRunning&&waitingQueue.size()==0)||STOPNOW) { break; }else { //不斷取任務(wù),當(dāng)任務(wù)!=null時(shí)則調(diào)用run方法處理任務(wù) Runnable runnable = waitingQueue.poll(); if(runnable!=null){ runnable.run(); System.out.println("task==>"+taskcount.incrementAndGet()); } } } } } //往線程池中放任務(wù) @Override public boolean execute(Runnable runnable) { if (!this.waitingQueue.offer(runnable)) { this.reject(runnable); return false; } else { if(this.workers!=null&&this.workers.size()<corePoolSize){//這種情況才能添加線程 MyWorker worker = new MyWorker(); //通過構(gòu)造方法添加線程 } return true; } } //優(yōu)雅的關(guān)閉 @Override public void shutdown() { this.isRunning=false; } //暴力關(guān)閉 @Override public void shutdownNow() { this.STOPNOW=true; } //判斷線程池是否關(guān)閉 @Override public boolean isShutdown() { return !this.isRunning||STOPNOW; } //獲取等待隊(duì)列 @Override public BlockingQueue<Runnable> getWaitingQueue() { return this.waitingQueue; } }
線程池測(cè)試類
package com.springframework.test; import com.springframework.concurrent.MyThreadPoolExecutor; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; public class ThreadPoolTest { public static void main(String[] args) { // MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor // (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyAbortPolicy()); // MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor // (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardPolicy()); // MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor // (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardOldestPolicy()); MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyCallerRunsPolicy()); for(int i=0;i<11;i++){ int finalI = i; myThreadPoolExecutor.execute(()->{ System.out.println(Thread.currentThread().getName()+">>>>"+ finalI); }); } myThreadPoolExecutor.shutdown(); // myThreadPoolExecutor.shutdownNow(); } }
好了第二代線程池就優(yōu)化到這了,后面可能還會(huì)出第三代,不斷進(jìn)行優(yōu)化。
到此這篇關(guān)于學(xué)生視角手把手帶你寫Java?線程池改良版的文章就介紹到這了,更多相關(guān)Java?線程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
關(guān)于springboot整合swagger問題及解決方法
這篇文章主要介紹了關(guān)于springboot整合swagger問題及解決方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-04-04單臺(tái)Spring Cloud Eureka升級(jí)到三臺(tái)Eureka高可用集群
今天小編就為大家分享一篇關(guān)于單臺(tái)Spring Cloud Eureka升級(jí)到三臺(tái)Eureka高可用集群,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2018-12-12java Spring AOP詳解及簡(jiǎn)單實(shí)例
這篇文章主要介紹了java Spring AOP詳解及簡(jiǎn)單實(shí)例的相關(guān)資料,需要的朋友可以參考下2017-05-05hibernate測(cè)試時(shí)遇到的幾個(gè)異常及解決方法匯總
今天小編就為大家分享一篇關(guān)于hibernate測(cè)試時(shí)遇到的幾個(gè)異常及解決方法匯總,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2019-03-03