SpringBoot中實現(xiàn)異步調(diào)用@Async詳解
為什么要用異步框架,它解決什么問題?
在SpringBoot的日常開發(fā)中,一般都是同步調(diào)用的。但實際中有很多場景非常適合使用異步來處理,如:注冊新用戶,送100個積分;或下單成功,發(fā)送push消息等等。
就拿注冊新用戶這個用例來說,為什么要異步處理?
- 第一個原因:容錯性、健壯性,如果送積分出現(xiàn)異常,不能因為送積分而導(dǎo)致用戶注冊失??;因為用戶注冊是主要功能,送積分是次要功能,即使送積分異常也要提示用戶注冊成功,然后后面在針對積分異常做補(bǔ)償處理。
- 第二個原因:提升性能,例如注冊用戶花了20毫秒,送積分花費(fèi)50毫秒,如果用同步的話,總耗時70毫秒,用異步的話,無需等待積分,故耗時20毫秒。
故,異步能解決2個問題,性能和容錯性。
SpringBoot如何實現(xiàn)異步調(diào)用?
對于異步方法調(diào)用,從Spring3開始提供了@Async注解,我們只需要在方法上標(biāo)注此注解,此方法即可實現(xiàn)異步調(diào)用。
當(dāng)然,我們還需要一個配置類,通過Enable模塊驅(qū)動注解@EnableAsync來開啟異步功能。
實現(xiàn)異步調(diào)用
第一步:新建配置類,開啟@Async功能支持
使用@EnableAsync來開啟異步任務(wù)支持,@EnableAsync注解可以直接放在SpringBoot啟動類上,也可以單獨放在其他配置類上。我們這里選擇使用單獨的配置類AsyncConfiguration。
至于為什么使用線程池,后面會講到。
import lombok.extern.slf4j.Slf4j; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * 描述:異步配置 */ @Slf4j @Configuration @EnableAsync // 可放在啟動類上或單獨的配置類 public class AsyncConfiguration implements AsyncConfigurer { @Bean(name = "asyncPoolTaskExecutor") public ThreadPoolTaskExecutor executor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //核心線程數(shù) taskExecutor.setCorePoolSize(10); //線程池維護(hù)線程的最大數(shù)量,只有在緩沖隊列滿了之后才會申請超過核心線程數(shù)的線程 taskExecutor.setMaxPoolSize(100); //緩存隊列 taskExecutor.setQueueCapacity(50); //設(shè)置線程的空閑時間,當(dāng)超過了核心線程出之外的線程在空閑時間到達(dá)之后會被銷毀 taskExecutor.setKeepAliveSeconds(200); //異步方法內(nèi)部線程名稱 taskExecutor.setThreadNamePrefix("async-"); /** * 當(dāng)線程池的任務(wù)緩存隊列已滿并且線程池中的線程數(shù)目達(dá)到maximumPoolSize,如果還有任務(wù)到來就會采取任務(wù)拒絕策略 * 通常有以下四種策略: * ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。 * ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。 * ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程) * ThreadPoolExecutor.CallerRunsPolicy:重試添加當(dāng)前的任務(wù),自動重復(fù)調(diào)用 execute() 方法,直到成功 */ taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.initialize(); return taskExecutor; } /** * 指定默認(rèn)線程池 * The {@link Executor} instance to be used when processing async method invocations. */ @Override public Executor getAsyncExecutor() { return executor(); } /** * The {@link AsyncUncaughtExceptionHandler} instance to be used * when an exception is thrown during an asynchronous method execution * with {@code void} return type. */ @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) -> log.error("線程池執(zhí)行任務(wù)發(fā)送未知錯誤, 執(zhí)行方法:{}", method.getName(), ex); } }
第二步:在方法上標(biāo)記異步調(diào)用
增加一個Component類,用來進(jìn)行業(yè)務(wù)處理,同時添加@Async注解,代表該方法為異步處理。
import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; /** * 描述:異步方法調(diào)用 */ @Slf4j @Component public class AsyncTask { @SneakyThrows @Async // 在異步配置類設(shè)置了默認(rèn)線程池則不需要再指定線程池名稱 // @Async("asyncPoolTaskExecutor") public void doTask1() { long t1 = System.currentTimeMillis(); Thread.sleep(2000); long t2 = System.currentTimeMillis(); log.info("task1方法耗時 {} ms" , t2-t1); } @SneakyThrows @Async // @Async("otherPoolTaskExecutor") // 其他線程池的名稱 public void doTask2() { long t1 = System.currentTimeMillis(); Thread.sleep(3000); long t2 = System.currentTimeMillis(); log.info("task2方法耗時 {} ms" , t2-t1); } }
第三步:在Controller中進(jìn)行異步方法調(diào)用
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @Slf4j @RestController @RequestMapping("/async") public class AsyncController { @Autowired private AsyncTask asyncTask; @RequestMapping("/task") public void task() throws InterruptedException { long t1 = System.currentTimeMillis(); asyncTask.doTask1(); asyncTask.doTask2(); Thread.sleep(1000); long t2 = System.currentTimeMillis(); log.info("main方法耗時{} ms", t2-t1); } }
通過訪問//localhost:8080/async/task查看控制臺日志:
[2021-12-01 20:21:36.036] [INFO] [http-nio-8080-exec-1] - task(AsyncController.java:27) - main方法耗時 1009 ms
[2021-12-01 20:21:37.037] [INFO] [async-1] - doTask1(AsyncTask.java:23) - task1方法耗時 2004 ms
[2021-12-01 20:21:38.038] [INFO] [async-2] - doTask2(AsyncTask.java:32) - task2方法耗時 3003 ms
通過日志可以看到:主線程不需要等待異步方法執(zhí)行完成,減少了響應(yīng)時間,提高了接口性能。
通過上面三步我們就可以在SpringBoot中使用異步方法來提高我們接口性能了。
為什么要給@Async自定義線程池?
使用@Async注解,在默認(rèn)情況下用的是SimpleAsyncTaskExecutor線程池,該線程池不是真正意義上的線程池。
使用此線程池?zé)o法實現(xiàn)線程重用,每次調(diào)用都會新建一條線程。若系統(tǒng)中不斷的創(chuàng)建線程,最終會導(dǎo)致系統(tǒng)占用內(nèi)存過高,引發(fā)OutOfMemoryError錯誤,關(guān)鍵代碼如下:
public void execute(Runnable task, long startTimeout) { Assert.notNull(task, "Runnable must not be null"); Runnable taskToUse = this.taskDecorator != null ? this.taskDecorator.decorate(task) : task; //判斷是否開啟限流,默認(rèn)為否 if (this.isThrottleActive() && startTimeout > 0L) { //執(zhí)行前置操作,進(jìn)行限流 this.concurrencyThrottle.beforeAccess(); this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(taskToUse)); } else { //未限流的情況,執(zhí)行線程任務(wù) this.doExecute(taskToUse); } } protected void doExecute(Runnable task) { //不斷創(chuàng)建線程 Thread thread = this.threadFactory != null ? this.threadFactory.newThread(task) : this.createThread(task); thread.start(); } //創(chuàng)建線程 public Thread createThread(Runnable runnable) { //指定線程名,task-1,task-2... Thread thread = new Thread(this.getThreadGroup(), runnable, this.nextThreadName()); thread.setPriority(this.getThreadPriority()); thread.setDaemon(this.isDaemon()); return thread; }
我們也可以直接通過上面的控制臺日志觀察,每次打印的線程名都是[task-1]、[task-2]、[task-3]、[task-4]…遞增的。
正因如此,所以我們在使用Spring中的@Async異步框架時一定要自定義線程池,替代默認(rèn)的SimpleAsyncTaskExecutor。
Spring提供了多種線程池
- SimpleAsyncTaskExecutor:不是真的線程池,這個類不重用線程,每次調(diào)用都會創(chuàng)建一個新的線程。
- SyncTaskExecutor:這個類沒有實現(xiàn)異步調(diào)用,只是一個同步操作。只適用于不需要多線程的地
- ConcurrentTaskExecutor:Executor的適配類,不推薦使用。如果ThreadPoolTaskExecutor不滿足要求時,才用考慮使用這個類
- ThreadPoolTaskScheduler:可以使用cron表達(dá)式
- ThreadPoolTaskExecutor:最常使用,推薦。其實質(zhì)是對java.util.concurrent.ThreadPoolExecutor的包裝
為@Async實現(xiàn)一個自定義線程池
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ThreadPoolExecutor; /** * 描述:異步配置2 */ @Configuration @EnableAsync // 可放在啟動類上或單獨的配置類 public class AsyncConfiguration2 { @Bean(name = "asyncPoolTaskExecutor") public ThreadPoolTaskExecutor executor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //核心線程數(shù) taskExecutor.setCorePoolSize(10); //線程池維護(hù)線程的最大數(shù)量,只有在緩沖隊列滿了之后才會申請超過核心線程數(shù)的線程 taskExecutor.setMaxPoolSize(100); //緩存隊列 taskExecutor.setQueueCapacity(50); //許的空閑時間,當(dāng)超過了核心線程出之外的線程在空閑時間到達(dá)之后會被銷毀 taskExecutor.setKeepAliveSeconds(200); //異步方法內(nèi)部線程名稱 taskExecutor.setThreadNamePrefix("async-"); /** * 當(dāng)線程池的任務(wù)緩存隊列已滿并且線程池中的線程數(shù)目達(dá)到maximumPoolSize,如果還有任務(wù)到來就會采取任務(wù)拒絕策略 * 通常有以下四種策略: * ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。 * ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。 * ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程) * ThreadPoolExecutor.CallerRunsPolicy:重試添加當(dāng)前的任務(wù),自動重復(fù)調(diào)用 execute() 方法,直到成功 */ taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.initialize(); return taskExecutor; } }
配置自定義線程池以后我們就可以大膽的使用@Async提供的異步處理能力了。
多個線程池處理
在現(xiàn)實的互聯(lián)網(wǎng)項目開發(fā)中,針對高并發(fā)的請求,一般的做法是高并發(fā)接口單獨線程池隔離處理。
假設(shè)現(xiàn)在2個高并發(fā)接口:一個是修改用戶信息接口,刷新用戶redis緩存;一個是下訂單接口,發(fā)送app push信息。往往會根據(jù)接口特征定義兩個線程池,這時候我們在使用@Async時就需要通過指定線程池名稱進(jìn)行區(qū)分。
為@Async指定線程池名字
@SneakyThrows @Async("asyncPoolTaskExecutor") public void doTask1() { long t1 = System.currentTimeMillis(); Thread.sleep(2000); long t2 = System.currentTimeMillis(); log.info("task1方法耗時 {} ms" , t2-t1); }
當(dāng)系統(tǒng)存在多個線程池時,我們也可以配置一個默認(rèn)線程池,對于非默認(rèn)的異步任務(wù)再通過@Async(“otherTaskExecutor”)來指定線程池名稱。
配置默認(rèn)線程池
可以修改配置類讓其實現(xiàn)AsyncConfigurer,并重寫getAsyncExecutor()方法,指定默認(rèn)線程池:
import lombok.extern.slf4j.Slf4j; import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.AsyncConfigurer; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; /** * 描述:異步配置 */ @Slf4j @Configuration @EnableAsync // 可放在啟動類上或單獨的配置類 public class AsyncConfiguration implements AsyncConfigurer { @Bean(name = "asyncPoolTaskExecutor") public ThreadPoolTaskExecutor executor() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); //核心線程數(shù) taskExecutor.setCorePoolSize(10); //線程池維護(hù)線程的最大數(shù)量,只有在緩沖隊列滿了之后才會申請超過核心線程數(shù)的線程 taskExecutor.setMaxPoolSize(100); //緩存隊列 taskExecutor.setQueueCapacity(50); //設(shè)置線程的空閑時間,當(dāng)超過了核心線程出之外的線程在空閑時間到達(dá)之后會被銷毀 taskExecutor.setKeepAliveSeconds(200); //異步方法內(nèi)部線程名稱 taskExecutor.setThreadNamePrefix("async-"); /** * 當(dāng)線程池的任務(wù)緩存隊列已滿并且線程池中的線程數(shù)目達(dá)到maximumPoolSize,如果還有任務(wù)到來就會采取任務(wù)拒絕策略 * 通常有以下四種策略: * ThreadPoolExecutor.AbortPolicy:丟棄任務(wù)并拋出RejectedExecutionException異常。 * ThreadPoolExecutor.DiscardPolicy:也是丟棄任務(wù),但是不拋出異常。 * ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務(wù),然后重新嘗試執(zhí)行任務(wù)(重復(fù)此過程) * ThreadPoolExecutor.CallerRunsPolicy:重試添加當(dāng)前的任務(wù),自動重復(fù)調(diào)用 execute() 方法,直到成功 */ taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); taskExecutor.initialize(); return taskExecutor; } /** * 指定默認(rèn)線程池 * The {@link Executor} instance to be used when processing async method invocations. */ @Override public Executor getAsyncExecutor() { return executor(); } /** * The {@link AsyncUncaughtExceptionHandler} instance to be used * when an exception is thrown during an asynchronous method execution * with {@code void} return type. */ @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return (ex, method, params) -> log.error("線程池執(zhí)行任務(wù)發(fā)送未知錯誤, 執(zhí)行方法:{}", method.getName(), ex); } }
如下,doTask1()方法使用默認(rèn)使用線程池asyncPoolTaskExecutor,doTask2()使用線程池otherTaskExecutor,非常靈活。
@SneakyThrows @Async public void doTask1() { long t1 = System.currentTimeMillis(); Thread.sleep(2000); long t2 = System.currentTimeMillis(); log.info("task1方法耗時 {} ms" , t2-t1); } @SneakyThrows @Async("otherTaskExecutor") public void doTask2() { long t1 = System.currentTimeMillis(); Thread.sleep(3000); long t2 = System.currentTimeMillis(); log.info("task2方法耗時 {} ms" , t2-t1); }
使用@Async注解可能會導(dǎo)致的問題
如果serviceA、serviceB對象之間相互依賴,serviceA和serviceB總一個一個會先實例化,而serviceA或serviceB里面使用了@Async注解,會導(dǎo)致循環(huán)依賴異常:org.springframework.beans.factory.BeanCurrentlyInCreationException
在springboot中,以上報錯被捕捉,拋出的異常是: The dependencies of some of the beans in the application context form a cycle
原因
我們知道,spring三級緩存一定程度上解決了循環(huán)依賴問題。A對象在實例化之后,屬性賦值【opulateBean(beanName, mbd, instanceWrapper)】執(zhí)行之前,將ObjectFactory添加至三級緩存中,從而使得在B對象實例化后的屬性賦值過程中,能從三級緩存拿到ObjectFactory,調(diào)用getObject()方法拿到A的引用,B由此能順利完成初始化并加入到IOC容器。此時A對象完成屬性賦值之后,將會執(zhí)行初始化【initializeBean(beanName, exposedObject, mbd)方法】,重點是@Async注解的處理正是在這地方完成的,其對應(yīng)的后置處理器AsyncAnnotationBeanPostProcessor,在postProcessAfterInitialization方法中將返回代理對象,此代理對象與B中持有的A對象引用不同,導(dǎo)致了以上報錯。
解決辦法
1.在A類上加@Lazy,保證A對象實例化晚于B對象
2.不使用@Async注解,通過自定義異步工具類發(fā)起異步線程(線程池)
3.不要讓@Async的Bean參與循環(huán)依賴
到此這篇關(guān)于SpringBoot中實現(xiàn)異步調(diào)用@Async詳解的文章就介紹到這了,更多相關(guān)SpringBoot異步調(diào)用@Async內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
intellij idea中安裝、配置mybatis插件Free Mybatis plugin的教程詳解
這篇文章主要介紹了intellij idea中安裝、配置mybatis插件Free Mybatis plugin的教程,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-09-09Spring security用戶URL權(quán)限FilterSecurityInterceptor使用解析
這篇文章主要介紹了Spring security用戶URL權(quán)限FilterSecurityInterceptor使用解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-12-12java鏈表數(shù)據(jù)結(jié)構(gòu)LinkedList插入刪除元素時間復(fù)雜度面試精講
這篇文章主要為大家介紹了java LinkedList插入和刪除元素的時間復(fù)雜度面試精講,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10SpringBoot整合Quartz及異步調(diào)用的案例
Quartz是一個完全由java編寫的開源作業(yè)調(diào)度框架、它的簡單易用受到業(yè)內(nèi)人士的一致好評,這篇文章主要介紹了SpringBoot整合Quartz及異步調(diào)用,需要的朋友可以參考下2023-03-03SpringBoot使用Sharding-JDBC實現(xiàn)數(shù)據(jù)分片和讀寫分離的方法
本文主要介紹了SpringBoot使用Sharding-JDBC實現(xiàn)數(shù)據(jù)分片和讀寫分離,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-10-10