實(shí)現(xiàn)java簡(jiǎn)單的線程池
拆分實(shí)現(xiàn)流程
請(qǐng)看下面這張圖
首先我們得對(duì)線程池進(jìn)行一個(gè)功能拆分
- Thread Pool 就是我們的線程池,t1,t2,t3代表三個(gè)線程
- Blocking Queue代表阻塞隊(duì)列
- main代表main方法的線程
- task1,task2,task3代表要執(zhí)行的每個(gè)任務(wù)
現(xiàn)在我們梳理一下執(zhí)行的流程,注意這里是簡(jiǎn)略版的,文章后面我會(huì)給出詳細(xì)版的
所以此時(shí),我們發(fā)現(xiàn)了需要?jiǎng)?chuàng)建幾個(gè)類(lèi),或者說(shuō)幾個(gè)角色,分別是
- 線程池
- 工作線程
- 阻塞隊(duì)列
- 拒絕策略(干嘛的?就是當(dāng)線程數(shù)已經(jīng)滿(mǎn)了,并且阻塞隊(duì)列也滿(mǎn)了,還有任務(wù)想進(jìn)入阻塞隊(duì)列的時(shí)候,就可以拒絕這個(gè)任務(wù))
實(shí)現(xiàn)方式
1.拒絕策略
/** * 拒絕策略 */ @FunctionalInterface interface RejectPolicy<T>{ //queue就是我們自己實(shí)現(xiàn)的阻塞隊(duì)列,task是任務(wù) void reject(BlockingQueue<T> queue,T task); }
2.阻塞隊(duì)列
我們需要實(shí)現(xiàn)四個(gè)方法,獲取和添加,超時(shí)獲取和超時(shí)添加,至于方法實(shí)現(xiàn)的細(xì)節(jié),我都備注了大量的注釋進(jìn)行解釋。
/** * 阻塞隊(duì)列 */ class BlockingQueue<T>{ //阻塞隊(duì)列 private Deque<T> queue = new ArrayDeque<>(); //鎖 private ReentrantLock lock = new ReentrantLock(); //生產(chǎn)者條件變量 private Condition fullWaitSet = lock.newCondition(); //消費(fèi)者條件變量 private Condition emptyWaitSet = lock.newCondition(); //容量 private int capacity; public BlockingQueue(int capacity){ this.capacity = capacity; } //帶有超時(shí)阻塞獲取 public T poll(long timeout, TimeUnit timeUnit){ lock.lock(); try { //將timeout統(tǒng)一轉(zhuǎn)換為納秒 long nanos = timeUnit.toNanos(timeout); while(queue.isEmpty()){ try { if(nanos <= 0){ //小于0,說(shuō)明上次沒(méi)有獲取到,代表已經(jīng)超時(shí)了 return null; } //返回值是剩余的時(shí)間 nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } T t = queue.removeFirst(); //通知生產(chǎn)者 fullWaitSet.signal(); return t; }finally { lock.unlock(); } } //阻塞獲取 public T take(){ lock.lock(); try{ while(queue.isEmpty()){ //如果任務(wù)隊(duì)列為空,代表線程池沒(méi)有可以執(zhí)行的內(nèi)容 try { /* 也就說(shuō)此時(shí)進(jìn)來(lái)的線程是執(zhí)行不了任務(wù)的,所以此時(shí)emptyWaitSet消費(fèi)者要進(jìn)行阻塞狀態(tài) 等待下一次喚醒,然后繼續(xù)判斷隊(duì)列是否為空 */ emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } /* 代碼執(zhí)行到這里。說(shuō)明任務(wù)隊(duì)列不為空,線程池就從任務(wù)隊(duì)列拿出一個(gè)任務(wù)出來(lái)執(zhí)行 也就是說(shuō)把阻塞隊(duì)列的一個(gè)任務(wù)出隊(duì) */ T t = queue.removeFirst(); /* 然后喚醒之前存放在生成者Condition休息室,因?yàn)橛捎谥白枞?duì)列已滿(mǎn),fullWaitSet才會(huì)進(jìn)入阻塞狀態(tài) 所以當(dāng)阻塞隊(duì)列刪除了任務(wù),就要喚醒之前進(jìn)入阻塞狀態(tài)的fullWaitSet */ fullWaitSet.signal(); //返回任務(wù) return t; }finally { lock.unlock(); } } //阻塞添加 public void put(T task){ lock.lock(); try { while(queue.size() == capacity){ //任務(wù)隊(duì)列滿(mǎn)了 try { System.out.println("等待加入任務(wù)隊(duì)列"+task); /* 也就說(shuō)此時(shí)進(jìn)來(lái)的任務(wù)是進(jìn)不了阻塞隊(duì)列的,已經(jīng)滿(mǎn)了,所以此時(shí)生產(chǎn)者Condition要進(jìn)入阻塞狀態(tài) 等待下一次喚醒,然后繼續(xù)判斷隊(duì)列是否為空 */ fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //任務(wù)隊(duì)列還未滿(mǎn) System.out.println("加入任務(wù)隊(duì)列"+task); //把任務(wù)加入阻塞隊(duì)列 queue.addLast(task); /* 然后喚醒之前存放在消費(fèi)者Condition休息室,因?yàn)橛捎谥白枞?duì)列為空,emptyWaitSet才會(huì)進(jìn)入阻塞狀態(tài) 所以當(dāng)阻塞隊(duì)列加入了任務(wù),就要喚醒之前進(jìn)入阻塞狀態(tài)的emptyWaitSet */ emptyWaitSet.signal(); }finally { lock.unlock(); } } //帶超時(shí)阻塞時(shí)間添加 public boolean offer(T task,long timeout,TimeUnit timeUnit){ lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while(queue.size() == capacity){ try { if(nanos < 0){ return false; } System.out.println("等待加入任務(wù)隊(duì)列"+task); //不會(huì)一直阻塞,超時(shí)就會(huì)繼續(xù)向下執(zhí)行 nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println("加入任務(wù)隊(duì)列"+task); queue.addLast(task); emptyWaitSet.signal(); return true; }finally { lock.unlock(); } } //獲取任務(wù)數(shù)量 public int size(){ lock.lock(); try{ return queue.size(); }finally { lock.unlock(); } } //嘗試添加任務(wù),如果阻塞隊(duì)列已經(jīng)滿(mǎn)了,就使用拒絕策略 public void tryPut(RejectPolicy<T> rejectPolicy, T task){ lock.lock(); try { //判斷隊(duì)列是否已滿(mǎn) if(queue.size() == capacity){ rejectPolicy.reject(this,task); }else{ //有空閑 System.out.println("加入任務(wù)隊(duì)列"+task); queue.addLast(task); emptyWaitSet.signal(); } }finally { lock.unlock(); } } }
3.線程池和工作線程
我把工作線程當(dāng)成線程池的內(nèi)部類(lèi)去實(shí)現(xiàn)。方便調(diào)用變量。
/** * 線程池 */ class ThreadPool{ //阻塞隊(duì)列 private BlockingQueue<Runnable> taskQueue; //線程集合 private HashSet<Worker> workers = new HashSet<>(); //核心線程數(shù) private int coreSize; //獲取任務(wù)的超時(shí)時(shí)間 private long timeout; private TimeUnit timeUnit; private RejectPolicy<Runnable> rejectPolicy; public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapacity,RejectPolicy<Runnable> rejectPolicy) { this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; this.taskQueue = new BlockingQueue<>(queueCapacity); this.rejectPolicy = rejectPolicy; } //執(zhí)行任務(wù) public void execute(Runnable task){ synchronized (workers){ if(workers.size() <= coreSize){ //當(dāng)前的線程數(shù)小于核心線程數(shù) Worker worker = new Worker(task); workers.add(worker); //讓線程開(kāi)始工作,執(zhí)行它的run方法 worker.start(); }else{ // 1) 死等 // 2) 帶超時(shí)等待 // 3) 讓調(diào)用者放棄任務(wù)執(zhí)行 // 4) 讓調(diào)用者拋出異常 // 5) 讓調(diào)用者自己執(zhí)行任務(wù) taskQueue.tryPut(rejectPolicy,task); } } } /** * 工作線程,也就是線程池里面的線程 */ class Worker extends Thread{ private Runnable task; public Worker(Runnable task){ this.task = task; } @Override public void run() { //執(zhí)行任務(wù) // 1) 當(dāng) task 不為空,執(zhí)行任務(wù) // 2) 當(dāng) task 執(zhí)行完畢,再接著從任務(wù)隊(duì)列獲取任務(wù)并執(zhí)行 while (task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) { try { System.out.println("正在執(zhí)行的任務(wù)" + task); task.run(); } catch (Exception e) { e.printStackTrace(); } finally { //代表這個(gè)任務(wù)已經(jīng)執(zhí)行完了 task = null; } } synchronized (workers) { System.out.println("worker 被移除" + this); workers.remove(this); } } } }
策略模式
細(xì)心的小伙伴已經(jīng)發(fā)現(xiàn),我在拒絕策略這里使用了23種設(shè)計(jì)模式的策略模式,因?yàn)槲覜](méi)有將拒絕的方式寫(xiě)死,而是交給了調(diào)用者去實(shí)現(xiàn)。
對(duì)比JDK的線程池
下面是JDK自帶的線程池
經(jīng)典的七大核心參數(shù)
- corePoolSize:核心線程數(shù)
- queueCapacity:任務(wù)隊(duì)列容量(阻塞隊(duì)列)
- maxPoolSize:最大線程數(shù)
- keepAliveTime:線程空閑時(shí)間
- TimeUnit unit:超時(shí)時(shí)間單位
- ThreadFactory threadFactory:線程工程
- rejectedExecutionHandler:任務(wù)拒絕處理器
實(shí)際上我們自己實(shí)現(xiàn)的也大同小異,只不過(guò)JDK官方的更為復(fù)雜。
JDK線程執(zhí)行的流程圖
線程池的狀態(tài)轉(zhuǎn)化
線程我們知道在操作系統(tǒng)層面有5種狀態(tài)
- 初始狀態(tài):僅是在語(yǔ)言層面創(chuàng)建了線程對(duì)象,還未與操作系統(tǒng)線程關(guān)聯(lián)
- 可運(yùn)行狀態(tài)(就緒狀態(tài)):指該線程已經(jīng)被創(chuàng)建(與操作系統(tǒng)線程關(guān)聯(lián)),可以由 CPU 調(diào)度執(zhí)行
- 運(yùn)行狀態(tài):指獲取了 CPU 時(shí)間片運(yùn)行中的狀態(tài),當(dāng) CPU 時(shí)間片用完,會(huì)從【運(yùn)行狀態(tài)】轉(zhuǎn)換至【可運(yùn)行狀態(tài)】,會(huì)導(dǎo)致線程的上下文切換
- 阻塞狀態(tài)
- 如果調(diào)用了阻塞 API,如 BIO 讀寫(xiě)文件,這時(shí)該線程實(shí)際不會(huì)用到 CPU,會(huì)導(dǎo)致線程上下文切換,進(jìn)入【阻塞狀態(tài)】
- 等 BIO 操作完畢,會(huì)由操作系統(tǒng)喚醒阻塞的線程,轉(zhuǎn)換至【可運(yùn)行狀態(tài)】
- 與【可運(yùn)行狀態(tài)】的區(qū)別是,對(duì)【阻塞狀態(tài)】的線程來(lái)說(shuō)只要它們一直不喚醒,調(diào)度器就一直不會(huì)考慮調(diào)度它們
- 終止?fàn)顟B(tài):表示線程已經(jīng)執(zhí)行完畢,生命周期已經(jīng)結(jié)束,不會(huì)再轉(zhuǎn)換為其它狀態(tài)
線程在Java API層面有6種狀態(tài)
- NEW 線程剛被創(chuàng)建,但是還沒(méi)有調(diào)用 start() 方法
- RUNNABLE 當(dāng)調(diào)用了 start() 方法之后,注意,Java API 層面的
- RUNNABLE 狀態(tài)涵蓋了 操作系統(tǒng) 層面的【可運(yùn)行狀態(tài)】、【運(yùn)行狀態(tài)】
- BLOCKED , WAITING , TIMED_WAITING 都是 Java API 層面對(duì)【阻塞狀態(tài)】的細(xì)分
- TERMINATED 當(dāng)線程代碼運(yùn)行結(jié)束
線程池有5種狀態(tài)
- RUNNING:能接受新任務(wù),并處理阻塞隊(duì)列中的任務(wù)
- SHUTDOWN:不接受新任務(wù),但是可以處理阻塞隊(duì)列中的任務(wù)
- STOP:不接受新任務(wù),并且不處理阻塞隊(duì)列中的任務(wù),并且還打斷正在運(yùn)行任務(wù)的線程,就是直接不干了!
- TIDYING:所有任務(wù)都終止,并且工作線程也為0,處于關(guān)閉之前的狀態(tài)
- TERMINATED:已關(guān)閉。
總結(jié)
本篇文章就到這里了,希望能給你帶來(lái)幫助,也希望您能夠多多關(guān)注腳本之家的更多內(nèi)容!
相關(guān)文章
JAVA學(xué)習(xí)筆記:注釋、變量的聲明和定義操作實(shí)例分析
這篇文章主要介紹了JAVA學(xué)習(xí)筆記:注釋、變量的聲明和定義操作,結(jié)合實(shí)例形式分析了Java注釋、變量的聲明和定義相關(guān)原理、實(shí)現(xiàn)方法及操作注意事項(xiàng),需要的朋友可以參考下2020-04-04詳解java中動(dòng)態(tài)代理實(shí)現(xiàn)機(jī)制
這篇文章主要為大家介紹了java中動(dòng)態(tài)代理實(shí)現(xiàn)機(jī)制的相關(guān)資料,需要的朋友可以參考下2016-01-01詳解java封裝實(shí)現(xiàn)Excel建表讀寫(xiě)操作
這篇文章給大家分享了java封裝實(shí)現(xiàn)Excel建表讀寫(xiě)操作的相關(guān)知識(shí)點(diǎn)內(nèi)容,有需要的朋友們可以學(xué)習(xí)下。2018-08-08Mybatis主配置文件的properties標(biāo)簽詳解
這篇文章主要介紹了Mybatis主配置文件的properties標(biāo)簽,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-08-08SpringBoot詳解實(shí)現(xiàn)自定義異常處理頁(yè)面方法
SpringBoot是Spring全家桶的成員之一,是一種整合Spring技術(shù)棧的方式(或者說(shuō)是框架),同時(shí)也是簡(jiǎn)化Spring的一種快速開(kāi)發(fā)的腳手架2022-06-06Spring Security OAuth2 授權(quán)碼模式的實(shí)現(xiàn)
這篇文章主要介紹了Spring Security OAuth2 授權(quán)碼模式的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08SpringBoot如何取消內(nèi)置Tomcat啟動(dòng)并改用外接Tomcat
這篇文章主要介紹了SpringBoot如何取消內(nèi)置Tomcat啟動(dòng)并改用外接Tomcat,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11