學(xué)生視角手把手帶你寫Java?線程池改良版
Java手寫線程池(第二代)
第二代線程池的優(yōu)化
1:新增了4種拒絕策略。分別為:MyAbortPolicy、MyDiscardPolicy、MyDiscardOldestPolicy、MyCallerRunsPolicy
2:對線程池MyThreadPoolExecutor的構(gòu)造方法進(jìn)行優(yōu)化,增加了參數(shù)校驗(yàn),防止亂傳參數(shù)現(xiàn)象。
3:這是最重要的一個優(yōu)化。
- 移除線程池的線程預(yù)熱功能。因?yàn)榫€程預(yù)熱會極大的耗費(fèi)內(nèi)存,當(dāng)我們不用線程池時也會一直在運(yùn)行狀態(tài)。
- 換來的是在調(diào)用execute方法添加任務(wù)時通過檢查workers線程集合目前的大小與corePoolSize的值去比較,再通過new MyWorker()去創(chuàng)建添加線程到線程池,這樣好處就是當(dāng)我們創(chuàng)建線程池如果不使用的話則對當(dāng)前內(nèi)存沒有一點(diǎn)影響,當(dāng)使用了才會創(chuàng)建線程并放入線程池中進(jìn)行復(fù)用。
線程池構(gòu)造器
public MyThreadPoolExecutor(){
this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle);
}
public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) {
this(corePoolSize,waitingQueue,threadFactory,defaultHandle);
}
public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) {
this.workers=new HashSet<>(corePoolSize);
if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){
this.corePoolSize=corePoolSize;
this.waitingQueue=waitingQueue;
this.threadFactory=threadFactory;
this.handle=handle;
}else {
throw new NullPointerException("線程池參數(shù)不合法");
}
}
線程池拒絕策略
策略接口:MyRejectedExecutionHandle
package com.springframework.concurrent;
/**
* 自定義拒絕策略
* @author 游政杰
*/
public interface MyRejectedExecutionHandle {
void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor);
}
策略內(nèi)部實(shí)現(xiàn)類
/**
* 實(shí)現(xiàn)自定義拒絕策略
*/
//拋異常策略(默認(rèn))
public static class MyAbortPolicy implements MyRejectedExecutionHandle{
public MyAbortPolicy(){
}
@Override
public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) {
throw new MyRejectedExecutionException("任務(wù)-> "+r.toString()+"被線程池-> "+t.toString()+" 拒絕");
}
}
//默默丟棄策略
public static class MyDiscardPolicy implements MyRejectedExecutionHandle{
public MyDiscardPolicy() {
}
@Override
public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
}
}
//丟棄掉最老的任務(wù)策略
public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{
public MyDiscardOldestPolicy() {
}
@Override
public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
if(!threadPoolExecutor.isShutdown()){ //如果線程池沒被關(guān)閉
threadPoolExecutor.getWaitingQueue().poll();//丟掉最老的任務(wù),此時就有位置當(dāng)新任務(wù)了
threadPoolExecutor.execute(runnable); //把新任務(wù)加入到隊列中
}
}
}
//由調(diào)用者調(diào)用策略
public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{
public MyCallerRunsPolicy(){
}
@Override
public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
if(!threadPoolExecutor.isShutdown()){//判斷線程池是否被關(guān)閉
runnable.run();
}
}
}
封裝拒絕方法
protected final void reject(Runnable runnable){
this.handle.rejectedExecution(runnable, this);
}
protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){
this.handle.rejectedExecution(runnable, threadPoolExecutor);
}
execute方法
@Override
public boolean execute(Runnable runnable)
{
if (!this.waitingQueue.offer(runnable)) {
this.reject(runnable);
return false;
}
else {
if(this.workers!=null&&this.workers.size()<corePoolSize){//這種情況才能添加線程
MyWorker worker = new MyWorker(); //通過構(gòu)造方法添加線程
}
return true;
}
}
可以看出只有當(dāng)往線程池放任務(wù)時才會創(chuàng)建線程對象。
手寫線程池源碼
MyExecutorService
package com.springframework.concurrent;
import java.util.concurrent.BlockingQueue;
/**
* 自定義線程池業(yè)務(wù)接口
* @author 游政杰
*/
public interface MyExecutorService {
boolean execute(Runnable runnable);
void shutdown();
void shutdownNow();
boolean isShutdown();
BlockingQueue<Runnable> getWaitingQueue();
}
MyRejectedExecutionException
package com.springframework.concurrent;
/**
* 自定義拒絕異常
*/
public class MyRejectedExecutionException extends RuntimeException {
public MyRejectedExecutionException() {
}
public MyRejectedExecutionException(String message) {
super(message);
}
public MyRejectedExecutionException(String message, Throwable cause) {
super(message, cause);
}
public MyRejectedExecutionException(Throwable cause) {
super(cause);
}
}
MyRejectedExecutionHandle
package com.springframework.concurrent;
/**
* 自定義拒絕策略
* @author 游政杰
*/
public interface MyRejectedExecutionHandle {
void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor);
}
核心類MyThreadPoolExecutor
package com.springframework.concurrent;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 純手?jǐn)]線程池框架
* @author 游政杰
*/
public class MyThreadPoolExecutor implements MyExecutorService{
private static final AtomicInteger taskcount=new AtomicInteger(0);//執(zhí)行任務(wù)次數(shù)
private static final AtomicInteger threadNumber=new AtomicInteger(0); //線程編號
private static volatile int corePoolSize; //核心線程數(shù)
private final HashSet<MyWorker> workers; //工作線程
private final BlockingQueue<Runnable> waitingQueue; //等待隊列
private static final String THREADPOOL_NAME="MyThread-Pool-";//線程名稱
private volatile boolean isRunning=true; //是否運(yùn)行
private volatile boolean STOPNOW=false; //是否立刻停止
private volatile ThreadFactory threadFactory; //線程工廠
private static final MyRejectedExecutionHandle defaultHandle=new MyThreadPoolExecutor.MyAbortPolicy();//默認(rèn)拒絕策略
private volatile MyRejectedExecutionHandle handle; //拒絕紫略
public MyThreadPoolExecutor(){
this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle);
}
public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) {
this(corePoolSize,waitingQueue,threadFactory,defaultHandle);
}
public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) {
this.workers=new HashSet<>(corePoolSize);
if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){
this.corePoolSize=corePoolSize;
this.waitingQueue=waitingQueue;
this.threadFactory=threadFactory;
this.handle=handle;
}else {
throw new NullPointerException("線程池參數(shù)不合法");
}
}
/**
* 實(shí)現(xiàn)自定義拒絕策略
*/
//拋異常策略(默認(rèn))
public static class MyAbortPolicy implements MyRejectedExecutionHandle{
public MyAbortPolicy(){
}
@Override
public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) {
throw new MyRejectedExecutionException("任務(wù)-> "+r.toString()+"被線程池-> "+t.toString()+" 拒絕");
}
}
//默默丟棄策略
public static class MyDiscardPolicy implements MyRejectedExecutionHandle{
public MyDiscardPolicy() {
}
@Override
public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
}
}
//丟棄掉最老的任務(wù)策略
public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{
public MyDiscardOldestPolicy() {
}
@Override
public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
if(!threadPoolExecutor.isShutdown()){ //如果線程池沒被關(guān)閉
threadPoolExecutor.getWaitingQueue().poll();//丟掉最老的任務(wù),此時就有位置當(dāng)新任務(wù)了
threadPoolExecutor.execute(runnable); //把新任務(wù)加入到隊列中
}
}
}
//由調(diào)用者調(diào)用策略
public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{
public MyCallerRunsPolicy(){
}
@Override
public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {
if(!threadPoolExecutor.isShutdown()){//判斷線程池是否被關(guān)閉
runnable.run();
}
}
}
//call拒絕方法
protected final void reject(Runnable runnable){
this.handle.rejectedExecution(runnable, this);
}
protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){
this.handle.rejectedExecution(runnable, threadPoolExecutor);
}
/**
* MyWorker就是我們每一個線程對象
*/
private final class MyWorker implements Runnable{
final Thread thread; //為每個MyWorker
MyWorker(){
Thread td = threadFactory.newThread(this);
td.setName(THREADPOOL_NAME+threadNumber.getAndIncrement());
this.thread=td;
this.thread.start();
workers.add(this);
}
//執(zhí)行任務(wù)
@Override
public void run() {
//循環(huán)接收任務(wù)
while (true)
{
//循環(huán)退出條件:
//1:當(dāng)isRunning為false并且waitingQueue的隊列大小為0(也就是無任務(wù)了),會優(yōu)雅的退出。
//2:當(dāng)STOPNOW為true,則說明調(diào)用了shutdownNow方法進(jìn)行暴力退出。
if((!isRunning&&waitingQueue.size()==0)||STOPNOW)
{
break;
}else {
//不斷取任務(wù),當(dāng)任務(wù)!=null時則調(diào)用run方法處理任務(wù)
Runnable runnable = waitingQueue.poll();
if(runnable!=null){
runnable.run();
System.out.println("task==>"+taskcount.incrementAndGet());
}
}
}
}
}
//往線程池中放任務(wù)
@Override
public boolean execute(Runnable runnable)
{
if (!this.waitingQueue.offer(runnable)) {
this.reject(runnable);
return false;
}
else {
if(this.workers!=null&&this.workers.size()<corePoolSize){//這種情況才能添加線程
MyWorker worker = new MyWorker(); //通過構(gòu)造方法添加線程
}
return true;
}
}
//優(yōu)雅的關(guān)閉
@Override
public void shutdown()
{
this.isRunning=false;
}
//暴力關(guān)閉
@Override
public void shutdownNow()
{
this.STOPNOW=true;
}
//判斷線程池是否關(guān)閉
@Override
public boolean isShutdown() {
return !this.isRunning||STOPNOW;
}
//獲取等待隊列
@Override
public BlockingQueue<Runnable> getWaitingQueue() {
return this.waitingQueue;
}
}
線程池測試類
package com.springframework.test;
import com.springframework.concurrent.MyThreadPoolExecutor;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
public class ThreadPoolTest {
public static void main(String[] args) {
// MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
// (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyAbortPolicy());
// MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
// (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardPolicy());
// MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
// (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardOldestPolicy());
MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
(5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyCallerRunsPolicy());
for(int i=0;i<11;i++){
int finalI = i;
myThreadPoolExecutor.execute(()->{
System.out.println(Thread.currentThread().getName()+">>>>"+ finalI);
});
}
myThreadPoolExecutor.shutdown();
// myThreadPoolExecutor.shutdownNow();
}
}
好了第二代線程池就優(yōu)化到這了,后面可能還會出第三代,不斷進(jìn)行優(yōu)化。
到此這篇關(guān)于學(xué)生視角手把手帶你寫Java?線程池改良版的文章就介紹到這了,更多相關(guān)Java?線程池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
關(guān)于springboot整合swagger問題及解決方法
這篇文章主要介紹了關(guān)于springboot整合swagger問題及解決方法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-04-04
單臺Spring Cloud Eureka升級到三臺Eureka高可用集群
今天小編就為大家分享一篇關(guān)于單臺Spring Cloud Eureka升級到三臺Eureka高可用集群,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-12-12

