欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java實(shí)現(xiàn)手寫(xiě)乞丐版線程池的示例代碼

 更新時(shí)間:2022年10月18日 08:44:50   作者:一無(wú)是處的研究僧  
在這篇文章當(dāng)中我們主要介紹實(shí)現(xiàn)一個(gè)非常簡(jiǎn)易版的線程池,深入的去理解其中的原理,麻雀雖小,五臟俱全,感興趣的小伙伴快跟隨小編一起學(xué)習(xí)學(xué)習(xí)吧

前言

在上篇文章線程池的前世今生當(dāng)中我們介紹了實(shí)現(xiàn)線程池的原理,在這篇文章當(dāng)中我們主要介紹實(shí)現(xiàn)一個(gè)非常簡(jiǎn)易版的線程池,深入的去理解其中的原理,麻雀雖小,五臟俱全。

線程池的具體實(shí)現(xiàn)

線程池實(shí)現(xiàn)思路

任務(wù)保存到哪里?

在上篇文章線程池的前世今生當(dāng)中我們具體去介紹了線程池當(dāng)中的原理。在線程池當(dāng)中我們有很多個(gè)線程不斷的從任務(wù)池(用戶在使用線程池的時(shí)候不斷的使用execute方法將任務(wù)添加到線程池當(dāng)中)里面去拿任務(wù)然后執(zhí)行,現(xiàn)在需要思考我們應(yīng)該用什么去實(shí)現(xiàn)任務(wù)池呢?

答案是阻塞隊(duì)列,因?yàn)槲覀冃枰WC在多個(gè)線程往任務(wù)池里面加入任務(wù)的時(shí)候并發(fā)安全,JDK已經(jīng)給我們提供了這樣的數(shù)據(jù)結(jié)構(gòu)——BlockingQueue,這個(gè)是一個(gè)并發(fā)安全的阻塞隊(duì)列,他之所以叫做阻塞隊(duì)列,是因?yàn)槲覀兛梢栽O(shè)置隊(duì)列當(dāng)中可以容納數(shù)據(jù)的個(gè)數(shù),當(dāng)加入到隊(duì)列當(dāng)中的數(shù)據(jù)超過(guò)這個(gè)值的時(shí)候,試圖將數(shù)據(jù)加入到阻塞隊(duì)列當(dāng)中的線程就會(huì)被掛起。當(dāng)隊(duì)列當(dāng)中為空的時(shí)候,試圖從隊(duì)列當(dāng)中取出數(shù)據(jù)的線程也會(huì)被掛起。

線程的設(shè)計(jì)

在我們自己實(shí)現(xiàn)的線程池當(dāng)中我們定一個(gè)Worker類去不斷的從任務(wù)池當(dāng)中取出任務(wù),然后進(jìn)行執(zhí)行。在我們自己定義的worker當(dāng)中還需要有一個(gè)變量isStopped表示線程是否停止工作。同時(shí)在worker當(dāng)中還需要保存當(dāng)前是哪個(gè)線程在執(zhí)行任務(wù),因此在我們自己設(shè)計(jì)的woker類當(dāng)中還需要有一個(gè)thisThread變量,保存正在執(zhí)行任務(wù)的線程,因此worker的整體設(shè)計(jì)如下:

package cscore.concurrent.java.threadpool;
 
import java.util.concurrent.BlockingQueue;
 
public class Worker implements Runnable {
 
  private Thread thisThread; // 表示正在執(zhí)行任務(wù)的線程
  private BlockingQueue<Runnable> taskQueue; // 由線程池傳遞過(guò)來(lái)的任務(wù)隊(duì)列
  private volatile boolean isStopped; // 表示 worker 是否停止工作 需要使用 volatile 保證線程之間的可見(jiàn)性
 
  public Worker(BlockingQueue taskQueue) { // 這個(gè)構(gòu)造方法是在線程池的實(shí)現(xiàn)當(dāng)中會(huì)被調(diào)用
    this.taskQueue = taskQueue;
  }
 
