實現(xiàn)java簡單的線程池
拆分實現(xiàn)流程
請看下面這張圖

首先我們得對線程池進行一個功能拆分
- Thread Pool 就是我們的線程池,t1,t2,t3代表三個線程
- Blocking Queue代表阻塞隊列
- main代表main方法的線程
- task1,task2,task3代表要執(zhí)行的每個任務(wù)
現(xiàn)在我們梳理一下執(zhí)行的流程,注意這里是簡略版的,文章后面我會給出詳細(xì)版的

所以此時,我們發(fā)現(xiàn)了需要創(chuàng)建幾個類,或者說幾個角色,分別是
- 線程池
- 工作線程
- 阻塞隊列
- 拒絕策略(干嘛的?就是當(dāng)線程數(shù)已經(jīng)滿了,并且阻塞隊列也滿了,還有任務(wù)想進入阻塞隊列的時候,就可以拒絕這個任務(wù))
實現(xiàn)方式
1.拒絕策略
/**
* 拒絕策略
*/
@FunctionalInterface
interface RejectPolicy<T>{
//queue就是我們自己實現(xiàn)的阻塞隊列,task是任務(wù)
void reject(BlockingQueue<T> queue,T task);
}
2.阻塞隊列
我們需要實現(xiàn)四個方法,獲取和添加,超時獲取和超時添加,至于方法實現(xiàn)的細(xì)節(jié),我都備注了大量的注釋進行解釋。
/**
* 阻塞隊列
*/
class BlockingQueue<T>{
//阻塞隊列
private Deque<T> queue = new ArrayDeque<>();
//鎖
private ReentrantLock lock = new ReentrantLock();
//生產(chǎn)者條件變量
private Condition fullWaitSet = lock.newCondition();
//消費者條件變量
private Condition emptyWaitSet = lock.newCondition();
//容量
private int capacity;
public BlockingQueue(int capacity){
this.capacity = capacity;
}
//帶有超時阻塞獲取
public T poll(long timeout, TimeUnit timeUnit){
lock.lock();
try {
//將timeout統(tǒng)一轉(zhuǎn)換為納秒
long nanos = timeUnit.toNanos(timeout);
while(queue.isEmpty()){
try {
if(nanos <= 0){
//小于0,說明上次沒有獲取到,代表已經(jīng)超時了
return null;
}
//返回值是剩余的時間
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T t = queue.removeFirst();
//通知生產(chǎn)者
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
//阻塞獲取
public T take(){
lock.lock();
try{
while(queue.isEmpty()){ //如果任務(wù)隊列為空,代表線程池沒有可以執(zhí)行的內(nèi)容
try {
/*
也就說此時進來的線程是執(zhí)行不了任務(wù)的,所以此時emptyWaitSet消費者要進行阻塞狀態(tài)
等待下一次喚醒,然后繼續(xù)判斷隊列是否為空
*/
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/*
代碼執(zhí)行到這里。說明任務(wù)隊列不為空,線程池就從任務(wù)隊列拿出一個任務(wù)出來執(zhí)行
也就是說把阻塞隊列的一個任務(wù)出隊
*/
T t = queue.removeFirst();
/*
然后喚醒之前存放在生成者Condition休息室,因為由于之前阻塞隊列已滿,fullWaitSet才會進入阻塞狀態(tài)
所以當(dāng)阻塞隊列刪除了任務(wù),就要喚醒之前進入阻塞狀態(tài)的fullWaitSet
*/
fullWaitSet.signal();
//返回任務(wù)
return t;
}finally {
lock.unlock();
}
}
//阻塞添加
public void put(T task){
lock.lock();
try {
while(queue.size() == capacity){ //任務(wù)隊列滿了
try {
System.out.println("等待加入任務(wù)隊列"+task);
/*
也就說此時進來的任務(wù)是進不了阻塞隊列的,已經(jīng)滿了,所以此時生產(chǎn)者Condition要進入阻塞狀態(tài)
等待下一次喚醒,然后繼續(xù)判斷隊列是否為空
*/
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//任務(wù)隊列還未滿
System.out.println("加入任務(wù)隊列"+task);
//把任務(wù)加入阻塞隊列
queue.addLast(task);
/*
然后喚醒之前存放在消費者Condition休息室,因為由于之前阻塞隊列為空,emptyWaitSet才會進入阻塞狀態(tài)
所以當(dāng)阻塞隊列加入了任務(wù),就要喚醒之前進入阻塞狀態(tài)的emptyWaitSet
*/
emptyWaitSet.signal();
}finally {
lock.unlock();
}
}
//帶超時阻塞時間添加
public boolean offer(T task,long timeout,TimeUnit timeUnit){
lock.lock();
try {
long nanos = timeUnit.toNanos(timeout);
while(queue.size() == capacity){
try {
if(nanos < 0){
return false;
}
System.out.println("等待加入任務(wù)隊列"+task);
//不會一直阻塞,超時就會繼續(xù)向下執(zhí)行
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("加入任務(wù)隊列"+task);
queue.addLast(task);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
//獲取任務(wù)數(shù)量
public int size(){
lock.lock();
try{
return queue.size();
}finally {
lock.unlock();
}
}
//嘗試添加任務(wù),如果阻塞隊列已經(jīng)滿了,就使用拒絕策略
public void tryPut(RejectPolicy<T> rejectPolicy, T task){
lock.lock();
try {
//判斷隊列是否已滿
if(queue.size() == capacity){
rejectPolicy.reject(this,task);
}else{ //有空閑
System.out.println("加入任務(wù)隊列"+task);
queue.addLast(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}
3.線程池和工作線程
我把工作線程當(dāng)成線程池的內(nèi)部類去實現(xiàn)。方便調(diào)用變量。
/**
* 線程池
*/
class ThreadPool{
//阻塞隊列
private BlockingQueue<Runnable> taskQueue;
//線程集合
private HashSet<Worker> workers = new HashSet<>();
//核心線程數(shù)
private int coreSize;
//獲取任務(wù)的超時時間
private long timeout;
private TimeUnit timeUnit;
private RejectPolicy<Runnable> rejectPolicy;
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity,RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(queueCapacity);
this.rejectPolicy = rejectPolicy;
}
//執(zhí)行任務(wù)
public void execute(Runnable task){
synchronized (workers){
if(workers.size() <= coreSize){ //當(dāng)前的線程數(shù)小于核心線程數(shù)
Worker worker = new Worker(task);
workers.add(worker);
//讓線程開始工作,執(zhí)行它的run方法
worker.start();
}else{
// 1) 死等
// 2) 帶超時等待
// 3) 讓調(diào)用者放棄任務(wù)執(zhí)行
// 4) 讓調(diào)用者拋出異常
// 5) 讓調(diào)用者自己執(zhí)行任務(wù)
taskQueue.tryPut(rejectPolicy,task);
}
}
}
/**
* 工作線程,也就是線程池里面的線程
*/
class Worker extends Thread{
private Runnable task;
public Worker(Runnable task){
this.task = task;
}
@Override
public void run() {
//執(zhí)行任務(wù)
// 1) 當(dāng) task 不為空,執(zhí)行任務(wù)
// 2) 當(dāng) task 執(zhí)行完畢,再接著從任務(wù)隊列獲取任務(wù)并執(zhí)行
while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {
try {
System.out.println("正在執(zhí)行的任務(wù)" + task);
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
//代表這個任務(wù)已經(jīng)執(zhí)行完了
task = null;
}
}
synchronized (workers) {
System.out.println("worker 被移除" + this);
workers.remove(this);
}
}
}
}
策略模式
細(xì)心的小伙伴已經(jīng)發(fā)現(xiàn),我在拒絕策略這里使用了23種設(shè)計模式的策略模式,因為我沒有將拒絕的方式寫死,而是交給了調(diào)用者去實現(xiàn)。
對比JDK的線程池
下面是JDK自帶的線程池

經(jīng)典的七大核心參數(shù)
- corePoolSize:核心線程數(shù)
- queueCapacity:任務(wù)隊列容量(阻塞隊列)
- maxPoolSize:最大線程數(shù)
- keepAliveTime:線程空閑時間
- TimeUnit unit:超時時間單位
- ThreadFactory threadFactory:線程工程
- rejectedExecutionHandler:任務(wù)拒絕處理器
實際上我們自己實現(xiàn)的也大同小異,只不過JDK官方的更為復(fù)雜。
JDK線程執(zhí)行的流程圖


線程池的狀態(tài)轉(zhuǎn)化
線程我們知道在操作系統(tǒng)層面有5種狀態(tài)

- 初始狀態(tài):僅是在語言層面創(chuàng)建了線程對象,還未與操作系統(tǒng)線程關(guān)聯(lián)
- 可運行狀態(tài)(就緒狀態(tài)):指該線程已經(jīng)被創(chuàng)建(與操作系統(tǒng)線程關(guān)聯(lián)),可以由 CPU 調(diào)度執(zhí)行
- 運行狀態(tài):指獲取了 CPU 時間片運行中的狀態(tài),當(dāng) CPU 時間片用完,會從【運行狀態(tài)】轉(zhuǎn)換至【可運行狀態(tài)】,會導(dǎo)致線程的上下文切換
- 阻塞狀態(tài)
- 如果調(diào)用了阻塞 API,如 BIO 讀寫文件,這時該線程實際不會用到 CPU,會導(dǎo)致線程上下文切換,進入【阻塞狀態(tài)】
- 等 BIO 操作完畢,會由操作系統(tǒng)喚醒阻塞的線程,轉(zhuǎn)換至【可運行狀態(tài)】
- 與【可運行狀態(tài)】的區(qū)別是,對【阻塞狀態(tài)】的線程來說只要它們一直不喚醒,調(diào)度器就一直不會考慮調(diào)度它們
- 終止?fàn)顟B(tài):表示線程已經(jīng)執(zhí)行完畢,生命周期已經(jīng)結(jié)束,不會再轉(zhuǎn)換為其它狀態(tài)
線程在Java API層面有6種狀態(tài)

- NEW 線程剛被創(chuàng)建,但是還沒有調(diào)用 start() 方法
- RUNNABLE 當(dāng)調(diào)用了 start() 方法之后,注意,Java API 層面的
- RUNNABLE 狀態(tài)涵蓋了 操作系統(tǒng) 層面的【可運行狀態(tài)】、【運行狀態(tài)】
- BLOCKED , WAITING , TIMED_WAITING 都是 Java API 層面對【阻塞狀態(tài)】的細(xì)分
- TERMINATED 當(dāng)線程代碼運行結(jié)束
線程池有5種狀態(tài)
- RUNNING:能接受新任務(wù),并處理阻塞隊列中的任務(wù)
- SHUTDOWN:不接受新任務(wù),但是可以處理阻塞隊列中的任務(wù)
- STOP:不接受新任務(wù),并且不處理阻塞隊列中的任務(wù),并且還打斷正在運行任務(wù)的線程,就是直接不干了!
- TIDYING:所有任務(wù)都終止,并且工作線程也為0,處于關(guān)閉之前的狀態(tài)
- TERMINATED:已關(guān)閉。

總結(jié)
本篇文章就到這里了,希望能給你帶來幫助,也希望您能夠多多關(guān)注腳本之家的更多內(nèi)容!
相關(guān)文章
JAVA學(xué)習(xí)筆記:注釋、變量的聲明和定義操作實例分析
這篇文章主要介紹了JAVA學(xué)習(xí)筆記:注釋、變量的聲明和定義操作,結(jié)合實例形式分析了Java注釋、變量的聲明和定義相關(guān)原理、實現(xiàn)方法及操作注意事項,需要的朋友可以參考下2020-04-04
Mybatis主配置文件的properties標(biāo)簽詳解
這篇文章主要介紹了Mybatis主配置文件的properties標(biāo)簽,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-08-08
SpringBoot詳解實現(xiàn)自定義異常處理頁面方法
SpringBoot是Spring全家桶的成員之一,是一種整合Spring技術(shù)棧的方式(或者說是框架),同時也是簡化Spring的一種快速開發(fā)的腳手架2022-06-06
Spring Security OAuth2 授權(quán)碼模式的實現(xiàn)
這篇文章主要介紹了Spring Security OAuth2 授權(quán)碼模式的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08
SpringBoot如何取消內(nèi)置Tomcat啟動并改用外接Tomcat
這篇文章主要介紹了SpringBoot如何取消內(nèi)置Tomcat啟動并改用外接Tomcat,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-11-11

