欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java實(shí)現(xiàn)手寫一個(gè)線程池的示例代碼

 更新時(shí)間:2022年10月18日 15:37:13   作者:JAVA旭陽(yáng)  
線程池技術(shù)想必大家都不陌生把,相信在平時(shí)的工作中沒(méi)有少用,而且這也是面試頻率非常高的一個(gè)知識(shí)點(diǎn),那么大家知道它的實(shí)現(xiàn)原理和細(xì)節(jié)嗎?本文就來(lái)通過(guò)手寫一個(gè)簡(jiǎn)單的線程池框架,去掌握線程池的基本原理,感興趣的可以學(xué)習(xí)一下

概述

線程池技術(shù)想必大家都不陌生把,相信在平時(shí)的工作中沒(méi)有少用,而且這也是面試頻率非常高的一個(gè)知識(shí)點(diǎn),那么大家知道它的實(shí)現(xiàn)原理和細(xì)節(jié)嗎?如果直接去看jdk源碼的話,可能有一定的難度,那么我們可以先通過(guò)手寫一個(gè)簡(jiǎn)單的線程池框架,去掌握線程池的基本原理后,再去看jdk的線程池源碼就會(huì)相對(duì)容易,而且不容易忘記。

線程池框架設(shè)計(jì)

我們都知道,線程資源的創(chuàng)建和銷毀并不是沒(méi)有代價(jià)的,甚至開(kāi)銷是非常高的。同時(shí),線程也不是任意多創(chuàng)建的,因?yàn)榛钴S的線程會(huì)消耗系統(tǒng)資源,特別是內(nèi)存,在一定的范圍內(nèi),增加線程可以提高系統(tǒng)的吞吐率,如果超過(guò)了這個(gè)范圍,反而會(huì)降低程序的執(zhí)行速度。

因此,設(shè)計(jì)一個(gè)容納多個(gè)線程的容器,容器中的線程可以重復(fù)使用,省去了頻繁創(chuàng)建和銷毀線程對(duì)象的操作, 達(dá)到下面的目標(biāo):

  • 降低資源消耗,減少了創(chuàng)建和銷毀線程的次數(shù),每個(gè)工作線程都可以被重復(fù)利用,可執(zhí)行多個(gè)任務(wù)
  • 提高響應(yīng)速度,當(dāng)任務(wù)到達(dá)時(shí),如果有線程可以直接用,不會(huì)出現(xiàn)系統(tǒng)僵死
  • 提高線程的可管理性,如果無(wú)限制的創(chuàng)建線程,不僅會(huì)消耗系統(tǒng)資源,還會(huì)降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控

線程池的核心思想: 線程復(fù)用,同一個(gè)線程可以被重復(fù)使用,來(lái)處理多個(gè)任務(wù)。

為了實(shí)現(xiàn)線程池功能,需要考慮下面幾個(gè)設(shè)計(jì)要點(diǎn):

  • 線程池可以接口外部提交的任務(wù)執(zhí)行
  • 線程池有工作線程的數(shù)量,有任務(wù)執(zhí)行,沒(méi)有任務(wù)也空閑在那,等待任務(wù)過(guò)來(lái),這樣既避免線程頻繁創(chuàng)建銷毀帶來(lái)的開(kāi)銷,同時(shí)也可以避免線程池?zé)o限制的創(chuàng)建線程
  • 如果線程池接受提交的任務(wù)超過(guò)工作線程的數(shù)量了,該怎么辦?可以用一個(gè)隊(duì)列把任務(wù)存下來(lái),等工作線程完成任務(wù)后去隊(duì)列中獲取任務(wù),執(zhí)行
  • 那如果任務(wù)實(shí)在是太多太多了,達(dá)到了我們認(rèn)為的隊(duì)列最大值,怎么辦,我們可以設(shè)計(jì)一種任務(wù)太多的策略,可以進(jìn)行切換,比如直接丟棄任務(wù)、報(bào)錯(cuò)等等

看了上面的設(shè)計(jì)目標(biāo)和要點(diǎn),是不是能立刻想到一個(gè)非常經(jīng)典的設(shè)計(jì)模型——生產(chǎn)者消費(fèi)者模型。

  • 阻塞隊(duì)列存儲(chǔ)執(zhí)行任務(wù),比如外部main函數(shù)作為生產(chǎn)者向隊(duì)列生產(chǎn)任務(wù)。
  • 線程池中的工作線程作為消費(fèi)者獲取任務(wù)執(zhí)行。

現(xiàn)在我們將我們的設(shè)計(jì)思路轉(zhuǎn)換為代碼。

