SpringBoot多線程進(jìn)行異步請求的處理方式
SpringBoot多線程進(jìn)行異步請求的處理
近期在協(xié)會博客園中,有人發(fā)布了博客,系統(tǒng)進(jìn)行查重的時(shí)候由于機(jī)器最低配置進(jìn)行大量計(jì)算時(shí)需要十秒左右時(shí)間才能處理完,由于一開始是單例模式,導(dǎo)致,在某人查重的時(shí)候整個(gè)系統(tǒng)是不會再響應(yīng)別的請求的,導(dǎo)致了系統(tǒng)假死狀態(tài),那么我們就應(yīng)該使用多線程進(jìn)行處理,將這種不能快速返回結(jié)果的方法交給線程池進(jìn)行處理。
而我們自己使用Java Thread實(shí)現(xiàn)多線程又比較麻煩,在這里我們使用SpringBoot自帶的多線程支持,僅需兩步就可以對我們的查重方法利用多線程進(jìn)行處理
第一步:編寫配置類
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableAsync // 啟用異步任務(wù)
public class AsyncConfig {
// 聲明一個(gè)線程池(并指定線程池的名字)
@Bean("AsyncThread")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心線程數(shù)5:線程池創(chuàng)建時(shí)候初始化的線程數(shù)
executor.setCorePoolSize(5);
//最大線程數(shù)5:線程池最大的線程數(shù),只有在緩沖隊(duì)列滿了之后才會申請超過核心線程數(shù)的線程
executor.setMaxPoolSize(10);
//緩沖隊(duì)列500:用來緩沖執(zhí)行任務(wù)的隊(duì)列
executor.setQueueCapacity(500);
//允許線程的空閑時(shí)間60秒:當(dāng)超過了核心線程出之外的線程在空閑時(shí)間到達(dá)之后會被銷毀
executor.setKeepAliveSeconds(60);
//線程池名的前綴:設(shè)置好了之后可以方便我們定位處理任務(wù)所在的線程池
executor.setThreadNamePrefix("AsyncThread-");
executor.initialize();
return executor;
}
}
第二步:對方法使用注解標(biāo)注為使用多線程進(jìn)行處理
并將方法改寫返回類型(因?yàn)椴荒芰⒓捶祷?,所以需要將返回值改為CompletableFuture<String>,
返回的時(shí)候使用return CompletableFuture.completedFuture(str);)
方法示范:
@Async("AsyncThread")
@RequestMapping(value = "/addBlog")
public CompletableFuture<String> addBlog(HttpSession httpSession, Blog blog, String blogId, boolean tempSave) {
System.out.println("\n\n----------------------------------------------");
System.out.println(Thread.currentThread().getName() + "正在處理請求");
System.out.println("----------------------------------------------");
String result = "請求失敗";
//....你的業(yè)務(wù)邏輯
return CompletableFuture.completedFuture(result);
}
這樣以后你的這個(gè)方法將會交由線程池去進(jìn)行處理,并將結(jié)果返回,一定要記得改返回值類型,否則返回的將是空的。
SpringBoot請求線程優(yōu)化
在我們的實(shí)際生產(chǎn)中,常常會遇到下面的這種情況,某個(gè)請求非常耗時(shí)(大約5s返回),當(dāng)大量的訪問該請求的時(shí)候,再請求其他服務(wù)時(shí),會造成沒有連接使用的情況,造成這種現(xiàn)象的主要原因是,我們的容器(tomcat)中線程的數(shù)量是一定的,例如500個(gè),當(dāng)這500個(gè)線程都用來請求服務(wù)的時(shí)候,再有請求進(jìn)來,就沒有多余的連接可用了,只能拒絕連接。要是我們在請求耗時(shí)服務(wù)的時(shí)候,能夠異步請求(請求到controller中時(shí),則容器線程直接返回,然后使用系統(tǒng)內(nèi)部的線程來執(zhí)行耗時(shí)的服務(wù),等到服務(wù)有返回的時(shí)候,再將請求返回給客戶端),那么系統(tǒng)的吞吐量就會得到很大程度的提升了。當(dāng)然,大家可以直接使用Hystrix的資源隔離來實(shí)現(xiàn),今天我們的重點(diǎn)是spring mvc是怎么來實(shí)現(xiàn)這種異步請求的。
使用Callable來實(shí)現(xiàn)
controller如下:
@RestController
public class HelloController {undefined
private static final Logger logger = LoggerFactory.getLogger(HelloController.class);
@Autowired
private HelloService hello;
@GetMapping("/helloworld")
public String helloWorldController() {undefined
return hello.sayHello();
}
/**
* 異步調(diào)用restful
* 當(dāng)controller返回值是Callable的時(shí)候,springmvc就會啟動一個(gè)線程將Callable交給TaskExecutor去處理
* 然后DispatcherServlet還有所有的spring攔截器都退出主線程,然后把response保持打開的狀態(tài)
* 當(dāng)Callable執(zhí)行結(jié)束之后,springmvc就會重新啟動分配一個(gè)request請求,然后DispatcherServlet就重新
* 調(diào)用和處理Callable異步執(zhí)行的返回結(jié)果, 然后返回視圖
*
* @return
*/
@GetMapping("/hello")
public Callable<String> helloController() {undefined
logger.info(Thread.currentThread().getName() + " 進(jìn)入helloController方法");
Callable<String> callable = new Callable<String>() {undefined
@Override
public String call() throws Exception {undefined
logger.info(Thread.currentThread().getName() + " 進(jìn)入call方法");
String say = hello.sayHello();
logger.info(Thread.currentThread().getName() + " 從helloService方法返回");
return say;
}
};
logger.info(Thread.currentThread().getName() + " 從helloController方法返回");
return callable;
}
}
我們首先來看下上面這兩個(gè)請求的區(qū)別:
下面這個(gè)是沒有使用異步請求的
2017-12-07 18:05:42.351 INFO 3020 --- [nio-8060-exec-5] c.travelsky.controller.HelloController : http-nio-8060-exec-5 進(jìn)入helloWorldController方法
2017-12-07 18:05:42.351 INFO 3020 --- [nio-8060-exec-5] com.travelsky.service.HelloService : http-nio-8060-exec-5 進(jìn)入sayHello方法!
2017-12-07 18:05:44.351 INFO 3020 --- [nio-8060-exec-5] c.travelsky.controller.HelloController : http-nio-8060-exec-5 從helloWorldController方法返回
我們可以看到,請求從頭到尾都只有一個(gè)線程,并且整個(gè)請求耗費(fèi)了2s鐘的時(shí)間。
下面,我們再來看下使用Callable異步請求的結(jié)果:
2017-12-07 18:11:55.671 INFO 6196 --- [nio-8060-exec-1] c.travelsky.controller.HelloController : http-nio-8060-exec-1 進(jìn)入helloController方法
2017-12-07 18:11:55.672 INFO 6196 --- [nio-8060-exec-1] c.travelsky.controller.HelloController : http-nio-8060-exec-1 從helloController方法返回
2017-12-07 18:11:55.676 INFO 6196 --- [nio-8060-exec-1] c.t.i.MyAsyncHandlerInterceptor : http-nio-8060-exec-1 進(jìn)入afterConcurrentHandlingStarted方法
2017-12-07 18:11:55.676 INFO 6196 --- [ MvcAsync1] c.travelsky.controller.HelloController : MvcAsync1 進(jìn)入call方法
2017-12-07 18:11:55.676 INFO 6196 --- [ MvcAsync1] com.travelsky.service.HelloService : MvcAsync1 進(jìn)入sayHello方法!
2017-12-07 18:11:57.677 INFO 6196 --- [ MvcAsync1] c.travelsky.controller.HelloController : MvcAsync1 從helloService方法返回
2017-12-07 18:11:57.721 INFO 6196 --- [nio-8060-exec-2] c.t.i.MyAsyncHandlerInterceptor : http-nio-8060-exec-2服務(wù)調(diào)用完成,返回結(jié)果給客戶端
從上面的結(jié)果中,我們可以看出,容器的線程http-nio-8060-exec-1這個(gè)線程進(jìn)入controller之后,就立即返回了,具體的服務(wù)調(diào)用是通過MvcAsync2這個(gè)線程來做的,當(dāng)服務(wù)執(zhí)行完要返回后,容器會再啟一個(gè)新的線程http-nio-8060-exec-2來將結(jié)果返回給客戶端或?yàn)g覽器,整個(gè)過程response都是打開的,當(dāng)有返回的時(shí)候,再從server端推到response中去。
1、異步調(diào)用的另一種方式
上面的示例是通過callable來實(shí)現(xiàn)的異步調(diào)用,其實(shí)還可以通過WebAsyncTask,也能實(shí)現(xiàn)異步調(diào)用,下面看示例:
@RestController
public class HelloController {undefined
private static final Logger logger = LoggerFactory.getLogger(HelloController.class);
@Autowired
private HelloService hello;
/**
* 帶超時(shí)時(shí)間的異步請求 通過WebAsyncTask自定義客戶端超時(shí)間
*
* @return
*/
@GetMapping("/world")
public WebAsyncTask<String> worldController() {undefined
logger.info(Thread.currentThread().getName() + " 進(jìn)入helloController方法");
// 3s鐘沒返回,則認(rèn)為超時(shí)
WebAsyncTask<String> webAsyncTask = new WebAsyncTask<>(3000, new Callable<String>() {undefined
@Override
public String call() throws Exception {undefined
logger.info(Thread.currentThread().getName() + " 進(jìn)入call方法");
String say = hello.sayHello();
logger.info(Thread.currentThread().getName() + " 從helloService方法返回");
return say;
}
});
logger.info(Thread.currentThread().getName() + " 從helloController方法返回");
webAsyncTask.onCompletion(new Runnable() {undefined
@Override
public void run() {undefined
logger.info(Thread.currentThread().getName() + " 執(zhí)行完畢");
}
});
webAsyncTask.onTimeout(new Callable<String>() {undefined
@Override
public String call() throws Exception {undefined
logger.info(Thread.currentThread().getName() + " onTimeout");
// 超時(shí)的時(shí)候,直接拋異常,讓外層統(tǒng)一處理超時(shí)異常
throw new TimeoutException("調(diào)用超時(shí)");
}
});
return webAsyncTask;
}
/**
* 異步調(diào)用,異常處理,詳細(xì)的處理流程見MyExceptionHandler類
*
* @return
*/
@GetMapping("/exception")
public WebAsyncTask<String> exceptionController() {undefined
logger.info(Thread.currentThread().getName() + " 進(jìn)入helloController方法");
Callable<String> callable = new Callable<String>() {undefined
@Override
public String call() throws Exception {undefined
logger.info(Thread.currentThread().getName() + " 進(jìn)入call方法");
throw new TimeoutException("調(diào)用超時(shí)!");
}
};
logger.info(Thread.currentThread().getName() + " 從helloController方法返回");
return new WebAsyncTask<>(20000, callable);
}
}
運(yùn)行結(jié)果如下:
2017-12-07 19:10:26.582 INFO 6196 --- [nio-8060-exec-4] c.travelsky.controller.HelloController : http-nio-8060-exec-4 進(jìn)入helloController方法
2017-12-07 19:10:26.585 INFO 6196 --- [nio-8060-exec-4] c.travelsky.controller.HelloController : http-nio-8060-exec-4 從helloController方法返回
2017-12-07 19:10:26.589 INFO 6196 --- [nio-8060-exec-4] c.t.i.MyAsyncHandlerInterceptor : http-nio-8060-exec-4 進(jìn)入afterConcurrentHandlingStarted方法
2017-12-07 19:10:26.591 INFO 6196 --- [ MvcAsync2] c.travelsky.controller.HelloController : MvcAsync2 進(jìn)入call方法
2017-12-07 19:10:26.591 INFO 6196 --- [ MvcAsync2] com.travelsky.service.HelloService : MvcAsync2 進(jìn)入sayHello方法!
2017-12-07 19:10:28.591 INFO 6196 --- [ MvcAsync2] c.travelsky.controller.HelloController : MvcAsync2 從helloService方法返回
2017-12-07 19:10:28.600 INFO 6196 --- [nio-8060-exec-5] c.t.i.MyAsyncHandlerInterceptor : http-nio-8060-exec-5服務(wù)調(diào)用完成,返回結(jié)果給客戶端
2017-12-07 19:10:28.601 INFO 6196 --- [nio-8060-exec-5] c.travelsky.controller.HelloController : http-nio-8060-exec-5 執(zhí)行完畢
這種方式和上面的callable方式最大的區(qū)別就是,WebAsyncTask支持超時(shí),并且還提供了兩個(gè)回調(diào)函數(shù),分別是onCompletion和onTimeout,顧名思義,這兩個(gè)回調(diào)函數(shù)分別在執(zhí)行完成和超時(shí)的時(shí)候回調(diào)。
2、Deferred方式實(shí)現(xiàn)異步調(diào)用
在我們是生產(chǎn)中,往往會遇到這樣的情景,controller中調(diào)用的方法很多都是和第三方有關(guān)的,例如JMS,定時(shí)任務(wù),隊(duì)列等,拿JMS來說,比如controller里面的服務(wù)需要從JMS中拿到返回值,才能給客戶端返回,而從JMS拿值這個(gè)過程也是異步的,這個(gè)時(shí)候,我們就可以通過Deferred來實(shí)現(xiàn)整個(gè)的異步調(diào)用。
首先,我們來模擬一個(gè)長時(shí)間調(diào)用的任務(wù),代碼如下:
@Component
public class LongTimeTask {undefined
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Async
public void execute(DeferredResult<String> deferred){undefined
logger.info(Thread.currentThread().getName() + "進(jìn)入 taskService 的 execute方法");
try {undefined
// 模擬長時(shí)間任務(wù)調(diào)用,睡眠2s
TimeUnit.SECONDS.sleep(2);
// 2s后給Deferred發(fā)送成功消息,告訴Deferred,我這邊已經(jīng)處理完了,可以返回給客戶端了
deferred.setResult("world");
} catch (InterruptedException e) {undefined
e.printStackTrace();
}
}
}
接著,我們就來實(shí)現(xiàn)異步調(diào)用,controller如下:
@RestController
public class AsyncDeferredController {undefined
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final LongTimeTask taskService;
@Autowired
public AsyncDeferredController(LongTimeTask taskService) {undefined
this.taskService = taskService;
}
@GetMapping("/deferred")
public DeferredResult<String> executeSlowTask() {undefined
logger.info(Thread.currentThread().getName() + "進(jìn)入executeSlowTask方法");
DeferredResult<String> deferredResult = new DeferredResult<>();
// 調(diào)用長時(shí)間執(zhí)行任務(wù)
taskService.execute(deferredResult);
// 當(dāng)長時(shí)間任務(wù)中使用deferred.setResult("world");這個(gè)方法時(shí),會從長時(shí)間任務(wù)中返回,繼續(xù)controller里面的流程
logger.info(Thread.currentThread().getName() + "從executeSlowTask方法返回");
// 超時(shí)的回調(diào)方法
deferredResult.onTimeout(new Runnable(){undefined
@Override
public void run() {undefined
logger.info(Thread.currentThread().getName() + " onTimeout");
// 返回超時(shí)信息
deferredResult.setErrorResult("time out!");
}
});
// 處理完成的回調(diào)方法,無論是超時(shí)還是處理成功,都會進(jìn)入這個(gè)回調(diào)方法
deferredResult.onCompletion(new Runnable(){undefined
@Override
public void run() {undefined
logger.info(Thread.currentThread().getName() + " onCompletion");
}
});
return deferredResult;
}
}
執(zhí)行結(jié)果如下:
2017-12-07 19:25:40.192 INFO 6196 --- [nio-8060-exec-7] c.t.controller.AsyncDeferredController : http-nio-8060-exec-7進(jìn)入executeSlowTask方法
2017-12-07 19:25:40.193 INFO 6196 --- [nio-8060-exec-7] .s.a.AnnotationAsyncExecutionInterceptor : No TaskExecutor bean found for async processing
2017-12-07 19:25:40.194 INFO 6196 --- [nio-8060-exec-7] c.t.controller.AsyncDeferredController : http-nio-8060-exec-7從executeSlowTask方法返回
2017-12-07 19:25:40.198 INFO 6196 --- [nio-8060-exec-7] c.t.i.MyAsyncHandlerInterceptor : http-nio-8060-exec-7 進(jìn)入afterConcurrentHandlingStarted方法
2017-12-07 19:25:40.202 INFO 6196 --- [cTaskExecutor-1] com.travelsky.controller.LongTimeTask : SimpleAsyncTaskExecutor-1進(jìn)入 taskService 的 execute方法
2017-12-07 19:25:42.212 INFO 6196 --- [nio-8060-exec-8] c.t.i.MyAsyncHandlerInterceptor : http-nio-8060-exec-8服務(wù)調(diào)用完成,返回結(jié)果給客戶端
2017-12-07 19:25:42.213 INFO 6196 --- [nio-8060-exec-8] c.t.controller.AsyncDeferredController : http-nio-8060-exec-8 onCompletion
從上面的執(zhí)行結(jié)果不難看出,容器線程會立刻返回,應(yīng)用程序使用線程池里面的cTaskExecutor-1線程來完成長時(shí)間任務(wù)的調(diào)用,當(dāng)調(diào)用完成后,容器又啟了一個(gè)連接線程,來返回最終的執(zhí)行結(jié)果。
這種異步調(diào)用,在容器線程資源非常寶貴的時(shí)候,能夠大大的提高整個(gè)系統(tǒng)的吞吐量。
ps:異步調(diào)用可以使用AsyncHandlerInterceptor進(jìn)行攔截,使用示例如下:
@Component
public class MyAsyncHandlerInterceptor implements AsyncHandlerInterceptor {undefined
private static final Logger logger = LoggerFactory.getLogger(MyAsyncHandlerInterceptor.class);
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {undefined
return true;
}
@Override
public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,
ModelAndView modelAndView) throws Exception {undefined
// HandlerMethod handlerMethod = (HandlerMethod) handler;
logger.info(Thread.currentThread().getName()+ "服務(wù)調(diào)用完成,返回結(jié)果給客戶端");
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex)
throws Exception {undefined
if(null != ex){undefined
System.out.println("發(fā)生異常:"+ex.getMessage());
}
}
@Override
public void afterConcurrentHandlingStarted(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {undefined
// 攔截之后,重新寫回?cái)?shù)據(jù),將原來的hello world換成如下字符串
String resp = "my name is chhliu!";
response.setContentLength(resp.length());
response.getOutputStream().write(resp.getBytes());
logger.info(Thread.currentThread().getName() + " 進(jìn)入afterConcurrentHandlingStarted方法");
}
}
有興趣的可以了解下,我這里的主題是異步調(diào)用,其他的相關(guān)知識點(diǎn),以后在講解吧。希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java多線程局域網(wǎng)聊天室的實(shí)現(xiàn)
在學(xué)習(xí)了一個(gè)學(xué)期的java以后,搞了一個(gè)多線程的聊天室,熟悉了一下服務(wù)器和客戶機(jī)的操作。感興趣的小伙伴們可以參考一下2021-06-06
一文搞懂接口參數(shù)簽名與驗(yàn)簽(附含java python php版)
這篇文章主要為大家介紹了java python php不同版的接口參數(shù)簽名與驗(yàn)簽示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-06-06
Java日常練習(xí)題,每天進(jìn)步一點(diǎn)點(diǎn)(3)
下面小編就為大家?guī)硪黄狫ava基礎(chǔ)的幾道練習(xí)題(分享)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧,希望可以幫到你2021-07-07
淺談spring-boot 允許接口跨域并實(shí)現(xiàn)攔截(CORS)
本篇文章主要介紹了淺談spring-boot 允許接口跨域并實(shí)現(xiàn)攔截(CORS),具有一定的參考價(jià)值,有興趣的可以了解一下2017-08-08
java構(gòu)造器 默認(rèn)構(gòu)造方法及參數(shù)化構(gòu)造方法
構(gòu)造器也叫構(gòu)造方法、構(gòu)造函數(shù),是一種特殊類型的方法,負(fù)責(zé)類中成員變量(域)的初始化。構(gòu)造器的用處是在創(chuàng)建對象時(shí)執(zhí)行初始化,當(dāng)創(chuàng)建一個(gè)對象時(shí),系統(tǒng)會為這個(gè)對象的實(shí)例進(jìn)行默認(rèn)的初始化,下面文章將進(jìn)入講解,需要的朋友可以參考下2021-10-10
集成apollo動態(tài)日志取締logback-spring.xml配置
這篇文章主要為大家介紹了集成apollo動態(tài)日志取締logback-spring.xml配置的過程內(nèi)容詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助2022-02-02
Java超詳細(xì)講解多線程中的Process與Thread
進(jìn)程process:在一定的環(huán)境下,把靜態(tài)的程序代碼運(yùn)行起來,通過使用不同的資源,來完成一定的任務(wù);線程thread:是程序中一個(gè)單一的順序控制流程。在單個(gè)進(jìn)程中同時(shí)運(yùn)行多個(gè)線程完成不同的工作,稱為多線程2022-05-05

