欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java實現手寫一個線程池的示例代碼

 更新時間:2022年10月18日 15:37:13   作者:JAVA旭陽  
線程池技術想必大家都不陌生把,相信在平時的工作中沒有少用,而且這也是面試頻率非常高的一個知識點,那么大家知道它的實現原理和細節(jié)嗎?本文就來通過手寫一個簡單的線程池框架,去掌握線程池的基本原理,感興趣的可以學習一下

概述

線程池技術想必大家都不陌生把,相信在平時的工作中沒有少用,而且這也是面試頻率非常高的一個知識點,那么大家知道它的實現原理和細節(jié)嗎?如果直接去看jdk源碼的話,可能有一定的難度,那么我們可以先通過手寫一個簡單的線程池框架,去掌握線程池的基本原理后,再去看jdk的線程池源碼就會相對容易,而且不容易忘記。

線程池框架設計

我們都知道,線程資源的創(chuàng)建和銷毀并不是沒有代價的,甚至開銷是非常高的。同時,線程也不是任意多創(chuàng)建的,因為活躍的線程會消耗系統(tǒng)資源,特別是內存,在一定的范圍內,增加線程可以提高系統(tǒng)的吞吐率,如果超過了這個范圍,反而會降低程序的執(zhí)行速度。

因此,設計一個容納多個線程的容器,容器中的線程可以重復使用,省去了頻繁創(chuàng)建和銷毀線程對象的操作, 達到下面的目標:

  • 降低資源消耗,減少了創(chuàng)建和銷毀線程的次數,每個工作線程都可以被重復利用,可執(zhí)行多個任務
  • 提高響應速度,當任務到達時,如果有線程可以直接用,不會出現系統(tǒng)僵死
  • 提高線程的可管理性,如果無限制的創(chuàng)建線程,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進行統(tǒng)一的分配,調優(yōu)和監(jiān)控

線程池的核心思想: 線程復用,同一個線程可以被重復使用,來處理多個任務。

為了實現線程池功能,需要考慮下面幾個設計要點:

  • 線程池可以接口外部提交的任務執(zhí)行
  • 線程池有工作線程的數量,有任務執(zhí)行,沒有任務也空閑在那,等待任務過來,這樣既避免線程頻繁創(chuàng)建銷毀帶來的開銷,同時也可以避免線程池無限制的創(chuàng)建線程
  • 如果線程池接受提交的任務超過工作線程的數量了,該怎么辦?可以用一個隊列把任務存下來,等工作線程完成任務后去隊列中獲取任務,執(zhí)行
  • 那如果任務實在是太多太多了,達到了我們認為的隊列最大值,怎么辦,我們可以設計一種任務太多的策略,可以進行切換,比如直接丟棄任務、報錯等等

看了上面的設計目標和要點,是不是能立刻想到一個非常經典的設計模型——生產者消費者模型。

  • 阻塞隊列存儲執(zhí)行任務,比如外部main函數作為生產者向隊列生產任務。
  • 線程池中的工作線程作為消費者獲取任務執(zhí)行。

現在我們將我們的設計思路轉換為代碼。

代碼實現

阻塞隊列的實現

  • 阻塞隊列主要存放任務,有容量限制
  • 阻塞隊列提供添加和刪除任務的API, 如果超過容量,阻塞不能添加任務,如果沒有任務,阻塞無法獲取任務。
/**
 * <p>自定義任務隊列, 用來存放任務 </p>
 *
 * @author: cxw (332059317@qq.com)
 * @date: 2022/10/18  10:15
 * @version: 1.0.0
 */
