SpringBoot多線(xiàn)程進(jìn)行異步請(qǐng)求的處理方式
SpringBoot多線(xiàn)程進(jìn)行異步請(qǐng)求的處理
近期在協(xié)會(huì)博客園中,有人發(fā)布了博客,系統(tǒng)進(jìn)行查重的時(shí)候由于機(jī)器最低配置進(jìn)行大量計(jì)算時(shí)需要十秒左右時(shí)間才能處理完,由于一開(kāi)始是單例模式,導(dǎo)致,在某人查重的時(shí)候整個(gè)系統(tǒng)是不會(huì)再響應(yīng)別的請(qǐng)求的,導(dǎo)致了系統(tǒng)假死狀態(tài),那么我們就應(yīng)該使用多線(xiàn)程進(jìn)行處理,將這種不能快速返回結(jié)果的方法交給線(xiàn)程池進(jìn)行處理。
而我們自己使用Java Thread實(shí)現(xiàn)多線(xiàn)程又比較麻煩,在這里我們使用SpringBoot自帶的多線(xiàn)程支持,僅需兩步就可以對(duì)我們的查重方法利用多線(xiàn)程進(jìn)行處理
第一步:編寫(xiě)配置類(lèi)
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.Executor; @Configuration @EnableAsync // 啟用異步任務(wù) public class AsyncConfig { // 聲明一個(gè)線(xiàn)程池(并指定線(xiàn)程池的名字) @Bean("AsyncThread") public Executor asyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //核心線(xiàn)程數(shù)5:線(xiàn)程池創(chuàng)建時(shí)候初始化的線(xiàn)程數(shù) executor.setCorePoolSize(5); //最大線(xiàn)程數(shù)5:線(xiàn)程池最大的線(xiàn)程數(shù),只有在緩沖隊(duì)列滿(mǎn)了之后才會(huì)申請(qǐng)超過(guò)核心線(xiàn)程數(shù)的線(xiàn)程 executor.setMaxPoolSize(10); //緩沖隊(duì)列500:用來(lái)緩沖執(zhí)行任務(wù)的隊(duì)列 executor.setQueueCapacity(500); //允許線(xiàn)程的空閑時(shí)間60秒:當(dāng)超過(guò)了核心線(xiàn)程出之外的線(xiàn)程在空閑時(shí)間到達(dá)之后會(huì)被銷(xiāo)毀 executor.setKeepAliveSeconds(60); //線(xiàn)程池名的前綴:設(shè)置好了之后可以方便我們定位處理任務(wù)所在的線(xiàn)程池 executor.setThreadNamePrefix("AsyncThread-"); executor.initialize(); return executor; } }
第二步:對(duì)方法使用注解標(biāo)注為使用多線(xiàn)程進(jìn)行處理
并將方法改寫(xiě)返回類(lèi)型(因?yàn)椴荒芰⒓捶祷兀孕枰獙⒎祷刂蹈臑镃ompletableFuture<String>,
返回的時(shí)候使用return CompletableFuture.completedFuture(str);)
方法示范:
@Async("AsyncThread") @RequestMapping(value = "/addBlog") public CompletableFuture<String> addBlog(HttpSession httpSession, Blog blog, String blogId, boolean tempSave) { System.out.println("\n\n----------------------------------------------"); System.out.println(Thread.currentThread().getName() + "正在處理請(qǐng)求"); System.out.println("----------------------------------------------"); String result = "請(qǐng)求失敗"; //....你的業(yè)務(wù)邏輯 return CompletableFuture.completedFuture(result); }
這樣以后你的這個(gè)方法將會(huì)交由線(xiàn)程池去進(jìn)行處理,并將結(jié)果返回,一定要記得改返回值類(lèi)型,否則返回的將是空的。
SpringBoot請(qǐng)求線(xiàn)程優(yōu)化
在我們的實(shí)際生產(chǎn)中,常常會(huì)遇到下面的這種情況,某個(gè)請(qǐng)求非常耗時(shí)(大約5s返回),當(dāng)大量的訪(fǎng)問(wèn)該請(qǐng)求的時(shí)候,再請(qǐng)求其他服務(wù)時(shí),會(huì)造成沒(méi)有連接使用的情況,造成這種現(xiàn)象的主要原因是,我們的容器(tomcat)中線(xiàn)程的數(shù)量是一定的,例如500個(gè),當(dāng)這500個(gè)線(xiàn)程都用來(lái)請(qǐng)求服務(wù)的時(shí)候,再有請(qǐng)求進(jìn)來(lái),就沒(méi)有多余的連接可用了,只能拒絕連接。要是我們?cè)谡?qǐng)求耗時(shí)服務(wù)的時(shí)候,能夠異步請(qǐng)求(請(qǐng)求到controller中時(shí),則容器線(xiàn)程直接返回,然后使用系統(tǒng)內(nèi)部的線(xiàn)程來(lái)執(zhí)行耗時(shí)的服務(wù),等到服務(wù)有返回的時(shí)候,再將請(qǐng)求返回給客戶(hù)端),那么系統(tǒng)的吞吐量就會(huì)得到很大程度的提升了。當(dāng)然,大家可以直接使用Hystrix的資源隔離來(lái)實(shí)現(xiàn),今天我們的重點(diǎn)是spring mvc是怎么來(lái)實(shí)現(xiàn)這種異步請(qǐng)求的。
使用Callable來(lái)實(shí)現(xiàn)
controller如下:
@RestController public class HelloController {undefined private static final Logger logger = LoggerFactory.getLogger(HelloController.class); @Autowired private HelloService hello; @GetMapping("/helloworld") public String helloWorldController() {undefined return hello.sayHello(); } /** * 異步調(diào)用restful * 當(dāng)controller返回值是Callable的時(shí)候,springmvc就會(huì)啟動(dòng)一個(gè)線(xiàn)程將Callable交給TaskExecutor去處理 * 然后DispatcherServlet還有所有的spring攔截器都退出主線(xiàn)程,然后把response保持打開(kāi)的狀態(tài) * 當(dāng)Callable執(zhí)行結(jié)束之后,springmvc就會(huì)重新啟動(dòng)分配一個(gè)request請(qǐng)求,然后DispatcherServlet就重新 * 調(diào)用和處理Callable異步執(zhí)行的返回結(jié)果, 然后返回視圖 * * @return */ @GetMapping("/hello") public Callable<String> helloController() {undefined logger.info(Thread.currentThread().getName() + " 進(jìn)入helloController方法"); Callable<String> callable = new Callable<String>() {undefined @Override public String call() throws Exception {undefined logger.info(Thread.currentThread().getName() + " 進(jìn)入call方法"); String say = hello.sayHello(); logger.info(Thread.currentThread().getName() + " 從helloService方法返回"); return say; } }; logger.info(Thread.currentThread().getName() + " 從helloController方法返回"); return callable; } }
我們首先來(lái)看下上面這兩個(gè)請(qǐng)求的區(qū)別:
下面這個(gè)是沒(méi)有使用異步請(qǐng)求的
2017-12-07 18:05:42.351 INFO 3020 --- [nio-8060-exec-5] c.travelsky.controller.HelloController : http-nio-8060-exec-5 進(jìn)入helloWorldController方法
2017-12-07 18:05:42.351 INFO 3020 --- [nio-8060-exec-5] com.travelsky.service.HelloService : http-nio-8060-exec-5 進(jìn)入sayHello方法!
2017-12-07 18:05:44.351 INFO 3020 --- [nio-8060-exec-5] c.travelsky.controller.HelloController : http-nio-8060-exec-5 從helloWorldController方法返回
我們可以看到,請(qǐng)求從頭到尾都只有一個(gè)線(xiàn)程,并且整個(gè)請(qǐng)求耗費(fèi)了2s鐘的時(shí)間。
下面,我們?cè)賮?lái)看下使用Callable異步請(qǐng)求的結(jié)果:
2017-12-07 18:11:55.671 INFO 6196 --- [nio-8060-exec-1] c.travelsky.controller.HelloController : http-nio-8060-exec-1 進(jìn)入helloController方法
2017-12-07 18:11:55.672 INFO 6196 --- [nio-8060-exec-1] c.travelsky.controller.HelloController : http-nio-8060-exec-1 從helloController方法返回
2017-12-07 18:11:55.676 INFO 6196 --- [nio-8060-exec-1] c.t.i.MyAsyncHandlerInterceptor : http-nio-8060-exec-1 進(jìn)入afterConcurrentHandlingStarted方法
2017-12-07 18:11:55.676 INFO 6196 --- [ MvcAsync1] c.travelsky.controller.HelloController : MvcAsync1 進(jìn)入call方法
2017-12-07 18:11:55.676 INFO 6196 --- [ MvcAsync1] com.travelsky.service.HelloService : MvcAsync1 進(jìn)入sayHello方法!
2017-12-07 18:11:57.677 INFO 6196 --- [ MvcAsync1] c.travelsky.controller.HelloController : MvcAsync1 從helloService方法返回
2017-12-07 18:11:57.721 INFO 6196 --- [nio-8060-exec-2] c.t.i.MyAsyncHandlerInterceptor : http-nio-8060-exec-2服務(wù)調(diào)用完成,返回結(jié)果給客戶(hù)端
從上面的結(jié)果中,我們可以看出,容器的線(xiàn)程http-nio-8060-exec-1這個(gè)線(xiàn)程進(jìn)入controller之后,就立即返回了,具體的服務(wù)調(diào)用是通過(guò)MvcAsync2這個(gè)線(xiàn)程來(lái)做的,當(dāng)服務(wù)執(zhí)行完要返回后,容器會(huì)再啟一個(gè)新的線(xiàn)程http-nio-8060-exec-2來(lái)將結(jié)果返回給客戶(hù)端或?yàn)g覽器,整個(gè)過(guò)程response都是打開(kāi)的,當(dāng)有返回的時(shí)候,再?gòu)膕erver端推到response中去。
1、異步調(diào)用的另一種方式
上面的示例是通過(guò)callable來(lái)實(shí)現(xiàn)的異步調(diào)用,其實(shí)還可以通過(guò)WebAsyncTask,也能實(shí)現(xiàn)異步調(diào)用,下面看示例:
@RestController public class HelloController {undefined private static final Logger logger = LoggerFactory.getLogger(HelloController.class); @Autowired private HelloService hello; /** * 帶超時(shí)時(shí)間的異步請(qǐng)求 通過(guò)WebAsyncTask自定義客戶(hù)端超時(shí)間 * * @return */ @GetMapping("/world") public WebAsyncTask<String> worldController() {undefined logger.info(Thread.currentThread().getName() + " 進(jìn)入helloController方法"); // 3s鐘沒(méi)返回,則認(rèn)為超時(shí) WebAsyncTask<String> webAsyncTask = new WebAsyncTask<>(3000, new Callable<String>() {undefined @Override public String call() throws Exception {undefined logger.info(Thread.currentThread().getName() + " 進(jìn)入call方法"); String say = hello.sayHello(); logger.info(Thread.currentThread().getName() + " 從helloService方法返回"); return say; } }); logger.info(Thread.currentThread().getName() + " 從helloController方法返回"); webAsyncTask.onCompletion(new Runnable() {undefined @Override public void run() {undefined logger.info(Thread.currentThread().getName() + " 執(zhí)行完畢"); } }); webAsyncTask.onTimeout(new Callable<String>() {undefined @Override public String call() throws Exception {undefined logger.info(Thread.currentThread().getName() + " onTimeout"); // 超時(shí)的時(shí)候,直接拋異常,讓外層統(tǒng)一處理超時(shí)異常 throw new TimeoutException("調(diào)用超時(shí)"); } }); return webAsyncTask; } /** * 異步調(diào)用,異常處理,詳細(xì)的處理流程見(jiàn)MyExceptionHandler類(lèi) * * @return */ @GetMapping("/exception") public WebAsyncTask<String> exceptionController() {undefined logger.info(Thread.currentThread().getName() + " 進(jìn)入helloController方法"); Callable<String> callable = new Callable<String>() {undefined @Override public String call() throws Exception {undefined logger.info(Thread.currentThread().getName() + " 進(jìn)入call方法"); throw new TimeoutException("調(diào)用超時(shí)!"); } }; logger.info(Thread.currentThread().getName() + " 從helloController方法返回"); return new WebAsyncTask<>(20000, callable); } }
運(yùn)行結(jié)果如下:
2017-12-07 19:10:26.582 INFO 6196 --- [nio-8060-exec-4] c.travelsky.controller.HelloController : http-nio-8060-exec-4 進(jìn)入helloController方法
2017-12-07 19:10:26.585 INFO 6196 --- [nio-8060-exec-4] c.travelsky.controller.HelloController : http-nio-8060-exec-4 從helloController方法返回
2017-12-07 19:10:26.589 INFO 6196 --- [nio-8060-exec-4] c.t.i.MyAsyncHandlerInterceptor : http-nio-8060-exec-4 進(jìn)入afterConcurrentHandlingStarted方法
2017-12-07 19:10:26.591 INFO 6196 --- [ MvcAsync2] c.travelsky.controller.HelloController : MvcAsync2 進(jìn)入call方法
2017-12-07 19:10:26.591 INFO 6196 --- [ MvcAsync2] com.travelsky.service.HelloService : MvcAsync2 進(jìn)入sayHello方法!
2017-12-07 19:10:28.591 INFO 6196 --- [ MvcAsync2] c.travelsky.controller.HelloController : MvcAsync2 從helloService方法返回
2017-12-07 19:10:28.600 INFO 6196 --- [nio-8060-exec-5] c.t.i.MyAsyncHandlerInterceptor : http-nio-8060-exec-5服務(wù)調(diào)用完成,返回結(jié)果給客戶(hù)端
2017-12-07 19:10:28.601 INFO 6196 --- [nio-8060-exec-5] c.travelsky.controller.HelloController : http-nio-8060-exec-5 執(zhí)行完畢
這種方式和上面的callable方式最大的區(qū)別就是,WebAsyncTask支持超時(shí),并且還提供了兩個(gè)回調(diào)函數(shù),分別是onCompletion和onTimeout,顧名思義,這兩個(gè)回調(diào)函數(shù)分別在執(zhí)行完成和超時(shí)的時(shí)候回調(diào)。
2、Deferred方式實(shí)現(xiàn)異步調(diào)用
在我們是生產(chǎn)中,往往會(huì)遇到這樣的情景,controller中調(diào)用的方法很多都是和第三方有關(guān)的,例如JMS,定時(shí)任務(wù),隊(duì)列等,拿JMS來(lái)說(shuō),比如controller里面的服務(wù)需要從JMS中拿到返回值,才能給客戶(hù)端返回,而從JMS拿值這個(gè)過(guò)程也是異步的,這個(gè)時(shí)候,我們就可以通過(guò)Deferred來(lái)實(shí)現(xiàn)整個(gè)的異步調(diào)用。
首先,我們來(lái)模擬一個(gè)長(zhǎng)時(shí)間調(diào)用的任務(wù),代碼如下:
@Component public class LongTimeTask {undefined private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Async public void execute(DeferredResult<String> deferred){undefined logger.info(Thread.currentThread().getName() + "進(jìn)入 taskService 的 execute方法"); try {undefined // 模擬長(zhǎng)時(shí)間任務(wù)調(diào)用,睡眠2s TimeUnit.SECONDS.sleep(2); // 2s后給Deferred發(fā)送成功消息,告訴Deferred,我這邊已經(jīng)處理完了,可以返回給客戶(hù)端了 deferred.setResult("world"); } catch (InterruptedException e) {undefined e.printStackTrace(); } } }
接著,我們就來(lái)實(shí)現(xiàn)異步調(diào)用,controller如下:
@RestController public class AsyncDeferredController {undefined private final Logger logger = LoggerFactory.getLogger(this.getClass()); private final LongTimeTask taskService; @Autowired public AsyncDeferredController(LongTimeTask taskService) {undefined this.taskService = taskService; } @GetMapping("/deferred") public DeferredResult<String> executeSlowTask() {undefined logger.info(Thread.currentThread().getName() + "進(jìn)入executeSlowTask方法"); DeferredResult<String> deferredResult = new DeferredResult<>(); // 調(diào)用長(zhǎng)時(shí)間執(zhí)行任務(wù) taskService.execute(deferredResult); // 當(dāng)長(zhǎng)時(shí)間任務(wù)中使用deferred.setResult("world");這個(gè)方法時(shí),會(huì)從長(zhǎng)時(shí)間任務(wù)中返回,繼續(xù)controller里面的流程 logger.info(Thread.currentThread().getName() + "從executeSlowTask方法返回"); // 超時(shí)的回調(diào)方法 deferredResult.onTimeout(new Runnable(){undefined @Override public void run() {undefined logger.info(Thread.currentThread().getName() + " onTimeout"); // 返回超時(shí)信息 deferredResult.setErrorResult("time out!"); } }); // 處理完成的回調(diào)方法,無(wú)論是超時(shí)還是處理成功,都會(huì)進(jìn)入這個(gè)回調(diào)方法 deferredResult.onCompletion(new Runnable(){undefined @Override public void run() {undefined logger.info(Thread.currentThread().getName() + " onCompletion"); } }); return deferredResult; } }
執(zhí)行結(jié)果如下:
2017-12-07 19:25:40.192 INFO 6196 --- [nio-8060-exec-7] c.t.controller.AsyncDeferredController : http-nio-8060-exec-7進(jìn)入executeSlowTask方法
2017-12-07 19:25:40.193 INFO 6196 --- [nio-8060-exec-7] .s.a.AnnotationAsyncExecutionInterceptor : No TaskExecutor bean found for async processing
2017-12-07 19:25:40.194 INFO 6196 --- [nio-8060-exec-7] c.t.controller.AsyncDeferredController : http-nio-8060-exec-7從executeSlowTask方法返回
2017-12-07 19:25:40.198 INFO 6196 --- [nio-8060-exec-7] c.t.i.MyAsyncHandlerInterceptor : http-nio-8060-exec-7 進(jìn)入afterConcurrentHandlingStarted方法
2017-12-07 19:25:40.202 INFO 6196 --- [cTaskExecutor-1] com.travelsky.controller.LongTimeTask : SimpleAsyncTaskExecutor-1進(jìn)入 taskService 的 execute方法
2017-12-07 19:25:42.212 INFO 6196 --- [nio-8060-exec-8] c.t.i.MyAsyncHandlerInterceptor : http-nio-8060-exec-8服務(wù)調(diào)用完成,返回結(jié)果給客戶(hù)端
2017-12-07 19:25:42.213 INFO 6196 --- [nio-8060-exec-8] c.t.controller.AsyncDeferredController : http-nio-8060-exec-8 onCompletion
從上面的執(zhí)行結(jié)果不難看出,容器線(xiàn)程會(huì)立刻返回,應(yīng)用程序使用線(xiàn)程池里面的cTaskExecutor-1線(xiàn)程來(lái)完成長(zhǎng)時(shí)間任務(wù)的調(diào)用,當(dāng)調(diào)用完成后,容器又啟了一個(gè)連接線(xiàn)程,來(lái)返回最終的執(zhí)行結(jié)果。
這種異步調(diào)用,在容器線(xiàn)程資源非常寶貴的時(shí)候,能夠大大的提高整個(gè)系統(tǒng)的吞吐量。
ps:異步調(diào)用可以使用AsyncHandlerInterceptor進(jìn)行攔截,使用示例如下:
@Component public class MyAsyncHandlerInterceptor implements AsyncHandlerInterceptor {undefined private static final Logger logger = LoggerFactory.getLogger(MyAsyncHandlerInterceptor.class); @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {undefined return true; } @Override public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) throws Exception {undefined // HandlerMethod handlerMethod = (HandlerMethod) handler; logger.info(Thread.currentThread().getName()+ "服務(wù)調(diào)用完成,返回結(jié)果給客戶(hù)端"); } @Override public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {undefined if(null != ex){undefined System.out.println("發(fā)生異常:"+ex.getMessage()); } } @Override public void afterConcurrentHandlingStarted(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {undefined // 攔截之后,重新寫(xiě)回?cái)?shù)據(jù),將原來(lái)的hello world換成如下字符串 String resp = "my name is chhliu!"; response.setContentLength(resp.length()); response.getOutputStream().write(resp.getBytes()); logger.info(Thread.currentThread().getName() + " 進(jìn)入afterConcurrentHandlingStarted方法"); } }
有興趣的可以了解下,我這里的主題是異步調(diào)用,其他的相關(guān)知識(shí)點(diǎn),以后在講解吧。希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java多線(xiàn)程局域網(wǎng)聊天室的實(shí)現(xiàn)
在學(xué)習(xí)了一個(gè)學(xué)期的java以后,搞了一個(gè)多線(xiàn)程的聊天室,熟悉了一下服務(wù)器和客戶(hù)機(jī)的操作。感興趣的小伙伴們可以參考一下2021-06-06一文搞懂接口參數(shù)簽名與驗(yàn)簽(附含java python php版)
這篇文章主要為大家介紹了java python php不同版的接口參數(shù)簽名與驗(yàn)簽示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-06-06Java日常練習(xí)題,每天進(jìn)步一點(diǎn)點(diǎn)(3)
下面小編就為大家?guī)?lái)一篇Java基礎(chǔ)的幾道練習(xí)題(分享)。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧,希望可以幫到你2021-07-07淺談spring-boot 允許接口跨域并實(shí)現(xiàn)攔截(CORS)
本篇文章主要介紹了淺談spring-boot 允許接口跨域并實(shí)現(xiàn)攔截(CORS),具有一定的參考價(jià)值,有興趣的可以了解一下2017-08-08Java基礎(chǔ)知識(shí)精選 你答對(duì)了幾道?
精選Java基礎(chǔ)知識(shí)講解,看看你能答對(duì)多少?2017-09-09java構(gòu)造器 默認(rèn)構(gòu)造方法及參數(shù)化構(gòu)造方法
構(gòu)造器也叫構(gòu)造方法、構(gòu)造函數(shù),是一種特殊類(lèi)型的方法,負(fù)責(zé)類(lèi)中成員變量(域)的初始化。構(gòu)造器的用處是在創(chuàng)建對(duì)象時(shí)執(zhí)行初始化,當(dāng)創(chuàng)建一個(gè)對(duì)象時(shí),系統(tǒng)會(huì)為這個(gè)對(duì)象的實(shí)例進(jìn)行默認(rèn)的初始化,下面文章將進(jìn)入講解,需要的朋友可以參考下2021-10-10Java抽象類(lèi)和抽象方法定義與用法實(shí)例詳解
這篇文章主要介紹了Java抽象類(lèi)和抽象方法定義與用法,結(jié)合實(shí)例形式詳細(xì)分析了Java抽象類(lèi)和抽象方法相關(guān)原理、定義、使用方法及操作注意事項(xiàng),需要的朋友可以參考下2019-11-11集成apollo動(dòng)態(tài)日志取締logback-spring.xml配置
這篇文章主要為大家介紹了集成apollo動(dòng)態(tài)日志取締logback-spring.xml配置的過(guò)程內(nèi)容詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助2022-02-02Java超詳細(xì)講解多線(xiàn)程中的Process與Thread
進(jìn)程process:在一定的環(huán)境下,把靜態(tài)的程序代碼運(yùn)行起來(lái),通過(guò)使用不同的資源,來(lái)完成一定的任務(wù);線(xiàn)程thread:是程序中一個(gè)單一的順序控制流程。在單個(gè)進(jìn)程中同時(shí)運(yùn)行多個(gè)線(xiàn)程完成不同的工作,稱(chēng)為多線(xiàn)程2022-05-05