Java實(shí)現(xiàn)手寫一個(gè)線程池的示例代碼
概述
線程池技術(shù)想必大家都不陌生把,相信在平時(shí)的工作中沒(méi)有少用,而且這也是面試頻率非常高的一個(gè)知識(shí)點(diǎn),那么大家知道它的實(shí)現(xiàn)原理和細(xì)節(jié)嗎?如果直接去看jdk源碼的話,可能有一定的難度,那么我們可以先通過(guò)手寫一個(gè)簡(jiǎn)單的線程池框架,去掌握線程池的基本原理后,再去看jdk的線程池源碼就會(huì)相對(duì)容易,而且不容易忘記。
線程池框架設(shè)計(jì)
我們都知道,線程資源的創(chuàng)建和銷毀并不是沒(méi)有代價(jià)的,甚至開(kāi)銷是非常高的。同時(shí),線程也不是任意多創(chuàng)建的,因?yàn)榛钴S的線程會(huì)消耗系統(tǒng)資源,特別是內(nèi)存,在一定的范圍內(nèi),增加線程可以提高系統(tǒng)的吞吐率,如果超過(guò)了這個(gè)范圍,反而會(huì)降低程序的執(zhí)行速度。
因此,設(shè)計(jì)一個(gè)容納多個(gè)線程的容器,容器中的線程可以重復(fù)使用,省去了頻繁創(chuàng)建和銷毀線程對(duì)象的操作, 達(dá)到下面的目標(biāo):
- 降低資源消耗,減少了創(chuàng)建和銷毀線程的次數(shù),每個(gè)工作線程都可以被重復(fù)利用,可執(zhí)行多個(gè)任務(wù)
- 提高響應(yīng)速度,當(dāng)任務(wù)到達(dá)時(shí),如果有線程可以直接用,不會(huì)出現(xiàn)系統(tǒng)僵死
- 提高線程的可管理性,如果無(wú)限制的創(chuàng)建線程,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控
線程池的核心思想: 線程復(fù)用,同一個(gè)線程可以被重復(fù)使用,來(lái)處理多個(gè)任務(wù)。
為了實(shí)現(xiàn)線程池功能,需要考慮下面幾個(gè)設(shè)計(jì)要點(diǎn):
- 線程池可以接口外部提交的任務(wù)執(zhí)行
- 線程池有工作線程的數(shù)量,有任務(wù)執(zhí)行,沒(méi)有任務(wù)也空閑在那,等待任務(wù)過(guò)來(lái),這樣既避免線程頻繁創(chuàng)建銷毀帶來(lái)的開(kāi)銷,同時(shí)也可以避免線程池?zé)o限制的創(chuàng)建線程
- 如果線程池接受提交的任務(wù)超過(guò)工作線程的數(shù)量了,該怎么辦?可以用一個(gè)隊(duì)列把任務(wù)存下來(lái),等工作線程完成任務(wù)后去隊(duì)列中獲取任務(wù),執(zhí)行
- 那如果任務(wù)實(shí)在是太多太多了,達(dá)到了我們認(rèn)為的隊(duì)列最大值,怎么辦,我們可以設(shè)計(jì)一種任務(wù)太多的策略,可以進(jìn)行切換,比如直接丟棄任務(wù)、報(bào)錯(cuò)等等
看了上面的設(shè)計(jì)目標(biāo)和要點(diǎn),是不是能立刻想到一個(gè)非常經(jīng)典的設(shè)計(jì)模型——生產(chǎn)者消費(fèi)者模型。
- 阻塞隊(duì)列存儲(chǔ)執(zhí)行任務(wù),比如外部main函數(shù)作為生產(chǎn)者向隊(duì)列生產(chǎn)任務(wù)。
- 線程池中的工作線程作為消費(fèi)者獲取任務(wù)執(zhí)行。
現(xiàn)在我們將我們的設(shè)計(jì)思路轉(zhuǎn)換為代碼。
代碼實(shí)現(xiàn)
阻塞隊(duì)列的實(shí)現(xiàn)
- 阻塞隊(duì)列主要存放任務(wù),有容量限制
- 阻塞隊(duì)列提供添加和刪除任務(wù)的API, 如果超過(guò)容量,阻塞不能添加任務(wù),如果沒(méi)有任務(wù),阻塞無(wú)法獲取任務(wù)。
/** * <p>自定義任務(wù)隊(duì)列, 用來(lái)存放任務(wù) </p> * * @author: cxw (332059317@qq.com) * @date: 2022/10/18 10:15 * @version: 1.0.0 */ @Slf4j(topic = "c.BlockingQueue") public class BlockingQueue<T> { // 容量 private int capcity; // 雙端任務(wù)隊(duì)列容器 private Deque<T> deque = new ArrayDeque<>(); // 重入鎖 private ReentrantLock lock = new ReentrantLock(); // 生產(chǎn)者條件變量 private Condition fullWaitSet = lock.newCondition(); // 生產(chǎn)者條件變量 private Condition emptyWaitSet = lock.newCondition(); public BlockingQueue(int capcity) { this.capcity = capcity; } // 阻塞的方式添加任務(wù) public void put(T task) { lock.lock(); try { // 通過(guò)while的方式 while (deque.size() >= capcity) { log.debug("wait to add queue"); try { fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } deque.offer(task); log.debug("task add successfully"); emptyWaitSet.signal(); } finally { lock.unlock(); } } // 阻塞獲取任務(wù) public T take() { lock.lock(); try { // 通過(guò)while的方式 while (deque.isEmpty()) { try { log.debug("wait to take task"); emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } fullWaitSet.signal(); T task = deque.poll(); log.debug("take task successfully"); // 從隊(duì)列中獲取元素 return task; } finally { lock.unlock(); } } }
- put()方法是向阻塞隊(duì)列中添加任務(wù)
- take()方法是向阻塞隊(duì)列中獲取任務(wù)
線程池消費(fèi)端實(shí)現(xiàn)
1.定義執(zhí)行器接口
/** * <p>定義一個(gè)執(zhí)行器的接口:</p> * * @author: cxw (332059317@qq.com) * @date: 2022/10/18 12:31 * @version: 1.0.0 */ public interface Executor { /** * 提交任務(wù)執(zhí)行 * @param task 任務(wù) */ void execute(Runnable task); }
2.定義線程池類實(shí)現(xiàn)該接口
@Slf4j(topic = "c.ThreadPool") public class ThreadPool implements Executor { /** * 任務(wù)隊(duì)列 */ private BlockingQueue<Runnable> taskQueue; /** * 核心工作線程數(shù) */ private int coreSize; /** * 工作線程集合 */ private Set<Worker> workers = new HashSet<>(); /** * 創(chuàng)建線程池 * @param coreSize 工作線程數(shù)量 * @param capcity 阻塞隊(duì)列容量 */ public ThreadPool(int coreSize, int capcity) { this.coreSize = coreSize; this.taskQueue = new BlockingQueue<>(capcity); } /** * 提交任務(wù)執(zhí)行 */ @Override public void execute(Runnable task) { synchronized (workers) { // 如果工作線程數(shù)小于閾值,直接開(kāi)始任務(wù)執(zhí)行 if(workers.size() < coreSize) { Worker worker = new Worker(task); workers.add(worker); worker.start(); } else { // 如果超過(guò)了閾值,加入到隊(duì)列中 taskQueue.put(task); } } } /** * 工作線程,對(duì)執(zhí)行的任務(wù)做了一層包裝處理 */ class Worker extends Thread { private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { // 如果任務(wù)不為空,或者可以從隊(duì)列中獲取任務(wù) while (task != null || (task = taskQueue.take()) != null) { try { task.run(); } catch (Exception e) { e.printStackTrace(); } finally { // 執(zhí)行完后,設(shè)置任務(wù)為空 task = null; } } // 移除工作線程 synchronized (workers){ log.debug("remove worker successfully"); workers.remove(this); } } } }
- Worker類是工作線程類,包裝了執(zhí)行任務(wù),里面實(shí)現(xiàn)了從隊(duì)列獲取任務(wù),然后執(zhí)行任務(wù)。
- execute方法的實(shí)現(xiàn)中,如果工作線程數(shù)量小于閾值的話,直接創(chuàng)建新的工作線程,否則將任務(wù)添加到隊(duì)列中。
3.演示
@Test public void testThreadPool1() throws InterruptedException { Executor executor = new ThreadPool(2, 4); // 提交任務(wù) for (int i = 0; i < 6; i++) { final int j = i; executor.execute(() -> { try { Thread.sleep(10); log.info("run task {}", j); } catch (InterruptedException e) { e.printStackTrace(); } }); Thread.sleep(10); } Thread.sleep(10000); }
運(yùn)行結(jié)果:
獲取任務(wù)超時(shí)設(shè)計(jì)
目前從隊(duì)列中獲取任務(wù)是永久阻塞等待的,可以改成阻塞一段時(shí)間沒(méi)有獲取任務(wù),丟棄的策略。
@Slf4j(topic = "c.TimeoutBlockingQueue") public class TimeoutBlockingQueue<T> { // 容量 private int capcity; // 雙端任務(wù)隊(duì)列容器 private Deque<T> deque = new ArrayDeque<>(); // 重入鎖 private ReentrantLock lock = new ReentrantLock(); // 生產(chǎn)者條件變量 private Condition fullWaitSet = lock.newCondition(); // 生產(chǎn)者條件變量 private Condition emptyWaitSet = lock.newCondition(); public TimeoutBlockingQueue(int capcity) { this.capcity = capcity; } // 帶超時(shí)時(shí)間的獲取 public T poll(long timeout, TimeUnit unit){ lock.lock(); try{ // 將 timeout 統(tǒng)一轉(zhuǎn)換為 納秒 long nanos = unit.toNanos(timeout); while (deque.isEmpty()){ try { if (nanos<=0){ return null; } // 返回的是剩余的等待時(shí)間,更改navos的值,使虛假喚醒的時(shí)候可以繼續(xù)等待 nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } fullWaitSet.signal(); return deque.getFirst(); }finally { lock.unlock(); } } // 帶超時(shí)時(shí)間的增加 public boolean offer(T task , long timeout , TimeUnit unit){ lock.lock(); try{ // 將 timeout 統(tǒng)一轉(zhuǎn)換為 納秒 long nanos = unit.toNanos(timeout); while (deque.size() == capcity){ try { if (nanos<=0){ return false; } // 更新剩余需要等待的時(shí)間 nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("加入任務(wù)隊(duì)列 {}", task); deque.addLast(task); emptyWaitSet.signal(); return true; }finally { lock.unlock(); } } }
新加TimeoutBlockingQueue類,添加offer和poll待超時(shí)的添加和獲取任務(wù)的方法。
拒絕策略設(shè)計(jì)
目前的實(shí)現(xiàn)還是有個(gè)漏洞,無(wú)法自定義任務(wù)超出閾值的一個(gè)拒絕策略,我們可以通過(guò)利用函數(shù)式編程+策略模式去實(shí)現(xiàn)。
1.定義策略模式的函數(shù)式接口
/** * <p>拒絕策略的函數(shù)式接口:</p> * * @author: cxw (332059317@qq.com) * @date: 2022/10/18 13:15 * @version: 1.0.0 */ @FunctionalInterface public interface RejectPolicy<T> { /** * 拒絕策略的接口 * @param queue * @param task */ void reject(BlockingQueue<T> queue, T task); }
2.添加函數(shù)式接口的調(diào)用入口
我們可以在阻塞隊(duì)列添加任務(wù)新加一個(gè)api, 添加任務(wù)如果超過(guò)容量,調(diào)用函數(shù)式接口。
@Slf4j(topic = "c.BlockingQueue") public class BlockingQueue<T> { ........ /** * 嘗試添加任務(wù) * @param rejectPolicy * @param task */ public void tryPut(RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try{ // 如果隊(duì)列超過(guò)容量 if (deque.size()> capcity){ log.debug("task too much, do reject"); rejectPolicy.reject(this, task); }else { deque.offer(task); emptyWaitSet.signal(); } }finally { lock.unlock(); } } }
3.修改ThreadPool類
@Slf4j(topic = "c.ThreadPool") public class ThreadPool implements Executor { ..... /** * 拒絕策略 */ private RejectPolicy rejectPolicy; // 通過(guò)構(gòu)造方法傳入執(zhí)行的拒絕策略 public ThreadPool(int coreSize, int capcity, RejectPolicy rejectPolicy) { this.coreSize = coreSize; this.taskQueue = new BlockingQueue<>(capcity); this.rejectPolicy = rejectPolicy; } /** * 提交任務(wù)執(zhí)行 */ @Override public void execute(Runnable task) { synchronized (workers) { // 如果工作線程數(shù)小于閾值,直接開(kāi)始任務(wù)執(zhí)行 if(workers.size() < coreSize) { Worker worker = new Worker(task); workers.add(worker); worker.start(); } else { // 如果超過(guò)了閾值,加入到隊(duì)列中 //taskQueue.put(task); // 調(diào)用tryPut的方式 taskQueue.tryPut(rejectPolicy, task); } } } .... }
通過(guò)構(gòu)造方法的方式傳入要執(zhí)行的拒絕策略
調(diào)用tryPut方法添加任務(wù)
4.演示
以上就是Java實(shí)現(xiàn)手寫一個(gè)線程池的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于Java線程池的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Java實(shí)現(xiàn)手寫乞丐版線程池的示例代碼
- 一文了解Java?線程池的正確使用姿勢(shì)
- Java手寫線程池之向JDK線程池進(jìn)發(fā)
- Java線程池源碼的深度解析
- 一篇文章帶你搞懂Java線程池實(shí)現(xiàn)原理
- 詳解Java線程池如何統(tǒng)計(jì)線程空閑時(shí)間
- Java中的異步與線程池解讀
- 詳解Java線程池隊(duì)列中的延遲隊(duì)列DelayQueue
- 一文帶你弄懂Java中線程池的原理
- java 線程池的實(shí)現(xiàn)原理、優(yōu)點(diǎn)與風(fēng)險(xiǎn)、以及4種線程池實(shí)現(xiàn)
相關(guān)文章
Springboot自定義mybatis攔截器實(shí)現(xiàn)擴(kuò)展
本文主要介紹了Springboot自定義mybatis攔截器實(shí)現(xiàn)擴(kuò)展,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-12-12Java listener簡(jiǎn)介_(kāi)動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要介紹了Java listener簡(jiǎn)介,可以用于統(tǒng)計(jì)用戶在線人數(shù)等,有興趣的可以了解一下2017-07-07SpringBoot中的五種對(duì)靜態(tài)資源的映射規(guī)則的實(shí)現(xiàn)
這篇文章主要介紹了SpringBoot中的五種對(duì)靜態(tài)資源的映射規(guī)則的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-12-12Java如何實(shí)現(xiàn)可折疊Panel方法示例
這篇文章主要給大家介紹了關(guān)于利用Java如何實(shí)現(xiàn)可折疊Panel的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用java具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2018-07-07java文件操作練習(xí)代碼 讀取某個(gè)盤符下的文件
這篇文章主要介紹了java讀取某個(gè)盤符下的文件示例,代碼中要求的是絕對(duì)路徑,編譯過(guò)程中要注意絕對(duì)路徑問(wèn)題和異常的抓取2014-01-01Java中?springcloud.openfeign應(yīng)用案例解析
使用OpenFeign能讓編寫Web?Service客戶端更加簡(jiǎn)單,使用時(shí)只需定義服務(wù)接口,然后在上面添加注解,OpenFeign也支持可拔插式的編碼和解碼器,這篇文章主要介紹了Java中?springcloud.openfeign應(yīng)用案例解析,需要的朋友可以參考下2024-06-06