@Slf4j(topic = "c.BlockingQueue")
public class BlockingQueue<T> {
    // 容量
    private int capcity;
    // 雙端任務隊列容器
    private Deque<T> deque = new ArrayDeque<>();
    // 重入鎖
    private ReentrantLock lock = new ReentrantLock();
    // 生產者條件變量
    private Condition fullWaitSet = lock.newCondition();
    // 生產者條件變量
    private Condition emptyWaitSet = lock.newCondition();

    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    // 阻塞的方式添加任務
    public void put(T task) {
        lock.lock();
        try {
            // 通過while的方式
            while (deque.size() >= capcity) {
                log.debug("wait to add queue");
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            deque.offer(task);
            log.debug("task add successfully");
            emptyWaitSet.signal();
        }  finally {
            lock.unlock();
        }
    }

    // 阻塞獲取任務
    public T take() {
        lock.lock();
        try {
            // 通過while的方式
            while (deque.isEmpty()) {
                try {
                    log.debug("wait to take task");
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fullWaitSet.signal();
            T task = deque.poll();
            log.debug("take task successfully");
            // 從隊列中獲取元素
            return task;
        } finally {
            lock.unlock();
        }
    }
}
  • put()方法是向阻塞隊列中添加任務
  • take()方法是向阻塞隊列中獲取任務

線程池消費端實現

1.定義執(zhí)行器接口

/**
 * <p>定義一個執(zhí)行器的接口:</p>
 *
 * @author: cxw (332059317@qq.com)
 * @date: 2022/10/18  12:31
 * @version: 1.0.0
 */
public interface Executor {

    /**
     * 提交任務執(zhí)行
     * @param task 任務
     */
    void execute(Runnable task);
}

2.定義線程池類實現該接口

@Slf4j(topic = "c.ThreadPool")
public class ThreadPool implements Executor {

    /**
     * 任務隊列
     */
    private BlockingQueue<Runnable> taskQueue;

    /**
     * 核心工作線程數
     */
    private int coreSize;

    /**
     * 工作線程集合
     */
    private Set<Worker> workers = new HashSet<>();

    /**
     *  創(chuàng)建線程池
     * @param coreSize 工作線程數量
     * @param capcity 阻塞隊列容量
     */
    public ThreadPool(int coreSize, int capcity) {
        this.coreSize = coreSize;
        this.taskQueue = new BlockingQueue<>(capcity);
    }

    /**
     * 提交任務執(zhí)行
     */
    @Override
    public void execute(Runnable task) {
        synchronized (workers) {
            // 如果工作線程數小于閾值,直接開始任務執(zhí)行
            if(workers.size() < coreSize) {
                Worker worker = new Worker(task);
                workers.add(worker);
                worker.start();
            } else {
                // 如果超過了閾值,加入到隊列中
                taskQueue.put(task);
            }
        }
    }

    /**
     * 工作線程,對執(zhí)行的任務做了一層包裝處理
     */
    class Worker extends Thread {
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 如果任務不為空,或者可以從隊列中獲取任務
            while (task != null || (task = taskQueue.take()) != null) {
                try {
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    // 執(zhí)行完后,設置任務為空
                    task = null;
                }
            }

              // 移除工作線程
            synchronized (workers){
                log.debug("remove worker successfully");
                workers.remove(this);
            }
        }
    }
}
  • Worker類是工作線程類,包裝了執(zhí)行任務,里面實現了從隊列獲取任務,然后執(zhí)行任務。
  • execute方法的實現中,如果工作線程數量小于閾值的話,直接創(chuàng)建新的工作線程,否則將任務添加到隊列中。

3.演示

@Test
    public void testThreadPool1() throws InterruptedException {
        Executor executor = new ThreadPool(2, 4);
        // 提交任務
        for (int i = 0; i < 6; i++) {
            final  int j = i;
            executor.execute(() -> {
                try {
                    Thread.sleep(10);
                    log.info("run task {}", j);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            Thread.sleep(10);
        }

        Thread.sleep(10000);
    }

運行結果:

獲取任務超時設計

目前從隊列中獲取任務是永久阻塞等待的,可以改成阻塞一段時間沒有獲取任務,丟棄的策略。

@Slf4j(topic = "c.TimeoutBlockingQueue")
public class TimeoutBlockingQueue<T> {
    // 容量
    private int capcity;
    // 雙端任務隊列容器
    private Deque<T> deque = new ArrayDeque<>();
    // 重入鎖
    private ReentrantLock lock = new ReentrantLock();
    // 生產者條件變量
    private Condition fullWaitSet = lock.newCondition();
    // 生產者條件變量
    private Condition emptyWaitSet = lock.newCondition();

    public TimeoutBlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    // 帶超時時間的獲取
    public T poll(long timeout, TimeUnit unit){
        lock.lock();
        try{
            // 將 timeout 統(tǒng)一轉換為 納秒
            long nanos = unit.toNanos(timeout);
            while (deque.isEmpty()){
                try {
                    if (nanos<=0){
                        return null;
                    }
                    // 返回的是剩余的等待時間,更改navos的值,使虛假喚醒的時候可以繼續(xù)等待
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fullWaitSet.signal();
            return deque.getFirst();
        }finally {
            lock.unlock();
        }
    }

    // 帶超時時間的增加
    public boolean offer(T task , long timeout , TimeUnit unit){
        lock.lock();
        try{
            // 將 timeout 統(tǒng)一轉換為 納秒
            long nanos = unit.toNanos(timeout);
            while (deque.size() == capcity){
                try {
                    if (nanos<=0){
                        return false;
                    }
                    // 更新剩余需要等待的時間
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任務隊列 {}", task);
            deque.addLast(task);
            emptyWaitSet.signal();
            return true;
        }finally {
            lock.unlock();
        }
    }
}

新加TimeoutBlockingQueue類,添加offer和poll待超時的添加和獲取任務的方法。

拒絕策略設計

目前的實現還是有個漏洞,無法自定義任務超出閾值的一個拒絕策略,我們可以通過利用函數式編程+策略模式去實現。

1.定義策略模式的函數式接口

/**
 * <p>拒絕策略的函數式接口:</p>
 *
 * @author: cxw (332059317@qq.com)
 * @date: 2022/10/18  13:15
 * @version: 1.0.0
 */
@FunctionalInterface
public interface RejectPolicy<T> {

    /**
     * 拒絕策略的接口
     * @param queue
     * @param task
     */
    void reject(BlockingQueue<T> queue, T task);
}

2.添加函數式接口的調用入口

我們可以在阻塞隊列添加任務新加一個api, 添加任務如果超過容量,調用函數式接口。

@Slf4j(topic = "c.BlockingQueue")
public class BlockingQueue<T> {
    ........

    /**
     * 嘗試添加任務
     * @param rejectPolicy
     * @param task
     */
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try{
            // 如果隊列超過容量
            if (deque.size()> capcity){
                log.debug("task too much, do reject");
                rejectPolicy.reject(this, task);
            }else {
                deque.offer(task);
                emptyWaitSet.signal();
            }
        }finally {
            lock.unlock();
        }
    }
}

3.修改ThreadPool類

@Slf4j(topic = "c.ThreadPool")
public class ThreadPool implements Executor {
    .....

    /**
     * 拒絕策略
     */
    private RejectPolicy rejectPolicy;

    // 通過構造方法傳入執(zhí)行的拒絕策略
    public ThreadPool(int coreSize, int capcity, RejectPolicy rejectPolicy) {
        this.coreSize = coreSize;
        this.taskQueue = new BlockingQueue<>(capcity);
        this.rejectPolicy = rejectPolicy;
    }

    /**
     * 提交任務執(zhí)行
     */
    @Override
    public void execute(Runnable task) {
        synchronized (workers) {
            // 如果工作線程數小于閾值,直接開始任務執(zhí)行
            if(workers.size() < coreSize) {
                Worker worker = new Worker(task);
                workers.add(worker);
                worker.start();
            } else {
                // 如果超過了閾值,加入到隊列中
                //taskQueue.put(task);
                
                // 調用tryPut的方式
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }

   ....
}

通過構造方法的方式傳入要執(zhí)行的拒絕策略

調用tryPut方法添加任務

4.演示

以上就是Java實現手寫一個線程池的示例代碼的詳細內容,更多關于Java線程池的資料請關注腳本之家其它相關文章!

相關文章

  • Java線程中斷interrupt的常用方法

    Java線程中斷interrupt的常用方法

    本文主要介紹了Java線程中斷interrupt的常用方法,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-06-06
  • 淺談一下Java中的悲觀鎖和樂觀鎖

    淺談一下Java中的悲觀鎖和樂觀鎖

    這篇文章主要介紹了一下Java中的悲觀鎖和樂觀鎖,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-04-04
  • Springboot自定義mybatis攔截器實現擴展

    Springboot自定義mybatis攔截器實現擴展

    本文主要介紹了Springboot自定義mybatis攔截器實現擴展,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-12-12
  • Java中String類常用方法總結詳解

    Java中String類常用方法總結詳解

    String類是一個很常用的類,是Java語言的核心類,用來保存代碼中的字符串常量的,并且封裝了很多操作字符串的方法。本文為大家總結了一些String類常用方法的使用,感興趣的可以了解一下
    2022-08-08
  • Java listener簡介_動力節(jié)點Java學院整理

    Java listener簡介_動力節(jié)點Java學院整理

    這篇文章主要介紹了Java listener簡介,可以用于統(tǒng)計用戶在線人數等,有興趣的可以了解一下
    2017-07-07
  • SpringBoot中的五種對靜態(tài)資源的映射規(guī)則的實現

    SpringBoot中的五種對靜態(tài)資源的映射規(guī)則的實現

    這篇文章主要介紹了SpringBoot中的五種對靜態(tài)資源的映射規(guī)則的實現,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2019-12-12
  • Java如何實現可折疊Panel方法示例

    Java如何實現可折疊Panel方法示例

    這篇文章主要給大家介紹了關于利用Java如何實現可折疊Panel的相關資料,文中通過示例代碼介紹的非常詳細,對大家學習或者使用java具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2018-07-07
  • IntelliJ IDEA 2020下載與安裝教程圖文詳解

    IntelliJ IDEA 2020下載與安裝教程圖文詳解

    這篇文章主要介紹了IDEA 2020下載與安裝的教程,本文通過圖文并茂的形式給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-05-05
  • java文件操作練習代碼 讀取某個盤符下的文件

    java文件操作練習代碼 讀取某個盤符下的文件

    這篇文章主要介紹了java讀取某個盤符下的文件示例,代碼中要求的是絕對路徑,編譯過程中要注意絕對路徑問題和異常的抓取
    2014-01-01
  • Java中?springcloud.openfeign應用案例解析

    Java中?springcloud.openfeign應用案例解析

    使用OpenFeign能讓編寫Web?Service客戶端更加簡單,使用時只需定義服務接口,然后在上面添加注解,OpenFeign也支持可拔插式的編碼和解碼器,這篇文章主要介紹了Java中?springcloud.openfeign應用案例解析,需要的朋友可以參考下
    2024-06-06

最新評論