欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java實現一個簡單的線程池代碼示例

 更新時間:2024年09月13日 10:16:27   作者:熊哈哈O_o  
線程池是管理線程的一個池子,通過阻塞隊列管理任務,主要參數包括corePoolSize、maximumPoolSize、keepAliveTime等,這篇文章主要介紹了Java實現一個簡單的線程池的相關資料,需要的朋友可以參考下

一、線程池的模式

線程池顧名思義就是管理線程的一個池子,我們把創(chuàng)建線程的過程交給線程池來處理,而這個線程池當中的線程都會從阻塞隊列當中取獲取任務執(zhí)行。

我們不在直接把任務的創(chuàng)建過程寫到我們初始化的線程對象中,而是通過調用線程池的execute()方法,同時把我們的具體任務交作為參數傳給線程池,之后線程池就會把任務添加到阻塞隊列當中,而線程池當中的線程會從阻塞隊列當中獲取任務并執(zhí)行。

二、線程池的一些參數 

  • corePoolSize:線程池核心線程大小,即最小線程數(初始化線程數)。線程池會維護當前數量的線程在線程池中,即使這些線程一直處于閑置狀態(tài),也不會被銷毀,除非設置了allowCoreThreadTimeOut。
  • maximumPoolSize:線程池最大線程數量。當任務提交到線程池后,如果當前線程數小于核心線程數,則會創(chuàng)建新線程來處理任務;如果當前線程數大于或等于核心線程數,但小于最大線程數,并且任務隊列已滿,則會創(chuàng)建新線程來處理任務。
  • keepAliveTime:空閑線程的存活時間。當線程池中的線程數量大于核心線程數且線程處于空閑狀態(tài)時,在指定時間后,這個空閑線程將會被銷毀,從而逐漸恢復到穩(wěn)定的核心線程數數量。
  • unit:keepAliveTime的存活時間的計量單位,通常使用TimeUnit枚舉類中的方法,如TimeUnit.SECONDS表示秒級。
  • workQueue:任務隊列。用于存放等待執(zhí)行的任務,常見的實現類有LinkedBlockingQueue、ArrayBlockingQueue等。
  • threadFactory:線程工廠。用于創(chuàng)建新的線程,可以自定義線程的名稱、優(yōu)先級等。
  • handler:拒絕策略。當任務無法執(zhí)行(如線程池已滿)時,可以選擇的策略有:AbortPolicy(拋出異常)、CallerRunsPolicy(調用者運行)、DiscardOldestPolicy(丟棄最老的任務)、DiscardPolicy(無聲丟棄)。

三、代碼實現

因為我們只是簡單的實現,所以有一些情況和實際不太相似。

1.BlockingQueue

先來看看我們阻塞隊列當中的一些參數,為了在多線程環(huán)境下防止并發(fā)問題,我使用了ReentrantLock,使用它的目的是為了創(chuàng)建多個不同的阻塞條件。

在我們調用一個對象的await()方法后,我們的當前線程就會加入到一個特定的隊列當中去等待,直到有調用了這個對象的notify()方法后才會從這個隊列中抽取一個線程喚醒。

舉個例子,我們去醫(yī)院的時候,一個醫(yī)生同一時間只能看一個病人,剩下的人都只能等待,如果只有一個大廳的話,看不同病的病人都只能等待在一個候診室中。使用ReentrentLock的意思就是為了創(chuàng)建多個不同的候診室,將不同醫(yī)生的病人分開在不同的候診室當中。

    //1.阻塞隊列
    private Deque<T> deque = new ArrayDeque<>();
    //2.實現阻塞的鎖
    private ReentrantLock lock = new ReentrantLock();
    //3. 生產者等待條件
    private Condition fullWaitSet = lock.newCondition();
    //4.消費者等待條件
    private Condition emptyWaitSet = lock.newCondition();
    //5.阻塞隊列的大小
    private  int CAPACITY;

在自定義的阻塞隊列中,我使用了一個雙向隊列來存儲任務,并且設置了一個隊列大小的屬性,在我們創(chuàng)建這個隊列的時候我們可以進行初始化。

