Java中基于DeferredResult的異步服務(wù)詳解
一. 簡介
Servlet3.0提供了基于servlet的異步處理api,Spring MVC只是將這些api進(jìn)行了一系列的封裝,從而實(shí)現(xiàn)了DeferredResult。
DeferredResult字面意思是"延遲結(jié)果",它允許Spring MVC收到請求后,立即釋放(歸還)容器線程,以便容器可以接收更多的外部請求,提升吞吐量,與此同時(shí),DeferredResult將陷入阻塞,直到我們主動(dòng)將結(jié)果set到DeferredResult,最后,DeferredResult會(huì)重新申請容器線程,并將本次請求返回給客戶端。
二. 使用
1. 監(jiān)聽器 onTimeout()
當(dāng)deferredResult被創(chuàng)建出來之后,執(zhí)行setResult()之前,這之間的時(shí)間超過設(shè)定值時(shí)(比如下方案例中設(shè)置為5秒超時(shí)),則被判定為超時(shí)。
DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L);
// 設(shè)置超時(shí)事件
deferredResult.onTimeout(() -> {
System.out.println("異步線程執(zhí)行超時(shí), 異步線程的名稱: " + Thread.currentThread().getName());
deferredResult.setResult("異步線程執(zhí)行超時(shí)");
});2. 監(jiān)聽器 onError()
當(dāng)onTimeout()或onCompletion()等回調(diào)函數(shù)中的代碼報(bào)錯(cuò)時(shí),則會(huì)執(zhí)行監(jiān)聽器onError()的回調(diào)函數(shù)。
PS: DeferredResult之外的代碼報(bào)錯(cuò)不會(huì)影響到onError()。
DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L);
// 設(shè)置異常事件
deferredResult.onError((throwable) -> {
System.out.println("異步請求出現(xiàn)錯(cuò)誤,異步線程的名稱: " + Thread.currentThread().getName() + "異常: " + throwable);
deferredResult.setErrorResult("異步線程執(zhí)行出錯(cuò)");
});3. 監(jiān)聽器 onCompletion()
代碼任意位置調(diào)用了同一個(gè)DeferredResult的setResult()后,則會(huì)被DeferredResult的onCompletion()監(jiān)聽器捕獲到。
Spring會(huì)任選一條容器線程來執(zhí)行onCompletion( )中的代碼(由于請求線程已被釋放(歸還),所以此處可能再次由同一條請求線程來處理,也可能由其他線程來處理)。
DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L);
// 設(shè)置完成事件
deferredResult.onCompletion(() -> {
System.out.println("異步線程執(zhí)行完畢,異步線程的名稱: " + Thread.currentThread().getName());
});完整的代碼為:
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@RestController
public class DemoController {
// 自定義線程池
public static ExecutorService exec = Executors.newCachedThreadPool();
@GetMapping("/demo")
public DeferredResult<String> demoResult() {
System.out.println("容器線程: " + Thread.currentThread().getName());
// 創(chuàng)建DeferredResult對(duì)象,設(shè)置超時(shí)時(shí)長 20秒
DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L);
// 設(shè)置超時(shí)事件
deferredResult.onTimeout(() -> {
System.out.println("異步線程執(zhí)行超時(shí), 異步線程的名稱: " + Thread.currentThread().getName());
// throw new RuntimeException("超時(shí)事件報(bào)錯(cuò)了!");
deferredResult.setResult("異步線程執(zhí)行超時(shí)");
});
// 設(shè)置異常事件
deferredResult.onError((throwable) -> {
System.out.println("異步請求出現(xiàn)錯(cuò)誤,異步線程的名稱: " + Thread.currentThread().getName() + "異常: " + throwable);
deferredResult.setErrorResult("異步線程執(zhí)行出錯(cuò)");
});
// 設(shè)置完成事件
deferredResult.onCompletion(() -> {
System.out.println("異步線程執(zhí)行完畢,異步線程的名稱: " + Thread.currentThread().getName());
});
exec.execute(() -> {
System.out.println("[線程池] 異步線程的名稱: " + Thread.currentThread().getName());
deferredResult.setResult("異步線程執(zhí)行完畢");
});
System.out.println("Servlet thread release");
return deferredResult;
}
}三. 拓展
有些業(yè)務(wù)場景下,我們希望新的請求觸發(fā)(激活)之前陷入阻塞的請求,此外可以通過不同的key來區(qū)分不同的請求。
比如apollo在實(shí)現(xiàn)時(shí)就利用了DeferredResult。客戶端向服務(wù)器端發(fā)送輪詢請求,服務(wù)端收到請求后,會(huì)立刻釋放容器線程,并阻塞本次請求,若apollo托管的配置文件沒有發(fā)生任何改變,則輪詢請求會(huì)超時(shí)(返回304)。當(dāng)有新的配置發(fā)布時(shí),服務(wù)端會(huì)調(diào)用DeferredResult setResult()方法,進(jìn)入onCompletion(),并使尚未超時(shí)的輪尋請求正常返回(200)。
大概如下:
@SpringBootApplication
public class DemoApplication implements WebMvcConfigurer {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.Collection;
@RestController
public class ApolloController {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
//guava中的Multimap,多值map,對(duì)map的增強(qiáng),一個(gè)key可以保持多個(gè)value
private Multimap<String, DeferredResult<String>> watchRequests = Multimaps.synchronizedSetMultimap(HashMultimap.create());
//模擬長輪詢
@RequestMapping(value = "/watch/{namespace}", method = RequestMethod.GET, produces = "text/html")
public DeferredResult<String> watch(@PathVariable("namespace") String namespace) {
logger.info("Request received");
DeferredResult<String> deferredResult = new DeferredResult<>();
//當(dāng)deferredResult完成時(shí)(不論是超時(shí)還是異常還是正常完成),移除watchRequests中相應(yīng)的watch key
deferredResult.onCompletion(new Runnable() {
@Override
public void run() {
System.out.println("remove key:" + namespace);
watchRequests.remove(namespace, deferredResult);
}
});
watchRequests.put(namespace, deferredResult);
logger.info("Servlet thread released");
return deferredResult;
}
//模擬發(fā)布namespace配置
@RequestMapping(value = "/publish/{namespace}", method = RequestMethod.GET, produces = "text/html")
public Object publishConfig(@PathVariable("namespace") String namespace) {
if (watchRequests.containsKey(namespace)) {
Collection<DeferredResult<String>> deferredResults = watchRequests.get(namespace);
Long time = System.currentTimeMillis();
//通知所有watch這個(gè)namespace變更的長輪訓(xùn)配置變更結(jié)果
for (DeferredResult<String> deferredResult : deferredResults) {
deferredResult.setResult(namespace + " changed:" + time);
}
}
return "success";
}
}當(dāng)請求超時(shí)的時(shí)候會(huì)產(chǎn)生AsyncRequestTimeoutException,我們定義一個(gè)全局異常捕獲類:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.context.request.async.AsyncRequestTimeoutException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ControllerAdvice
class GlobalControllerExceptionHandler {
protected static final Logger logger = LoggerFactory.getLogger(GlobalControllerExceptionHandler.class);
@ResponseStatus(HttpStatus.NOT_MODIFIED)//返回304狀態(tài)碼
@ResponseBody
@ExceptionHandler(AsyncRequestTimeoutException.class) //捕獲特定異常
public void handleAsyncRequestTimeoutException(AsyncRequestTimeoutException e, HttpServletRequest request) {
System.out.println("handleAsyncRequestTimeoutException");
}
}然后我們通過postman工具發(fā)送請求//localhost:8080/watch/mynamespace,請求會(huì)掛起,60秒后,DeferredResult超時(shí),客戶端正常收到了304狀態(tài)碼,表明在這個(gè)期間配置沒有變更過。
然后我們在模擬配置變更的情況,再次發(fā)起請求//localhost:8080/watch/mynamespace,等待個(gè)10秒鐘(不要超過60秒),然后調(diào)用//localhost:8080/publish/mynamespace,發(fā)布配置變更。這時(shí)postman會(huì)立刻收到response響應(yīng)結(jié)果:
mynamespace changed:1538880050147
表明在輪訓(xùn)期間有配置變更過。
這里我們用了一個(gè)MultiMap來存放所有輪訓(xùn)的請求,Key對(duì)應(yīng)的是namespace,value對(duì)應(yīng)的是所有watch這個(gè)namespace變更的異步請求DeferredResult,需要注意的是:在DeferredResult完成的時(shí)候記得移除MultiMap中相應(yīng)的key,避免內(nèi)存溢出請求。
采用這種長輪詢的好處是,相比一直循環(huán)請求服務(wù)器,實(shí)例一多的話會(huì)對(duì)服務(wù)器產(chǎn)生很大的壓力,http長輪詢的方式會(huì)在服務(wù)器變更的時(shí)候主動(dòng)推送給客戶端,其他時(shí)間客戶端是掛起請求的,這樣同時(shí)滿足了性能和實(shí)時(shí)性。
四. DeferredResult與Callable的區(qū)別
DeferredResult和Callable都可以在Controller層的方法中直接返回,請求收到后,釋放容器線程,在另一個(gè)線程中通過異步的方式執(zhí)行任務(wù),最后將請求返回給客戶端。
不同之處在于,使用Callable時(shí),當(dāng)其它線程中的任務(wù)執(zhí)行完畢后,請求會(huì)立刻返回給客戶端,而DeferredResult則需要用戶在代碼中手動(dòng)set值到DeferredResult,否則即便異步線程中的任務(wù)執(zhí)行完畢,DeferredResult仍然不會(huì)向客戶端返回任何結(jié)果。
到此這篇關(guān)于Java中基于DeferredResult的異步服務(wù)詳解的文章就介紹到這了,更多相關(guān)基于DeferredResult的異步服務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Maven pom的distributionManagement配置方式
文章主要介紹了Maven的distributionManagement配置方式,以及它的作用、配置方法和重要性,distributionManagement用于指定構(gòu)件的發(fā)布位置,包括下載URL、狀態(tài)等,文章還詳細(xì)解釋了如何配置repository和snapshotRepository,以及它們的用途和區(qū)別2025-01-01
springboot前端傳參date類型后臺(tái)處理的方式
這篇文章主要介紹了springboot前端傳參date類型后臺(tái)處理的方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07
基于HttpServletRequest 相關(guān)常用方法的應(yīng)用
本篇文章小編為大家介紹,基于HttpServletRequest 相關(guān)常用方法的應(yīng)用,需要的朋友參考下2013-04-04
SpringBoot整合Pulsar的實(shí)現(xiàn)示例
本文主要介紹了SpringBoot整合Pulsar的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07

