Spring Boot配置線程池拒絕策略的場(chǎng)景分析(妥善處理好溢出的任務(wù))
通過之前三篇關(guān)于Spring Boot異步任務(wù)實(shí)現(xiàn)的博文,我們分別學(xué)會(huì)了用@Async創(chuàng)建異步任務(wù)、為異步任務(wù)配置線程池、使用多個(gè)線程池隔離不同的異步任務(wù)。今天這篇,我們繼續(xù)對(duì)上面的知識(shí)進(jìn)行完善和優(yōu)化!
如果你已經(jīng)看過上面幾篇內(nèi)容并已經(jīng)掌握之后,一起來思考下面這個(gè)問題:
假設(shè),線程池配置為核心線程數(shù)2、最大線程數(shù)2、緩沖隊(duì)列長(zhǎng)度2。此時(shí),有5個(gè)異步任務(wù)同時(shí)開始,會(huì)發(fā)生什么?
場(chǎng)景重現(xiàn)
我們先來把上面的假設(shè)用代碼實(shí)現(xiàn)一下:
第一步:創(chuàng)建Spring Boot應(yīng)用,根據(jù)上面的假設(shè)寫好線程池配置。
@EnableAsync @SpringBootApplication public class Chapter78Application { public static void main(String[] args) { SpringApplication.run(Chapter78Application.class, args); } @EnableAsync @Configuration class TaskPoolConfig { @Bean public Executor taskExecutor1() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(2); executor.setMaxPoolSize(2); executor.setQueueCapacity(2); executor.setKeepAliveSeconds(60); executor.setThreadNamePrefix("executor-1-"); return executor; } } }
第二步:用@Async
注解實(shí)現(xiàn)一個(gè)部分任務(wù)
@Slf4j @Component public class AsyncTasks { public static Random random = new Random(); @Async("taskExecutor1") public CompletableFuture<String> doTaskOne(String taskNo) throws Exception { log.info("開始任務(wù):{}", taskNo); long start = System.currentTimeMillis(); Thread.sleep(random.nextInt(10000)); long end = System.currentTimeMillis(); log.info("完成任務(wù):{},耗時(shí):{} 毫秒", taskNo, end - start); return CompletableFuture.completedFuture("任務(wù)完成"); } }
第三步:編寫測(cè)試用例
@Slf4j @SpringBootTest public class Chapter78ApplicationTests { @Autowired private AsyncTasks asyncTasks; @Test public void test2() throws Exception { // 線程池配置:core-2,max-2,queue=2,同時(shí)有5個(gè)任務(wù),出現(xiàn)下面異常: // org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@59901c4d[Running, pool size = 2, // active threads = 0, queued tasks = 2, completed tasks = 4]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@408e96d9 long start = System.currentTimeMillis(); // 線程池1 CompletableFuture<String> task1 = asyncTasks.doTaskOne("1"); CompletableFuture<String> task2 = asyncTasks.doTaskOne("2"); CompletableFuture<String> task3 = asyncTasks.doTaskOne("3"); CompletableFuture<String> task4 = asyncTasks.doTaskOne("4"); CompletableFuture<String> task5 = asyncTasks.doTaskOne("5"); // 一起執(zhí)行 CompletableFuture.allOf(task1, task2, task3, task4, task5).join(); long end = System.currentTimeMillis(); log.info("任務(wù)全部完成,總耗時(shí):" + (end - start) + "毫秒"); } }
執(zhí)行一下,可以類似下面這樣的日志信息:
2021-09-22 17:33:08.159 INFO 21119 --- [ executor-1-2] com.didispace.chapter78.AsyncTasks : 開始任務(wù):2
2021-09-22 17:33:08.159 INFO 21119 --- [ executor-1-1] com.didispace.chapter78.AsyncTasks : 開始任務(wù):1org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task: java.util.concurrent.CompletableFuture$AsyncSupply@64968732
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:324)
at java.util.concurrent.CompletableFuture.asyncSupplyStage(CompletableFuture.java:1604)
at java.util.concurrent.CompletableFuture.supplyAsync(CompletableFuture.java:1830)
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.doSubmit(AsyncExecutionAspectSupport.java:274)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.invoke(AsyncExecutionInterceptor.java:129)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692)
at com.didispace.chapter78.AsyncTasks$$EnhancerBySpringCGLIB$$c7e8d57b.doTaskOne(<generated>)
at com.didispace.chapter78.Chapter78ApplicationTests.test2(Chapter78ApplicationTests.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at java.util.ArrayList.forEach(ArrayList.java:1255)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at java.util.ArrayList.forEach(ArrayList.java:1255)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@64968732 rejected from java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:321)
... 74 more
從異常信息org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@3e1a3801[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]] did not accept task:
中,可以很明確的知道,第5個(gè)任務(wù)因?yàn)槌^了執(zhí)行線程+緩沖隊(duì)列長(zhǎng)度,而被拒絕了。
所有,默認(rèn)情況下,線程池的拒絕策略是:當(dāng)線程池隊(duì)列滿了,會(huì)丟棄這個(gè)任務(wù),并拋出異常。
配置拒絕策略
雖然線程池有默認(rèn)的拒絕策略,但實(shí)際開發(fā)過程中,有些業(yè)務(wù)場(chǎng)景,直接拒絕的策略往往并不適用,有時(shí)候我們可能會(huì)選擇舍棄最早開始執(zhí)行而未完成的任務(wù)、也可能會(huì)選擇舍棄剛開始執(zhí)行而未完成的任務(wù)等更貼近業(yè)務(wù)需要的策略。所以,為線程池配置其他拒絕策略或自定義拒絕策略是很常見的需求,那么這個(gè)要怎么實(shí)現(xiàn)呢?
下面就來具體說說今天的正題,如何為線程池配置拒絕策略、如何自定義拒絕策略。
看下面這段代碼的最后一行,setRejectedExecutionHandler
方法就是為線程池設(shè)置拒絕策略的方法:
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //...其他線程池配置 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
在ThreadPoolExecutor
中提供了4種線程的策略可以供開發(fā)者直接使用,你只需要像下面這樣設(shè)置即可:
// AbortPolicy策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // DiscardPolicy策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); // DiscardOldestPolicy策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); // CallerRunsPolicy策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
這四個(gè)策略對(duì)應(yīng)的含義分別是:
- AbortPolicy策略:默認(rèn)策略,如果線程池隊(duì)列滿了丟掉這個(gè)任務(wù)并且拋出RejectedExecutionException異常。
- DiscardPolicy策略:如果線程池隊(duì)列滿了,會(huì)直接丟掉這個(gè)任務(wù)并且不會(huì)有任何異常。
- DiscardOldestPolicy策略:如果隊(duì)列滿了,會(huì)將最早進(jìn)入隊(duì)列的任務(wù)刪掉騰出空間,再嘗試加入隊(duì)列。
- CallerRunsPolicy策略:如果添加到線程池失敗,那么主線程會(huì)自己去執(zhí)行該任務(wù),不會(huì)等待線程池中的線程去執(zhí)行。
而如果你要自定義一個(gè)拒絕策略,那么可以這樣寫:
executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 拒絕策略的邏輯 } });
當(dāng)然如果你喜歡用Lamba表達(dá)式,也可以這樣寫:
executor.setRejectedExecutionHandler((r, executor1) -> { // 拒絕策略的邏輯 });
好了,今天的學(xué)習(xí)就到這里!
代碼示例
本文的完整工程可以查看下面?zhèn)}庫中2.x
目錄下的chapter7-8
工程:
- Github:https://github.com/dyc87112/SpringBoot-Learning/
- Gitee:https://gitee.com/didispace/SpringBoot-Learning/
到此這篇關(guān)于Spring Boot配置線程池拒絕策略,妥善處理好溢出的任務(wù)的文章就介紹到這了,更多相關(guān)Spring Boot配置線程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- springboot中@Async默認(rèn)線程池導(dǎo)致OOM問題
- SpringBoot2線程池定義使用方法解析
- springboot2.0以上調(diào)度器配置線程池的實(shí)現(xiàn)
- Spring boot注解@Async線程池實(shí)例詳解
- Spring Boot中配置定時(shí)任務(wù)、線程池與多線程池執(zhí)行的方法
- SpringBoot策略模式的實(shí)踐使用
- 淺談Spring Boot: 接口壓測(cè)及簡(jiǎn)要優(yōu)化策略
- 詳解SpringBoot結(jié)合策略模式實(shí)戰(zhàn)套路
- 如何修改覆蓋spring boot默認(rèn)日志策略logback詳解
相關(guān)文章
Spring深入分析講解BeanUtils的實(shí)現(xiàn)
java知識(shí)體系統(tǒng)有很多數(shù)據(jù)實(shí)體,比較常用的DTO、BO、DO、VO等,其他類似POJO概念太老了現(xiàn)在基本廢棄掉了,本篇幅直接忽略,對(duì)于這幾種數(shù)據(jù)實(shí)體各自代表的含義和應(yīng)用場(chǎng)景先做一下簡(jiǎn)單描述和分析2022-06-06Flutter 通過Clipper實(shí)現(xiàn)各種自定義形狀的示例代碼
這篇文章主要介紹了Flutter 通過Clipper實(shí)現(xiàn)各種自定義形狀的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-12-12JavaSE static final及abstract修飾符實(shí)例解析
這篇文章主要介紹了JavaSE static final及abstract修飾符實(shí)例解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-06-06解決nacos報(bào)錯(cuò)java.lang.ClassNotFoundException: com.netflix.
這篇文章主要介紹了解決nacos報(bào)錯(cuò)java.lang.ClassNotFoundException: com.netflix.config.DynamicPropertyFactory的問題,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-06-06Spring?WebMVC初始化Controller流程詳解
這篇文章主要介紹了Spring?WebMVC初始化Controller流程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-02-02Java中數(shù)組與集合的相互轉(zhuǎn)換實(shí)現(xiàn)解析
這篇文章主要介紹了Java中數(shù)組與集合的相互轉(zhuǎn)換實(shí)現(xiàn)解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-08-08Spring Kafka中@KafkaListener注解的參數(shù)與使用小結(jié)
@KafkaListener注解為開發(fā)者提供了一種聲明式的方式來定義消息監(jiān)聽器,本文主要介紹了Spring Kafka中@KafkaListener注解的參數(shù)與使用小結(jié),具有一定的參考價(jià)值,感興趣的可以了解一下2024-06-06