Java實(shí)現(xiàn)手寫一個(gè)線程池的示例代碼
概述
線程池技術(shù)想必大家都不陌生把,相信在平時(shí)的工作中沒有少用,而且這也是面試頻率非常高的一個(gè)知識點(diǎn),那么大家知道它的實(shí)現(xiàn)原理和細(xì)節(jié)嗎?如果直接去看jdk源碼的話,可能有一定的難度,那么我們可以先通過手寫一個(gè)簡單的線程池框架,去掌握線程池的基本原理后,再去看jdk的線程池源碼就會相對容易,而且不容易忘記。
線程池框架設(shè)計(jì)
我們都知道,線程資源的創(chuàng)建和銷毀并不是沒有代價(jià)的,甚至開銷是非常高的。同時(shí),線程也不是任意多創(chuàng)建的,因?yàn)榛钴S的線程會消耗系統(tǒng)資源,特別是內(nèi)存,在一定的范圍內(nèi),增加線程可以提高系統(tǒng)的吞吐率,如果超過了這個(gè)范圍,反而會降低程序的執(zhí)行速度。
因此,設(shè)計(jì)一個(gè)容納多個(gè)線程的容器,容器中的線程可以重復(fù)使用,省去了頻繁創(chuàng)建和銷毀線程對象的操作, 達(dá)到下面的目標(biāo):
- 降低資源消耗,減少了創(chuàng)建和銷毀線程的次數(shù),每個(gè)工作線程都可以被重復(fù)利用,可執(zhí)行多個(gè)任務(wù)
- 提高響應(yīng)速度,當(dāng)任務(wù)到達(dá)時(shí),如果有線程可以直接用,不會出現(xiàn)系統(tǒng)僵死
- 提高線程的可管理性,如果無限制的創(chuàng)建線程,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性,使用線程池可以進(jìn)行統(tǒng)一的分配,調(diào)優(yōu)和監(jiān)控
線程池的核心思想: 線程復(fù)用,同一個(gè)線程可以被重復(fù)使用,來處理多個(gè)任務(wù)。
為了實(shí)現(xiàn)線程池功能,需要考慮下面幾個(gè)設(shè)計(jì)要點(diǎn):
- 線程池可以接口外部提交的任務(wù)執(zhí)行
- 線程池有工作線程的數(shù)量,有任務(wù)執(zhí)行,沒有任務(wù)也空閑在那,等待任務(wù)過來,這樣既避免線程頻繁創(chuàng)建銷毀帶來的開銷,同時(shí)也可以避免線程池?zé)o限制的創(chuàng)建線程
- 如果線程池接受提交的任務(wù)超過工作線程的數(shù)量了,該怎么辦?可以用一個(gè)隊(duì)列把任務(wù)存下來,等工作線程完成任務(wù)后去隊(duì)列中獲取任務(wù),執(zhí)行
- 那如果任務(wù)實(shí)在是太多太多了,達(dá)到了我們認(rèn)為的隊(duì)列最大值,怎么辦,我們可以設(shè)計(jì)一種任務(wù)太多的策略,可以進(jìn)行切換,比如直接丟棄任務(wù)、報(bào)錯(cuò)等等
看了上面的設(shè)計(jì)目標(biāo)和要點(diǎn),是不是能立刻想到一個(gè)非常經(jīng)典的設(shè)計(jì)模型——生產(chǎn)者消費(fèi)者模型。

