Java實現手寫一個線程池的示例代碼
概述
線程池技術想必大家都不陌生把,相信在平時的工作中沒有少用,而且這也是面試頻率非常高的一個知識點,那么大家知道它的實現原理和細節(jié)嗎?如果直接去看jdk源碼的話,可能有一定的難度,那么我們可以先通過手寫一個簡單的線程池框架,去掌握線程池的基本原理后,再去看jdk的線程池源碼就會相對容易,而且不容易忘記。
線程池框架設計
我們都知道,線程資源的創(chuàng)建和銷毀并不是沒有代價的,甚至開銷是非常高的。同時,線程也不是任意多創(chuàng)建的,因為活躍的線程會消耗系統(tǒng)資源,特別是內存,在一定的范圍內,增加線程可以提高系統(tǒng)的吞吐率,如果超過了這個范圍,反而會降低程序的執(zhí)行速度。
因此,設計一個容納多個線程的容器,容器中的線程可以重復使用,省去了頻繁創(chuàng)建和銷毀線程對象的操作, 達到下面的目標:
- 降低資源消耗,減少了創(chuàng)建和銷毀線程的次數,每個工作線程都可以被重復利用,可執(zhí)行多個任務
- 提高響應速度,當任務到達時,如果有線程可以直接用,不會出現系統(tǒng)僵死
- 提高線程的可管理性,如果無限制的創(chuàng)建線程,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進行統(tǒng)一的分配,調優(yōu)和監(jiān)控
線程池的核心思想: 線程復用,同一個線程可以被重復使用,來處理多個任務。
為了實現線程池功能,需要考慮下面幾個設計要點:
- 線程池可以接口外部提交的任務執(zhí)行
- 線程池有工作線程的數量,有任務執(zhí)行,沒有任務也空閑在那,等待任務過來,這樣既避免線程頻繁創(chuàng)建銷毀帶來的開銷,同時也可以避免線程池無限制的創(chuàng)建線程
- 如果線程池接受提交的任務超過工作線程的數量了,該怎么辦?可以用一個隊列把任務存下來,等工作線程完成任務后去隊列中獲取任務,執(zhí)行
- 那如果任務實在是太多太多了,達到了我們認為的隊列最大值,怎么辦,我們可以設計一種任務太多的策略,可以進行切換,比如直接丟棄任務、報錯等等
看了上面的設計目標和要點,是不是能立刻想到一個非常經典的設計模型——生產者消費者模型。
- 阻塞隊列存儲執(zhí)行任務,比如外部main函數作為生產者向隊列生產任務。
- 線程池中的工作線程作為消費者獲取任務執(zhí)行。
現在我們將我們的設計思路轉換為代碼。
代碼實現
阻塞隊列的實現
- 阻塞隊列主要存放任務,有容量限制
- 阻塞隊列提供添加和刪除任務的API, 如果超過容量,阻塞不能添加任務,如果沒有任務,阻塞無法獲取任務。
/** * <p>自定義任務隊列, 用來存放任務 </p> * * @author: cxw (332059317@qq.com) * @date: 2022/10/18 10:15 * @version: 1.0.0 */ @Slf4j(topic = "c.BlockingQueue") public class BlockingQueue<T> { // 容量 private int capcity; // 雙端任務隊列容器 private Deque<T> deque = new ArrayDeque<>(); // 重入鎖 private ReentrantLock lock = new ReentrantLock(); // 生產者條件變量 private Condition fullWaitSet = lock.newCondition(); // 生產者條件變量 private Condition emptyWaitSet = lock.newCondition(); public BlockingQueue(int capcity) { this.capcity = capcity; } // 阻塞的方式添加任務 public void put(T task) { lock.lock(); try { // 通過while的方式 while (deque.size() >= capcity) { log.debug("wait to add queue"); try { fullWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } deque.offer(task); log.debug("task add successfully"); emptyWaitSet.signal(); } finally { lock.unlock(); } } // 阻塞獲取任務 public T take() { lock.lock(); try { // 通過while的方式 while (deque.isEmpty()) { try { log.debug("wait to take task"); emptyWaitSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } fullWaitSet.signal(); T task = deque.poll(); log.debug("take task successfully"); // 從隊列中獲取元素 return task; } finally { lock.unlock(); } } }
- put()方法是向阻塞隊列中添加任務
- take()方法是向阻塞隊列中獲取任務
線程池消費端實現
1.定義執(zhí)行器接口
/** * <p>定義一個執(zhí)行器的接口:</p> * * @author: cxw (332059317@qq.com) * @date: 2022/10/18 12:31 * @version: 1.0.0 */ public interface Executor { /** * 提交任務執(zhí)行 * @param task 任務 */ void execute(Runnable task); }
2.定義線程池類實現該接口
@Slf4j(topic = "c.ThreadPool") public class ThreadPool implements Executor { /** * 任務隊列 */ private BlockingQueue<Runnable> taskQueue; /** * 核心工作線程數 */ private int coreSize; /** * 工作線程集合 */ private Set<Worker> workers = new HashSet<>(); /** * 創(chuàng)建線程池 * @param coreSize 工作線程數量 * @param capcity 阻塞隊列容量 */ public ThreadPool(int coreSize, int capcity) { this.coreSize = coreSize; this.taskQueue = new BlockingQueue<>(capcity); } /** * 提交任務執(zhí)行 */ @Override public void execute(Runnable task) { synchronized (workers) { // 如果工作線程數小于閾值,直接開始任務執(zhí)行 if(workers.size() < coreSize) { Worker worker = new Worker(task); workers.add(worker); worker.start(); } else { // 如果超過了閾值,加入到隊列中 taskQueue.put(task); } } } /** * 工作線程,對執(zhí)行的任務做了一層包裝處理 */ class Worker extends Thread { private Runnable task; public Worker(Runnable task) { this.task = task; } @Override public void run() { // 如果任務不為空,或者可以從隊列中獲取任務 while (task != null || (task = taskQueue.take()) != null) { try { task.run(); } catch (Exception e) { e.printStackTrace(); } finally { // 執(zhí)行完后,設置任務為空 task = null; } } // 移除工作線程 synchronized (workers){ log.debug("remove worker successfully"); workers.remove(this); } } } }
- Worker類是工作線程類,包裝了執(zhí)行任務,里面實現了從隊列獲取任務,然后執(zhí)行任務。
- execute方法的實現中,如果工作線程數量小于閾值的話,直接創(chuàng)建新的工作線程,否則將任務添加到隊列中。
3.演示
@Test public void testThreadPool1() throws InterruptedException { Executor executor = new ThreadPool(2, 4); // 提交任務 for (int i = 0; i < 6; i++) { final int j = i; executor.execute(() -> { try { Thread.sleep(10); log.info("run task {}", j); } catch (InterruptedException e) { e.printStackTrace(); } }); Thread.sleep(10); } Thread.sleep(10000); }
運行結果:
獲取任務超時設計
目前從隊列中獲取任務是永久阻塞等待的,可以改成阻塞一段時間沒有獲取任務,丟棄的策略。
@Slf4j(topic = "c.TimeoutBlockingQueue") public class TimeoutBlockingQueue<T> { // 容量 private int capcity; // 雙端任務隊列容器 private Deque<T> deque = new ArrayDeque<>(); // 重入鎖 private ReentrantLock lock = new ReentrantLock(); // 生產者條件變量 private Condition fullWaitSet = lock.newCondition(); // 生產者條件變量 private Condition emptyWaitSet = lock.newCondition(); public TimeoutBlockingQueue(int capcity) { this.capcity = capcity; } // 帶超時時間的獲取 public T poll(long timeout, TimeUnit unit){ lock.lock(); try{ // 將 timeout 統(tǒng)一轉換為 納秒 long nanos = unit.toNanos(timeout); while (deque.isEmpty()){ try { if (nanos<=0){ return null; } // 返回的是剩余的等待時間,更改navos的值,使虛假喚醒的時候可以繼續(xù)等待 nanos = emptyWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } fullWaitSet.signal(); return deque.getFirst(); }finally { lock.unlock(); } } // 帶超時時間的增加 public boolean offer(T task , long timeout , TimeUnit unit){ lock.lock(); try{ // 將 timeout 統(tǒng)一轉換為 納秒 long nanos = unit.toNanos(timeout); while (deque.size() == capcity){ try { if (nanos<=0){ return false; } // 更新剩余需要等待的時間 nanos = fullWaitSet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } log.debug("加入任務隊列 {}", task); deque.addLast(task); emptyWaitSet.signal(); return true; }finally { lock.unlock(); } } }
新加TimeoutBlockingQueue類,添加offer和poll待超時的添加和獲取任務的方法。
拒絕策略設計
目前的實現還是有個漏洞,無法自定義任務超出閾值的一個拒絕策略,我們可以通過利用函數式編程+策略模式去實現。
1.定義策略模式的函數式接口
/** * <p>拒絕策略的函數式接口:</p> * * @author: cxw (332059317@qq.com) * @date: 2022/10/18 13:15 * @version: 1.0.0 */ @FunctionalInterface public interface RejectPolicy<T> { /** * 拒絕策略的接口 * @param queue * @param task */ void reject(BlockingQueue<T> queue, T task); }
2.添加函數式接口的調用入口
我們可以在阻塞隊列添加任務新加一個api, 添加任務如果超過容量,調用函數式接口。
@Slf4j(topic = "c.BlockingQueue") public class BlockingQueue<T> { ........ /** * 嘗試添加任務 * @param rejectPolicy * @param task */ public void tryPut(RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try{ // 如果隊列超過容量 if (deque.size()> capcity){ log.debug("task too much, do reject"); rejectPolicy.reject(this, task); }else { deque.offer(task); emptyWaitSet.signal(); } }finally { lock.unlock(); } } }
3.修改ThreadPool類
@Slf4j(topic = "c.ThreadPool") public class ThreadPool implements Executor { ..... /** * 拒絕策略 */ private RejectPolicy rejectPolicy; // 通過構造方法傳入執(zhí)行的拒絕策略 public ThreadPool(int coreSize, int capcity, RejectPolicy rejectPolicy) { this.coreSize = coreSize; this.taskQueue = new BlockingQueue<>(capcity); this.rejectPolicy = rejectPolicy; } /** * 提交任務執(zhí)行 */ @Override public void execute(Runnable task) { synchronized (workers) { // 如果工作線程數小于閾值,直接開始任務執(zhí)行 if(workers.size() < coreSize) { Worker worker = new Worker(task); workers.add(worker); worker.start(); } else { // 如果超過了閾值,加入到隊列中 //taskQueue.put(task); // 調用tryPut的方式 taskQueue.tryPut(rejectPolicy, task); } } } .... }
通過構造方法的方式傳入要執(zhí)行的拒絕策略
調用tryPut方法添加任務
4.演示
以上就是Java實現手寫一個線程池的示例代碼的詳細內容,更多關于Java線程池的資料請關注腳本之家其它相關文章!
相關文章
Java listener簡介_動力節(jié)點Java學院整理
這篇文章主要介紹了Java listener簡介,可以用于統(tǒng)計用戶在線人數等,有興趣的可以了解一下2017-07-07SpringBoot中的五種對靜態(tài)資源的映射規(guī)則的實現
這篇文章主要介紹了SpringBoot中的五種對靜態(tài)資源的映射規(guī)則的實現,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-12-12Java中?springcloud.openfeign應用案例解析
使用OpenFeign能讓編寫Web?Service客戶端更加簡單,使用時只需定義服務接口,然后在上面添加注解,OpenFeign也支持可拔插式的編碼和解碼器,這篇文章主要介紹了Java中?springcloud.openfeign應用案例解析,需要的朋友可以參考下2024-06-06