先來看看阻塞隊列任務的添加過程。這個邏輯并不難,我們在代碼的上方上鎖,在finally中解鎖。如果這時我們的隊列是滿的,就無法在繼續(xù)添加任務了,這個時候我們就把當前線程掛起(注意我們的掛起條件)。如果隊列不是滿的話那我們就加入到隊尾,同時把另一類掛起的線程喚醒(這類線程在隊列為空的時候掛起,等待任務的添加)

 // 生產者放入數據
    public void put(T t) {
        lock.lock();
        try {
            while (deque.size() == CAPACITY) {
                fullWaitSet.await();
            }
            deque.addLast(t);
            emptyWaitSet.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

在看看我們取任務的過程。同樣加鎖,當我們的隊列為空的時候,線程掛起,等待任務的添加之后線程喚醒,如果隊列不為空的話,我們從隊列頭部取出一個任務,并且喚起一類線程(這類線程在任務已經滿了的時候無法在添加任務了,進行掛起,等待隊列不為滿)。

  // 消費者從線程池當中獲取任務
    public T take(){
        T t = null;
        lock.lock();
        try {
            while(deque.size() == 0){
                emptyWaitSet.await();
            }
            t = deque.removeFirst();
            fullWaitSet.signal();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
        return t;
    }

我們上邊的代碼展示的隊列的存取的過程都是死等狀態(tài),什么是死等狀態(tài)?就是任務添加不進去或者取不出來的時候,線程會被一直掛起。真實并不是如此,這里只是簡單的展示。

阻塞隊列需要的就是這兩個存取的過程。

2.ThreadPool

先看看線程池當中的屬性。把剛才創(chuàng)建的任務隊列加進去,因為線程池要時常和任務隊列溝通。然后創(chuàng)建了一個HashSet結構用于存儲我們的線程。下邊的都是我們線程池需要的一些參數了,拒絕策略在這里沒有寫。

    // 任務隊列
    private BlockedQueue<Runnable> taskQueue;

    // 線程集合
    private HashSet<Worker> workers = new HashSet<>();

    //核心線程數
    private int coreSize;

    // 超時時間
    private int timeout;

    // 超時單位
    private TimeUnit timeUnit;

來看看我們的線程池是如何工作的吧,可以看到我們線程池保存的是Worker對象,我們來看看這個Worker對象是干啥的。這個Worker對象實現了Runnable接口,我們可以把這個類當作線程類,這個類中有一個task屬性,因為我們線程池當中的線程是要獲取任務執(zhí)行的,這個任務就用這個task屬性代表。

這個Worker類一直在干一件事情,就是不斷地從我們的任務隊列當中獲取任務(Worker類是ThreadPool的內部類),如果獲取的任務不為空的話就執(zhí)行任務,一旦沒有任務可以執(zhí)行那么就把當前的線程從線程池當中移除。

class Worker implements Runnable{
        private Runnable task;
        public Worker(Runnable task){
            this.task = task;
        }
        @Override
        public void run() {
            while(task!=null || (task = taskQueue.take())!=null){
                System.out.println("取出的任務是"+task);
                try {
                    task.run();
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    task = null;
                }
                synchronized (workers){
                    workers.remove(this);
                }
            }
        }
    }

那什么時候用到這個Worker類呢?當我們調用ThreadPool中的execute()方法時,線程池中的線程會就調用這個run()方法。

來看我們的execute()方法。當我們的線程數小于我們的核心線程數的時候,我們可以直接創(chuàng)建一個新的線程,并且把我們的任務直接交給這個核心線程。反之我們不能創(chuàng)建,而是把任務添加到我們的任務隊列當中,等待核心線程去執(zhí)行這個任務。

 // 任務執(zhí)行
    public void execute(Runnable task){
        synchronized (workers){
            if(workers.size() < coreSize){
                // 創(chuàng)建核心線程
                Worker worker = new Worker(task);
                workers.add(worker);
                Thread thread = new Thread(worker);
                thread.start();
            }else {
               
                taskQueue.put(task);
            }
        }

    }

寫完了上邊的代碼我們測試一下。

 public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2,10,TimeUnit.MILLISECONDS,10);
        for(int i = 0;i<12;i++){
            int j = i;
            threadPool.execute(()->{
                System.out.println("當前線程"+Thread.currentThread().getName()+"task "+j+" is running");
                try {
                    Thread.currentThread().sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }

方法運行了之后,即使任務全部執(zhí)行,線程也不會結束。這是因為我們的worker類中的run方法調用了任務隊列的take()方法,而take方法是會一直掛起的。

我們現在換一種帶超時獲取,在規(guī)定時間內獲取不到任務就自動結束任務。這時候就用到我們傳入的時間參數了,我們不再調用await()方法了,而是調用awaitNanos()方法,方法可以接收一個時間參數,這個方法可以消耗我們的nanos時間,在這個時間內如果獲取不到的話線程就不在掛起了,這時還會進入到我們的while循環(huán)當中,判斷我們的nanos是不是被消耗完了,如果被消耗完了就說明在規(guī)定時間內獲取不到任務,直接return結束線程。

 // 帶超時獲取
    public T poll(int timeout,TimeUnit timeUnit){
        T t = null;
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while(deque.size() == 0){
                if(nanos <= 0){
                    return null;
                }
                nanos = emptyWaitSet.awaitNanos(nanos);
            }

            t = deque.removeFirst();
            fullWaitSet.signal();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
        return t;
    }

 修改Worker類。

 class Worker implements Runnable{
        private Runnable task;
        public Worker(Runnable task){
            this.task = task;
        }
        @Override
        public void run() {
            while(task!=null || (task = taskQueue.poll(timeout,timeUnit))!=null){
                System.out.println("取出的任務是"+task);
                try {
                    task.run();
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    task = null;
                }
                synchronized (workers){
                    workers.remove(this);
                }
            }
        }
    }

現在就可以正常結束了。

四、拒絕策略

全部代碼如下。要使用拒絕策略,我們定義一個函數式接口,同時寫一個參數傳給線程池,參數的具體內容就是拒絕策略的拒絕方法,是我們自己定義的。

同時我們的execute()方法不在使用put來添加任務了,而是使用tryPut,如果大家對這一塊感興趣的話,可以在bilibili上觀看黑馬程序員的課程學習一下。

/**
 * 自定義線程池
 */
public class TestPool {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2,10,TimeUnit.SECONDS,10,((queue, task) -> {queue.put(task);}));
        for(int i = 0;i<12;i++){
            int j = i;
            threadPool.execute(()->{
                System.out.println("當前線程"+Thread.currentThread().getName()+"task "+j+" is running");
                try {
                    Thread.currentThread().sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

/**
 * 拒絕策略
 */
@FunctionalInterface
interface RejectPolicy<T>{
    void reject(BlockedQueue<T> queue,T task);
}
/**
 * 阻塞隊列
 */
class BlockedQueue <T>{
    //1.阻塞隊列
    private Deque<T> deque = new ArrayDeque<>();
    //2.實現阻塞的鎖
    private ReentrantLock lock = new ReentrantLock();
    //3. 生產者等待條件
    private Condition fullWaitSet = lock.newCondition();
    //4.消費者等待條件
    private Condition emptyWaitSet = lock.newCondition();
    //5.阻塞隊列的大小
    private  int CAPACITY;

    public BlockedQueue(int queueCapacity) {
        this.CAPACITY = queueCapacity;
    }

    // 生產者放入數據
    public void put(T t) {
        lock.lock();
        try {
            while (deque.size() == CAPACITY) {
                fullWaitSet.await();
            }
            deque.addLast(t);
            emptyWaitSet.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    // 帶超時添加
    public boolean offer(T t,int timeout,TimeUnit timeUnit) {
        lock.lock();
        long nanos = timeUnit.toNanos(timeout);
        try {
            while (deque.size() == CAPACITY) {
                if(nanos <= 0){
                    return  false;
                }
                nanos = fullWaitSet.awaitNanos(nanos);
            }
            deque.addLast(t);
            emptyWaitSet.signal();
            return  true;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return true;
    }
    // 帶超時獲取
    public T poll(int timeout,TimeUnit timeUnit){
        T t = null;
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while(deque.size() == 0){
                if(nanos <= 0){
                    return null;
                }
                nanos = emptyWaitSet.awaitNanos(nanos);
            }

            t = deque.removeFirst();
            fullWaitSet.signal();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
        return t;
    }
    // 消費者從線程池當中獲取任務
    public T take(){
        T t = null;
        lock.lock();
        try {
            while(deque.size() == 0){
                emptyWaitSet.await();
            }
            t = deque.removeFirst();
            fullWaitSet.signal();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
        return t;
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            if(deque.size()==CAPACITY){
                rejectPolicy.reject(this,task);
            }else{
                deque.addLast(task);
                emptyWaitSet.signal();

            }
        } finally {
            lock.unlock();
        }
    }
}
/**
 * 線程池
 */
class ThreadPool{
    // 任務隊列
    private BlockedQueue<Runnable> taskQueue;

    // 線程集合
    private HashSet<Worker> workers = new HashSet<>();

    //核心線程數
    private int coreSize;

    // 超時時間
    private int timeout;

    // 超時單位
    private TimeUnit timeUnit;

    //拒絕策略
    private RejectPolicy<Runnable> rejectPolicy;

    // 任務執(zhí)行
    public void execute(Runnable task){
        synchronized (workers){
            if(workers.size() < coreSize){
                // 創(chuàng)建核心線程
                Worker worker = new Worker(task);
                workers.add(worker);
                Thread thread = new Thread(worker);
                thread.start();
            }else {
                // 任務隊列
                //taskQueue.offer(task,timeout,timeUnit);
                taskQueue.tryPut(rejectPolicy,task);
                //taskQueue.put(task);
            }
        }

    }
    public ThreadPool(int coreSize, int timeout, TimeUnit timeUnit,int queueCapacity,RejectPolicy<Runnable> rejectPolicy){
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockedQueue<>(queueCapacity);

        this.rejectPolicy = rejectPolicy;
    }


    class Worker implements Runnable{
        private Runnable task;
        public Worker(Runnable task){
            this.task = task;
        }
        @Override
        public void run() {
            while(task!=null || (task = taskQueue.poll(timeout,timeUnit))!=null){
                System.out.println("取出的任務是"+task);
                try {
                    task.run();
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    task = null;
                }
                synchronized (workers){
                    workers.remove(this);
                }
            }
        }
    }
}

這個代碼我自己覺得是有些問題,因為如果我的任務隊列大小有10的時候,我給出了13個任務,兩個交給核心線程不占任務隊列大小,另外10個任務正好占滿,剩下一個放不進去,這時就會卡住不輸出。---------未解決

總結

到此這篇關于Java實現一個線程池的文章就介紹到這了,更多相關Java實現線程池內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • springboot?log4j2.xml如何讀取application.yml中屬性值

    springboot?log4j2.xml如何讀取application.yml中屬性值

    這篇文章主要介紹了springboot?log4j2.xml如何讀取application.yml中屬性值問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • java分割字符串多種方法(附例子)

    java分割字符串多種方法(附例子)

    這篇文章主要給大家介紹了關于java分割字符串多種方法的相關資料,Java中有多種方法可以實現字符串分割,文中將每張方法都給出了代碼示例,需要的朋友可以參考下
    2023-10-10
  • springboot工程如何使用阿里云OSS傳輸文件

    springboot工程如何使用阿里云OSS傳輸文件

    阿里云對象存儲OSS(Object Storage Service)是一款海量、安全、低成本、高可靠的云存儲服務,多種存儲類型供選擇,全面優(yōu)化存儲成本,非常適合存儲非結構化數據,本文給大家介紹springboot工程使用阿里云OSS傳輸文件的操作,感興趣的朋友一起看看吧
    2023-08-08
  • SpringBoot 啟動報錯Unable to connect to Redis server: 127.0.0.1/127.0.0.1:6379問題的解決方案

    SpringBoot 啟動報錯Unable to connect to 

    這篇文章主要介紹了SpringBoot 啟動報錯Unable to connect to Redis server: 127.0.0.1/127.0.0.1:6379問題的解決方案,文中通過圖文結合的方式給大家講解的非常詳細,對大家解決問題有一定的幫助,需要的朋友可以參考下
    2024-10-10
  • 詳解5種Java中常見限流算法

    詳解5種Java中常見限流算法

    在高并發(fā)系統(tǒng)中,出于系統(tǒng)保護角度考慮,通常會對流量進行限流;不但在工作中要頻繁使用,而且也是面試中的高頻考點。本文就為大家整理了5種Java中常見限流算法,需要的可以參考一下
    2023-04-04
  • Java中List遍歷刪除元素remove()的方法

    Java中List遍歷刪除元素remove()的方法

    這篇文章主要介紹了Java中List遍歷刪除元素remove()的方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-11-11
  • Spring 靜態(tài)變量/構造函數注入失敗的解決方案

    Spring 靜態(tài)變量/構造函數注入失敗的解決方案

    我們經常會遇到一下問題:Spring對靜態(tài)變量的注入為空、在構造函數中使用Spring容器中的Bean對象,得到的結果為空。不要擔心,本文將為大家介紹如何解決這些問題,跟隨小編來看看吧
    2021-11-11
  • springboot與mybatis整合實例詳解

    springboot與mybatis整合實例詳解

    這篇文章主要為大家詳細介紹了springboot與mybatis整合實例,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-11-11
  • idea中如何查看類中所有方法列表

    idea中如何查看類中所有方法列表

    這篇文章主要介紹了idea中如何查看類中所有方法列表問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-03-03
  • Java MD5加密(實例講解)

    Java MD5加密(實例講解)

    下面小編就為大家?guī)硪黄狫ava MD5加密(實例講解)。小編覺得挺不錯的,現在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-08-08

最新評論