欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java實現(xiàn)一個簡單的線程池代碼示例

 更新時間:2024年09月13日 10:16:27   作者:熊哈哈O_o  
線程池是管理線程的一個池子,通過阻塞隊列管理任務(wù),主要參數(shù)包括corePoolSize、maximumPoolSize、keepAliveTime等,這篇文章主要介紹了Java實現(xiàn)一個簡單的線程池的相關(guān)資料,需要的朋友可以參考下

一、線程池的模式

線程池顧名思義就是管理線程的一個池子,我們把創(chuàng)建線程的過程交給線程池來處理,而這個線程池當中的線程都會從阻塞隊列當中取獲取任務(wù)執(zhí)行。

我們不在直接把任務(wù)的創(chuàng)建過程寫到我們初始化的線程對象中,而是通過調(diào)用線程池的execute()方法,同時把我們的具體任務(wù)交作為參數(shù)傳給線程池,之后線程池就會把任務(wù)添加到阻塞隊列當中,而線程池當中的線程會從阻塞隊列當中獲取任務(wù)并執(zhí)行。

二、線程池的一些參數(shù) 

  • corePoolSize:線程池核心線程大小,即最小線程數(shù)(初始化線程數(shù))。線程池會維護當前數(shù)量的線程在線程池中,即使這些線程一直處于閑置狀態(tài),也不會被銷毀,除非設(shè)置了allowCoreThreadTimeOut。
  • maximumPoolSize:線程池最大線程數(shù)量。當任務(wù)提交到線程池后,如果當前線程數(shù)小于核心線程數(shù),則會創(chuàng)建新線程來處理任務(wù);如果當前線程數(shù)大于或等于核心線程數(shù),但小于最大線程數(shù),并且任務(wù)隊列已滿,則會創(chuàng)建新線程來處理任務(wù)。
  • keepAliveTime:空閑線程的存活時間。當線程池中的線程數(shù)量大于核心線程數(shù)且線程處于空閑狀態(tài)時,在指定時間后,這個空閑線程將會被銷毀,從而逐漸恢復(fù)到穩(wěn)定的核心線程數(shù)數(shù)量。
  • unit:keepAliveTime的存活時間的計量單位,通常使用TimeUnit枚舉類中的方法,如TimeUnit.SECONDS表示秒級。
  • workQueue:任務(wù)隊列。用于存放等待執(zhí)行的任務(wù),常見的實現(xiàn)類有LinkedBlockingQueue、ArrayBlockingQueue等。
  • threadFactory:線程工廠。用于創(chuàng)建新的線程,可以自定義線程的名稱、優(yōu)先級等。
  • handler:拒絕策略。當任務(wù)無法執(zhí)行(如線程池已滿)時,可以選擇的策略有:AbortPolicy(拋出異常)、CallerRunsPolicy(調(diào)用者運行)、DiscardOldestPolicy(丟棄最老的任務(wù))、DiscardPolicy(無聲丟棄)。

三、代碼實現(xiàn)

因為我們只是簡單的實現(xiàn),所以有一些情況和實際不太相似。

1.BlockingQueue

先來看看我們阻塞隊列當中的一些參數(shù),為了在多線程環(huán)境下防止并發(fā)問題,我使用了ReentrantLock,使用它的目的是為了創(chuàng)建多個不同的阻塞條件。

在我們調(diào)用一個對象的await()方法后,我們的當前線程就會加入到一個特定的隊列當中去等待,直到有調(diào)用了這個對象的notify()方法后才會從這個隊列中抽取一個線程喚醒。

舉個例子,我們?nèi)メt(yī)院的時候,一個醫(yī)生同一時間只能看一個病人,剩下的人都只能等待,如果只有一個大廳的話,看不同病的病人都只能等待在一個候診室中。使用ReentrentLock的意思就是為了創(chuàng)建多個不同的候診室,將不同醫(yī)生的病人分開在不同的候診室當中。

    //1.阻塞隊列
    private Deque<T> deque = new ArrayDeque<>();
    //2.實現(xiàn)阻塞的鎖
    private ReentrantLock lock = new ReentrantLock();
    //3. 生產(chǎn)者等待條件
    private Condition fullWaitSet = lock.newCondition();
    //4.消費者等待條件
    private Condition emptyWaitSet = lock.newCondition();
    //5.阻塞隊列的大小
    private  int CAPACITY;

