Java中基于DeferredResult的異步服務(wù)詳解
一. 簡(jiǎn)介
Servlet3.0提供了基于servlet的異步處理api,Spring MVC只是將這些api進(jìn)行了一系列的封裝,從而實(shí)現(xiàn)了DeferredResult。
DeferredResult字面意思是"延遲結(jié)果",它允許Spring MVC收到請(qǐng)求后,立即釋放(歸還)容器線程,以便容器可以接收更多的外部請(qǐng)求,提升吞吐量,與此同時(shí),DeferredResult將陷入阻塞,直到我們主動(dòng)將結(jié)果set到DeferredResult,最后,DeferredResult會(huì)重新申請(qǐng)容器線程,并將本次請(qǐng)求返回給客戶端。
二. 使用
1. 監(jiān)聽(tīng)器 onTimeout()
當(dāng)deferredResult被創(chuàng)建出來(lái)之后,執(zhí)行setResult()之前,這之間的時(shí)間超過(guò)設(shè)定值時(shí)(比如下方案例中設(shè)置為5秒超時(shí)),則被判定為超時(shí)。
DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L); // 設(shè)置超時(shí)事件 deferredResult.onTimeout(() -> { System.out.println("異步線程執(zhí)行超時(shí), 異步線程的名稱: " + Thread.currentThread().getName()); deferredResult.setResult("異步線程執(zhí)行超時(shí)"); });
2. 監(jiān)聽(tīng)器 onError()
當(dāng)onTimeout()或onCompletion()等回調(diào)函數(shù)中的代碼報(bào)錯(cuò)時(shí),則會(huì)執(zhí)行監(jiān)聽(tīng)器onError()的回調(diào)函數(shù)。
PS: DeferredResult之外的代碼報(bào)錯(cuò)不會(huì)影響到onError()。
DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L); // 設(shè)置異常事件 deferredResult.onError((throwable) -> { System.out.println("異步請(qǐng)求出現(xiàn)錯(cuò)誤,異步線程的名稱: " + Thread.currentThread().getName() + "異常: " + throwable); deferredResult.setErrorResult("異步線程執(zhí)行出錯(cuò)"); });
3. 監(jiān)聽(tīng)器 onCompletion()
代碼任意位置調(diào)用了同一個(gè)DeferredResult的setResult()后,則會(huì)被DeferredResult的onCompletion()監(jiān)聽(tīng)器捕獲到。
Spring會(huì)任選一條容器線程來(lái)執(zhí)行onCompletion( )中的代碼(由于請(qǐng)求線程已被釋放(歸還),所以此處可能再次由同一條請(qǐng)求線程來(lái)處理,也可能由其他線程來(lái)處理)。
DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L); // 設(shè)置完成事件 deferredResult.onCompletion(() -> { System.out.println("異步線程執(zhí)行完畢,異步線程的名稱: " + Thread.currentThread().getName()); });
完整的代碼為:
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @RestController public class DemoController { // 自定義線程池 public static ExecutorService exec = Executors.newCachedThreadPool(); @GetMapping("/demo") public DeferredResult<String> demoResult() { System.out.println("容器線程: " + Thread.currentThread().getName()); // 創(chuàng)建DeferredResult對(duì)象,設(shè)置超時(shí)時(shí)長(zhǎng) 20秒 DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L); // 設(shè)置超時(shí)事件 deferredResult.onTimeout(() -> { System.out.println("異步線程執(zhí)行超時(shí), 異步線程的名稱: " + Thread.currentThread().getName()); // throw new RuntimeException("超時(shí)事件報(bào)錯(cuò)了!"); deferredResult.setResult("異步線程執(zhí)行超時(shí)"); }); // 設(shè)置異常事件 deferredResult.onError((throwable) -> { System.out.println("異步請(qǐng)求出現(xiàn)錯(cuò)誤,異步線程的名稱: " + Thread.currentThread().getName() + "異常: " + throwable); deferredResult.setErrorResult("異步線程執(zhí)行出錯(cuò)"); }); // 設(shè)置完成事件 deferredResult.onCompletion(() -> { System.out.println("異步線程執(zhí)行完畢,異步線程的名稱: " + Thread.currentThread().getName()); }); exec.execute(() -> { System.out.println("[線程池] 異步線程的名稱: " + Thread.currentThread().getName()); deferredResult.setResult("異步線程執(zhí)行完畢"); }); System.out.println("Servlet thread release"); return deferredResult; } }
三. 拓展
有些業(yè)務(wù)場(chǎng)景下,我們希望新的請(qǐng)求觸發(fā)(激活)之前陷入阻塞的請(qǐng)求,此外可以通過(guò)不同的key來(lái)區(qū)分不同的請(qǐng)求。
比如apollo在實(shí)現(xiàn)時(shí)就利用了DeferredResult??蛻舳讼蚍?wù)器端發(fā)送輪詢請(qǐng)求,服務(wù)端收到請(qǐng)求后,會(huì)立刻釋放容器線程,并阻塞本次請(qǐng)求,若apollo托管的配置文件沒(méi)有發(fā)生任何改變,則輪詢請(qǐng)求會(huì)超時(shí)(返回304)。當(dāng)有新的配置發(fā)布時(shí),服務(wù)端會(huì)調(diào)用DeferredResult setResult()方法,進(jìn)入onCompletion(),并使尚未超時(shí)的輪尋請(qǐng)求正常返回(200)。
大概如下:
@SpringBootApplication public class DemoApplication implements WebMvcConfigurer { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }
import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; import java.util.Collection; @RestController public class ApolloController { private final Logger logger = LoggerFactory.getLogger(this.getClass()); //guava中的Multimap,多值map,對(duì)map的增強(qiáng),一個(gè)key可以保持多個(gè)value private Multimap<String, DeferredResult<String>> watchRequests = Multimaps.synchronizedSetMultimap(HashMultimap.create()); //模擬長(zhǎng)輪詢 @RequestMapping(value = "/watch/{namespace}", method = RequestMethod.GET, produces = "text/html") public DeferredResult<String> watch(@PathVariable("namespace") String namespace) { logger.info("Request received"); DeferredResult<String> deferredResult = new DeferredResult<>(); //當(dāng)deferredResult完成時(shí)(不論是超時(shí)還是異常還是正常完成),移除watchRequests中相應(yīng)的watch key deferredResult.onCompletion(new Runnable() { @Override public void run() { System.out.println("remove key:" + namespace); watchRequests.remove(namespace, deferredResult); } }); watchRequests.put(namespace, deferredResult); logger.info("Servlet thread released"); return deferredResult; } //模擬發(fā)布namespace配置 @RequestMapping(value = "/publish/{namespace}", method = RequestMethod.GET, produces = "text/html") public Object publishConfig(@PathVariable("namespace") String namespace) { if (watchRequests.containsKey(namespace)) { Collection<DeferredResult<String>> deferredResults = watchRequests.get(namespace); Long time = System.currentTimeMillis(); //通知所有watch這個(gè)namespace變更的長(zhǎng)輪訓(xùn)配置變更結(jié)果 for (DeferredResult<String> deferredResult : deferredResults) { deferredResult.setResult(namespace + " changed:" + time); } } return "success"; } }
當(dāng)請(qǐng)求超時(shí)的時(shí)候會(huì)產(chǎn)生AsyncRequestTimeoutException,我們定義一個(gè)全局異常捕獲類:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.ControllerAdvice; import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.context.request.async.AsyncRequestTimeoutException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @ControllerAdvice class GlobalControllerExceptionHandler { protected static final Logger logger = LoggerFactory.getLogger(GlobalControllerExceptionHandler.class); @ResponseStatus(HttpStatus.NOT_MODIFIED)//返回304狀態(tài)碼 @ResponseBody @ExceptionHandler(AsyncRequestTimeoutException.class) //捕獲特定異常 public void handleAsyncRequestTimeoutException(AsyncRequestTimeoutException e, HttpServletRequest request) { System.out.println("handleAsyncRequestTimeoutException"); } }
然后我們通過(guò)postman工具發(fā)送請(qǐng)求//localhost:8080/watch/mynamespace,請(qǐng)求會(huì)掛起,60秒后,DeferredResult超時(shí),客戶端正常收到了304狀態(tài)碼,表明在這個(gè)期間配置沒(méi)有變更過(guò)。
然后我們?cè)谀M配置變更的情況,再次發(fā)起請(qǐng)求//localhost:8080/watch/mynamespace,等待個(gè)10秒鐘(不要超過(guò)60秒),然后調(diào)用//localhost:8080/publish/mynamespace,發(fā)布配置變更。這時(shí)postman會(huì)立刻收到response響應(yīng)結(jié)果:
mynamespace changed:1538880050147
表明在輪訓(xùn)期間有配置變更過(guò)。
這里我們用了一個(gè)MultiMap來(lái)存放所有輪訓(xùn)的請(qǐng)求,Key對(duì)應(yīng)的是namespace,value對(duì)應(yīng)的是所有watch這個(gè)namespace變更的異步請(qǐng)求DeferredResult,需要注意的是:在DeferredResult完成的時(shí)候記得移除MultiMap中相應(yīng)的key,避免內(nèi)存溢出請(qǐng)求。
采用這種長(zhǎng)輪詢的好處是,相比一直循環(huán)請(qǐng)求服務(wù)器,實(shí)例一多的話會(huì)對(duì)服務(wù)器產(chǎn)生很大的壓力,http長(zhǎng)輪詢的方式會(huì)在服務(wù)器變更的時(shí)候主動(dòng)推送給客戶端,其他時(shí)間客戶端是掛起請(qǐng)求的,這樣同時(shí)滿足了性能和實(shí)時(shí)性。
四. DeferredResult與Callable的區(qū)別
DeferredResult和Callable都可以在Controller層的方法中直接返回,請(qǐng)求收到后,釋放容器線程,在另一個(gè)線程中通過(guò)異步的方式執(zhí)行任務(wù),最后將請(qǐng)求返回給客戶端。
不同之處在于,使用Callable時(shí),當(dāng)其它線程中的任務(wù)執(zhí)行完畢后,請(qǐng)求會(huì)立刻返回給客戶端,而DeferredResult則需要用戶在代碼中手動(dòng)set值到DeferredResult,否則即便異步線程中的任務(wù)執(zhí)行完畢,DeferredResult仍然不會(huì)向客戶端返回任何結(jié)果。
到此這篇關(guān)于Java中基于DeferredResult的異步服務(wù)詳解的文章就介紹到這了,更多相關(guān)基于DeferredResult的異步服務(wù)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Maven pom的distributionManagement配置方式
文章主要介紹了Maven的distributionManagement配置方式,以及它的作用、配置方法和重要性,distributionManagement用于指定構(gòu)件的發(fā)布位置,包括下載URL、狀態(tài)等,文章還詳細(xì)解釋了如何配置repository和snapshotRepository,以及它們的用途和區(qū)別2025-01-01springboot前端傳參date類型后臺(tái)處理的方式
這篇文章主要介紹了springboot前端傳參date類型后臺(tái)處理的方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-07-07基于HttpServletRequest 相關(guān)常用方法的應(yīng)用
本篇文章小編為大家介紹,基于HttpServletRequest 相關(guān)常用方法的應(yīng)用,需要的朋友參考下2013-04-04Java實(shí)現(xiàn)簡(jiǎn)單的貪吃蛇游戲
這篇文章主要介紹了Java實(shí)現(xiàn)簡(jiǎn)單的貪吃蛇游戲,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-07-07SpringBoot整合Pulsar的實(shí)現(xiàn)示例
本文主要介紹了SpringBoot整合Pulsar的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07