Java實(shí)現(xiàn)一個(gè)簡單的長輪詢的示例代碼
分析一下長輪詢的實(shí)現(xiàn)方式
現(xiàn)在各大中間件都使用了長輪詢的數(shù)據(jù)交互方式,目前比較流行的例如Nacos的配置中心,RocketMQ Pull(拉模式)消息等,它們都是采用了長輪詢方的式實(shí)現(xiàn)。就例如Nacos的配置中心,如何做到服務(wù)端感知配置變化實(shí)時(shí)推送給客戶端的呢?
長輪詢與短輪詢
說到長輪詢,肯定存在和它相對(duì)立的,我們暫且叫它短輪詢吧,我們簡單介紹一下短輪詢:
短輪詢也是拉模式。是指不管服務(wù)端數(shù)據(jù)有無更新,客戶端每隔定長時(shí)間請求拉取一次數(shù)據(jù),可能有更新數(shù)據(jù)返回,也可能什么都沒有。如果配置中心使用這樣的方式,會(huì)存在以下問題:
由于配置數(shù)據(jù)并不會(huì)頻繁變更,若是一直發(fā)請求,勢必會(huì)對(duì)服務(wù)端造成很大壓力。還會(huì)造成推送數(shù)據(jù)的延遲,比如:每10s請求一次配置,如果在第11s時(shí)配置更新了,那么推送將會(huì)延遲9s,等待下一次請求;
無法在推送延遲和服務(wù)端壓力兩者之間中和。降低輪詢的間隔,延遲降低,壓力增加;增加輪詢的間隔,壓力降低,延遲增高。
長輪詢為了解決短輪詢存在的問題,客戶端發(fā)起長輪詢,如果服務(wù)端的數(shù)據(jù)沒有發(fā)生變更,會(huì)hold住請求,直到服務(wù)端的數(shù)據(jù)發(fā)生變化,或者等待一定時(shí)間超時(shí)才會(huì)返回。返回后,客戶端再發(fā)起下一次長輪詢請求監(jiān)聽。
這樣設(shè)計(jì)的好處:
- 相對(duì)于低延時(shí),客戶端發(fā)起長輪詢,服務(wù)端感知到數(shù)據(jù)發(fā)生變更后,能立刻返回響應(yīng)給客戶端。
- 服務(wù)端的壓力減小,客戶端發(fā)起長輪詢,如果數(shù)據(jù)沒有發(fā)生變更,服務(wù)端會(huì)hold住此次客戶端的請求,hold住請求的時(shí)間一般會(huì)設(shè)置到30s或者60s,并且服務(wù)端hold住請求不會(huì)消耗太多服務(wù)端的資源。
下面借用圖片來說明一下流程:
- 首先客戶端發(fā)起長輪詢請求,服務(wù)端收到客戶端的請求,這時(shí)會(huì)掛起客戶端的請求,如果在服務(wù)端設(shè)計(jì)的30s之內(nèi)都沒有發(fā)生變更,服務(wù)端會(huì)響應(yīng)回客戶端數(shù)據(jù)沒有變更,客戶端會(huì)繼續(xù)發(fā)送請求。
- 如果在30s之內(nèi)服務(wù)數(shù)據(jù)發(fā)生了變更,服務(wù)端會(huì)推送變更的數(shù)據(jù)到客戶端。
配置中心長輪詢設(shè)計(jì)
上面我們已經(jīng)介紹了整個(gè)思路,下面我們用代碼實(shí)現(xiàn)一下:
- 首先客戶端發(fā)送一個(gè)HTTP請求到服務(wù)端;服務(wù)端會(huì)開啟一個(gè)異步線程,如果一直沒有數(shù)據(jù)變更會(huì)掛起當(dāng)前請求(一個(gè) Tomcat 也就 200 個(gè)線程,長輪詢也不應(yīng)該阻塞 Tomcat 的業(yè)務(wù)線程,所以需要配置中心在實(shí)現(xiàn)長輪詢時(shí)往往采用異步響應(yīng)的方式來實(shí)現(xiàn),而比較方便實(shí)現(xiàn)異步 HTTP 的常見手段便是 Servlet3.0 提供的 AsyncContext 機(jī)制。)
- 在服務(wù)端設(shè)置的超時(shí)時(shí)間內(nèi)仍然沒有數(shù)據(jù)變更,那就返回客戶端一個(gè)沒有變更的標(biāo)識(shí)。例如響應(yīng)304狀態(tài)碼;
- 在服務(wù)端設(shè)置的超時(shí)時(shí)間內(nèi)有數(shù)據(jù)變更了,就返回客戶端變更的內(nèi)容;
配置中心長輪詢實(shí)現(xiàn)
下面用代碼實(shí)現(xiàn)長輪詢:
客戶端實(shí)現(xiàn)
@Slf4j public class ConfigClientWorker { ? private final CloseableHttpClient httpClient; ? private final ScheduledExecutorService executorService; ? public ConfigClientWorker(String url, String dataId) { this.executorService = Executors.newSingleThreadScheduledExecutor(runnable -> { Thread thread = new Thread(runnable); thread.setName("client.worker.executor-%d"); thread.setDaemon(true); return thread; }); ? // ① httpClient 客戶端超時(shí)時(shí)間要大于長輪詢約定的超時(shí)時(shí)間 RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(40000).build(); this.httpClient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build(); ? executorService.execute(new LongPollingRunnable(url, dataId)); } ? class LongPollingRunnable implements Runnable { ? private final String url; private final String dataId; ? public LongPollingRunnable(String url, String dataId) { this.url = url; this.dataId = dataId; } ? @SneakyThrows @Override public void run() { String endpoint = url + "?dataId=" + dataId; log.info("endpoint: {}", endpoint); HttpGet request = new HttpGet(endpoint); CloseableHttpResponse response = httpClient.execute(request); switch (response.getStatusLine().getStatusCode()) { case 200: { BufferedReader rd = new BufferedReader(new InputStreamReader(response.getEntity() .getContent())); StringBuilder result = new StringBuilder(); String line; while ((line = rd.readLine()) != null) { result.append(line); } response.close(); String configInfo = result.toString(); log.info("dataId: [{}] changed, receive configInfo: {}", dataId, configInfo); break; } // ② 304 響應(yīng)碼標(biāo)記配置未變更 case 304: { log.info("longPolling dataId: [{}] once finished, configInfo is unchanged, longPolling again", dataId); break; } default: { throw new RuntimeException("unExcepted HTTP status code"); } } executorService.execute(this); } } ? public static void main(String[] args) throws IOException { ? new ConfigClientWorker("http://127.0.0.1:8080/listener", "user"); System.in.read(); } }
- httpClient 客戶端超時(shí)時(shí)間要大于長輪詢約定的超時(shí)時(shí)間,不然還沒等到服務(wù)端返回,客戶端自己就超時(shí)了。
- 304 響應(yīng)碼標(biāo)記配置未變更;
- http://127.0.0.1:8080/listener 是服務(wù)端地址;
服務(wù)端實(shí)現(xiàn)
@RestController @Slf4j @SpringBootApplication public class ConfigServer { ? @Data private static class AsyncTask { // 長輪詢請求的上下文,包含請求和響應(yīng)體 private AsyncContext asyncContext; // 超時(shí)標(biāo)記 private boolean timeout; ? public AsyncTask(AsyncContext asyncContext, boolean timeout) { this.asyncContext = asyncContext; this.timeout = timeout; } } ? // guava 提供的多值 Map,一個(gè) key 可以對(duì)應(yīng)多個(gè) value private Multimap<String, AsyncTask> dataIdContext = Multimaps.synchronizedSetMultimap(HashMultimap.create()); ? private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("longPolling-timeout-checker-%d") .build(); private ScheduledExecutorService timeoutChecker = new ScheduledThreadPoolExecutor(1, threadFactory); ? // 配置監(jiān)聽接入點(diǎn) @RequestMapping("/listener") public void addListener(HttpServletRequest request, HttpServletResponse response) { ? String dataId = request.getParameter("dataId"); ? // 開啟異步?。?! AsyncContext asyncContext = request.startAsync(request, response); AsyncTask asyncTask = new AsyncTask(asyncContext, true); ? // 維護(hù) dataId 和異步請求上下文的關(guān)聯(lián) dataIdContext.put(dataId, asyncTask); ? // 啟動(dòng)定時(shí)器,30s 后寫入 304 響應(yīng) timeoutChecker.schedule(() -> { if (asyncTask.isTimeout()) { dataIdContext.remove(dataId, asyncTask); response.setStatus(HttpServletResponse.SC_NOT_MODIFIED); // 標(biāo)志此次異步線程完成結(jié)束!?。? asyncContext.complete(); } }, 30000, TimeUnit.MILLISECONDS); } ? // 配置發(fā)布接入點(diǎn) @RequestMapping("/publishConfig") @SneakyThrows public String publishConfig(String dataId, String configInfo) { log.info("publish configInfo dataId: [{}], configInfo: {}", dataId, configInfo); Collection<AsyncTask> asyncTasks = dataIdContext.removeAll(dataId); for (AsyncTask asyncTask : asyncTasks) { asyncTask.setTimeout(false); HttpServletResponse response = (HttpServletResponse)asyncTask.getAsyncContext().getResponse(); response.setStatus(HttpServletResponse.SC_OK); response.getWriter().println(configInfo); asyncTask.getAsyncContext().complete(); } return "success"; } ? public static void main(String[] args) { SpringApplication.run(ConfigServer.class, args); } }
- 客戶端請求過來,首先開啟一個(gè)異步線程
request.startAsync(request, response);
保證不占用Tomcat線程。此時(shí)Tomcat線程以及釋放。配合asyncContext.complete()
使用。 dataIdContext.put(dataId, asyncTask);
會(huì)將 dataId 和異步請求上下文給關(guān)聯(lián)起來,方便配置發(fā)布時(shí),拿到對(duì)應(yīng)的上下文Multimap<String, AsyncTask> dataIdContext
它是一個(gè)多值 Map,一個(gè) key 可以對(duì)應(yīng)多個(gè) value,你也可以理解為Map<String,List<AsyncTask>>
timeoutChecker.schedule()
啟動(dòng)定時(shí)器,30s 后寫入 304 響應(yīng)@RequestMapping("/publishConfig")
,配置發(fā)布的入口。配置變更后,根據(jù) dataId 一次拿出所有的長輪詢,為之寫入變更的響應(yīng)。asyncTask.getAsyncContext().complete();
表示這次異步請求結(jié)束了。
啟動(dòng)配置監(jiān)聽
先啟動(dòng) ConfigServer,再啟動(dòng) ConfigClient。30s之后控制臺(tái)打印第一次超時(shí)之后收到服務(wù)端304的狀態(tài)碼
16:41:14.824 [client.worker.executor-%d] INFO cn.haoxiaoyong.poll.ConfigClientWorker - longPolling dataId: [user] once finished, configInfo is unchanged, longPolling again
請求一下配置發(fā)布,請求localhost:8080/publishConfig?dataId=user&configInfo=helloworld
服務(wù)端打印日志:
2022-08-25 16:45:56.663 INFO 90650 --- [nio-8080-exec-2] cn.haoxiaoyong.poll.ConfigServer : publish configInfo dataId: [user], configInfo: helloworld
到此這篇關(guān)于Java實(shí)現(xiàn)一個(gè)簡單的長輪詢的示例代碼的文章就介紹到這了,更多相關(guān)Java長輪詢內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- java?常規(guī)輪詢長輪詢Long?polling實(shí)現(xiàn)示例詳解
- Java servlet通過事件驅(qū)動(dòng)進(jìn)行高性能長輪詢詳解
- Java?輪詢鎖使用時(shí)遇到問題解決方案
- Java?死鎖解決方案順序鎖和輪詢鎖
- Java實(shí)現(xiàn)平滑加權(quán)輪詢算法之降權(quán)和提權(quán)詳解
- Java負(fù)載均衡算法實(shí)現(xiàn)之輪詢和加權(quán)輪詢
- Java如何使用ReentrantLock實(shí)現(xiàn)長輪詢
- Java 利用DeferredResult實(shí)現(xiàn)http輪詢實(shí)時(shí)返回?cái)?shù)據(jù)接口
- 基于Rxjava實(shí)現(xiàn)輪詢定時(shí)器
- 告別無盡等待:Java中的輪詢終止技巧
相關(guān)文章
Java詳細(xì)講解Math和Random類中有哪些常用方法
Math類位于java.lang包中,包含很多用于科學(xué)計(jì)算的類方法,這些方法可以直接通過類名調(diào)用。Random類獲取隨機(jī)數(shù),位于java.util包中,本篇帶你了解它們的常用方法2022-05-05基于Spring Boot 排除自動(dòng)配置的4個(gè)方法
這篇文章主要介紹了Spring Boot 排除自動(dòng)配置的4個(gè)方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08SpringBoot之那些注入不了的Spring占位符(${}表達(dá)式)問題
這篇文章主要介紹了SpringBoot之那些注入不了的Spring占位符(${}表達(dá)式)問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-04-04解決idea中@Data標(biāo)簽getset不起作用的問題
這篇文章主要介紹了解決idea中@Data標(biāo)簽getset不起作用的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2021-02-02