  // 線程執(zhí)行的函數(shù)
  @Override
  public void run() {
    thisThread = Thread.currentThread(); // 獲取執(zhí)行任務(wù)的線程
    while (!isStopped) { // 當(dāng)線程沒(méi)有停止的時(shí)候就不斷的去任務(wù)池當(dāng)中取出任務(wù)
      try {
        Runnable task = taskQueue.take(); // 從任務(wù)池當(dāng)中取出任務(wù) 當(dāng)沒(méi)有任務(wù)的時(shí)候線程會(huì)被這個(gè)方法阻塞
        task.run(); // 執(zhí)行任務(wù) 任務(wù)就是一個(gè) Runnable 對(duì)象
      } catch (InterruptedException e) {
        // do nothing
        // 這個(gè)地方很重要 你有沒(méi)有思考過(guò)一個(gè)問(wèn)題當(dāng)任務(wù)池當(dāng)中沒(méi)有任務(wù)的時(shí)候 線程會(huì)被阻塞在 take 方法上
        // 如果我們后面沒(méi)有任務(wù)提交拿他就會(huì)一直阻塞 那么我們?cè)撊绾螁拘阉?
        // 答案就在下面的函數(shù)當(dāng)中 調(diào)用線程的 interruput 方法 那么take方法就會(huì)產(chǎn)生一個(gè)異常 然后我們
        // 捕獲到一異常 然后線程退出
      }
    }
  }
 
  public synchronized void stopWorker() {
    if (isStopped) {
      throw new RuntimeException("thread has been interrupted");
    }
    isStopped = true;
    thisThread.interrupt(); // 中斷線程產(chǎn)生異常
  }
 
  public synchronized boolean isStopped() {
    return isStopped;
  }
}

線程池的參數(shù)

在我們自己實(shí)現(xiàn)的線程池當(dāng)中,我們只需要定義兩個(gè)參數(shù)一個(gè)是線程的個(gè)數(shù),另外一個(gè)是阻塞隊(duì)列(任務(wù)池)當(dāng)中最大的任務(wù)個(gè)數(shù)。在我們自己實(shí)現(xiàn)的線程池當(dāng)中還需要有一個(gè)變量isStopped表示線程池是否停止工作了,因此線程池的初步設(shè)計(jì)大致如下:

  private BlockingQueue taskQueue; // 任務(wù)池
  private volatile boolean isStopped; // 
  private final List<Worker> workers = new ArrayList<>();// 保存所所有的執(zhí)行任務(wù)的線程
 
  public ThreadPool(int numThreads, int maxTasks) {
    this.taskQueue = new ArrayBlockingQueue(maxTasks);
    for (int i = 0; i < numThreads; i++) {
      workers.add(new Worker(this.taskQueue));
    }
    int i = 1;
    // 這里產(chǎn)生線程 然后啟動(dòng)線程
    for (Worker worker : workers) {
      new Thread(worker, "ThreadPool-" + i + "-thread").start();
      i++;
    }
  }

線程池實(shí)現(xiàn)代碼

在上文當(dāng)中我們大致設(shè)計(jì)的線程池的初步結(jié)構(gòu),從上面的結(jié)果可以看出當(dāng)我們?cè)煲粋€(gè)ThreadPool對(duì)象的時(shí)候會(huì)產(chǎn)生指定線程的數(shù)目線程并且啟動(dòng)他們?nèi)?zhí)行任務(wù),現(xiàn)在我們還需要設(shè)計(jì)的就是如果關(guān)閉線程!我們?cè)陉P(guān)閉線程的時(shí)候還需要保證所有的任務(wù)都被執(zhí)行完成然后才關(guān)閉所有的線程,再退出,我們?cè)O(shè)計(jì)這個(gè)方法為shutDown。除此之外我們還設(shè)計(jì)一個(gè)函數(shù)可以強(qiáng)制退出,不用執(zhí)行所有的任務(wù)了,就直接退出,這個(gè)方法為stop。整個(gè)線程池實(shí)現(xiàn)的代碼如下:

package cscore.concurrent.java.threadpool;
 
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
 
public class ThreadPool {
 
