SpringBoot執(zhí)行有返回值的異步任務(wù)問(wèn)題
SpringBoot執(zhí)行有返回值異步任務(wù)
Springboot如何使用多線程處理異步任務(wù),并且是代返回值的。
比如,我一個(gè)Controller層的接口,調(diào)用到Service層,在對(duì)應(yīng)的Service方法中有三個(gè)方法,這三個(gè)方法都是去調(diào)Dao層查詢數(shù)據(jù)庫(kù)數(shù)據(jù),每個(gè)查詢平均耗時(shí)5s,最后將這三個(gè)查詢的結(jié)果進(jìn)行合并計(jì)算。如果不采用異步查詢的至少需要15s的時(shí)間來(lái)處理這個(gè)請(qǐng)求。此時(shí)我們可以使用異步任務(wù)方式來(lái)操作,可以減少10s左右的時(shí)間。
在Springboot中使用異步需要開(kāi)啟異步支持(@EnableAsync),代碼如下
@SpringBootApplication @EnableAsync public class PublisherApplication { ? ? public static void main(String[] args) { ? ? ? ? SpringApplication.run(PublisherApplication.class, args); ? ? } }
然后增加一個(gè)配置類,不過(guò)這個(gè)Springboot也會(huì)默認(rèn)配置,一般我們使用線程池是都不使用默認(rèn)的,而是使用自定義的:
/** ?* @author Mr. Zhang ?* @description 異步線程池 ?* @date 2019-04-19 14:21 ?* @website https://www.zhangguimin.cn ?*/ @Configuration public class AsyncConfig { ? ? private static final int THREADS = Runtime.getRuntime().availableProcessors(); ? ? @Bean("taskExecutor") ? ? public Executor execute() { ? ? ? ? System.out.println(THREADS); ? ? ? ? Executor executor = new ThreadPoolExecutor(THREADS, 2 * THREADS, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024)); ? ? ? ? return executor; ? ? } }
此配置為IO密集型服務(wù)器參數(shù),即最大線程數(shù)為CPU核數(shù)*2(THREADS為CPU核數(shù)),其他拒絕策略默認(rèn)即可。
在之后的異步任務(wù)中使用此出定義的線程池taskExecutor。
然后就是異步任務(wù),在對(duì)應(yīng)的任務(wù)方法上標(biāo)注@Async(“taskExecutor”)即可,表示使用taskExecutor線程池中線程執(zhí)行異步任務(wù)。代碼參考:
@Service public class TaskServer { ? ? @Async("taskExecutor") ? ? public Future<Integer> asyncTask(CountDownLatch latch) { ? ? ? ? try { ? ? ? ? ? ? Thread.sleep(3000L); ? ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? }finally { ? ? ? ? ? ? latch.countDown(); ? ? ? ? } ? ? ? ? return new AsyncResult<>(10); ? ? } ? ? @Async("taskExecutor") ? ? public Future<Integer> asyncTask2(CountDownLatch latch) { ? ? ? ? try { ? ? ? ? ? ? Thread.sleep(3000L); ? ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? }finally { ? ? ? ? ? ? latch.countDown(); ? ? ? ? } ? ? ? ? return new AsyncResult<>(20); ? ? } ? ? @Async("taskExecutor") ? ? public Future<Integer> asyncTask3(CountDownLatch latch) { ? ? ? ? try { ? ? ? ? ? ? Thread.sleep(3000L); ? ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? }finally { ? ? ? ? ? ? latch.countDown(); ? ? ? ? } ? ? ? ? return new AsyncResult<>(30); ? ? } }
注意的是,需要在類上標(biāo)注為組件注解,此處為@Service。
在這三個(gè)方法中執(zhí)行sleep,使線程都是sleep 3秒的時(shí)間。然后參數(shù)為CountDownLatch,在每次執(zhí)行完就countDown(),然后返回值為Future,如果執(zhí)行的異步方法沒(méi)有返回值可以不需要CountDownLatch和返回值就行。
在對(duì)應(yīng)的測(cè)試類中執(zhí)行
@RunWith(SpringRunner.class) @SpringBootTest public class PublisherApplicationTests { ? ? @Autowired ? ? private TaskServer taskServer; ? ? @Test ? ? public void test() { ? ? ? ? CountDownLatch latch = new CountDownLatch(3); ? ? ? ? LocalTime time = LocalTime.now(); ? ? ? ? Future<Integer> integerFuture = taskServer.asyncTask(latch); ? ? ? ? Future<Integer> integerFuture1 = taskServer.asyncTask2(latch); ? ? ? ? Future<Integer> integerFuture2 = taskServer.asyncTask3(latch); ? ? ? ? try { ? ? ? ? ? ? latch.await(); ? ? ? ? ? ? LocalTime end =LocalTime.now(); ? ? ? ? ? ? System.out.println(Duration.between(time, end)); ? ? ? ? ? ? Integer integer = integerFuture.get(); ? ? ? ? ? ? Integer integer1 = integerFuture1.get(); ? ? ? ? ? ? Integer integer2 = integerFuture2.get(); ? ? ? ? ? ? int i = integer + integer1 + integer2; ? ? ? ? ? ? System.out.println(i); ? ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? } catch (ExecutionException e) { ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? } ? ? } }
注意CountDownLatch latch = new CountDownLatch(3);有三個(gè)異步方法傳入的值為3,在調(diào)用的異步方法后面需要latch.await();
一定不要在latch.await();前去get()結(jié)果值,這樣會(huì)導(dǎo)致異步方法強(qiáng)制執(zhí)行。
另外,有的人也使用while循環(huán),在所有結(jié)果出來(lái)前一致在循環(huán)等待,類似自旋鎖的原理,對(duì)資源消耗較高,還有,如果一個(gè)任務(wù)出現(xiàn)錯(cuò)誤,就可能會(huì)造成一只循環(huán)。而CountDownLatch可在await()方法中傳入等待時(shí)間,如果超過(guò)這個(gè)時(shí)間就會(huì)結(jié)束等待,直接完成下面的操作。
SpringBoot開(kāi)啟有返回值的異步調(diào)用:三步搞定
1、線程池配置
package com.listen.demo.config; /** * @author liuxd * @version 1.0 * @date 2019-12-25 15:46 */ import com.listen.demo.service.MyTaskServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @Configuration //@ComponentScan({"com.listen.demo"}) @EnableAsync public class ExecutorConfig { private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class); @Bean public Executor asyncServiceExecutor() { logger.info("start asyncServiceExecutor"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心線程數(shù) executor.setCorePoolSize(5); //配置最大線程數(shù) executor.setMaxPoolSize(5); //配置隊(duì)列大小 executor.setQueueCapacity(99999); //配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix("async-service-"); // rejection-policy:當(dāng)pool已經(jīng)達(dá)到max size的時(shí)候,如何處理新任務(wù) // CALLER_RUNS:不在新線程中執(zhí)行任務(wù),而是有調(diào)用者所在的線程來(lái)執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //執(zhí)行初始化 executor.initialize(); return executor; } @Bean public MyTaskServer myTaskServer(){ return new MyTaskServer(); } }
2、業(yè)務(wù)操作類
package com.listen.demo.service; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Service; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; /** * @author liuxd * @version 1.0 * @date 2019-12-25 15:48 */ @Service public class MyTaskServer { @Async("asyncServiceExecutor") public Future<Integer> asyncTask(CountDownLatch countDownLatch) throws Exception { Thread.sleep(1000L); countDownLatch.countDown(); return new AsyncResult<>(10); } @Async("asyncServiceExecutor") public Future<Integer> asyncTask2(CountDownLatch countDownLatch) throws Exception { Thread.sleep(2000L); countDownLatch.countDown(); return new AsyncResult<>(20); } @Async("asyncServiceExecutor") public Future<Integer> asyncTask3(CountDownLatch countDownLatch) throws Exception { Thread.sleep(3000L); countDownLatch.countDown(); return new AsyncResult<>(30); } }
3、異步+等待+有返回的測(cè)試類
package com.listen.demo; import com.listen.demo.config.ExecutorConfig; import com.listen.demo.service.MyTaskServer; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; /** * @author liuxd * @version 1.0 * @date 2019-12-25 16:03 */ public class TestAsyncTask { public static void main(String[] args) throws Exception { System.out.println("主線程:" + Thread.currentThread().getName() + "開(kāi)始執(zhí)行調(diào)用任務(wù)..."); AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ExecutorConfig.class); MyTaskServer myTaskServer = context.getBean(MyTaskServer.class); CountDownLatch countDownLatch = new CountDownLatch(3); Future<Integer> future1 = myTaskServer.asyncTask(countDownLatch); Future<Integer> future2 = myTaskServer.asyncTask2(countDownLatch); Future<Integer> future3 = myTaskServer.asyncTask3(countDownLatch); countDownLatch.await(); Integer num1 = future1.get(); Integer num2 = future2.get(); Integer num3 = future3.get(); int data = num1 + num2 + num3; System.out.println("最終匯總計(jì)算結(jié)果:" + data); context.close(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("主線程:" + Thread.currentThread().getName() + "程序結(jié)束!!"); } }
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot+Redis哨兵模式的實(shí)現(xiàn)
本文主要介紹了SpringBoot+Redis哨兵模式的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-05-05Java中ResponseBodyEmitter的實(shí)現(xiàn)
這篇文章主要介紹了Java中ResponseBodyEmitter的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03Springboot實(shí)現(xiàn)多線程注入bean的工具類操作
這篇文章主要介紹了Springboot實(shí)現(xiàn)多線程注入bean的工具類操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-08-08SpringBoot3整合Hutool-captcha實(shí)現(xiàn)圖形驗(yàn)證碼
在整合技術(shù)框架的時(shí)候,想找一個(gè)圖形驗(yàn)證碼相關(guān)的框架,看到很多驗(yàn)證碼的maven庫(kù)不再更新了或中央倉(cāng)庫(kù)下載不下來(lái),還需要多引入依賴,后面看到了Hutool圖形驗(yàn)證碼(Hutool-captcha)中對(duì)驗(yàn)證碼的實(shí)現(xiàn),所以本文介紹了SpringBoot3整合Hutool-captcha實(shí)現(xiàn)圖形驗(yàn)證碼2024-11-11Java實(shí)現(xiàn)定時(shí)讀取json文件里內(nèi)容的示例代碼
有時(shí)候我們會(huì)需要定時(shí)來(lái)讀取JSON配置文件里的內(nèi)容,來(lái)執(zhí)行一些業(yè)務(wù)邏輯上的操作,本文就介紹了Java實(shí)現(xiàn)定時(shí)讀取json文件里內(nèi)容的示例代碼,感興趣的可以了解一下2023-08-08java靜態(tài)工具類注入service出現(xiàn)NullPointerException異常處理
如果我們要在我們自己封裝的Utils工具類中或者非controller普通類中使用@Autowired注解注入Service或者M(jìn)apper接口,直接注入是報(bào)錯(cuò)的,因Utils用了靜態(tài)方法,我們無(wú)法直接用非靜態(tài)接口的,遇到這問(wèn)題,我們要想法解決,下面小編就簡(jiǎn)單介紹解決辦法,需要的朋友可參考下2021-09-09springboot過(guò)濾器執(zhí)行兩次的解決及跨域過(guò)濾器問(wèn)題
這篇文章主要介紹了springboot過(guò)濾器執(zhí)行兩次的解決及跨域過(guò)濾器問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12