代碼實(shí)現(xiàn)

阻塞隊(duì)列的實(shí)現(xiàn)

  • 阻塞隊(duì)列主要存放任務(wù),有容量限制
  • 阻塞隊(duì)列提供添加和刪除任務(wù)的API, 如果超過(guò)容量,阻塞不能添加任務(wù),如果沒(méi)有任務(wù),阻塞無(wú)法獲取任務(wù)。
/**
 * <p>自定義任務(wù)隊(duì)列, 用來(lái)存放任務(wù) </p>
 *
 * @author: cxw (332059317@qq.com)
 * @date: 2022/10/18  10:15
 * @version: 1.0.0
 */
@Slf4j(topic = "c.BlockingQueue")
public class BlockingQueue<T> {
    // 容量
    private int capcity;
    // 雙端任務(wù)隊(duì)列容器
    private Deque<T> deque = new ArrayDeque<>();
    // 重入鎖
    private ReentrantLock lock = new ReentrantLock();
    // 生產(chǎn)者條件變量
    private Condition fullWaitSet = lock.newCondition();
    // 生產(chǎn)者條件變量
    private Condition emptyWaitSet = lock.newCondition();

    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    // 阻塞的方式添加任務(wù)
    public void put(T task) {
        lock.lock();
        try {
            // 通過(guò)while的方式
            while (deque.size() >= capcity) {
                log.debug("wait to add queue");
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            deque.offer(task);
            log.debug("task add successfully");
            emptyWaitSet.signal();
        }  finally {
            lock.unlock();
        }
    }

    // 阻塞獲取任務(wù)
    public T take() {
        lock.lock();
        try {
            // 通過(guò)while的方式
            while (deque.isEmpty()) {
                try {
                    log.debug("wait to take task");
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fullWaitSet.signal();
            T task = deque.poll();
            log.debug("take task successfully");
            // 從隊(duì)列中獲取元素
            return task;
        } finally {
            lock.unlock();
        }
    }
}
  • put()方法是向阻塞隊(duì)列中添加任務(wù)
  • take()方法是向阻塞隊(duì)列中獲取任務(wù)

線程池消費(fèi)端實(shí)現(xiàn)

1.定義執(zhí)行器接口

/**
 * <p>定義一個(gè)執(zhí)行器的接口:</p>
 *
 * @author: cxw (332059317@qq.com)
 * @date: 2022/10/18  12:31
 * @version: 1.0.0
 */
public interface Executor {

    /**
     * 提交任務(wù)執(zhí)行
     * @param task 任務(wù)
     */
    void execute(Runnable task);
}

2.定義線程池類實(shí)現(xiàn)該接口

@Slf4j(topic = "c.ThreadPool")
public class ThreadPool implements Executor {

    /**
     * 任務(wù)隊(duì)列
     */
    private BlockingQueue<Runnable> taskQueue;

    /**
     * 核心工作線程數(shù)
     */
    private int coreSize;

    /**
     * 工作線程集合
     */
    private Set<Worker> workers = new HashSet<>();

    /**
     *  創(chuàng)建線程池
     * @param coreSize 工作線程數(shù)量
     * @param capcity 阻塞隊(duì)列容量
     */
    public ThreadPool(int coreSize, int capcity) {
        this.coreSize = coreSize;
        this.taskQueue = new BlockingQueue<>(capcity);
    }

    /**
     * 提交任務(wù)執(zhí)行
     */
    @Override
    public void execute(Runnable task) {
        synchronized (workers) {
            // 如果工作線程數(shù)小于閾值,直接開(kāi)始任務(wù)執(zhí)行
            if(workers.size() < coreSize) {
                Worker worker = new Worker(task);
                workers.add(worker);
                worker.start();
            } else {
                // 如果超過(guò)了閾值,加入到隊(duì)列中
                taskQueue.put(task);
            }
        }
    }

    /**
     * 工作線程,對(duì)執(zhí)行的任務(wù)做了一層包裝處理
     */
    class Worker extends Thread {
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 如果任務(wù)不為空,或者可以從隊(duì)列中獲取任務(wù)
            while (task != null || (task = taskQueue.take()) != null) {
                try {
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    // 執(zhí)行完后,設(shè)置任務(wù)為空
                    task = null;
                }
            }

              // 移除工作線程
            synchronized (workers){
                log.debug("remove worker successfully");
                workers.remove(this);
            }
        }
    }
}
  • Worker類是工作線程類,包裝了執(zhí)行任務(wù),里面實(shí)現(xiàn)了從隊(duì)列獲取任務(wù),然后執(zhí)行任務(wù)。
  • execute方法的實(shí)現(xiàn)中,如果工作線程數(shù)量小于閾值的話,直接創(chuàng)建新的工作線程,否則將任務(wù)添加到隊(duì)列中。

3.演示

@Test
    public void testThreadPool1() throws InterruptedException {
        Executor executor = new ThreadPool(2, 4);
        // 提交任務(wù)
        for (int i = 0; i < 6; i++) {
            final  int j = i;
            executor.execute(() -> {
                try {
                    Thread.sleep(10);
                    log.info("run task {}", j);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            Thread.sleep(10);
        }

        Thread.sleep(10000);
    }

運(yùn)行結(jié)果:

獲取任務(wù)超時(shí)設(shè)計(jì)

目前從隊(duì)列中獲取任務(wù)是永久阻塞等待的,可以改成阻塞一段時(shí)間沒(méi)有獲取任務(wù),丟棄的策略。

@Slf4j(topic = "c.TimeoutBlockingQueue")
public class TimeoutBlockingQueue<T> {
    // 容量
    private int capcity;
    // 雙端任務(wù)隊(duì)列容器
    private Deque<T> deque = new ArrayDeque<>();
    // 重入鎖
    private ReentrantLock lock = new ReentrantLock();
    // 生產(chǎn)者條件變量
    private Condition fullWaitSet = lock.newCondition();
    // 生產(chǎn)者條件變量
    private Condition emptyWaitSet = lock.newCondition();

    public TimeoutBlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    // 帶超時(shí)時(shí)間的獲取
    public T poll(long timeout, TimeUnit unit){
        lock.lock();
        try{
            // 將 timeout 統(tǒng)一轉(zhuǎn)換為 納秒
            long nanos = unit.toNanos(timeout);
            while (deque.isEmpty()){
                try {
                    if (nanos<=0){
                        return null;
                    }
                    // 返回的是剩余的等待時(shí)間,更改navos的值,使虛假喚醒的時(shí)候可以繼續(xù)等待
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fullWaitSet.signal();
            return deque.getFirst();
        }finally {
            lock.unlock();
        }
    }

    // 帶超時(shí)時(shí)間的增加
    public boolean offer(T task , long timeout , TimeUnit unit){
        lock.lock();
        try{
            // 將 timeout 統(tǒng)一轉(zhuǎn)換為 納秒
            long nanos = unit.toNanos(timeout);
            while (deque.size() == capcity){
                try {
                    if (nanos<=0){
                        return false;
                    }
                    // 更新剩余需要等待的時(shí)間
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任務(wù)隊(duì)列 {}", task);
            deque.addLast(task);
            emptyWaitSet.signal();
            return true;
        }finally {
            lock.unlock();
        }
    }
}

新加TimeoutBlockingQueue類,添加offer和poll待超時(shí)的添加和獲取任務(wù)的方法。

拒絕策略設(shè)計(jì)

目前的實(shí)現(xiàn)還是有個(gè)漏洞,無(wú)法自定義任務(wù)超出閾值的一個(gè)拒絕策略,我們可以通過(guò)利用函數(shù)式編程+策略模式去實(shí)現(xiàn)。

1.定義策略模式的函數(shù)式接口

/**
 * <p>拒絕策略的函數(shù)式接口:</p>
 *
 * @author: cxw (332059317@qq.com)
 * @date: 2022/10/18  13:15
 * @version: 1.0.0
 */
@FunctionalInterface
public interface RejectPolicy<T> {

    /**
     * 拒絕策略的接口
     * @param queue
     * @param task
     */
    void reject(BlockingQueue<T> queue, T task);
}

2.添加函數(shù)式接口的調(diào)用入口

我們可以在阻塞隊(duì)列添加任務(wù)新加一個(gè)api, 添加任務(wù)如果超過(guò)容量,調(diào)用函數(shù)式接口。

@Slf4j(topic = "c.BlockingQueue")
public class BlockingQueue<T> {
    ........

    /**
     * 嘗試添加任務(wù)
     * @param rejectPolicy
     * @param task
     */
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try{
            // 如果隊(duì)列超過(guò)容量
            if (deque.size()> capcity){
                log.debug("task too much, do reject");
                rejectPolicy.reject(this, task);
            }else {
                deque.offer(task);
                emptyWaitSet.signal();
            }
        }finally {
            lock.unlock();
        }
    }
}

3.修改ThreadPool類

@Slf4j(topic = "c.ThreadPool")
public class ThreadPool implements Executor {
    .....

    /**
     * 拒絕策略
     */
    private RejectPolicy rejectPolicy;

    // 通過(guò)構(gòu)造方法傳入執(zhí)行的拒絕策略
    public ThreadPool(int coreSize, int capcity, RejectPolicy rejectPolicy) {
        this.coreSize = coreSize;
        this.taskQueue = new BlockingQueue<>(capcity);
        this.rejectPolicy = rejectPolicy;
    }

    /**
     * 提交任務(wù)執(zhí)行
     */
    @Override
    public void execute(Runnable task) {
        synchronized (workers) {
            // 如果工作線程數(shù)小于閾值,直接開(kāi)始任務(wù)執(zhí)行
            if(workers.size() < coreSize) {
                Worker worker = new Worker(task);
                workers.add(worker);
                worker.start();
            } else {
                // 如果超過(guò)了閾值,加入到隊(duì)列中
                //taskQueue.put(task);
                
                // 調(diào)用tryPut的方式
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }

   ....
}

通過(guò)構(gòu)造方法的方式傳入要執(zhí)行的拒絕策略

調(diào)用tryPut方法添加任務(wù)

4.演示

以上就是Java實(shí)現(xiàn)手寫一個(gè)線程池的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于Java線程池的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Java線程中斷interrupt的常用方法

    Java線程中斷interrupt的常用方法

    本文主要介紹了Java線程中斷interrupt的常用方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2022-06-06
  • 淺談一下Java中的悲觀鎖和樂(lè)觀鎖

    淺談一下Java中的悲觀鎖和樂(lè)觀鎖

    這篇文章主要介紹了一下Java中的悲觀鎖和樂(lè)觀鎖,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-04-04
  • Springboot自定義mybatis攔截器實(shí)現(xiàn)擴(kuò)展

    Springboot自定義mybatis攔截器實(shí)現(xiàn)擴(kuò)展

    本文主要介紹了Springboot自定義mybatis攔截器實(shí)現(xiàn)擴(kuò)展,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-12-12
  • Java中String類常用方法總結(jié)詳解

    Java中String類常用方法總結(jié)詳解

    String類是一個(gè)很常用的類,是Java語(yǔ)言的核心類,用來(lái)保存代碼中的字符串常量的,并且封裝了很多操作字符串的方法。本文為大家總結(jié)了一些String類常用方法的使用,感興趣的可以了解一下
    2022-08-08
  • Java listener簡(jiǎn)介_(kāi)動(dòng)力節(jié)點(diǎn)Java學(xué)院整理

    Java listener簡(jiǎn)介_(kāi)動(dòng)力節(jié)點(diǎn)Java學(xué)院整理

    這篇文章主要介紹了Java listener簡(jiǎn)介,可以用于統(tǒng)計(jì)用戶在線人數(shù)等,有興趣的可以了解一下
    2017-07-07
  • SpringBoot中的五種對(duì)靜態(tài)資源的映射規(guī)則的實(shí)現(xiàn)

    SpringBoot中的五種對(duì)靜態(tài)資源的映射規(guī)則的實(shí)現(xiàn)

    這篇文章主要介紹了SpringBoot中的五種對(duì)靜態(tài)資源的映射規(guī)則的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-12-12
  • Java如何實(shí)現(xiàn)可折疊Panel方法示例

    Java如何實(shí)現(xiàn)可折疊Panel方法示例

    這篇文章主要給大家介紹了關(guān)于利用Java如何實(shí)現(xiàn)可折疊Panel的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用java具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2018-07-07
  • IntelliJ IDEA 2020下載與安裝教程圖文詳解

    IntelliJ IDEA 2020下載與安裝教程圖文詳解

    這篇文章主要介紹了IDEA 2020下載與安裝的教程,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-05-05
  • java文件操作練習(xí)代碼 讀取某個(gè)盤符下的文件

    java文件操作練習(xí)代碼 讀取某個(gè)盤符下的文件

    這篇文章主要介紹了java讀取某個(gè)盤符下的文件示例,代碼中要求的是絕對(duì)路徑,編譯過(guò)程中要注意絕對(duì)路徑問(wèn)題和異常的抓取
    2014-01-01
  • Java中?springcloud.openfeign應(yīng)用案例解析

    Java中?springcloud.openfeign應(yīng)用案例解析

    使用OpenFeign能讓編寫Web?Service客戶端更加簡(jiǎn)單,使用時(shí)只需定義服務(wù)接口,然后在上面添加注解,OpenFeign也支持可拔插式的編碼和解碼器,這篇文章主要介紹了Java中?springcloud.openfeign應(yīng)用案例解析,需要的朋友可以參考下
    2024-06-06

最新評(píng)論