Java實現(xiàn)一個簡單的線程池代碼示例
一、線程池的模式
線程池顧名思義就是管理線程的一個池子,我們把創(chuàng)建線程的過程交給線程池來處理,而這個線程池當中的線程都會從阻塞隊列當中取獲取任務(wù)執(zhí)行。
我們不在直接把任務(wù)的創(chuàng)建過程寫到我們初始化的線程對象中,而是通過調(diào)用線程池的execute()方法,同時把我們的具體任務(wù)交作為參數(shù)傳給線程池,之后線程池就會把任務(wù)添加到阻塞隊列當中,而線程池當中的線程會從阻塞隊列當中獲取任務(wù)并執(zhí)行。
二、線程池的一些參數(shù)
- corePoolSize:線程池核心線程大小,即最小線程數(shù)(初始化線程數(shù))。線程池會維護當前數(shù)量的線程在線程池中,即使這些線程一直處于閑置狀態(tài),也不會被銷毀,除非設(shè)置了allowCoreThreadTimeOut。
- maximumPoolSize:線程池最大線程數(shù)量。當任務(wù)提交到線程池后,如果當前線程數(shù)小于核心線程數(shù),則會創(chuàng)建新線程來處理任務(wù);如果當前線程數(shù)大于或等于核心線程數(shù),但小于最大線程數(shù),并且任務(wù)隊列已滿,則會創(chuàng)建新線程來處理任務(wù)。
- keepAliveTime:空閑線程的存活時間。當線程池中的線程數(shù)量大于核心線程數(shù)且線程處于空閑狀態(tài)時,在指定時間后,這個空閑線程將會被銷毀,從而逐漸恢復(fù)到穩(wěn)定的核心線程數(shù)數(shù)量。
- unit:keepAliveTime的存活時間的計量單位,通常使用TimeUnit枚舉類中的方法,如TimeUnit.SECONDS表示秒級。
- workQueue:任務(wù)隊列。用于存放等待執(zhí)行的任務(wù),常見的實現(xiàn)類有LinkedBlockingQueue、ArrayBlockingQueue等。
- threadFactory:線程工廠。用于創(chuàng)建新的線程,可以自定義線程的名稱、優(yōu)先級等。
- handler:拒絕策略。當任務(wù)無法執(zhí)行(如線程池已滿)時,可以選擇的策略有:AbortPolicy(拋出異常)、CallerRunsPolicy(調(diào)用者運行)、DiscardOldestPolicy(丟棄最老的任務(wù))、DiscardPolicy(無聲丟棄)。
三、代碼實現(xiàn)
因為我們只是簡單的實現(xiàn),所以有一些情況和實際不太相似。
1.BlockingQueue
先來看看我們阻塞隊列當中的一些參數(shù),為了在多線程環(huán)境下防止并發(fā)問題,我使用了ReentrantLock,使用它的目的是為了創(chuàng)建多個不同的阻塞條件。
在我們調(diào)用一個對象的await()方法后,我們的當前線程就會加入到一個特定的隊列當中去等待,直到有調(diào)用了這個對象的notify()方法后才會從這個隊列中抽取一個線程喚醒。
舉個例子,我們?nèi)メt(yī)院的時候,一個醫(yī)生同一時間只能看一個病人,剩下的人都只能等待,如果只有一個大廳的話,看不同病的病人都只能等待在一個候診室中。使用ReentrentLock的意思就是為了創(chuàng)建多個不同的候診室,將不同醫(yī)生的病人分開在不同的候診室當中。
//1.阻塞隊列 private Deque<T> deque = new ArrayDeque<>(); //2.實現(xiàn)阻塞的鎖 private ReentrantLock lock = new ReentrantLock(); //3. 生產(chǎn)者等待條件 private Condition fullWaitSet = lock.newCondition(); //4.消費者等待條件 private Condition emptyWaitSet = lock.newCondition(); //5.阻塞隊列的大小 private int CAPACITY;
在自定義的阻塞隊列中,我使用了一個雙向隊列來存儲任務(wù),并且設(shè)置了一個隊列大小的屬性,在我們創(chuàng)建這個隊列的時候我們可以進行初始化。
先來看看阻塞隊列任務(wù)的添加過程。這個邏輯并不難,我們在代碼的上方上鎖,在finally中解鎖。如果這時我們的隊列是滿的,就無法在繼續(xù)添加任務(wù)了,這個時候我們就把當前線程掛起(注意我們的掛起條件)。如果隊列不是滿的話那我們就加入到隊尾,同時把另一類掛起的線程喚醒(這類線程在隊列為空的時候掛起,等待任務(wù)的添加)。
// 生產(chǎn)者放入數(shù)據(jù) public void put(T t) { lock.lock(); try { while (deque.size() == CAPACITY) { fullWaitSet.await(); } deque.addLast(t); emptyWaitSet.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
在看看我們?nèi)∪蝿?wù)的過程。同樣加鎖,當我們的隊列為空的時候,線程掛起,等待任務(wù)的添加之后線程喚醒,如果隊列不為空的話,我們從隊列頭部取出一個任務(wù),并且喚起一類線程(這類線程在任務(wù)已經(jīng)滿了的時候無法在添加任務(wù)了,進行掛起,等待隊列不為滿)。
// 消費者從線程池當中獲取任務(wù) public T take(){ T t = null; lock.lock(); try { while(deque.size() == 0){ emptyWaitSet.await(); } t = deque.removeFirst(); fullWaitSet.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } return t; }
我們上邊的代碼展示的隊列的存取的過程都是死等狀態(tài),什么是死等狀態(tài)?就是任務(wù)添加不進去或者取不出來的時候,線程會被一直掛起。真實并不是如此,這里只是簡單的展示。
阻塞隊列需要的就是這兩個存取的過程。
2.ThreadPool
先看看線程池當中的屬性。把剛才創(chuàng)建的任務(wù)隊列加進去,因為線程池要時常和任務(wù)隊列溝通。然后創(chuàng)建了一個HashSet結(jié)構(gòu)用于存儲我們的線程。下邊的都是我們線程池需要的一些參數(shù)了,拒絕策略在這里沒有寫。
// 任務(wù)隊列 private BlockedQueue<Runnable> taskQueue; // 線程集合 private HashSet<Worker> workers = new HashSet<>(); //核心線程數(shù) private int coreSize; // 超時時間 private int timeout; // 超時單位 private TimeUnit timeUnit;
來看看我們的線程池是如何工作的吧,可以看到我們線程池保存的是Worker對象,我們來看看這個Worker對象是干啥的。這個Worker對象實現(xiàn)了Runnable接口,我們可以把這個類當作線程類,這個類中有一個task屬性,因為我們線程池當中的線程是要獲取任務(wù)執(zhí)行的,這個任務(wù)就用這個task屬性代表。
這個Worker類一直在干一件事情,就是不斷地從我們的任務(wù)隊列當中獲取任務(wù)(Worker類是ThreadPool的內(nèi)部類),如果獲取的任務(wù)不為空的話就執(zhí)行任務(wù),一旦沒有任務(wù)可以執(zhí)行那么就把當前的線程從線程池當中移除。
class Worker implements Runnable{ private Runnable task; public Worker(Runnable task){ this.task = task; } @Override public void run() { while(task!=null || (task = taskQueue.take())!=null){ System.out.println("取出的任務(wù)是"+task); try { task.run(); }catch (Exception e){ e.printStackTrace(); }finally { task = null; } synchronized (workers){ workers.remove(this); } } } }
那什么時候用到這個Worker類呢?當我們調(diào)用ThreadPool中的execute()方法時,線程池中的線程會就調(diào)用這個run()方法。
來看我們的execute()方法。當我們的線程數(shù)小于我們的核心線程數(shù)的時候,我們可以直接創(chuàng)建一個新的線程,并且把我們的任務(wù)直接交給這個核心線程。反之我們不能創(chuàng)建,而是把任務(wù)添加到我們的任務(wù)隊列當中,等待核心線程去執(zhí)行這個任務(wù)。
// 任務(wù)執(zhí)行 public void execute(Runnable task){ synchronized (workers){ if(workers.size() < coreSize){ // 創(chuàng)建核心線程 Worker worker = new Worker(task); workers.add(worker); Thread thread = new Thread(worker); thread.start(); }else { taskQueue.put(task); } } }
寫完了上邊的代碼我們測試一下。
public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(2,10,TimeUnit.MILLISECONDS,10); for(int i = 0;i<12;i++){ int j = i; threadPool.execute(()->{ System.out.println("當前線程"+Thread.currentThread().getName()+"task "+j+" is running"); try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); } }
方法運行了之后,即使任務(wù)全部執(zhí)行,線程也不會結(jié)束。這是因為我們的worker類中的run方法調(diào)用了任務(wù)隊列的take()方法,而take方法是會一直掛起的。
我們現(xiàn)在換一種帶超時獲取,在規(guī)定時間內(nèi)獲取不到任務(wù)就自動結(jié)束任務(wù)。這時候就用到我們傳入的時間參數(shù)了,我們不再調(diào)用await()方法了,而是調(diào)用awaitNanos()方法,方法可以接收一個時間參數(shù),這個方法可以消耗我們的nanos時間,在這個時間內(nèi)如果獲取不到的話線程就不在掛起了,這時還會進入到我們的while循環(huán)當中,判斷我們的nanos是不是被消耗完了,如果被消耗完了就說明在規(guī)定時間內(nèi)獲取不到任務(wù),直接return結(jié)束線程。
// 帶超時獲取 public T poll(int timeout,TimeUnit timeUnit){ T t = null; lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while(deque.size() == 0){ if(nanos <= 0){ return null; } nanos = emptyWaitSet.awaitNanos(nanos); } t = deque.removeFirst(); fullWaitSet.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } return t; }
修改Worker類。
class Worker implements Runnable{ private Runnable task; public Worker(Runnable task){ this.task = task; } @Override public void run() { while(task!=null || (task = taskQueue.poll(timeout,timeUnit))!=null){ System.out.println("取出的任務(wù)是"+task); try { task.run(); }catch (Exception e){ e.printStackTrace(); }finally { task = null; } synchronized (workers){ workers.remove(this); } } } }
現(xiàn)在就可以正常結(jié)束了。
四、拒絕策略
全部代碼如下。要使用拒絕策略,我們定義一個函數(shù)式接口,同時寫一個參數(shù)傳給線程池,參數(shù)的具體內(nèi)容就是拒絕策略的拒絕方法,是我們自己定義的。
同時我們的execute()方法不在使用put來添加任務(wù)了,而是使用tryPut,如果大家對這一塊感興趣的話,可以在bilibili上觀看黑馬程序員的課程學(xué)習一下。
/** * 自定義線程池 */ public class TestPool { public static void main(String[] args) { ThreadPool threadPool = new ThreadPool(2,10,TimeUnit.SECONDS,10,((queue, task) -> {queue.put(task);})); for(int i = 0;i<12;i++){ int j = i; threadPool.execute(()->{ System.out.println("當前線程"+Thread.currentThread().getName()+"task "+j+" is running"); try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); } } } /** * 拒絕策略 */ @FunctionalInterface interface RejectPolicy<T>{ void reject(BlockedQueue<T> queue,T task); } /** * 阻塞隊列 */ class BlockedQueue <T>{ //1.阻塞隊列 private Deque<T> deque = new ArrayDeque<>(); //2.實現(xiàn)阻塞的鎖 private ReentrantLock lock = new ReentrantLock(); //3. 生產(chǎn)者等待條件 private Condition fullWaitSet = lock.newCondition(); //4.消費者等待條件 private Condition emptyWaitSet = lock.newCondition(); //5.阻塞隊列的大小 private int CAPACITY; public BlockedQueue(int queueCapacity) { this.CAPACITY = queueCapacity; } // 生產(chǎn)者放入數(shù)據(jù) public void put(T t) { lock.lock(); try { while (deque.size() == CAPACITY) { fullWaitSet.await(); } deque.addLast(t); emptyWaitSet.signal(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } // 帶超時添加 public boolean offer(T t,int timeout,TimeUnit timeUnit) { lock.lock(); long nanos = timeUnit.toNanos(timeout); try { while (deque.size() == CAPACITY) { if(nanos <= 0){ return false; } nanos = fullWaitSet.awaitNanos(nanos); } deque.addLast(t); emptyWaitSet.signal(); return true; } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } return true; } // 帶超時獲取 public T poll(int timeout,TimeUnit timeUnit){ T t = null; lock.lock(); try { long nanos = timeUnit.toNanos(timeout); while(deque.size() == 0){ if(nanos <= 0){ return null; } nanos = emptyWaitSet.awaitNanos(nanos); } t = deque.removeFirst(); fullWaitSet.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } return t; } // 消費者從線程池當中獲取任務(wù) public T take(){ T t = null; lock.lock(); try { while(deque.size() == 0){ emptyWaitSet.await(); } t = deque.removeFirst(); fullWaitSet.signal(); }catch (Exception e){ e.printStackTrace(); }finally { lock.unlock(); } return t; } public void tryPut(RejectPolicy<T> rejectPolicy, T task) { lock.lock(); try { if(deque.size()==CAPACITY){ rejectPolicy.reject(this,task); }else{ deque.addLast(task); emptyWaitSet.signal(); } } finally { lock.unlock(); } } } /** * 線程池 */ class ThreadPool{ // 任務(wù)隊列 private BlockedQueue<Runnable> taskQueue; // 線程集合 private HashSet<Worker> workers = new HashSet<>(); //核心線程數(shù) private int coreSize; // 超時時間 private int timeout; // 超時單位 private TimeUnit timeUnit; //拒絕策略 private RejectPolicy<Runnable> rejectPolicy; // 任務(wù)執(zhí)行 public void execute(Runnable task){ synchronized (workers){ if(workers.size() < coreSize){ // 創(chuàng)建核心線程 Worker worker = new Worker(task); workers.add(worker); Thread thread = new Thread(worker); thread.start(); }else { // 任務(wù)隊列 //taskQueue.offer(task,timeout,timeUnit); taskQueue.tryPut(rejectPolicy,task); //taskQueue.put(task); } } } public ThreadPool(int coreSize, int timeout, TimeUnit timeUnit,int queueCapacity,RejectPolicy<Runnable> rejectPolicy){ this.coreSize = coreSize; this.timeout = timeout; this.timeUnit = timeUnit; this.taskQueue = new BlockedQueue<>(queueCapacity); this.rejectPolicy = rejectPolicy; } class Worker implements Runnable{ private Runnable task; public Worker(Runnable task){ this.task = task; } @Override public void run() { while(task!=null || (task = taskQueue.poll(timeout,timeUnit))!=null){ System.out.println("取出的任務(wù)是"+task); try { task.run(); }catch (Exception e){ e.printStackTrace(); }finally { task = null; } synchronized (workers){ workers.remove(this); } } } } }
這個代碼我自己覺得是有些問題,因為如果我的任務(wù)隊列大小有10的時候,我給出了13個任務(wù),兩個交給核心線程不占任務(wù)隊列大小,另外10個任務(wù)正好占滿,剩下一個放不進去,這時就會卡住不輸出。---------未解決
總結(jié)
到此這篇關(guān)于Java實現(xiàn)一個線程池的文章就介紹到這了,更多相關(guān)Java實現(xiàn)線程池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot?log4j2.xml如何讀取application.yml中屬性值
這篇文章主要介紹了springboot?log4j2.xml如何讀取application.yml中屬性值問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12SpringBoot 啟動報錯Unable to connect to 
這篇文章主要介紹了SpringBoot 啟動報錯Unable to connect to Redis server: 127.0.0.1/127.0.0.1:6379問題的解決方案,文中通過圖文結(jié)合的方式給大家講解的非常詳細,對大家解決問題有一定的幫助,需要的朋友可以參考下2024-10-10Spring 靜態(tài)變量/構(gòu)造函數(shù)注入失敗的解決方案
我們經(jīng)常會遇到一下問題:Spring對靜態(tài)變量的注入為空、在構(gòu)造函數(shù)中使用Spring容器中的Bean對象,得到的結(jié)果為空。不要擔心,本文將為大家介紹如何解決這些問題,跟隨小編來看看吧2021-11-11