SpringBoot執(zhí)行有返回值的異步任務問題
SpringBoot執(zhí)行有返回值異步任務
Springboot如何使用多線程處理異步任務,并且是代返回值的。
比如,我一個Controller層的接口,調用到Service層,在對應的Service方法中有三個方法,這三個方法都是去調Dao層查詢數(shù)據(jù)庫數(shù)據(jù),每個查詢平均耗時5s,最后將這三個查詢的結果進行合并計算。如果不采用異步查詢的至少需要15s的時間來處理這個請求。此時我們可以使用異步任務方式來操作,可以減少10s左右的時間。
在Springboot中使用異步需要開啟異步支持(@EnableAsync),代碼如下
@SpringBootApplication @EnableAsync public class PublisherApplication { ? ? public static void main(String[] args) { ? ? ? ? SpringApplication.run(PublisherApplication.class, args); ? ? } }
然后增加一個配置類,不過這個Springboot也會默認配置,一般我們使用線程池是都不使用默認的,而是使用自定義的:
/** ?* @author Mr. Zhang ?* @description 異步線程池 ?* @date 2019-04-19 14:21 ?* @website https://www.zhangguimin.cn ?*/ @Configuration public class AsyncConfig { ? ? private static final int THREADS = Runtime.getRuntime().availableProcessors(); ? ? @Bean("taskExecutor") ? ? public Executor execute() { ? ? ? ? System.out.println(THREADS); ? ? ? ? Executor executor = new ThreadPoolExecutor(THREADS, 2 * THREADS, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1024)); ? ? ? ? return executor; ? ? } }
此配置為IO密集型服務器參數(shù),即最大線程數(shù)為CPU核數(shù)*2(THREADS為CPU核數(shù)),其他拒絕策略默認即可。
在之后的異步任務中使用此出定義的線程池taskExecutor。
然后就是異步任務,在對應的任務方法上標注@Async(“taskExecutor”)即可,表示使用taskExecutor線程池中線程執(zhí)行異步任務。代碼參考:
@Service public class TaskServer { ? ? @Async("taskExecutor") ? ? public Future<Integer> asyncTask(CountDownLatch latch) { ? ? ? ? try { ? ? ? ? ? ? Thread.sleep(3000L); ? ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? }finally { ? ? ? ? ? ? latch.countDown(); ? ? ? ? } ? ? ? ? return new AsyncResult<>(10); ? ? } ? ? @Async("taskExecutor") ? ? public Future<Integer> asyncTask2(CountDownLatch latch) { ? ? ? ? try { ? ? ? ? ? ? Thread.sleep(3000L); ? ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? }finally { ? ? ? ? ? ? latch.countDown(); ? ? ? ? } ? ? ? ? return new AsyncResult<>(20); ? ? } ? ? @Async("taskExecutor") ? ? public Future<Integer> asyncTask3(CountDownLatch latch) { ? ? ? ? try { ? ? ? ? ? ? Thread.sleep(3000L); ? ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? }finally { ? ? ? ? ? ? latch.countDown(); ? ? ? ? } ? ? ? ? return new AsyncResult<>(30); ? ? } }
注意的是,需要在類上標注為組件注解,此處為@Service。
在這三個方法中執(zhí)行sleep,使線程都是sleep 3秒的時間。然后參數(shù)為CountDownLatch,在每次執(zhí)行完就countDown(),然后返回值為Future,如果執(zhí)行的異步方法沒有返回值可以不需要CountDownLatch和返回值就行。
在對應的測試類中執(zhí)行
@RunWith(SpringRunner.class) @SpringBootTest public class PublisherApplicationTests { ? ? @Autowired ? ? private TaskServer taskServer; ? ? @Test ? ? public void test() { ? ? ? ? CountDownLatch latch = new CountDownLatch(3); ? ? ? ? LocalTime time = LocalTime.now(); ? ? ? ? Future<Integer> integerFuture = taskServer.asyncTask(latch); ? ? ? ? Future<Integer> integerFuture1 = taskServer.asyncTask2(latch); ? ? ? ? Future<Integer> integerFuture2 = taskServer.asyncTask3(latch); ? ? ? ? try { ? ? ? ? ? ? latch.await(); ? ? ? ? ? ? LocalTime end =LocalTime.now(); ? ? ? ? ? ? System.out.println(Duration.between(time, end)); ? ? ? ? ? ? Integer integer = integerFuture.get(); ? ? ? ? ? ? Integer integer1 = integerFuture1.get(); ? ? ? ? ? ? Integer integer2 = integerFuture2.get(); ? ? ? ? ? ? int i = integer + integer1 + integer2; ? ? ? ? ? ? System.out.println(i); ? ? ? ? } catch (InterruptedException e) { ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? } catch (ExecutionException e) { ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? } ? ? } }
注意CountDownLatch latch = new CountDownLatch(3);有三個異步方法傳入的值為3,在調用的異步方法后面需要latch.await();
一定不要在latch.await();前去get()結果值,這樣會導致異步方法強制執(zhí)行。
另外,有的人也使用while循環(huán),在所有結果出來前一致在循環(huán)等待,類似自旋鎖的原理,對資源消耗較高,還有,如果一個任務出現(xiàn)錯誤,就可能會造成一只循環(huán)。而CountDownLatch可在await()方法中傳入等待時間,如果超過這個時間就會結束等待,直接完成下面的操作。
SpringBoot開啟有返回值的異步調用:三步搞定
1、線程池配置
package com.listen.demo.config; /** * @author liuxd * @version 1.0 * @date 2019-12-25 15:46 */ import com.listen.demo.service.MyTaskServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; import java.util.concurrent.ThreadPoolExecutor; @Configuration //@ComponentScan({"com.listen.demo"}) @EnableAsync public class ExecutorConfig { private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class); @Bean public Executor asyncServiceExecutor() { logger.info("start asyncServiceExecutor"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心線程數(shù) executor.setCorePoolSize(5); //配置最大線程數(shù) executor.setMaxPoolSize(5); //配置隊列大小 executor.setQueueCapacity(99999); //配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix("async-service-"); // rejection-policy:當pool已經達到max size的時候,如何處理新任務 // CALLER_RUNS:不在新線程中執(zhí)行任務,而是有調用者所在的線程來執(zhí)行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //執(zhí)行初始化 executor.initialize(); return executor; } @Bean public MyTaskServer myTaskServer(){ return new MyTaskServer(); } }
2、業(yè)務操作類
package com.listen.demo.service; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.stereotype.Service; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; /** * @author liuxd * @version 1.0 * @date 2019-12-25 15:48 */ @Service public class MyTaskServer { @Async("asyncServiceExecutor") public Future<Integer> asyncTask(CountDownLatch countDownLatch) throws Exception { Thread.sleep(1000L); countDownLatch.countDown(); return new AsyncResult<>(10); } @Async("asyncServiceExecutor") public Future<Integer> asyncTask2(CountDownLatch countDownLatch) throws Exception { Thread.sleep(2000L); countDownLatch.countDown(); return new AsyncResult<>(20); } @Async("asyncServiceExecutor") public Future<Integer> asyncTask3(CountDownLatch countDownLatch) throws Exception { Thread.sleep(3000L); countDownLatch.countDown(); return new AsyncResult<>(30); } }
3、異步+等待+有返回的測試類
package com.listen.demo; import com.listen.demo.config.ExecutorConfig; import com.listen.demo.service.MyTaskServer; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; /** * @author liuxd * @version 1.0 * @date 2019-12-25 16:03 */ public class TestAsyncTask { public static void main(String[] args) throws Exception { System.out.println("主線程:" + Thread.currentThread().getName() + "開始執(zhí)行調用任務..."); AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ExecutorConfig.class); MyTaskServer myTaskServer = context.getBean(MyTaskServer.class); CountDownLatch countDownLatch = new CountDownLatch(3); Future<Integer> future1 = myTaskServer.asyncTask(countDownLatch); Future<Integer> future2 = myTaskServer.asyncTask2(countDownLatch); Future<Integer> future3 = myTaskServer.asyncTask3(countDownLatch); countDownLatch.await(); Integer num1 = future1.get(); Integer num2 = future2.get(); Integer num3 = future3.get(); int data = num1 + num2 + num3; System.out.println("最終匯總計算結果:" + data); context.close(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("主線程:" + Thread.currentThread().getName() + "程序結束!!"); } }
總結
以上為個人經驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
Java中ResponseBodyEmitter的實現(xiàn)
這篇文章主要介紹了Java中ResponseBodyEmitter的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-03-03Springboot實現(xiàn)多線程注入bean的工具類操作
這篇文章主要介紹了Springboot實現(xiàn)多線程注入bean的工具類操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-08-08SpringBoot3整合Hutool-captcha實現(xiàn)圖形驗證碼
在整合技術框架的時候,想找一個圖形驗證碼相關的框架,看到很多驗證碼的maven庫不再更新了或中央倉庫下載不下來,還需要多引入依賴,后面看到了Hutool圖形驗證碼(Hutool-captcha)中對驗證碼的實現(xiàn),所以本文介紹了SpringBoot3整合Hutool-captcha實現(xiàn)圖形驗證碼2024-11-11Java實現(xiàn)定時讀取json文件里內容的示例代碼
有時候我們會需要定時來讀取JSON配置文件里的內容,來執(zhí)行一些業(yè)務邏輯上的操作,本文就介紹了Java實現(xiàn)定時讀取json文件里內容的示例代碼,感興趣的可以了解一下2023-08-08java靜態(tài)工具類注入service出現(xiàn)NullPointerException異常處理
如果我們要在我們自己封裝的Utils工具類中或者非controller普通類中使用@Autowired注解注入Service或者Mapper接口,直接注入是報錯的,因Utils用了靜態(tài)方法,我們無法直接用非靜態(tài)接口的,遇到這問題,我們要想法解決,下面小編就簡單介紹解決辦法,需要的朋友可參考下2021-09-09springboot過濾器執(zhí)行兩次的解決及跨域過濾器問題
這篇文章主要介紹了springboot過濾器執(zhí)行兩次的解決及跨域過濾器問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12