SpringBoot線程池和Java線程池的使用和實(shí)現(xiàn)原理解析
SpringBoot線程池和Java線程池的用法和實(shí)現(xiàn)原理
使用默認(rèn)的線程池
方式一:通過(guò)@Async注解調(diào)用
public class AsyncTest { @Async public void async(String name) throws InterruptedException { System.out.println("async" + name + " " + Thread.currentThread().getName()); Thread.sleep(1000); } }
啟動(dòng)類上需要添加@EnableAsync
注解,否則不會(huì)生效。
@SpringBootApplication //@EnableAsync public class Test1Application { public static void main(String[] args) throws InterruptedException { ConfigurableApplicationContext run = SpringApplication.run(Test1Application.class, args); AsyncTest bean = run.getBean(AsyncTest.class); for(int index = 0; index <= 10; ++index){ bean.async(String.valueOf(index)); } } }
方式二:直接注入 ThreadPoolTaskExecutor
此時(shí)可不加 @EnableAsync
注解
@SpringBootTest class Test1ApplicationTests { @Resource ThreadPoolTaskExecutor threadPoolTaskExecutor; @Test void contextLoads() { Runnable runnable = () -> { System.out.println(Thread.currentThread().getName()); }; for(int index = 0; index <= 10; ++index){ threadPoolTaskExecutor.submit(runnable); } } }
線程池默認(rèn)配置信息
SpringBoot線程池的常見(jiàn)配置:
spring: task: execution: pool: core-size: 8 max-size: 16 # 默認(rèn)是 Integer.MAX_VALUE keep-alive: 60s # 當(dāng)線程池中的線程數(shù)量大于 corePoolSize 時(shí),如果某線程空閑時(shí)間超過(guò)keepAliveTime,線程將被終止 allow-core-thread-timeout: true # 是否允許核心線程超時(shí),默認(rèn)true queue-capacity: 100 # 線程隊(duì)列的大小,默認(rèn)Integer.MAX_VALUE shutdown: await-termination: false # 線程關(guān)閉等待 thread-name-prefix: task- # 線程名稱的前綴
SpringBoot 線程池的實(shí)現(xiàn)原理
TaskExecutionAutoConfiguration
類中定義了 ThreadPoolTaskExecutor
,該類的內(nèi)部實(shí)現(xiàn)也是基于java原生的 ThreadPoolExecutor
類。initializeExecutor()
方法在其父類中被調(diào)用,但是在父類中 RejectedExecutionHandler
被定義為了 private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();
,并通過(guò)initialize()
方法將AbortPolicy
傳入initializeExecutor()
中。
注意在TaskExecutionAutoConfiguration
類中,ThreadPoolTaskExecutor
類的bean的名稱為: applicationTaskExecutor
和 taskExecutor
。
// TaskExecutionAutoConfiguration#applicationTaskExecutor() @Lazy @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME, AsyncAnnotationBeanPostProcessor.DEFAUL T_TASK_EXECUTOR_BEAN_NAME }) @ConditionalOnMissingBean(Executor.class) public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) { return builder.build(); }
// ThreadPoolTaskExecutor#initializeExecutor() @Override protected ExecutorService initializeExecutor( ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue<Runnable> queue = createQueue(this.queueCapacity); ThreadPoolExecutor executor; if (this.taskDecorator != null) { executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler) { @Override public void execute(Runnable command) { Runnable decorated = taskDecorator.decorate(command); if (decorated != command) { decoratedTaskMap.put(decorated, command); } super.execute(decorated); } }; } else { executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); } if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true); } this.threadPoolExecutor = executor; return executor; }
// ExecutorConfigurationSupport#initialize() public void initialize() { if (logger.isInfoEnabled()) { logger.info("Initializing ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : "")); } if (!this.threadNamePrefixSet && this.beanName != null) { setThreadNamePrefix(this.beanName + "-"); } this.executor = initializeExecutor(this.threadFactory, this.rejectedExecutionHandler); }
覆蓋默認(rèn)的線程池
覆蓋默認(rèn)的 taskExecutor
對(duì)象,bean的返回類型可以是ThreadPoolTaskExecutor
也可以是Executor
。
@Configuration public class ThreadPoolConfiguration { @Bean("taskExecutor") public ThreadPoolTaskExecutor taskExecutor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //設(shè)置線程池參數(shù)信息 taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("myExecutor--"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒絕策略為使用當(dāng)前線程執(zhí)行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化線程池 taskExecutor.initialize(); return taskExecutor; } }
管理多個(gè)線程池
如果出現(xiàn)了多個(gè)線程池,例如再定義一個(gè)線程池 taskExecutor2
,則直接執(zhí)行會(huì)報(bào)錯(cuò)。此時(shí)需要指定bean的名稱即可。
@Bean("taskExecutor2") public ThreadPoolTaskExecutor taskExecutor2() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //設(shè)置線程池參數(shù)信息 taskExecutor.setCorePoolSize(10); taskExecutor.setMaxPoolSize(50); taskExecutor.setQueueCapacity(200); taskExecutor.setKeepAliveSeconds(60); taskExecutor.setThreadNamePrefix("myExecutor2--"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60); //修改拒絕策略為使用當(dāng)前線程執(zhí)行 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //初始化線程池 taskExecutor.initialize(); return taskExecutor; }
引用線程池時(shí),需要將變量名更改為bean的名稱,這樣會(huì)按照名稱查找。
@Resource ThreadPoolTaskExecutor taskExecutor2;
對(duì)于使用@Async
注解的多線程則在注解中指定bean的名字即可。
@Async("taskExecutor2") public void async(String name) throws InterruptedException { System.out.println("async" + name + " " + Thread.currentThread().getName()); Thread.sleep(1000); }
線程池的四種拒絕策略
JAVA常用的四種線程池
ThreadPoolExecutor
類的構(gòu)造函數(shù)如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); }
newCachedThreadPool
不限制最大線程數(shù)(maximumPoolSize=Integer.MAX_VALUE
),如果有空閑的線程超過(guò)需要,則回收,否則重用已有的線程。
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
newFixedThreadPool
定長(zhǎng)線程池,超出線程數(shù)的任務(wù)會(huì)在隊(duì)列中等待。
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
newScheduledThreadPool
類似于newCachedThreadPool
,線程數(shù)無(wú)上限,但是可以指定corePoolSize
??蓪?shí)現(xiàn)延遲執(zhí)行、周期執(zhí)行。
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
周期執(zhí)行:
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.scheduleAtFixedRate(()->{ System.out.println("rate"); }, 1, 1, TimeUnit.SECONDS);
延時(shí)執(zhí)行:
scheduledThreadPool.schedule(()->{ System.out.println("delay 3 seconds"); }, 3, TimeUnit.SECONDS);
newSingleThreadExecutor
單線程線程池,可以實(shí)現(xiàn)線程的順序執(zhí)行。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
Java 線程池中的四種拒絕策略
CallerRunsPolicy
:線程池讓調(diào)用者去執(zhí)行。AbortPolicy
:如果線程池拒絕了任務(wù),直接報(bào)錯(cuò)。DiscardPolicy
:如果線程池拒絕了任務(wù),直接丟棄。DiscardOldestPolicy
:如果線程池拒絕了任務(wù),直接將線程池中最舊的,未運(yùn)行的任務(wù)丟棄,將新任務(wù)入隊(duì)。
CallerRunsPolicy
直接在主線程中執(zhí)行了run方法。
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
效果類似于:
Runnable thread = ()->{ System.out.println(Thread.currentThread().getName()); try { Thread.sleep(0); } catch (InterruptedException e) { throw new RuntimeException(e); } }; thread.run();
AbortPolicy
直接拋出RejectedExecutionException
異常,并指示任務(wù)的信息,線程池的信息。、
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
DiscardPolicy
什么也不做。
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
DiscardOldestPolicy
e.getQueue().poll()
: 取出隊(duì)列最舊的任務(wù)。e.execute(r)
: 當(dāng)前任務(wù)入隊(duì)。
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
Java 線程復(fù)用的原理
java
的線程池中保存的是 java.util.concurrent.ThreadPoolExecutor.Worker
對(duì)象,該對(duì)象在 被維護(hù)在private final HashSet<Worker> workers = new HashSet<Worker>();
。workQueue
是保存待執(zhí)行的任務(wù)的隊(duì)列,線程池中加入新的任務(wù)時(shí),會(huì)將任務(wù)加入到workQueue
隊(duì)列中。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
work對(duì)象的執(zhí)行依賴于 runWorker()
,與我們平時(shí)寫的線程不同,該線程處在一個(gè)循環(huán)中,并不斷地從隊(duì)列中獲取新的任務(wù)執(zhí)行。因此線程池中的線程才可以復(fù)用,而不是像我們平常使用的線程一樣執(zhí)行完畢就結(jié)束。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
到此這篇關(guān)于SpringBoot線程池和Java線程池的用法和實(shí)現(xiàn)原理的文章就介紹到這了,更多相關(guān)SpringBoot線程池和Java線程池用法內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解Spring注解@Autowired的實(shí)現(xiàn)原理和使用方法
在使用Spring開發(fā)的時(shí)候,配置的方式主要有兩種,一種是xml的方式,另外一種是 java config的方式,在使用的過(guò)程中,我們使用最多的注解應(yīng)該就是@Autowired注解了,所以本文就給大家講講@Autowired注解是如何使用和實(shí)現(xiàn)的,需要的朋友可以參考下2023-07-07java將XML文檔轉(zhuǎn)換成json格式數(shù)據(jù)的示例
本篇文章主要介紹了java將XML文檔轉(zhuǎn)換成json格式數(shù)據(jù)的示例,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-12-12struts2的國(guó)際化實(shí)現(xiàn)網(wǎng)站整體中英文切換實(shí)例代碼
本篇文章主要介紹了struts2的國(guó)際化實(shí)現(xiàn)網(wǎng)站整體中英文切換實(shí)例代碼,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-10-10Java使用poi-tl1.9.1生成Word文檔的技巧分享
本文將簡(jiǎn)單介紹poi-tl的相關(guān)知識(shí),通過(guò)一個(gè)實(shí)際的案例實(shí)踐,充分介紹如何利用poi-tl進(jìn)行目標(biāo)文檔的生成,同時(shí)分享幾個(gè)不同的office版本如何進(jìn)行圖表生成的解決方案,需要的朋友可以參考下2023-09-09簡(jiǎn)單實(shí)現(xiàn)java音樂(lè)播放器
這篇文章主要為大家詳細(xì)介紹了java實(shí)現(xiàn)音樂(lè)播放器的相關(guān)代碼,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-06-06Java設(shè)計(jì)模式編程中簡(jiǎn)單工廠與抽象工廠模式的使用實(shí)例
這篇文章主要介紹了Java設(shè)計(jì)模式編程中簡(jiǎn)單工廠與抽象工廠模式的使用實(shí)例,簡(jiǎn)單工廠與抽象工廠都可以歸類于設(shè)計(jì)模式中的創(chuàng)建型模式,需要的朋友可以參考下2016-04-04IDEA MyBatis Plugins自動(dòng)生成實(shí)體類和mapper.xml
這篇文章主要介紹了IDEA MyBatis Plugins自動(dòng)生成實(shí)體類和mapper.xml,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07Java JDK11基于嵌套的訪問(wèn)控制的實(shí)現(xiàn)
這篇文章主要介紹了Java JDK11基于嵌套的訪問(wèn)控制的實(shí)現(xiàn),小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2019-01-01