基于springboot 長(zhǎng)輪詢的實(shí)現(xiàn)操作
springboot 長(zhǎng)輪詢實(shí)現(xiàn)
基于 @EnableAsync , @Sync
@SpringBootApplication
@EnableAsync
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
@RequestMapping("/async")
@RestController
public class AsyncRequestDemo {
@Autowired
private AsyncRequestService asyncRequestService;
@GetMapping("/value")
public String getValue() {
String msg = null;
Future<String> result = null;
try{
result = asyncRequestService.getValue();
msg = result.get(10, TimeUnit.SECONDS);
}catch (Exception e){
e.printStackTrace();
}finally {
if (result != null){
result.cancel(true);
}
}
return msg;
}
@PostMapping("/value")
public void postValue(String msg) {
asyncRequestService.postValue(msg);
}
}
@Service
public class AsyncRequestService {
private String msg = null;
@Async
public Future<String> getValue() throws InterruptedException {
while (true){
synchronized (this){
if (msg != null){
String resultMsg = msg;
msg = null;
return new AsyncResult(resultMsg);
}
}
Thread.sleep(100);
}
}
public synchronized void postValue(String msg) {
this.msg = msg;
}
}
備注
@EnableAsync 開啟異步
@Sync 標(biāo)記異步方法
Future 用于接收異步返回值
result.get(10, TimeUnit.SECONDS); 阻塞,超時(shí)獲取結(jié)果
Future.cancel() 中斷線程
補(bǔ)充:通過spring提供的DeferredResult實(shí)現(xiàn)長(zhǎng)輪詢服務(wù)端推送消息
DeferredResult字面意思就是推遲結(jié)果,是在servlet3.0以后引入了異步請(qǐng)求之后,spring封裝了一下提供了相應(yīng)的支持,也是一個(gè)很老的特性了。DeferredResult可以允許容器線程快速釋放以便可以接受更多的請(qǐng)求提升吞吐量,讓真正的業(yè)務(wù)邏輯在其他的工作線程中去完成。
最近再看apollo配置中心的實(shí)現(xiàn)原理,apollo的發(fā)布配置推送變更消息就是用DeferredResult實(shí)現(xiàn)的,apollo客戶端會(huì)像服務(wù)端發(fā)送長(zhǎng)輪訓(xùn)http請(qǐng)求,超時(shí)時(shí)間60秒,當(dāng)超時(shí)后返回客戶端一個(gè)304 httpstatus,表明配置沒有變更,客戶端繼續(xù)這個(gè)步驟重復(fù)發(fā)起請(qǐng)求,當(dāng)有發(fā)布配置的時(shí)候,服務(wù)端會(huì)調(diào)用DeferredResult.setResult返回200狀態(tài)碼,然后輪訓(xùn)請(qǐng)求會(huì)立即返回(不會(huì)超時(shí)),客戶端收到響應(yīng)結(jié)果后,會(huì)發(fā)起請(qǐng)求獲取變更后的配置信息。
下面我們自己寫一個(gè)簡(jiǎn)單的demo來演示這個(gè)過程
springboot啟動(dòng)類:
@SpringBootApplication
public class DemoApplication implements WebMvcConfigurer {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
@Bean
public ThreadPoolTaskExecutor mvcTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setQueueCapacity(100);
executor.setMaxPoolSize(25);
return executor;
}
//配置異步支持,設(shè)置了一個(gè)用來異步執(zhí)行業(yè)務(wù)邏輯的工作線程池,設(shè)置了默認(rèn)的超時(shí)時(shí)間是60秒
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
configurer.setTaskExecutor(mvcTaskExecutor());
configurer.setDefaultTimeout(60000L);
}
}
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import java.util.Collection;
@RestController
public class ApolloController {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
//guava中的Multimap,多值map,對(duì)map的增強(qiáng),一個(gè)key可以保持多個(gè)value
private Multimap<String, DeferredResult<String>> watchRequests = Multimaps.synchronizedSetMultimap(HashMultimap.create());
//模擬長(zhǎng)輪詢
@RequestMapping(value = "/watch/{namespace}", method = RequestMethod.GET, produces = "text/html")
public DeferredResult<String> watch(@PathVariable("namespace") String namespace) {
logger.info("Request received");
DeferredResult<String> deferredResult = new DeferredResult<>();
//當(dāng)deferredResult完成時(shí)(不論是超時(shí)還是異常還是正常完成),移除watchRequests中相應(yīng)的watch key
deferredResult.onCompletion(new Runnable() {
@Override
public void run() {
System.out.println("remove key:" + namespace);
watchRequests.remove(namespace, deferredResult);
}
});
watchRequests.put(namespace, deferredResult);
logger.info("Servlet thread released");
return deferredResult;
}
//模擬發(fā)布namespace配置
@RequestMapping(value = "/publish/{namespace}", method = RequestMethod.GET, produces = "text/html")
public Object publishConfig(@PathVariable("namespace") String namespace) {
if (watchRequests.containsKey(namespace)) {
Collection<DeferredResult<String>> deferredResults = watchRequests.get(namespace);
Long time = System.currentTimeMillis();
//通知所有watch這個(gè)namespace變更的長(zhǎng)輪訓(xùn)配置變更結(jié)果
for (DeferredResult<String> deferredResult : deferredResults) {
deferredResult.setResult(namespace + " changed:" + time);
}
}
return "success";
}
}
當(dāng)請(qǐng)求超時(shí)的時(shí)候會(huì)產(chǎn)生AsyncRequestTimeoutException,我們定義一個(gè)全局異常捕獲類:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.context.request.async.AsyncRequestTimeoutException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ControllerAdvice
class GlobalControllerExceptionHandler {
protected static final Logger logger = LoggerFactory.getLogger(GlobalControllerExceptionHandler.class);
@ResponseStatus(HttpStatus.NOT_MODIFIED)//返回304狀態(tài)碼
@ResponseBody
@ExceptionHandler(AsyncRequestTimeoutException.class) //捕獲特定異常
public void handleAsyncRequestTimeoutException(AsyncRequestTimeoutException e, HttpServletRequest request) {
System.out.println("handleAsyncRequestTimeoutException");
}
}
然后我們通過postman工具發(fā)送請(qǐng)求http://localhost:8080/watch/mynamespace,請(qǐng)求會(huì)掛起,60秒后,DeferredResult超時(shí),客戶端正常收到了304狀態(tài)碼,表明在這個(gè)期間配置沒有變更過。
然后我們?cè)谀M配置變更的情況,再次發(fā)起請(qǐng)求http://localhost:8080/watch/mynamespace,等待個(gè)10秒鐘(不要超過60秒),然后調(diào)用http://localhost:8080/publish/mynamespace,發(fā)布配置變更。這時(shí)postman會(huì)立刻收到response響應(yīng)結(jié)果:
mynamespace changed:1538880050147
表明在輪訓(xùn)期間有配置變更過。
這里我們用了一個(gè)MultiMap來存放所有輪訓(xùn)的請(qǐng)求,Key對(duì)應(yīng)的是namespace,value對(duì)應(yīng)的是所有watch這個(gè)namespace變更的異步請(qǐng)求DeferredResult,需要注意的是:在DeferredResult完成的時(shí)候記得移除MultiMap中相應(yīng)的key,避免內(nèi)存溢出請(qǐng)求。
采用這種長(zhǎng)輪詢的好處是,相比一直循環(huán)請(qǐng)求服務(wù)器,實(shí)例一多的話會(huì)對(duì)服務(wù)器產(chǎn)生很大的壓力,http長(zhǎng)輪詢的方式會(huì)在服務(wù)器變更的時(shí)候主動(dòng)推送給客戶端,其他時(shí)間客戶端是掛起請(qǐng)求的,這樣同時(shí)滿足了性能和實(shí)時(shí)性。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教。
相關(guān)文章
Spring mvc Controller和RestFul原理解析
這篇文章主要介紹了Spring mvc Controller和RestFul原理解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-03-03
深入分析JAVA 多線程--interrupt()和線程終止方式
這篇文章主要介紹了JAVA 多線程--interrupt()和線程終止方式的的相關(guān)資料,文中代碼非常細(xì)致,幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下2020-06-06
Java super關(guān)鍵字調(diào)用父類過程解析
這篇文章主要介紹了Java super關(guān)鍵字調(diào)用父類過程解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12
Springboot死信隊(duì)列?DLX?配置和使用思路分析
死信隊(duì)列簡(jiǎn)稱就是DLX,死信交換機(jī)和死信隊(duì)列和普通的沒有區(qū)別,當(dāng)消息成為死信后,如果該隊(duì)列綁定了死信交換機(jī),則消息會(huì)被死信交換機(jī)重新路由到死信隊(duì)列,本文給大家介紹Springboot死信隊(duì)列?DLX的相關(guān)知識(shí),感興趣的朋友一起看看吧2022-03-03