- 阻塞隊(duì)列存儲執(zhí)行任務(wù),比如外部main函數(shù)作為生產(chǎn)者向隊(duì)列生產(chǎn)任務(wù)。
- 線程池中的工作線程作為消費(fèi)者獲取任務(wù)執(zhí)行。
現(xiàn)在我們將我們的設(shè)計(jì)思路轉(zhuǎn)換為代碼。
代碼實(shí)現(xiàn)
阻塞隊(duì)列的實(shí)現(xiàn)
- 阻塞隊(duì)列主要存放任務(wù),有容量限制
- 阻塞隊(duì)列提供添加和刪除任務(wù)的API, 如果超過容量,阻塞不能添加任務(wù),如果沒有任務(wù),阻塞無法獲取任務(wù)。
/**
* <p>自定義任務(wù)隊(duì)列, 用來存放任務(wù) </p>
*
* @author: cxw (332059317@qq.com)
* @date: 2022/10/18 10:15
* @version: 1.0.0
*/
@Slf4j(topic = "c.BlockingQueue")
public class BlockingQueue<T> {
// 容量
private int capcity;
// 雙端任務(wù)隊(duì)列容器
private Deque<T> deque = new ArrayDeque<>();
// 重入鎖
private ReentrantLock lock = new ReentrantLock();
// 生產(chǎn)者條件變量
private Condition fullWaitSet = lock.newCondition();
// 生產(chǎn)者條件變量
private Condition emptyWaitSet = lock.newCondition();
public BlockingQueue(int capcity) {
this.capcity = capcity;
}
// 阻塞的方式添加任務(wù)
public void put(T task) {
lock.lock();
try {
// 通過while的方式
while (deque.size() >= capcity) {
log.debug("wait to add queue");
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
deque.offer(task);
log.debug("task add successfully");
emptyWaitSet.signal();
} finally {
lock.unlock();
}
}
// 阻塞獲取任務(wù)
public T take() {
lock.lock();
try {
// 通過while的方式
while (deque.isEmpty()) {
try {
log.debug("wait to take task");
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
fullWaitSet.signal();
T task = deque.poll();
log.debug("take task successfully");
// 從隊(duì)列中獲取元素
return task;
} finally {
lock.unlock();
}
}
}- put()方法是向阻塞隊(duì)列中添加任務(wù)
- take()方法是向阻塞隊(duì)列中獲取任務(wù)
線程池消費(fèi)端實(shí)現(xiàn)
1.定義執(zhí)行器接口
/**
* <p>定義一個(gè)執(zhí)行器的接口:</p>
*
* @author: cxw (332059317@qq.com)
* @date: 2022/10/18 12:31
* @version: 1.0.0
*/
public interface Executor {
/**
* 提交任務(wù)執(zhí)行
* @param task 任務(wù)
*/
void execute(Runnable task);
}2.定義線程池類實(shí)現(xiàn)該接口
@Slf4j(topic = "c.ThreadPool")
public class ThreadPool implements Executor {
/**
* 任務(wù)隊(duì)列
*/
private BlockingQueue<Runnable> taskQueue;
/**
* 核心工作線程數(shù)
*/
private int coreSize;
/**
* 工作線程集合
*/
private Set<Worker> workers = new HashSet<>();
/**
* 創(chuàng)建線程池
* @param coreSize 工作線程數(shù)量
* @param capcity 阻塞隊(duì)列容量
*/
public ThreadPool(int coreSize, int capcity) {
this.coreSize = coreSize;
this.taskQueue = new BlockingQueue<>(capcity);
}
/**
* 提交任務(wù)執(zhí)行
*/
@Override
public void execute(Runnable task) {
synchronized (workers) {
// 如果工作線程數(shù)小于閾值,直接開始任務(wù)執(zhí)行
if(workers.size() < coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
} else {
// 如果超過了閾值,加入到隊(duì)列中
taskQueue.put(task);
}
}
}
/**
* 工作線程,對執(zhí)行的任務(wù)做了一層包裝處理
*/
class Worker extends Thread {
private Runnable task;
public Worker(Runnable task) {
this.task = task;
}
@Override
public void run() {
// 如果任務(wù)不為空,或者可以從隊(duì)列中獲取任務(wù)
while (task != null || (task = taskQueue.take()) != null) {
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 執(zhí)行完后,設(shè)置任務(wù)為空
task = null;
}
}
// 移除工作線程
synchronized (workers){
log.debug("remove worker successfully");
workers.remove(this);
}
}
}
}- Worker類是工作線程類,包裝了執(zhí)行任務(wù),里面實(shí)現(xiàn)了從隊(duì)列獲取任務(wù),然后執(zhí)行任務(wù)。
- execute方法的實(shí)現(xiàn)中,如果工作線程數(shù)量小于閾值的話,直接創(chuàng)建新的工作線程,否則將任務(wù)添加到隊(duì)列中。
3.演示
@Test
public void testThreadPool1() throws InterruptedException {
Executor executor = new ThreadPool(2, 4);
// 提交任務(wù)
for (int i = 0; i < 6; i++) {
final int j = i;
executor.execute(() -> {
try {
Thread.sleep(10);
log.info("run task {}", j);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread.sleep(10);
}
Thread.sleep(10000);
}運(yùn)行結(jié)果:

獲取任務(wù)超時(shí)設(shè)計(jì)
目前從隊(duì)列中獲取任務(wù)是永久阻塞等待的,可以改成阻塞一段時(shí)間沒有獲取任務(wù),丟棄的策略。
@Slf4j(topic = "c.TimeoutBlockingQueue")
public class TimeoutBlockingQueue<T> {
// 容量
private int capcity;
// 雙端任務(wù)隊(duì)列容器
private Deque<T> deque = new ArrayDeque<>();
// 重入鎖
private ReentrantLock lock = new ReentrantLock();
// 生產(chǎn)者條件變量
private Condition fullWaitSet = lock.newCondition();
// 生產(chǎn)者條件變量
private Condition emptyWaitSet = lock.newCondition();
public TimeoutBlockingQueue(int capcity) {
this.capcity = capcity;
}
// 帶超時(shí)時(shí)間的獲取
public T poll(long timeout, TimeUnit unit){
lock.lock();
try{
// 將 timeout 統(tǒng)一轉(zhuǎn)換為 納秒
long nanos = unit.toNanos(timeout);
while (deque.isEmpty()){
try {
if (nanos<=0){
return null;
}
// 返回的是剩余的等待時(shí)間,更改navos的值,使虛假喚醒的時(shí)候可以繼續(xù)等待
nanos = emptyWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
fullWaitSet.signal();
return deque.getFirst();
}finally {
lock.unlock();
}
}
// 帶超時(shí)時(shí)間的增加
public boolean offer(T task , long timeout , TimeUnit unit){
lock.lock();
try{
// 將 timeout 統(tǒng)一轉(zhuǎn)換為 納秒
long nanos = unit.toNanos(timeout);
while (deque.size() == capcity){
try {
if (nanos<=0){
return false;
}
// 更新剩余需要等待的時(shí)間
nanos = fullWaitSet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("加入任務(wù)隊(duì)列 {}", task);
deque.addLast(task);
emptyWaitSet.signal();
return true;
}finally {
lock.unlock();
}
}
}新加TimeoutBlockingQueue類,添加offer和poll待超時(shí)的添加和獲取任務(wù)的方法。
拒絕策略設(shè)計(jì)
目前的實(shí)現(xiàn)還是有個(gè)漏洞,無法自定義任務(wù)超出閾值的一個(gè)拒絕策略,我們可以通過利用函數(shù)式編程+策略模式去實(shí)現(xiàn)。
1.定義策略模式的函數(shù)式接口
/**
* <p>拒絕策略的函數(shù)式接口:</p>
*
* @author: cxw (332059317@qq.com)
* @date: 2022/10/18 13:15
* @version: 1.0.0
*/
@FunctionalInterface
public interface RejectPolicy<T> {
/**
* 拒絕策略的接口
* @param queue
* @param task
*/
void reject(BlockingQueue<T> queue, T task);
}2.添加函數(shù)式接口的調(diào)用入口
我們可以在阻塞隊(duì)列添加任務(wù)新加一個(gè)api, 添加任務(wù)如果超過容量,調(diào)用函數(shù)式接口。
@Slf4j(topic = "c.BlockingQueue")
public class BlockingQueue<T> {
........
/**
* 嘗試添加任務(wù)
* @param rejectPolicy
* @param task
*/
public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
lock.lock();
try{
// 如果隊(duì)列超過容量
if (deque.size()> capcity){
log.debug("task too much, do reject");
rejectPolicy.reject(this, task);
}else {
deque.offer(task);
emptyWaitSet.signal();
}
}finally {
lock.unlock();
}
}
}3.修改ThreadPool類
@Slf4j(topic = "c.ThreadPool")
public class ThreadPool implements Executor {
.....
/**
* 拒絕策略
*/
private RejectPolicy rejectPolicy;
// 通過構(gòu)造方法傳入執(zhí)行的拒絕策略
public ThreadPool(int coreSize, int capcity, RejectPolicy rejectPolicy) {
this.coreSize = coreSize;
this.taskQueue = new BlockingQueue<>(capcity);
this.rejectPolicy = rejectPolicy;
}
/**
* 提交任務(wù)執(zhí)行
*/
@Override
public void execute(Runnable task) {
synchronized (workers) {
// 如果工作線程數(shù)小于閾值,直接開始任務(wù)執(zhí)行
if(workers.size() < coreSize) {
Worker worker = new Worker(task);
workers.add(worker);
worker.start();
} else {
// 如果超過了閾值,加入到隊(duì)列中
//taskQueue.put(task);
// 調(diào)用tryPut的方式
taskQueue.tryPut(rejectPolicy, task);
}
}
}
....
}通過構(gòu)造方法的方式傳入要執(zhí)行的拒絕策略
調(diào)用tryPut方法添加任務(wù)
4.演示

以上就是Java實(shí)現(xiàn)手寫一個(gè)線程池的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于Java線程池的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Springboot自定義mybatis攔截器實(shí)現(xiàn)擴(kuò)展
本文主要介紹了Springboot自定義mybatis攔截器實(shí)現(xiàn)擴(kuò)展,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-12-12
Java listener簡介_動力節(jié)點(diǎn)Java學(xué)院整理
這篇文章主要介紹了Java listener簡介,可以用于統(tǒng)計(jì)用戶在線人數(shù)等,有興趣的可以了解一下2017-07-07
SpringBoot中的五種對靜態(tài)資源的映射規(guī)則的實(shí)現(xiàn)
這篇文章主要介紹了SpringBoot中的五種對靜態(tài)資源的映射規(guī)則的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-12-12
Java如何實(shí)現(xiàn)可折疊Panel方法示例
這篇文章主要給大家介紹了關(guān)于利用Java如何實(shí)現(xiàn)可折疊Panel的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用java具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2018-07-07
java文件操作練習(xí)代碼 讀取某個(gè)盤符下的文件
這篇文章主要介紹了java讀取某個(gè)盤符下的文件示例,代碼中要求的是絕對路徑,編譯過程中要注意絕對路徑問題和異常的抓取2014-01-01
Java中?springcloud.openfeign應(yīng)用案例解析
使用OpenFeign能讓編寫Web?Service客戶端更加簡單,使用時(shí)只需定義服務(wù)接口,然后在上面添加注解,OpenFeign也支持可拔插式的編碼和解碼器,這篇文章主要介紹了Java中?springcloud.openfeign應(yīng)用案例解析,需要的朋友可以參考下2024-06-06