  private BlockingQueue taskQueue;
  private volatile boolean isStopped;
  private final List<Worker> workers = new ArrayList<>();
 
  public ThreadPool(int numThreads, int maxTasks) {
    this.taskQueue = new ArrayBlockingQueue(maxTasks);
    for (int i = 0; i < numThreads; i++) {
      workers.add(new Worker(this.taskQueue));
    }
    int i = 1;
    for (Worker worker : workers) {
      new Thread(worker, "ThreadPool-" + i + "-thread").start();
      i++;
    }
  }
 
  // 下面這個(gè)方法是向線程池提交任務(wù)
  public void execute(Runnable runnable) throws InterruptedException {
    if (isStopped) {
      // 如果線程池已經(jīng)停下來(lái)了,就不在向任務(wù)隊(duì)列當(dāng)中提交任務(wù)了
      System.err.println("thread pool has been stopped, so quit submitting task");
      return;
    }
    taskQueue.put(runnable);
  }
 
  // 強(qiáng)制關(guān)閉線程池
  public synchronized void stop() {
    isStopped = true;
    for (Worker worker : workers) {
      worker.stopWorker();
    }
  }
 
  public synchronized void shutDown() {
    // 先表示關(guān)閉線程池 線程就不能再向線程池提交任務(wù)
    isStopped = true;
    // 先等待所有的任務(wù)執(zhí)行完成再關(guān)閉線程池
    waitForAllTasks();
    stop();
  }
 
  private void waitForAllTasks() {
    // 當(dāng)線程池當(dāng)中還有任務(wù)的時(shí)候 就不退出循環(huán)
    while (taskQueue.size() > 0)
      Thread.yield();
  }
}

線程池測(cè)試代碼

package cscore.concurrent.java.threadpool;
 
public class TestPool {
 
  public static void main(String[] args) throws InterruptedException {
    ThreadPool pool = new ThreadPool(3, 1024);
 
    for (int i = 0; i < 10; i++) {
      int tmp = i;
      pool.execute(() -> {
        System.out.println(Thread.currentThread().getName() + " say hello " + tmp);
      });
    }
    pool.shutDown();
  }
}

上面的代碼輸出結(jié)果:

ThreadPool-2-thread say hello 1
ThreadPool-2-thread say hello 3
ThreadPool-2-thread say hello 4
ThreadPool-2-thread say hello 5
ThreadPool-2-thread say hello 6
ThreadPool-2-thread say hello 7
ThreadPool-2-thread say hello 8
ThreadPool-2-thread say hello 9
ThreadPool-3-thread say hello 2
ThreadPool-1-thread say hello 0

從上面的結(jié)果來(lái)看確實(shí)實(shí)現(xiàn)了線程池的效果。

雜談

可能你會(huì)有疑問(wèn),當(dāng)我們調(diào)用 interrupt的時(shí)候是如何產(chǎn)生異常的,我們仔細(xì)看一個(gè)阻塞隊(duì)列的實(shí)現(xiàn)。在ArrayBlockingQueue當(dāng)中take方法實(shí)現(xiàn)如下:

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

在這個(gè)方法當(dāng)中調(diào)用的是鎖的lock.lockInterruptibly();方法,當(dāng)調(diào)用這個(gè)方法的時(shí)候線程是可以被interrupt方法中斷的,然后會(huì)拋出InterruptedException異常。

總結(jié)

在本篇文章當(dāng)中我們主要實(shí)現(xiàn)了一個(gè)乞丐版的線程池,這個(gè)線程池離JDK給我們提供的線程池還是有一點(diǎn)距離,JDK給我們提供給的線程池還有很多其他的參數(shù),我們將在后續(xù)的幾篇文章當(dāng)中繼續(xù)向JDK給我們提供的線程池靠近,直至實(shí)現(xiàn)一個(gè)盜版的JDK的線程池。本篇文章的代碼在下面的鏈接當(dāng)中也可以訪問(wèn)。

到此這篇關(guān)于Java實(shí)現(xiàn)手寫(xiě)乞丐版線程池的示例代碼的文章就介紹到這了,更多相關(guān)Java線程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論