SpringBoot線程池和Java線程池的使用和實現(xiàn)原理解析
SpringBoot線程池和Java線程池的用法和實現(xiàn)原理
使用默認(rèn)的線程池
方式一:通過@Async注解調(diào)用
public class AsyncTest {
@Async
public void async(String name) throws InterruptedException {
System.out.println("async" + name + " " + Thread.currentThread().getName());
Thread.sleep(1000);
}
}啟動類上需要添加@EnableAsync注解,否則不會生效。
@SpringBootApplication
//@EnableAsync
public class Test1Application {
public static void main(String[] args) throws InterruptedException {
ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args);
AsyncTest bean = run.getBean(AsyncTest.class);
for(int index = 0; index <= 10; ++index){
bean.async(String.valueOf(index));
}
}
}方式二:直接注入 ThreadPoolTaskExecutor
此時可不加 @EnableAsync注解
@SpringBootTest
class Test1ApplicationTests {
@Resource
ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Test
void contextLoads() {
Runnable runnable = () -> {
System.out.println(Thread.currentThread().getName());
};
for(int index = 0; index <= 10; ++index){
threadPoolTaskExecutor.submit(runnable);
}
}
}線程池默認(rèn)配置信息
SpringBoot線程池的常見配置:
spring:
task:
execution:
pool:
core-size: 8
max-size: 16 # 默認(rèn)是 Integer.MAX_VALUE
keep-alive: 60s # 當(dāng)線程池中的線程數(shù)量大于 corePoolSize 時,如果某線程空閑時間超過keepAliveTime,線程將被終止
allow-core-thread-timeout: true # 是否允許核心線程超時,默認(rèn)true
queue-capacity: 100 # 線程隊列的大小,默認(rèn)Integer.MAX_VALUE
shutdown:
await-termination: false # 線程關(guān)閉等待
thread-name-prefix: task- # 線程名稱的前綴SpringBoot 線程池的實現(xiàn)原理
TaskExecutionAutoConfiguration 類中定義了 ThreadPoolTaskExecutor,該類的內(nèi)部實現(xiàn)也是基于java原生的 ThreadPoolExecutor類。initializeExecutor()方法在其父類中被調(diào)用,但是在父類中 RejectedExecutionHandler 被定義為了 private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy(); ,并通過initialize()方法將AbortPolicy傳入initializeExecutor()中。
注意在TaskExecutionAutoConfiguration 類中,ThreadPoolTaskExecutor類的bean的名稱為: applicationTaskExecutor 和 taskExecutor。
// TaskExecutionAutoConfiguration#applicationTaskExecutor()
@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
AsyncAnnotationBeanPostProcessor.DEFAUL
T_TASK_EXECUTOR_BEAN_NAME })
@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
return builder.build();
}// ThreadPoolTaskExecutor#initializeExecutor()
@Override
protected ExecutorService initializeExecutor(
ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
BlockingQueue<Runnable> queue = createQueue(this.queueCapacity);
ThreadPoolExecutor executor;
if (this.taskDecorator != null) {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler) {
@Override
public void execute(Runnable command) {
Runnable decorated = taskDecorator.decorate(command);
if (decorated != command) {
decoratedTaskMap.put(decorated, command);
}
super.execute(decorated);
}
};
}
else {
executor = new ThreadPoolExecutor(
this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS,
queue, threadFactory, rejectedExecutionHandler);
}
if (this.allowCoreThreadTimeOut) {
executor.allowCoreThreadTimeOut(true);
}
this.threadPoolExecutor = executor;
return executor;
}
// ExecutorConfigurationSupport#initialize()
public void initialize() {
if (logger.isInfoEnabled()) {
logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : ""));
}
if (!this.threadNamePrefixSet && this.beanName != null) {
setThreadNamePrefix(this.beanName + "-");
}
this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler);
}覆蓋默認(rèn)的線程池
覆蓋默認(rèn)的 taskExecutor對象,bean的返回類型可以是ThreadPoolTaskExecutor也可以是Executor。
@Configuration
public class ThreadPoolConfiguration {
@Bean("taskExecutor")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//設(shè)置線程池參數(shù)信息
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(50);
taskExecutor.setQueueCapacity(200);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setThreadNamePrefix("myExecutor--");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
//修改拒絕策略為使用當(dāng)前線程執(zhí)行
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化線程池
taskExecutor.initialize();
return taskExecutor;
}
}管理多個線程池
如果出現(xiàn)了多個線程池,例如再定義一個線程池 taskExecutor2,則直接執(zhí)行會報錯。此時需要指定bean的名稱即可。
@Bean("taskExecutor2")
public ThreadPoolTaskExecutor taskExecutor2() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
//設(shè)置線程池參數(shù)信息
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(50);
taskExecutor.setQueueCapacity(200);
taskExecutor.setKeepAliveSeconds(60);
taskExecutor.setThreadNamePrefix("myExecutor2--");
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);
taskExecutor.setAwaitTerminationSeconds(60);
//修改拒絕策略為使用當(dāng)前線程執(zhí)行
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//初始化線程池
taskExecutor.initialize();
return taskExecutor;
}引用線程池時,需要將變量名更改為bean的名稱,這樣會按照名稱查找。
@Resource ThreadPoolTaskExecutor taskExecutor2;
對于使用@Async注解的多線程則在注解中指定bean的名字即可。
@Async("taskExecutor2")
public void async(String name) throws InterruptedException {
System.out.println("async" + name + " " + Thread.currentThread().getName());
Thread.sleep(1000);
}線程池的四種拒絕策略
JAVA常用的四種線程池
ThreadPoolExecutor 類的構(gòu)造函數(shù)如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}newCachedThreadPool
不限制最大線程數(shù)(maximumPoolSize=Integer.MAX_VALUE),如果有空閑的線程超過需要,則回收,否則重用已有的線程。
new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());newFixedThreadPool
定長線程池,超出線程數(shù)的任務(wù)會在隊列中等待。
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());newScheduledThreadPool
類似于newCachedThreadPool,線程數(shù)無上限,但是可以指定corePoolSize??蓪崿F(xiàn)延遲執(zhí)行、周期執(zhí)行。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}周期執(zhí)行:
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.scheduleAtFixedRate(()->{
System.out.println("rate");
}, 1, 1, TimeUnit.SECONDS);延時執(zhí)行:
scheduledThreadPool.schedule(()->{
System.out.println("delay 3 seconds");
}, 3, TimeUnit.SECONDS);newSingleThreadExecutor
單線程線程池,可以實現(xiàn)線程的順序執(zhí)行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}Java 線程池中的四種拒絕策略
CallerRunsPolicy:線程池讓調(diào)用者去執(zhí)行。AbortPolicy:如果線程池拒絕了任務(wù),直接報錯。DiscardPolicy:如果線程池拒絕了任務(wù),直接丟棄。DiscardOldestPolicy:如果線程池拒絕了任務(wù),直接將線程池中最舊的,未運行的任務(wù)丟棄,將新任務(wù)入隊。
CallerRunsPolicy
直接在主線程中執(zhí)行了run方法。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}效果類似于:
Runnable thread = ()->{
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(0);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
};
thread.run();AbortPolicy
直接拋出RejectedExecutionException異常,并指示任務(wù)的信息,線程池的信息。、
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}DiscardPolicy
什么也不做。
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}DiscardOldestPolicy
e.getQueue().poll(): 取出隊列最舊的任務(wù)。e.execute(r): 當(dāng)前任務(wù)入隊。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}Java 線程復(fù)用的原理
java的線程池中保存的是 java.util.concurrent.ThreadPoolExecutor.Worker 對象,該對象在 被維護(hù)在private final HashSet<Worker> workers = new HashSet<Worker>();。workQueue是保存待執(zhí)行的任務(wù)的隊列,線程池中加入新的任務(wù)時,會將任務(wù)加入到workQueue隊列中。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
work對象的執(zhí)行依賴于 runWorker(),與我們平時寫的線程不同,該線程處在一個循環(huán)中,并不斷地從隊列中獲取新的任務(wù)執(zhí)行。因此線程池中的線程才可以復(fù)用,而不是像我們平常使用的線程一樣執(zhí)行完畢就結(jié)束。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
到此這篇關(guān)于SpringBoot線程池和Java線程池的用法和實現(xiàn)原理的文章就介紹到這了,更多相關(guān)SpringBoot線程池和Java線程池用法內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解Spring注解@Autowired的實現(xiàn)原理和使用方法
在使用Spring開發(fā)的時候,配置的方式主要有兩種,一種是xml的方式,另外一種是 java config的方式,在使用的過程中,我們使用最多的注解應(yīng)該就是@Autowired注解了,所以本文就給大家講講@Autowired注解是如何使用和實現(xiàn)的,需要的朋友可以參考下2023-07-07
java將XML文檔轉(zhuǎn)換成json格式數(shù)據(jù)的示例
本篇文章主要介紹了java將XML文檔轉(zhuǎn)換成json格式數(shù)據(jù)的示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-12-12
struts2的國際化實現(xiàn)網(wǎng)站整體中英文切換實例代碼
本篇文章主要介紹了struts2的國際化實現(xiàn)網(wǎng)站整體中英文切換實例代碼,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-10-10
Java使用poi-tl1.9.1生成Word文檔的技巧分享
本文將簡單介紹poi-tl的相關(guān)知識,通過一個實際的案例實踐,充分介紹如何利用poi-tl進(jìn)行目標(biāo)文檔的生成,同時分享幾個不同的office版本如何進(jìn)行圖表生成的解決方案,需要的朋友可以參考下2023-09-09
Java設(shè)計模式編程中簡單工廠與抽象工廠模式的使用實例
這篇文章主要介紹了Java設(shè)計模式編程中簡單工廠與抽象工廠模式的使用實例,簡單工廠與抽象工廠都可以歸類于設(shè)計模式中的創(chuàng)建型模式,需要的朋友可以參考下2016-04-04
IDEA MyBatis Plugins自動生成實體類和mapper.xml
這篇文章主要介紹了IDEA MyBatis Plugins自動生成實體類和mapper.xml,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07

