關(guān)于java自定義線程池的原理與實現(xiàn)
這一節(jié)來自定義一個簡單的線程池。
一、自定義阻塞隊列
生產(chǎn)者創(chuàng)建任務(wù)添加到線程池中,線程池中有若干線程來執(zhí)行任務(wù),如果任務(wù)數(shù)大于線程數(shù),線程池中要有一個地方來存儲多余的任務(wù)
線程池中需要一個存放任務(wù)的阻塞隊列,所以需要先定義一個阻塞隊列
class BlockingQueue<T> { static Logger LOG = LoggerFactory.getLogger(BlockingQueue.class); //隊列 private Deque<T> queue = new ArrayDeque<>(); //隊列的容量 private int capcity; private ReentrantLock lock= new ReentrantLock(); //獲取元素時隊列為空就到這個Condition中等待 private Condition emptySet = lock.newCondition(); // 添加元素時如果隊列已到達最大容量就到這個condition等待 private Condition fullSet = lock.newCondition(); public BlockingQueue(int capcity) { this.capcity = capcity; } //添加元素 public void put(T t){ //queue是共享變量,多線程操作要加鎖 try { lock.lock(); while(queue.size()==capcity){ //隊列中元素已達到最大容量,添加元素的線程等待 try { LOG.info("隊列元素已滿,添加元素線程等待"); fullSet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //走到這里表示隊列中有空位了 queue.addLast(t); LOG.info("元素添加成功"); //喚醒等待的獲取元素的線程 emptySet.signalAll(); } finally { lock.unlock(); } } //獲取元素的方法 public T take(){ try { lock.lock(); while (queue.size()==0){ //隊列中沒有元素 try { LOG.info("隊列為空,獲取元素線程等待"); emptySet.await(); } catch (InterruptedException e) { e.printStackTrace(); } } //走到這里表示隊列中有元素了 T t = queue.removeFirst(); //叫醒添加元素的等待線程 fullSet.signalAll(); return t; } finally { lock.unlock(); } } // 帶超時時間的獲取元素的方法 public T poll(long time, TimeUnit timeUnit){ try { lock.lock(); long nanos = timeUnit.toNanos(time); while (queue.size()==0){ //隊列中沒有元素 try { if(nanos<=0){ LOG.info("等待超時時間到,返回null"); return null; } LOG.info("隊列為空,獲取元素線程等待"); //這個方法的返回值表示剩余的等待時間,例如本來等待5s,等了三秒被叫醒了,返回值就是2 nanos = emptySet.awaitNanos(nanos); } catch (InterruptedException e) { e.printStackTrace(); } } //走到這里表示隊列中有元素了 T t = queue.removeFirst(); //叫醒添加元素的等待線程 fullSet.signalAll(); return t; } finally { lock.unlock(); } } }
我們的線程池中需要一個阻塞隊列來存放任務(wù),可以使用上邊定義的這個
二、自定義線程池
線程池的代碼
class ThreadPool { static Logger logger = LoggerFactory.getLogger(ThreadPool.class); //用來存儲任務(wù)的隊列 private BlockingQueue<Runnable> taskQueue; //核心數(shù),即線程中可以創(chuàng)建的最大線程數(shù) private int coreSize; //存放線程的集合 private HashSet<Worker> workers = new HashSet<>(); //線程的空閑時間,池中的一個線程如果在這段時間后還獲取不到任務(wù)就會自動終止 private long time; private TimeUnit timeUnit; /** * * @param coreSize * @param capacity 線程池中任務(wù)隊列的容量 * @param time * @param timeUnit */ public ThreadPool(int coreSize,int capacity, long time, TimeUnit timeUnit) { this.coreSize = coreSize; this.time = time; this.timeUnit = timeUnit; this.taskQueue = new BlockingQueue<>(capacity); } //提交任務(wù)的方法 public void execute(Runnable task){ synchronized (workers){ if(workers.size()<coreSize){ //線程數(shù)小于核心數(shù),啟動新線程來執(zhí)行任務(wù) Worker worker = new Worker(task); workers.add(worker); logger.info("線程池新增線程執(zhí)行任務(wù)"); worker.start(); } else{ //線程數(shù)已達到核心數(shù),把任務(wù)添加到隊列中 taskQueue.put(task); logger.info("線程池添加任務(wù)到隊列中"); } } } //用來描述線程池中工作線程的類 class Worker extends Thread { private Runnable task; public Worker(Runnable task){ this.task =task; } @Override public void run() { //執(zhí)行任務(wù)的邏輯 //一個任務(wù)執(zhí)行完后繼續(xù)從任務(wù)隊列中獲取任務(wù)來執(zhí)行 while (task!=null || (task=taskQueue.poll(time,timeUnit))!=null){ try { task.run(); } catch (Exception e) { e.printStackTrace(); } finally { //任務(wù)執(zhí)行完后清空 task = null; } } //走到這里表示沒有從任務(wù)隊列中獲取到任務(wù),當前線程將要結(jié)束 synchronized (workers){ //從線程集合中刪除當前線程 workers.remove(this); } } } }
三、測試
public class Test9 { private static Logger LOG = LoggerFactory.getLogger(Test9.class); public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(1,1,1000,TimeUnit.MILLISECONDS); for (int i = 0; i < 4; i++) { int a=i; threadPool.execute(new Runnable() { @Override public void run() { LOG.info("第{}個任務(wù)",a); } }); } } }
四、拒絕策略
上邊的線程池存在一個問題,當有大量任務(wù)提交到線程池超過了任務(wù)隊列的容量時,提交任務(wù)的線程就會一直阻塞等待,
//核心1,隊列容量1 ThreadPool threadPool = new ThreadPool(1,1,1000,TimeUnit.MILLISECONDS); for (int i = 0; i < 4; i++) { int a=i; threadPool.execute(new Runnable() { @Override public void run() { LOG.info("第{}個任務(wù)",a); try { Thread.sleep(1000000); } catch (InterruptedException e) { e.printStackTrace(); } } }); }
像這樣把任務(wù)時間延長,提交的任務(wù)就會超過隊列容量,這時主線程就會阻塞住
實際上應(yīng)該提供一種拒絕策略來讓提交任務(wù)的線程自己決定是阻塞死等還是放棄執(zhí)行任務(wù),
為了實現(xiàn)這個功能,先抽象一個接口來封裝拒絕策略
interface RejectPolicy<T> { // 把任務(wù)隊列和當前要提交的任務(wù)作為參數(shù) public void applyPolicy(BlockingQueue<T> queue,T task); }
然后為了應(yīng)用拒絕策略需要在阻塞隊列BlockingQueue中添加一個tryPut方法
public void tryPut(RejectPolicy<T> rejectPolicy,T t){ try { lock.lock(); if(queue.size()>=capacity){ //應(yīng)用拒絕測試 rejectPolicy.applyPolicy(this,t); } else { //還有空間正常添加任務(wù) queue.addLast(t); } } finally { lock.unlock(); } }
然后修改線程池,添加一個rejectPolicy屬性,在構(gòu)造方法中由任務(wù)提交者來賦值
線程池新增的屬性
private RejectPolicy<Runnable> rejectPolicy;
線程池的構(gòu)造方法
public ThreadPool(int coreSize,int capacity, long time, TimeUnit timeUnit,RejectPolicy<Runnable> rejectPolicy) { this.coreSize = coreSize; this.time = time; this.timeUnit = timeUnit; this.taskQueue = new BlockingQueue<>(capacity); this.rejectPolicy=rejectPolicy; }
線程池的execute方法
public void execute(Runnable task){ synchronized (workers){ if(workers.size()<coreSize){ //線程數(shù)小于核心數(shù),啟動新線程來執(zhí)行任務(wù) Worker worker = new Worker(task); workers.add(worker); logger.info("線程池新增線程執(zhí)行任務(wù)"); worker.start(); } else{ //線程數(shù)已達到核心數(shù),把任務(wù)添加到隊列中,傳遞拒絕策略 taskQueue.tryPut(rejectPolicy,task); logger.info("線程池添加任務(wù)到隊列中"); } } }
可以看到,拒絕策略是有提交任務(wù)的線程指定,最終是由阻塞隊列來執(zhí)行,阻塞隊列不知道拒絕策略具體是什么,這也是java多態(tài)的一種體現(xiàn),面向抽象編程。
到此這篇關(guān)于關(guān)于java自定義線程池的文章就介紹到這了,更多相關(guān)java自定義線程池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java番外雜談之每天掃的二維碼你了解它內(nèi)含的信息嗎
二維碼已經(jīng)成為我們?nèi)粘I钪斜夭豢缮俚慕M成部分了,登錄需要掃一掃二維碼、買東西付錢需要掃一掃二維碼、開會簽到也需要掃一掃二維碼,那么如此使用的二維碼技術(shù),背后的原理是怎樣的呢?本文將結(jié)合二維碼的發(fā)展歷程以及典型應(yīng)用場景,分析二維碼背后的技術(shù)原理2022-02-02詳解Java中while和do-while循環(huán)、break的使用
本文介紹了循環(huán)結(jié)構(gòu)語句while和do-while循環(huán)、break的使用,while循環(huán)語句通過流程圖和語法語句結(jié)合一個求1~10的整數(shù)和的例子來幫助大家理解while循環(huán)的用法,感興趣的朋友跟隨小編來看看吧2020-11-11Java利用opencv實現(xiàn)用字符展示視頻或圖片的方法
這篇文章主要介紹了Java利用opencv實現(xiàn)用字符展示視頻或圖片的方法,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-12-12java通過DelayQueue實現(xiàn)延時任務(wù)
本文主要介紹了java通過DelayQueue實現(xiàn)延時任務(wù),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07SpringMVC 向jsp頁面?zhèn)鬟f數(shù)據(jù)庫讀取到的值方法
下面小編就為大家分享一篇SpringMVC 向jsp頁面?zhèn)鬟f數(shù)據(jù)庫讀取到的值方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-03-03