Java實(shí)現(xiàn)手寫(xiě)乞丐版線程池的示例代碼
前言
在上篇文章線程池的前世今生當(dāng)中我們介紹了實(shí)現(xiàn)線程池的原理,在這篇文章當(dāng)中我們主要介紹實(shí)現(xiàn)一個(gè)非常簡(jiǎn)易版的線程池,深入的去理解其中的原理,麻雀雖小,五臟俱全。
線程池的具體實(shí)現(xiàn)
線程池實(shí)現(xiàn)思路
任務(wù)保存到哪里?
在上篇文章線程池的前世今生當(dāng)中我們具體去介紹了線程池當(dāng)中的原理。在線程池當(dāng)中我們有很多個(gè)線程不斷的從任務(wù)池(用戶在使用線程池的時(shí)候不斷的使用execute方法將任務(wù)添加到線程池當(dāng)中)里面去拿任務(wù)然后執(zhí)行,現(xiàn)在需要思考我們應(yīng)該用什么去實(shí)現(xiàn)任務(wù)池呢?
答案是阻塞隊(duì)列,因?yàn)槲覀冃枰WC在多個(gè)線程往任務(wù)池里面加入任務(wù)的時(shí)候并發(fā)安全,JDK已經(jīng)給我們提供了這樣的數(shù)據(jù)結(jié)構(gòu)——BlockingQueue,這個(gè)是一個(gè)并發(fā)安全的阻塞隊(duì)列,他之所以叫做阻塞隊(duì)列,是因?yàn)槲覀兛梢栽O(shè)置隊(duì)列當(dāng)中可以容納數(shù)據(jù)的個(gè)數(shù),當(dāng)加入到隊(duì)列當(dāng)中的數(shù)據(jù)超過(guò)這個(gè)值的時(shí)候,試圖將數(shù)據(jù)加入到阻塞隊(duì)列當(dāng)中的線程就會(huì)被掛起。當(dāng)隊(duì)列當(dāng)中為空的時(shí)候,試圖從隊(duì)列當(dāng)中取出數(shù)據(jù)的線程也會(huì)被掛起。
線程的設(shè)計(jì)
在我們自己實(shí)現(xiàn)的線程池當(dāng)中我們定一個(gè)Worker類去不斷的從任務(wù)池當(dāng)中取出任務(wù),然后進(jìn)行執(zhí)行。在我們自己定義的worker當(dāng)中還需要有一個(gè)變量isStopped表示線程是否停止工作。同時(shí)在worker當(dāng)中還需要保存當(dāng)前是哪個(gè)線程在執(zhí)行任務(wù),因此在我們自己設(shè)計(jì)的woker類當(dāng)中還需要有一個(gè)thisThread變量,保存正在執(zhí)行任務(wù)的線程,因此worker的整體設(shè)計(jì)如下:
package cscore.concurrent.java.threadpool; import java.util.concurrent.BlockingQueue; public class Worker implements Runnable { private Thread thisThread; // 表示正在執(zhí)行任務(wù)的線程 private BlockingQueue<Runnable> taskQueue; // 由線程池傳遞過(guò)來(lái)的任務(wù)隊(duì)列 private volatile boolean isStopped; // 表示 worker 是否停止工作 需要使用 volatile 保證線程之間的可見(jiàn)性 public Worker(BlockingQueue taskQueue) { // 這個(gè)構(gòu)造方法是在線程池的實(shí)現(xiàn)當(dāng)中會(huì)被調(diào)用 this.taskQueue = taskQueue; } // 線程執(zhí)行的函數(shù) @Override public void run() { thisThread = Thread.currentThread(); // 獲取執(zhí)行任務(wù)的線程 while (!isStopped) { // 當(dāng)線程沒(méi)有停止的時(shí)候就不斷的去任務(wù)池當(dāng)中取出任務(wù) try { Runnable task = taskQueue.take(); // 從任務(wù)池當(dāng)中取出任務(wù) 當(dāng)沒(méi)有任務(wù)的時(shí)候線程會(huì)被這個(gè)方法阻塞 task.run(); // 執(zhí)行任務(wù) 任務(wù)就是一個(gè) Runnable 對(duì)象 } catch (InterruptedException e) { // do nothing // 這個(gè)地方很重要 你有沒(méi)有思考過(guò)一個(gè)問(wèn)題當(dāng)任務(wù)池當(dāng)中沒(méi)有任務(wù)的時(shí)候 線程會(huì)被阻塞在 take 方法上 // 如果我們后面沒(méi)有任務(wù)提交拿他就會(huì)一直阻塞 那么我們?cè)撊绾螁拘阉? // 答案就在下面的函數(shù)當(dāng)中 調(diào)用線程的 interruput 方法 那么take方法就會(huì)產(chǎn)生一個(gè)異常 然后我們 // 捕獲到一異常 然后線程退出 } } } public synchronized void stopWorker() { if (isStopped) { throw new RuntimeException("thread has been interrupted"); } isStopped = true; thisThread.interrupt(); // 中斷線程產(chǎn)生異常 } public synchronized boolean isStopped() { return isStopped; } }
線程池的參數(shù)
在我們自己實(shí)現(xiàn)的線程池當(dāng)中,我們只需要定義兩個(gè)參數(shù)一個(gè)是線程的個(gè)數(shù),另外一個(gè)是阻塞隊(duì)列(任務(wù)池)當(dāng)中最大的任務(wù)個(gè)數(shù)。在我們自己實(shí)現(xiàn)的線程池當(dāng)中還需要有一個(gè)變量isStopped表示線程池是否停止工作了,因此線程池的初步設(shè)計(jì)大致如下:
private BlockingQueue taskQueue; // 任務(wù)池 private volatile boolean isStopped; // private final List<Worker> workers = new ArrayList<>();// 保存所所有的執(zhí)行任務(wù)的線程 public ThreadPool(int numThreads, int maxTasks) { this.taskQueue = new ArrayBlockingQueue(maxTasks); for (int i = 0; i < numThreads; i++) { workers.add(new Worker(this.taskQueue)); } int i = 1; // 這里產(chǎn)生線程 然后啟動(dòng)線程 for (Worker worker : workers) { new Thread(worker, "ThreadPool-" + i + "-thread").start(); i++; } }
線程池實(shí)現(xiàn)代碼
在上文當(dāng)中我們大致設(shè)計(jì)的線程池的初步結(jié)構(gòu),從上面的結(jié)果可以看出當(dāng)我們?cè)煲粋€(gè)ThreadPool對(duì)象的時(shí)候會(huì)產(chǎn)生指定線程的數(shù)目線程并且啟動(dòng)他們?nèi)?zhí)行任務(wù),現(xiàn)在我們還需要設(shè)計(jì)的就是如果關(guān)閉線程!我們?cè)陉P(guān)閉線程的時(shí)候還需要保證所有的任務(wù)都被執(zhí)行完成然后才關(guān)閉所有的線程,再退出,我們?cè)O(shè)計(jì)這個(gè)方法為shutDown。除此之外我們還設(shè)計(jì)一個(gè)函數(shù)可以強(qiáng)制退出,不用執(zhí)行所有的任務(wù)了,就直接退出,這個(gè)方法為stop。整個(gè)線程池實(shí)現(xiàn)的代碼如下:
package cscore.concurrent.java.threadpool; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class ThreadPool { private BlockingQueue taskQueue; private volatile boolean isStopped; private final List<Worker> workers = new ArrayList<>(); public ThreadPool(int numThreads, int maxTasks) { this.taskQueue = new ArrayBlockingQueue(maxTasks); for (int i = 0; i < numThreads; i++) { workers.add(new Worker(this.taskQueue)); } int i = 1; for (Worker worker : workers) { new Thread(worker, "ThreadPool-" + i + "-thread").start(); i++; } } // 下面這個(gè)方法是向線程池提交任務(wù) public void execute(Runnable runnable) throws InterruptedException { if (isStopped) { // 如果線程池已經(jīng)停下來(lái)了,就不在向任務(wù)隊(duì)列當(dāng)中提交任務(wù)了 System.err.println("thread pool has been stopped, so quit submitting task"); return; } taskQueue.put(runnable); } // 強(qiáng)制關(guān)閉線程池 public synchronized void stop() { isStopped = true; for (Worker worker : workers) { worker.stopWorker(); } } public synchronized void shutDown() { // 先表示關(guān)閉線程池 線程就不能再向線程池提交任務(wù) isStopped = true; // 先等待所有的任務(wù)執(zhí)行完成再關(guān)閉線程池 waitForAllTasks(); stop(); } private void waitForAllTasks() { // 當(dāng)線程池當(dāng)中還有任務(wù)的時(shí)候 就不退出循環(huán) while (taskQueue.size() > 0) Thread.yield(); } }
線程池測(cè)試代碼
package cscore.concurrent.java.threadpool; public class TestPool { public static void main(String[] args) throws InterruptedException { ThreadPool pool = new ThreadPool(3, 1024); for (int i = 0; i < 10; i++) { int tmp = i; pool.execute(() -> { System.out.println(Thread.currentThread().getName() + " say hello " + tmp); }); } pool.shutDown(); } }
上面的代碼輸出結(jié)果:
ThreadPool-2-thread say hello 1
ThreadPool-2-thread say hello 3
ThreadPool-2-thread say hello 4
ThreadPool-2-thread say hello 5
ThreadPool-2-thread say hello 6
ThreadPool-2-thread say hello 7
ThreadPool-2-thread say hello 8
ThreadPool-2-thread say hello 9
ThreadPool-3-thread say hello 2
ThreadPool-1-thread say hello 0
從上面的結(jié)果來(lái)看確實(shí)實(shí)現(xiàn)了線程池的效果。
雜談
可能你會(huì)有疑問(wèn),當(dāng)我們調(diào)用 interrupt的時(shí)候是如何產(chǎn)生異常的,我們仔細(xì)看一個(gè)阻塞隊(duì)列的實(shí)現(xiàn)。在ArrayBlockingQueue當(dāng)中take方法實(shí)現(xiàn)如下:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
在這個(gè)方法當(dāng)中調(diào)用的是鎖的lock.lockInterruptibly();方法,當(dāng)調(diào)用這個(gè)方法的時(shí)候線程是可以被interrupt方法中斷的,然后會(huì)拋出InterruptedException異常。
總結(jié)
在本篇文章當(dāng)中我們主要實(shí)現(xiàn)了一個(gè)乞丐版的線程池,這個(gè)線程池離JDK給我們提供的線程池還是有一點(diǎn)距離,JDK給我們提供給的線程池還有很多其他的參數(shù),我們將在后續(xù)的幾篇文章當(dāng)中繼續(xù)向JDK給我們提供的線程池靠近,直至實(shí)現(xiàn)一個(gè)盜版的JDK的線程池。本篇文章的代碼在下面的鏈接當(dāng)中也可以訪問(wèn)。
到此這篇關(guān)于Java實(shí)現(xiàn)手寫(xiě)乞丐版線程池的示例代碼的文章就介紹到這了,更多相關(guān)Java線程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Java實(shí)現(xiàn)手寫(xiě)一個(gè)線程池的示例代碼
- 一文了解Java?線程池的正確使用姿勢(shì)
- Java手寫(xiě)線程池之向JDK線程池進(jìn)發(fā)
- Java線程池源碼的深度解析
- 一篇文章帶你搞懂Java線程池實(shí)現(xiàn)原理
- 詳解Java線程池如何統(tǒng)計(jì)線程空閑時(shí)間
- Java中的異步與線程池解讀
- 詳解Java線程池隊(duì)列中的延遲隊(duì)列DelayQueue
- 一文帶你弄懂Java中線程池的原理
- java 線程池的實(shí)現(xiàn)原理、優(yōu)點(diǎn)與風(fēng)險(xiǎn)、以及4種線程池實(shí)現(xiàn)
相關(guān)文章
Springboot集成Elasticsearch的步驟與相關(guān)功能
ElasticSearch是開(kāi)源搜索平臺(tái)領(lǐng)域的一個(gè)新成員,?ElasticSearch是一個(gè)基于Lucene構(gòu)建的開(kāi)源,分布式,RESTful搜索引擎,這篇文章主要給大家介紹了關(guān)于Springboot集成Elasticsearch的相關(guān)資料,需要的朋友可以參考下2021-12-12java ReentrantLock條件鎖實(shí)現(xiàn)原理示例詳解
這篇文章主要為大家介紹了java ReentrantLock條件鎖實(shí)現(xiàn)原理示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-01-01MyBatis的SQL執(zhí)行結(jié)果和客戶端執(zhí)行結(jié)果不一致問(wèn)題排查
本文主要介紹了MyBatis的SQL執(zhí)行結(jié)果和客戶端執(zhí)行結(jié)果不一致問(wèn)題排查,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-04-04MySQL?MyBatis?默認(rèn)插入當(dāng)前時(shí)間方式
這篇文章主要介紹了MySQL?MyBatis?默認(rèn)插入當(dāng)前時(shí)間方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-10-10java新增關(guān)聯(lián)的三張表,每張表要求都插入集合,代碼實(shí)現(xiàn)方式
這篇文章主要介紹了java新增關(guān)聯(lián)的三張表,每張表要求都插入集合,代碼實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12Java中Date數(shù)據(jù)類型的數(shù)值轉(zhuǎn)換方式
這篇文章主要介紹了Java中Date數(shù)據(jù)類型的數(shù)值轉(zhuǎn)換方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07SpringBoot配置文件的優(yōu)先級(jí)順序、加載順序、bootstrap.yml與application.yml區(qū)別及說(shuō)明
在SpringBoot中,配置文件的優(yōu)先級(jí)順序是:application-{profile}.yml或.properties > application.yml或.properties > bootstrap.yml或.properties,{profile}代表不同環(huán)境,如dev、test、prod,加載順序是先加載bootstrap文件2024-09-09springboot如何接收application/x-www-form-urlencoded類型的請(qǐng)求
這篇文章主要介紹了springboot如何接收application/x-www-form-urlencoded類型的請(qǐng)求,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11Java?SpringTask定時(shí)自動(dòng)化處理方法
這篇文章主要介紹了Java?SpringTask定時(shí)自動(dòng)化處理,通過(guò)自動(dòng)化,不僅可以提高工作效率和準(zhǔn)確性,還可以釋放人力資源以專注于更高價(jià)值的工作,需要的朋友可以參考下2024-08-08