在自定義的阻塞隊列中,我使用了一個雙向隊列來存儲任務(wù),并且設(shè)置了一個隊列大小的屬性,在我們創(chuàng)建這個隊列的時候我們可以進行初始化。

先來看看阻塞隊列任務(wù)的添加過程。這個邏輯并不難,我們在代碼的上方上鎖,在finally中解鎖。如果這時我們的隊列是滿的,就無法在繼續(xù)添加任務(wù)了,這個時候我們就把當前線程掛起(注意我們的掛起條件)。如果隊列不是滿的話那我們就加入到隊尾,同時把另一類掛起的線程喚醒(這類線程在隊列為空的時候掛起,等待任務(wù)的添加)

 // 生產(chǎn)者放入數(shù)據(jù)
    public void put(T t) {
        lock.lock();
        try {
            while (deque.size() == CAPACITY) {
                fullWaitSet.await();
            }
            deque.addLast(t);
            emptyWaitSet.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

在看看我們?nèi)∪蝿?wù)的過程。同樣加鎖,當我們的隊列為空的時候,線程掛起,等待任務(wù)的添加之后線程喚醒,如果隊列不為空的話,我們從隊列頭部取出一個任務(wù),并且喚起一類線程(這類線程在任務(wù)已經(jīng)滿了的時候無法在添加任務(wù)了,進行掛起,等待隊列不為滿)。

  // 消費者從線程池當中獲取任務(wù)
    public T take(){
        T t = null;
        lock.lock();
        try {
            while(deque.size() == 0){
                emptyWaitSet.await();
            }
            t = deque.removeFirst();
            fullWaitSet.signal();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
        return t;
    }

我們上邊的代碼展示的隊列的存取的過程都是死等狀態(tài),什么是死等狀態(tài)?就是任務(wù)添加不進去或者取不出來的時候,線程會被一直掛起。真實并不是如此,這里只是簡單的展示。

阻塞隊列需要的就是這兩個存取的過程。

2.ThreadPool

先看看線程池當中的屬性。把剛才創(chuàng)建的任務(wù)隊列加進去,因為線程池要時常和任務(wù)隊列溝通。然后創(chuàng)建了一個HashSet結(jié)構(gòu)用于存儲我們的線程。下邊的都是我們線程池需要的一些參數(shù)了,拒絕策略在這里沒有寫。

    // 任務(wù)隊列
    private BlockedQueue<Runnable> taskQueue;

    // 線程集合
    private HashSet<Worker> workers = new HashSet<>();

    //核心線程數(shù)
    private int coreSize;

    // 超時時間
    private int timeout;

    // 超時單位
    private TimeUnit timeUnit;

來看看我們的線程池是如何工作的吧,可以看到我們線程池保存的是Worker對象,我們來看看這個Worker對象是干啥的。這個Worker對象實現(xiàn)了Runnable接口,我們可以把這個類當作線程類,這個類中有一個task屬性,因為我們線程池當中的線程是要獲取任務(wù)執(zhí)行的,這個任務(wù)就用這個task屬性代表。

這個Worker類一直在干一件事情,就是不斷地從我們的任務(wù)隊列當中獲取任務(wù)(Worker類是ThreadPool的內(nèi)部類),如果獲取的任務(wù)不為空的話就執(zhí)行任務(wù),一旦沒有任務(wù)可以執(zhí)行那么就把當前的線程從線程池當中移除。

class Worker implements Runnable{
        private Runnable task;
        public Worker(Runnable task){
            this.task = task;
        }
        @Override
        public void run() {
            while(task!=null || (task = taskQueue.take())!=null){
                System.out.println("取出的任務(wù)是"+task);
                try {
                    task.run();
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    task = null;
                }
                synchronized (workers){
                    workers.remove(this);
                }
            }
        }
    }

那什么時候用到這個Worker類呢?當我們調(diào)用ThreadPool中的execute()方法時,線程池中的線程會就調(diào)用這個run()方法。

來看我們的execute()方法。當我們的線程數(shù)小于我們的核心線程數(shù)的時候,我們可以直接創(chuàng)建一個新的線程,并且把我們的任務(wù)直接交給這個核心線程。反之我們不能創(chuàng)建,而是把任務(wù)添加到我們的任務(wù)隊列當中,等待核心線程去執(zhí)行這個任務(wù)。

 // 任務(wù)執(zhí)行
    public void execute(Runnable task){
        synchronized (workers){
            if(workers.size() < coreSize){
                // 創(chuàng)建核心線程
                Worker worker = new Worker(task);
                workers.add(worker);
                Thread thread = new Thread(worker);
                thread.start();
            }else {
               
                taskQueue.put(task);
            }
        }

    }

寫完了上邊的代碼我們測試一下。

 public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2,10,TimeUnit.MILLISECONDS,10);
        for(int i = 0;i<12;i++){
            int j = i;
            threadPool.execute(()->{
                System.out.println("當前線程"+Thread.currentThread().getName()+"task "+j+" is running");
                try {
                    Thread.currentThread().sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }

方法運行了之后,即使任務(wù)全部執(zhí)行,線程也不會結(jié)束。這是因為我們的worker類中的run方法調(diào)用了任務(wù)隊列的take()方法,而take方法是會一直掛起的。

我們現(xiàn)在換一種帶超時獲取,在規(guī)定時間內(nèi)獲取不到任務(wù)就自動結(jié)束任務(wù)。這時候就用到我們傳入的時間參數(shù)了,我們不再調(diào)用await()方法了,而是調(diào)用awaitNanos()方法,方法可以接收一個時間參數(shù),這個方法可以消耗我們的nanos時間,在這個時間內(nèi)如果獲取不到的話線程就不在掛起了,這時還會進入到我們的while循環(huán)當中,判斷我們的nanos是不是被消耗完了,如果被消耗完了就說明在規(guī)定時間內(nèi)獲取不到任務(wù),直接return結(jié)束線程。

 // 帶超時獲取
    public T poll(int timeout,TimeUnit timeUnit){
        T t = null;
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while(deque.size() == 0){
                if(nanos <= 0){
                    return null;
                }
                nanos = emptyWaitSet.awaitNanos(nanos);
            }

            t = deque.removeFirst();
            fullWaitSet.signal();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
        return t;
    }

 修改Worker類。

 class Worker implements Runnable{
        private Runnable task;
        public Worker(Runnable task){
            this.task = task;
        }
        @Override
        public void run() {
            while(task!=null || (task = taskQueue.poll(timeout,timeUnit))!=null){
                System.out.println("取出的任務(wù)是"+task);
                try {
                    task.run();
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    task = null;
                }
                synchronized (workers){
                    workers.remove(this);
                }
            }
        }
    }

現(xiàn)在就可以正常結(jié)束了。

四、拒絕策略

全部代碼如下。要使用拒絕策略,我們定義一個函數(shù)式接口,同時寫一個參數(shù)傳給線程池,參數(shù)的具體內(nèi)容就是拒絕策略的拒絕方法,是我們自己定義的。

同時我們的execute()方法不在使用put來添加任務(wù)了,而是使用tryPut,如果大家對這一塊感興趣的話,可以在bilibili上觀看黑馬程序員的課程學(xué)習一下。

/**
 * 自定義線程池
 */
public class TestPool {
    public static void main(String[] args) {
        ThreadPool threadPool = new ThreadPool(2,10,TimeUnit.SECONDS,10,((queue, task) -> {queue.put(task);}));
        for(int i = 0;i<12;i++){
            int j = i;
            threadPool.execute(()->{
                System.out.println("當前線程"+Thread.currentThread().getName()+"task "+j+" is running");
                try {
                    Thread.currentThread().sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

/**
 * 拒絕策略
 */
@FunctionalInterface
interface RejectPolicy<T>{
    void reject(BlockedQueue<T> queue,T task);
}
/**
 * 阻塞隊列
 */
class BlockedQueue <T>{
    //1.阻塞隊列
    private Deque<T> deque = new ArrayDeque<>();
    //2.實現(xiàn)阻塞的鎖
    private ReentrantLock lock = new ReentrantLock();
    //3. 生產(chǎn)者等待條件
    private Condition fullWaitSet = lock.newCondition();
    //4.消費者等待條件
    private Condition emptyWaitSet = lock.newCondition();
    //5.阻塞隊列的大小
    private  int CAPACITY;

    public BlockedQueue(int queueCapacity) {
        this.CAPACITY = queueCapacity;
    }

    // 生產(chǎn)者放入數(shù)據(jù)
    public void put(T t) {
        lock.lock();
        try {
            while (deque.size() == CAPACITY) {
                fullWaitSet.await();
            }
            deque.addLast(t);
            emptyWaitSet.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    // 帶超時添加
    public boolean offer(T t,int timeout,TimeUnit timeUnit) {
        lock.lock();
        long nanos = timeUnit.toNanos(timeout);
        try {
            while (deque.size() == CAPACITY) {
                if(nanos <= 0){
                    return  false;
                }
                nanos = fullWaitSet.awaitNanos(nanos);
            }
            deque.addLast(t);
            emptyWaitSet.signal();
            return  true;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        return true;
    }
    // 帶超時獲取
    public T poll(int timeout,TimeUnit timeUnit){
        T t = null;
        lock.lock();
        try {
            long nanos = timeUnit.toNanos(timeout);
            while(deque.size() == 0){
                if(nanos <= 0){
                    return null;
                }
                nanos = emptyWaitSet.awaitNanos(nanos);
            }

            t = deque.removeFirst();
            fullWaitSet.signal();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
        return t;
    }
    // 消費者從線程池當中獲取任務(wù)
    public T take(){
        T t = null;
        lock.lock();
        try {
            while(deque.size() == 0){
                emptyWaitSet.await();
            }
            t = deque.removeFirst();
            fullWaitSet.signal();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
        return t;
    }

    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try {
            if(deque.size()==CAPACITY){
                rejectPolicy.reject(this,task);
            }else{
                deque.addLast(task);
                emptyWaitSet.signal();

            }
        } finally {
            lock.unlock();
        }
    }
}
/**
 * 線程池
 */
class ThreadPool{
    // 任務(wù)隊列
    private BlockedQueue<Runnable> taskQueue;

    // 線程集合
    private HashSet<Worker> workers = new HashSet<>();

    //核心線程數(shù)
    private int coreSize;

    // 超時時間
    private int timeout;

    // 超時單位
    private TimeUnit timeUnit;

    //拒絕策略
    private RejectPolicy<Runnable> rejectPolicy;

    // 任務(wù)執(zhí)行
    public void execute(Runnable task){
        synchronized (workers){
            if(workers.size() < coreSize){
                // 創(chuàng)建核心線程
                Worker worker = new Worker(task);
                workers.add(worker);
                Thread thread = new Thread(worker);
                thread.start();
            }else {
                // 任務(wù)隊列
                //taskQueue.offer(task,timeout,timeUnit);
                taskQueue.tryPut(rejectPolicy,task);
                //taskQueue.put(task);
            }
        }

    }
    public ThreadPool(int coreSize, int timeout, TimeUnit timeUnit,int queueCapacity,RejectPolicy<Runnable> rejectPolicy){
        this.coreSize = coreSize;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.taskQueue = new BlockedQueue<>(queueCapacity);

        this.rejectPolicy = rejectPolicy;
    }


    class Worker implements Runnable{
        private Runnable task;
        public Worker(Runnable task){
            this.task = task;
        }
        @Override
        public void run() {
            while(task!=null || (task = taskQueue.poll(timeout,timeUnit))!=null){
                System.out.println("取出的任務(wù)是"+task);
                try {
                    task.run();
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    task = null;
                }
                synchronized (workers){
                    workers.remove(this);
                }
            }
        }
    }
}

這個代碼我自己覺得是有些問題,因為如果我的任務(wù)隊列大小有10的時候,我給出了13個任務(wù),兩個交給核心線程不占任務(wù)隊列大小,另外10個任務(wù)正好占滿,剩下一個放不進去,這時就會卡住不輸出。---------未解決

總結(jié)

到此這篇關(guān)于Java實現(xiàn)一個線程池的文章就介紹到這了,更多相關(guān)Java實現(xiàn)線程池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • springboot?log4j2.xml如何讀取application.yml中屬性值

    springboot?log4j2.xml如何讀取application.yml中屬性值

    這篇文章主要介紹了springboot?log4j2.xml如何讀取application.yml中屬性值問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • java分割字符串多種方法(附例子)

    java分割字符串多種方法(附例子)

    這篇文章主要給大家介紹了關(guān)于java分割字符串多種方法的相關(guān)資料,Java中有多種方法可以實現(xiàn)字符串分割,文中將每張方法都給出了代碼示例,需要的朋友可以參考下
    2023-10-10
  • springboot工程如何使用阿里云OSS傳輸文件

    springboot工程如何使用阿里云OSS傳輸文件

    阿里云對象存儲OSS(Object Storage Service)是一款海量、安全、低成本、高可靠的云存儲服務(wù),多種存儲類型供選擇,全面優(yōu)化存儲成本,非常適合存儲非結(jié)構(gòu)化數(shù)據(jù),本文給大家介紹springboot工程使用阿里云OSS傳輸文件的操作,感興趣的朋友一起看看吧
    2023-08-08
  • SpringBoot 啟動報錯Unable to connect to Redis server: 127.0.0.1/127.0.0.1:6379問題的解決方案

    SpringBoot 啟動報錯Unable to connect to 

    這篇文章主要介紹了SpringBoot 啟動報錯Unable to connect to Redis server: 127.0.0.1/127.0.0.1:6379問題的解決方案,文中通過圖文結(jié)合的方式給大家講解的非常詳細,對大家解決問題有一定的幫助,需要的朋友可以參考下
    2024-10-10
  • 詳解5種Java中常見限流算法

    詳解5種Java中常見限流算法

    在高并發(fā)系統(tǒng)中,出于系統(tǒng)保護角度考慮,通常會對流量進行限流;不但在工作中要頻繁使用,而且也是面試中的高頻考點。本文就為大家整理了5種Java中常見限流算法,需要的可以參考一下
    2023-04-04
  • Java中List遍歷刪除元素remove()的方法

    Java中List遍歷刪除元素remove()的方法

    這篇文章主要介紹了Java中List遍歷刪除元素remove()的方法,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習或者工作具有一定的參考學(xué)習價值,需要的朋友們下面隨著小編來一起學(xué)習學(xué)習吧
    2020-11-11
  • Spring 靜態(tài)變量/構(gòu)造函數(shù)注入失敗的解決方案

    Spring 靜態(tài)變量/構(gòu)造函數(shù)注入失敗的解決方案

    我們經(jīng)常會遇到一下問題:Spring對靜態(tài)變量的注入為空、在構(gòu)造函數(shù)中使用Spring容器中的Bean對象,得到的結(jié)果為空。不要擔心,本文將為大家介紹如何解決這些問題,跟隨小編來看看吧
    2021-11-11
  • springboot與mybatis整合實例詳解

    springboot與mybatis整合實例詳解

    這篇文章主要為大家詳細介紹了springboot與mybatis整合實例,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2017-11-11
  • idea中如何查看類中所有方法列表

    idea中如何查看類中所有方法列表

    這篇文章主要介紹了idea中如何查看類中所有方法列表問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-03-03
  • Java MD5加密(實例講解)

    Java MD5加密(實例講解)

    下面小編就為大家?guī)硪黄狫ava MD5加密(實例講解)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2017-08-08

最新評論