Java手寫線程池之向JDK線程池進(jìn)發(fā)
前言
在前面的文章自己動手寫乞丐版線程池中,我們寫了一個非常簡單的線程池實現(xiàn),這個只是一個非常簡單的實現(xiàn),在本篇文章當(dāng)中我們將要實現(xiàn)一個和JDK內(nèi)部實現(xiàn)的線程池非常相似的線程池。
JDK線程池一瞥
我們首先看一個JDK給我們提供的線程池ThreadPoolExecutor
的構(gòu)造函數(shù)的參數(shù):
public?ThreadPoolExecutor(int?corePoolSize, ??????????????????????????????int?maximumPoolSize, ??????????????????????????????long?keepAliveTime, ??????????????????????????????TimeUnit?unit, ??????????????????????????????BlockingQueue<Runnable>?workQueue, ??????????????????????????????ThreadFactory?threadFactory, ??????????????????????????????RejectedExecutionHandler?handler)?
參數(shù)解釋:
1.corePoolSize:這個參數(shù)你可以理解為線程池當(dāng)中至少需要 corePoolSize 個線程,初始時線程池當(dāng)中線程的個數(shù)為0,當(dāng)線程池當(dāng)中線程的個數(shù)小于 corePoolSize 每次提交一個任務(wù)都會創(chuàng)建一個線程,并且先執(zhí)行這個提交的任務(wù),然后再去任務(wù)隊列里面去獲取新的任務(wù),然后再執(zhí)行。
2.maximumPoolSize:這個參數(shù)指的是線程池當(dāng)中能夠允許的最大的線程的數(shù)目,當(dāng)任務(wù)隊列滿了之后如果這個時候有新的任務(wù)想要加入隊列當(dāng)中,當(dāng)發(fā)現(xiàn)隊列滿了之后就創(chuàng)建新的線程去執(zhí)行任務(wù),但是需要滿足最大的線程的個數(shù)不能夠超過 maximumPoolSize 。
3.keepAliveTime 和 unit:這個主要是用于時間的表示,當(dāng)隊列當(dāng)中多長時間沒有數(shù)據(jù)的時候線程自己退出,前面談到了線程池當(dāng)中任務(wù)過多的時候會超過 corePoolSize ,當(dāng)線程池閑下來的時候這些多余的線程就可以退出了。
4.workQueue:這個就是用于保存任務(wù)的阻塞隊列。
5.threadFactory:這個參數(shù)倒不是很重要,線程工廠。
6.handler:這個表示拒絕策略,JDK給我們提供了四種策略:
- AbortPolicy:拋出異常。
- DiscardPolicy:放棄這個任務(wù)。
- CallerRunPolicy:提交任務(wù)的線程執(zhí)行。
- DiscardOldestPolicy:放棄等待時間最長的任務(wù)。
如果上面的參數(shù)你不能夠理解,可以先閱讀這篇文章自己動手寫乞丐版線程池。基于上面談到的參數(shù),線程池當(dāng)中提交任務(wù)的流程大致如下圖所示:
自己動手實現(xiàn)線程池
根據(jù)前面的參數(shù)分析我們自己實現(xiàn)的線程池需要實現(xiàn)一下功能:
- 能夠提交Runnable的任務(wù)和Callable的任務(wù)。
- 線程池能夠自己實現(xiàn)動態(tài)的擴容和所容,動態(tài)調(diào)整線程池當(dāng)中線程的數(shù)目,當(dāng)任務(wù)多的時候能夠增加線程的數(shù)目,當(dāng)任務(wù)少的時候多出來的線程能夠自動退出。
- 有自己的拒絕策略,當(dāng)任務(wù)隊列滿了,線程數(shù)也達(dá)到最大的時候,需要拒絕提交的任務(wù)。
線程池參數(shù)介紹
??private?AtomicInteger?ct?=?new?AtomicInteger(0);?//?當(dāng)前在執(zhí)行任務(wù)的線程個數(shù) ??private?int?corePoolSize; ??private?int?maximumPoolSize; ??private?long?keepAliveTime; ??private?TimeUnit?unit; ??private?BlockingQueue<Runnable>?taskQueue; ??private?RejectPolicy?policy; ??private?ArrayList<Worker>?workers?=?new?ArrayList<>(); ??private?volatile?boolean?isStopped; ??private?boolean?useTimed;
參數(shù)解釋如下:
- ct:表示當(dāng)前線程池當(dāng)中線程的個數(shù)。
- corePoolSize:線程池當(dāng)中核心線程的個數(shù),意義和上面談到的JDK的線程池意義一致。
- maximumPoolSize:線程池當(dāng)中最大的線程個數(shù),意義和上面談到的JDK的線程池意義一致。
- keepAliveTime 和 unit:和JDK線程池的參數(shù)意義一致。
- taskQueue:任務(wù)隊列,用不保存提交的任務(wù)。
- policy:拒絕策略,主要有一下四種策略:
public?enum?RejectPolicy?{ ??ABORT, ??CALLER_RUN, ??DISCARD_OLDEST, ??DISCARD }
workers:用于保存工作線程。
isStopped:線程池是否被關(guān)閉了。
useTimed:主要是用于表示是否使用上面的 keepAliveTime 和 unit,如果使用就是在一定的時間內(nèi),如果沒有從任務(wù)隊列當(dāng)中獲取到任務(wù),線程就從線程池退出,但是需要保證線程池當(dāng)中最小的線程個數(shù)不小于 corePoolSize 。
實現(xiàn)Runnable
??//?下面這個方法是向線程池提交任務(wù) ??public?void?execute(Runnable?runnable)?throws?InterruptedException?{ ????checkPoolState(); ????if?(addWorker(runnable,?false)??//?如果能夠加入新的線程執(zhí)行任務(wù)?加入成功就直接返回 ????????????||?!taskQueue.offer(runnable)?//?如果?taskQueue.offer(runnable)?返回?false?說明提交任務(wù)失敗?任務(wù)隊列已經(jīng)滿了 ????????????||?addWorker(runnable,?true))?//?使用能夠使用的最大的線程數(shù)?(maximumPoolSize)?看是否能夠產(chǎn)生新的線程 ??????return; ????//?如果任務(wù)隊列滿了而且不能夠加入新的線程?則拒絕這個任務(wù) ????if?(!taskQueue.offer(runnable)) ??????reject(runnable); ??}
在上面的代碼當(dāng)中:
checkPoolState函數(shù)是檢查線程池的狀態(tài),當(dāng)線程池被停下來之后就不能夠在提交任務(wù):
??private?void?checkPoolState()?{ ????if?(isStopped)?{ ??????//?如果線程池已經(jīng)停下來了,就不在向任務(wù)隊列當(dāng)中提交任務(wù)了 ??????throw?new?RuntimeException("thread?pool?has?been?stopped,?so?quit?submitting?task"); ????} ??}
addWorker函數(shù)是往線程池當(dāng)中提交任務(wù)并且產(chǎn)生一個線程,并且這個線程執(zhí)行的第一個任務(wù)就是傳遞的參數(shù)。max表示線程的最大數(shù)目,max == true 的時候表示使用 maximumPoolSize 否則使用 corePoolSize,當(dāng)返回值等于 true 的時候表示執(zhí)行成功,否則表示執(zhí)行失敗。
??/** ???* ???*?@param?runnable?需要被執(zhí)行的任務(wù) ???*?@param?max?是否使用?maximumPoolSize ???*?@return?boolean ???*/ ??public?synchronized?boolean?addWorker(Runnable?runnable,?boolean?max)?{ ????if?(ct.get()?>=?corePoolSize?&&?!max) ??????return?false; ????if?(ct.get()?>=?maximumPoolSize?&&?max) ??????return?false; ????Worker?worker?=?new?Worker(runnable); ????workers.add(worker); ????Thread?thread?=?new?Thread(worker,?"ThreadPool-"?+?"Thread-"?+?ct.addAndGet(1)); ????thread.start(); ????return?true; ??}
實現(xiàn)Callable
這個函數(shù)其實比較簡單,只需要將傳入的Callable對象封裝成一個FutureTask對象即可,因為FutureTask實現(xiàn)了Callable和Runnable兩個接口,然后將這個結(jié)果返回即可,得到這個對象,再調(diào)用對象的 get 方法就能夠得到結(jié)果。
??public?<V>?RunnableFuture<V>?submit(Callable<V>?task)?throws?InterruptedException?{ ????checkPoolState(); ????FutureTask<V>?futureTask?=?new?FutureTask<>(task); ????execute(futureTask); ????return?futureTask; ??}
拒絕策略的實現(xiàn)
根據(jù)前面提到的各種策略的具體實現(xiàn)方式,具體的代碼實現(xiàn)如下所示:
??private?void?reject(Runnable?runnable)?throws?InterruptedException?{ ????switch?(policy)?{ ??????case?ABORT: ????????throw?new?RuntimeException("task?queue?is?full"); ??????case?CALLER_RUN: ????????runnable.run(); ??????case?DISCARD:?//?直接放棄這個任務(wù) ????????return; ??????case?DISCARD_OLDEST: ????????//?放棄等待時間最長的任務(wù)?也就是隊列當(dāng)中的第一個任務(wù) ????????taskQueue.poll(); ????????execute(runnable);?//?重新執(zhí)行這個任務(wù) ????} ??}
線程池關(guān)閉實現(xiàn)
一共兩種方式實現(xiàn)線程池關(guān)閉:
- 直接關(guān)閉線程池,不管任務(wù)隊列當(dāng)中的任務(wù)是否被全部執(zhí)行完成。
- 安全關(guān)閉線程池,先等待任務(wù)隊列當(dāng)中所有的任務(wù)被執(zhí)行完成,再關(guān)閉線程池,但是在這個過程當(dāng)中不允許繼續(xù)提交任務(wù)了,這一點已經(jīng)在函數(shù) checkPoolState 當(dāng)中實現(xiàn)了。
??//?強制關(guān)閉線程池 ??public?synchronized?void?stop()?{ ????isStopped?=?true; ????for?(Worker?worker?:?workers)?{ ??????worker.stopWorker(); ????} ??} ??public?synchronized?void?shutDown()?{ ????//?先表示關(guān)閉線程池?線程就不能再向線程池提交任務(wù) ????isStopped?=?true; ????//?先等待所有的任務(wù)執(zhí)行完成再關(guān)閉線程池 ????waitForAllTasks(); ????stop(); ??} ??private?void?waitForAllTasks()?{ ????//?當(dāng)線程池當(dāng)中還有任務(wù)的時候?就不退出循環(huán) ????while?(taskQueue.size()?>?0)?{ ??????Thread.yield(); ??????try?{ ????????Thread.sleep(1000); ??????}?catch?(InterruptedException?e)?{ ????????e.printStackTrace(); ??????} ????} ??}
工作線程的工作實現(xiàn)
????@Override ????public?void?run()?{ ??????//?先執(zhí)行傳遞過來的第一個任務(wù)?這里是一個小的優(yōu)化?讓線程直接執(zhí)行第一個任務(wù)?不需要 ??????//?放入任務(wù)隊列再取出來執(zhí)行了 ??????firstTask.run(); ??????thisThread?=?Thread.currentThread(); ??????while?(!isStopped)?{ ????????try?{ ??????????//?是否使用時間就在這里顯示出來了 ??????????Runnable?task?=?useTimed???taskQueue.poll(keepAliveTime,?unit)?:?taskQueue.take(); ??????????if?(task?==?null)?{ ????????????int?i; ????????????boolean?exit?=?true; ????????????//?如果當(dāng)前線程數(shù)大于核心線程數(shù)?則使用?CAS?去退出?用于保證在線程安全下的退出 ????????????//?且保證線程的個數(shù)不小于?corePoolSize?下面這段代碼需要仔細(xì)分析一下 ????????????if?(ct.get()?>?corePoolSize)?{ ??????????????do{ ????????????????i?=?ct.get(); ????????????????if?(i?<=?corePoolSize)?{ ??????????????????exit?=?false; ??????????????????break; ????????????????} ??????????????}while?(!ct.compareAndSet(i,?i?-?1)); ??????????????if?(exit)?{ ????????????????return; ??????????????} ????????????} ??????????}else?{ ????????????task.run(); ??????????} ????????}?catch?(InterruptedException?e)?{ ??????????//?do?nothing ????????} ??????} ????}
我們現(xiàn)在來仔細(xì)分析一下,線程退出線程池的時候是如何保證線程池當(dāng)中總的線程數(shù)是不小于 corePoolSize 的!首先整體的框架是使用 CAS 進(jìn)行實現(xiàn),具體代碼為 do ... while 操作,然后在 while 操作里面使用 CAS 進(jìn)行測試替換,如果沒有成功再次獲取 ,當(dāng)線程池當(dāng)中核心線程的數(shù)目小于等于 corePoolSize 的時候也需要退出循環(huán),因為線程池當(dāng)中線程的個數(shù)不能小于 corePoolSize 。因此使用 break 跳出循環(huán)的線程是不會退出線程池的。
線程池實現(xiàn)的BUG
在我們自己實現(xiàn)的線程池當(dāng)中當(dāng)線程退出的時候,workers 當(dāng)中還保存這指向這個線程的對象,但是當(dāng)線程退出的時候我們還沒有在 workers 當(dāng)中刪除這個對象,因此這個線程對象不會被垃圾回收器收集掉,但是我們這個只是一個線程池實現(xiàn)的例子而已,并不用于生產(chǎn)環(huán)境,只是為了幫助大家理解線程池的原理。
完整代碼
package?cscore.concurrent.java.threadpoolv2; import?java.util.ArrayList; import?java.util.concurrent.*; import?java.util.concurrent.atomic.AtomicInteger; public?class?ThreadPool?{ ??private?AtomicInteger?ct?=?new?AtomicInteger(0);?//?當(dāng)前在執(zhí)行任務(wù)的線程個數(shù) ??private?int?corePoolSize; ??private?int?maximumPoolSize; ??private?long?keepAliveTime; ??private?TimeUnit?unit; ??private?BlockingQueue<Runnable>?taskQueue; ??private?RejectPolicy?policy; ??private?ArrayList<Worker>?workers?=?new?ArrayList<>(); ??private?volatile?boolean?isStopped; ??private?boolean?useTimed; ??public?int?getCt()?{ ????return?ct.get(); ??} ??public?ThreadPool(int?corePoolSize,?int?maximumPoolSize,?TimeUnit?unit,?long?keepAliveTime,?RejectPolicy?policy ??????????,?int?maxTasks)?{ ????//?please?add?-ea?to?vm?options?to?make?assert?keyword?enable ????assert?corePoolSize?>?0; ????assert?maximumPoolSize?>?0; ????assert?keepAliveTime?>=?0; ????assert?maxTasks?>?0; ????this.corePoolSize?=?corePoolSize; ????this.maximumPoolSize?=?maximumPoolSize; ????this.unit?=?unit; ????this.policy?=?policy; ????this.keepAliveTime?=?keepAliveTime; ????taskQueue?=?new?ArrayBlockingQueue<Runnable>(maxTasks); ????useTimed?=?keepAliveTime?!=?0; ??} ??/** ???* ???*?@param?runnable?需要被執(zhí)行的任務(wù) ???*?@param?max?是否使用?maximumPoolSize ???*?@return?boolean ???*/ ??public?synchronized?boolean?addWorker(Runnable?runnable,?boolean?max)?{ ????if?(ct.get()?>=?corePoolSize?&&?!max) ??????return?false; ????if?(ct.get()?>=?maximumPoolSize?&&?max) ??????return?false; ????Worker?worker?=?new?Worker(runnable); ????workers.add(worker); ????Thread?thread?=?new?Thread(worker,?"ThreadPool-"?+?"Thread-"?+?ct.addAndGet(1)); ????thread.start(); ????return?true; ??} ??//?下面這個方法是向線程池提交任務(wù) ??public?void?execute(Runnable?runnable)?throws?InterruptedException?{ ????checkPoolState(); ????if?(addWorker(runnable,?false)??//?如果能夠加入新的線程執(zhí)行任務(wù)?加入成功就直接返回 ????????????||?!taskQueue.offer(runnable)?//?如果?taskQueue.offer(runnable)?返回?false?說明提交任務(wù)失敗?任務(wù)隊列已經(jīng)滿了 ????????????||?addWorker(runnable,?true))?//?使用能夠使用的最大的線程數(shù)?(maximumPoolSize)?看是否能夠產(chǎn)生新的線程 ??????return; ????//?如果任務(wù)隊列滿了而且不能夠加入新的線程?則拒絕這個任務(wù) ????if?(!taskQueue.offer(runnable)) ??????reject(runnable); ??} ??private?void?reject(Runnable?runnable)?throws?InterruptedException?{ ????switch?(policy)?{ ??????case?ABORT: ????????throw?new?RuntimeException("task?queue?is?full"); ??????case?CALLER_RUN: ????????runnable.run(); ??????case?DISCARD: ????????return; ??????case?DISCARD_OLDEST: ????????//?放棄等待時間最長的任務(wù) ????????taskQueue.poll(); ????????execute(runnable); ????} ??} ??private?void?checkPoolState()?{ ????if?(isStopped)?{ ??????//?如果線程池已經(jīng)停下來了,就不在向任務(wù)隊列當(dāng)中提交任務(wù)了 ??????throw?new?RuntimeException("thread?pool?has?been?stopped,?so?quit?submitting?task"); ????} ??} ??public?<V>?RunnableFuture<V>?submit(Callable<V>?task)?throws?InterruptedException?{ ????checkPoolState(); ????FutureTask<V>?futureTask?=?new?FutureTask<>(task); ????execute(futureTask); ????return?futureTask; ??} ??//?強制關(guān)閉線程池 ??public?synchronized?void?stop()?{ ????isStopped?=?true; ????for?(Worker?worker?:?workers)?{ ??????worker.stopWorker(); ????} ??} ??public?synchronized?void?shutDown()?{ ????//?先表示關(guān)閉線程池?線程就不能再向線程池提交任務(wù) ????isStopped?=?true; ????//?先等待所有的任務(wù)執(zhí)行完成再關(guān)閉線程池 ????waitForAllTasks(); ????stop(); ??} ??private?void?waitForAllTasks()?{ ????//?當(dāng)線程池當(dāng)中還有任務(wù)的時候?就不退出循環(huán) ????while?(taskQueue.size()?>?0)?{ ??????Thread.yield(); ??????try?{ ????????Thread.sleep(1000); ??????}?catch?(InterruptedException?e)?{ ????????e.printStackTrace(); ??????} ????} ??} ??class?Worker?implements?Runnable?{ ????private?Thread?thisThread; ????private?final?Runnable?firstTask; ????private?volatile?boolean?isStopped; ????public?Worker(Runnable?firstTask)?{ ??????this.firstTask?=?firstTask; ????} ????@Override ????public?void?run()?{ ??????//?先執(zhí)行傳遞過來的第一個任務(wù)?這里是一個小的優(yōu)化?讓線程直接執(zhí)行第一個任務(wù)?不需要 ??????//?放入任務(wù)隊列再取出來執(zhí)行了 ??????firstTask.run(); ??????thisThread?=?Thread.currentThread(); ??????while?(!isStopped)?{ ????????try?{ ??????????Runnable?task?=?useTimed???taskQueue.poll(keepAliveTime,?unit)?:?taskQueue.take(); ??????????if?(task?==?null)?{ ????????????int?i; ????????????boolean?exit?=?true; ????????????if?(ct.get()?>?corePoolSize)?{ ??????????????do{ ????????????????i?=?ct.get(); ????????????????if?(i?<=?corePoolSize)?{ ??????????????????exit?=?false; ??????????????????break; ????????????????} ??????????????}while?(!ct.compareAndSet(i,?i?-?1)); ??????????????if?(exit)?{ ????????????????return; ??????????????} ????????????} ??????????}else?{ ????????????task.run(); ??????????} ????????}?catch?(InterruptedException?e)?{ ??????????//?do?nothing ????????} ??????} ????} ????public?synchronized?void?stopWorker()?{ ??????if?(isStopped)?{ ????????throw?new?RuntimeException("thread?has?been?interrupted"); ??????} ??????isStopped?=?true; ??????thisThread.interrupt(); ????} ??} }
線程池測試
package?cscore.concurrent.java.threadpoolv2; import?java.util.concurrent.ExecutionException; import?java.util.concurrent.RunnableFuture; import?java.util.concurrent.TimeUnit; public?class?Test?{ ??public?static?void?main(String[]?args)?throws?InterruptedException,?ExecutionException?{ ????var?pool?=?new?ThreadPool(2,?5,?TimeUnit.SECONDS,?10,?RejectPolicy.ABORT,?100000); ????for?(int?i?=?0;?i?<?10;?i++)?{ ??????RunnableFuture<Integer>?submit?=?pool.submit(()?->?{ ????????System.out.println(Thread.currentThread().getName()?+?"?output?a"); ????????try?{ ??????????Thread.sleep(10); ????????}?catch?(InterruptedException?e)?{ ??????????e.printStackTrace(); ????????} ????????return?0; ??????}); ??????System.out.println(submit.get()); ????} ????int?n?=?15; ????while?(n--?>?0)?{ ??????System.out.println("Number?Threads?=?"?+?pool.getCt()); ??????Thread.sleep(1000); ????} ????pool.shutDown(); ??} }
上面測試代碼的輸出結(jié)果如下所示:
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-4 output a
Number Threads = 5
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-4 output a
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-4 output a
ThreadPool-Thread-3 output a
ThreadPool-Thread-5 output a
ThreadPool-Thread-2 output a
ThreadPool-Thread-1 output a
ThreadPool-Thread-4 output a
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 5
Number Threads = 3
Number Threads = 2
Number Threads = 2
Number Threads = 2
Number Threads = 2
從上面的代碼可以看出我們實現(xiàn)了正確的任務(wù)實現(xiàn)結(jié)果,同時線程池當(dāng)中的核心線程數(shù)從 2 變到了 5 ,當(dāng)線程池當(dāng)中任務(wù)隊列全部別執(zhí)行完成之后,線程的數(shù)目重新降下來了,這確實是我們想要達(dá)到的結(jié)果。
總結(jié)
在本篇文章當(dāng)中主要給大家介紹了如何實現(xiàn)一個類似于JDK中的線程池,里面有非常多的實現(xiàn)細(xì)節(jié),大家可以仔細(xì)捋一下其中的流程,對線程池的理解將會非常有幫助。
以上就是Java手寫線程池之向JDK線程池進(jìn)發(fā)的詳細(xì)內(nèi)容,更多關(guān)于Java手寫線程池的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot使用Shiro實現(xiàn)動態(tài)加載權(quán)限詳解流程
本文小編將基于?SpringBoot?集成?Shiro?實現(xiàn)動態(tài)uri權(quán)限,由前端vue在頁面配置uri,Java后端動態(tài)刷新權(quán)限,不用重啟項目,以及在頁面分配給用戶?角色?、?按鈕?、uri?權(quán)限后,后端動態(tài)分配權(quán)限,用戶無需在頁面重新登錄才能獲取最新權(quán)限,一切權(quán)限動態(tài)加載,靈活配置2022-07-07java判斷l(xiāng)ist不為空的實現(xiàn),和限制條數(shù)不要在一起寫
這篇文章主要介紹了java判斷l(xiāng)ist不為空的實現(xiàn),和限制條數(shù)不要在一起寫。具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-01-01java新增關(guān)聯(lián)的三張表,每張表要求都插入集合,代碼實現(xiàn)方式
這篇文章主要介紹了java新增關(guān)聯(lián)的三張表,每張表要求都插入集合,代碼實現(xiàn)方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12使用Java實現(xiàn)動態(tài)生成MySQL數(shù)據(jù)庫
這篇文章主要為大家詳細(xì)介紹了如何使用Java實現(xiàn)動態(tài)生成MySQL數(shù)據(jù)庫,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-02-02