Java高并發(fā)下請(qǐng)求合并處理方式
Java高并發(fā)下請(qǐng)求合并處理
場(chǎng)景描述
在大并發(fā)量下每秒有一萬(wàn)個(gè)請(qǐng)求向后端查詢(xún)數(shù)據(jù),這樣我們就需要向后端請(qǐng)求一萬(wàn)次,甚至查詢(xún)一萬(wàn)次數(shù)據(jù)庫(kù)。
我們要做的請(qǐng)求合并就是每隔一段時(shí)間(10ms)將這段時(shí)間內(nèi)的請(qǐng)求合并到一起進(jìn)行批量查詢(xún),減少查詢(xún)數(shù)據(jù)庫(kù)的次數(shù)。
思考
1、如何存放一段時(shí)間內(nèi)的請(qǐng)求?這里我們可以用隊(duì)列。
2、如何每隔一段時(shí)間執(zhí)行任務(wù)?用定時(shí)任務(wù)線(xiàn)程池。
3、每個(gè)請(qǐng)求都是單獨(dú)的線(xiàn)程,如何保證各個(gè)請(qǐng)求能得到自己的查詢(xún)結(jié)果?這里我們使用callable返回查詢(xún)結(jié)果,在沒(méi)有查到結(jié)果前阻塞線(xiàn)程。
下面來(lái)看看具體實(shí)現(xiàn)的demo
package cn.codingxiaxw.combine; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.*; import java.util.stream.Collectors; @Service public class QueryService { //用來(lái)存放請(qǐng)求的隊(duì)列,我們將請(qǐng)求封裝成了一個(gè)Request對(duì)象 private LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue<>() ; //這個(gè)是我們的單個(gè)的查詢(xún)方法,假設(shè)每隔請(qǐng)求都根據(jù)唯一的code進(jìn)行查詢(xún) public Map<String,Object> query(String code){ //這個(gè)request是我們自定義的內(nèi)部類(lèi) Request request = new Request(); request.code = code; CompletableFuture<Map<String,Object>> future = new CompletableFuture<>(); request.future = future; queue.add(request); //阻塞 直到返回結(jié)果 try { return future.get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } return null; } //這個(gè)是個(gè)模擬批量查詢(xún)的方法 public List<Map<String,Object>> batchQuery(List<String> codes){ return null; } //封裝的請(qǐng)求 class Request { String code; CompletableFuture<Map<String,Object>> future; } @PostConstruct public void init(){ //在init方法中初始化一個(gè)定時(shí)任務(wù)線(xiàn)程,去定時(shí)執(zhí)行我們的查詢(xún)?nèi)蝿?wù).具體的任務(wù)實(shí)現(xiàn)是我們根據(jù)唯一code查詢(xún)出來(lái)的結(jié)果集,以code為key轉(zhuǎn)成map,然后我們隊(duì)列中的每個(gè)Request對(duì)象都有自己的唯一code,我們根據(jù)code一一對(duì)應(yīng),給相應(yīng)的future返回對(duì)應(yīng)的查詢(xún)結(jié)果。 ScheduledExecutorService poolExecutor = new ScheduledThreadPoolExecutor(1); poolExecutor.scheduleAtFixedRate(()->{ int size = queue.size(); //如果沒(méi)有請(qǐng)求直接返回 if(size==0) return ; List<Request> list = new ArrayList<>(); for (int i = 0; i < size;i++){ Request request = queue.poll(); list.add(request); } System.out.println("批量處理:"+size); List<String> codes = list.stream().map(s->s.code).collect(Collectors.toList()); //合并之后的結(jié)果集 List<Map<String, Object>> batchResult = batchQuery(codes); Map<String,Map<String,Object>> responseMap = new HashMap<>(); for (Map<String,Object> result : batchResult) { String code = result.get("code").toString(); responseMap.put(code,result); } //返回對(duì)應(yīng)的請(qǐng)求結(jié)果 for (Request request : list) { Map<String, Object> response = responseMap.get(request.code); request.future.complete(response); } },0,10,TimeUnit.MILLISECONDS); } }
利用請(qǐng)求合并發(fā)揮高并發(fā)下批量處理的優(yōu)勢(shì)
需求分析
我們經(jīng)常會(huì)寫(xiě)一些查詢(xún)接口,假設(shè)現(xiàn)在我們需要寫(xiě)一個(gè)查詢(xún)用戶(hù)信息的接口,要求傳入用戶(hù)Id,返回用戶(hù)名稱(chēng)。
那么最簡(jiǎn)化的流程就是:用戶(hù)發(fā)送請(qǐng)求->controller層->service層->dao層->數(shù)據(jù)庫(kù)。
每次請(qǐng)求就相當(dāng)于請(qǐng)求一條用戶(hù)信息。
當(dāng)這個(gè)接口被用戶(hù)頻繁請(qǐng)求時(shí),此接口就不斷的在做“請(qǐng)求”到“返回”的操作,服務(wù)端同時(shí)會(huì)開(kāi)辟許多線(xiàn)程幫我們執(zhí)行這些操作,這么多的線(xiàn)程會(huì)消耗許多系統(tǒng)資源,服務(wù)端承受了巨大壓力。
//單查詢(xún)接口 @GetMapping("/getUser") public String getUser(Long key){ long currentMillis = System.currentTimeMillis(); //單查詢(xún)service,大量線(xiàn)程懟到這個(gè)service上去 String userName = userService.getUser(key); System.out.printf("##############################################\n"); System.out.printf("用戶(hù)名為:" + userName + "---線(xiàn)程名為:" + Thread.currentThread().getName() + "---執(zhí)行時(shí)間為:" + (System.currentTimeMillis() - currentMillis) + "\n"); return userName; }
那么我們有什么方式可以?xún)?yōu)化這種操作呢?
我目前能想到的就是利用緩存(緩存熱點(diǎn)數(shù)據(jù))、消息隊(duì)列(接收請(qǐng)求慢慢消費(fèi)達(dá)到流量削峰)、多個(gè)服務(wù)實(shí)例(分散請(qǐng)求壓力提高計(jì)算能力)等方式應(yīng)對(duì)高并發(fā)場(chǎng)景。
在本文中,我利用另一種思路:把多個(gè)請(qǐng)求合并為一個(gè)請(qǐng)求,把單查詢(xún)變?yōu)榕坎樵?xún),這樣就能有效減少開(kāi)辟線(xiàn)程的數(shù)量。
具體實(shí)現(xiàn)
首先我們定義一個(gè)用戶(hù)請(qǐng)求類(lèi)Request:
//用戶(hù)請(qǐng)求類(lèi) public class RequestTest { //請(qǐng)求條件 private Long key; //傳話(huà)人 private CompletableFuture<String> future; public CompletableFuture<String> getFuture() { return future; } public void setFuture(CompletableFuture<String> future) { this.future = future; } public Long getKey() { return key; } public void setKey(Long key) { this.key = key; } }
接著是請(qǐng)求合并的主要代碼:
//存放請(qǐng)求的隊(duì)列 LinkedBlockingDeque<RequestTest> queue = new LinkedBlockingDeque<>(); //初始化方法 @PostConstruct public void init() { //定時(shí)執(zhí)行的線(xiàn)程池,每隔5毫秒執(zhí)行一次(間隔時(shí)間可以由業(yè)務(wù)決定),把所有堆積的請(qǐng)求 ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1); executorService.scheduleAtFixedRate(() -> { //在這里具體執(zhí)行批量查詢(xún)邏輯 int size = queue.size(); if (size == 0) { //若沒(méi)有請(qǐng)求堆積,直接返回,等10毫秒再執(zhí)行一次 return; } //若有請(qǐng)求堆積把所有請(qǐng)求都拿出來(lái) List<RequestTest> requestTests = new ArrayList<>(); for (int i = 0; i < size; i++) { //把請(qǐng)求拿出來(lái) RequestTest poll = queue.poll(); requestTests.add(poll); } //至此請(qǐng)求已經(jīng)被合并了 System.out.printf("##############################################\n"); System.out.printf("請(qǐng)求合并了" + requestTests.size() + "條!\n"); //組裝批量查詢(xún)條件 List<Long> keyList = new ArrayList<>(); for (RequestTest requestTest : requestTests) { keyList.add(requestTest.getKey()); } //進(jìn)行批量查詢(xún) List<User> nameList = userService.getUserList(keyList); //把批查結(jié)果放入一個(gè)map Map<Long,String> map = new HashMap<>(); for(User user:nameList){ map.put(user.getId(),user.getName()); } for (RequestTest requestTest : requestTests){ //把放在map中的結(jié)果集放回給對(duì)應(yīng)的線(xiàn)程 //future是對(duì)應(yīng)每個(gè)請(qǐng)求的,因?yàn)槭敲總€(gè)請(qǐng)求線(xiàn)程都傳了自己的future是對(duì)應(yīng)的過(guò)來(lái) requestTest.getFuture().complete(map.get(requestTest.getKey())); } }, 0, 5, TimeUnit.MILLISECONDS); } //請(qǐng)求合并 @GetMapping("/requestMerge/getUser") public String getUserRequestMerge(Long key) throws InterruptedException, ExecutionException { long currentMillis = System.currentTimeMillis(); //CompletableFuture可以使一個(gè)線(xiàn)程執(zhí)行操作后,主動(dòng)返回值給另一個(gè)線(xiàn)程 CompletableFuture<String> future = new CompletableFuture<>(); RequestTest requestTest = new RequestTest(); //把future(把future可以認(rèn)為是線(xiàn)程間的"傳話(huà)人")放到等待隊(duì)列中去,讓定時(shí)調(diào)度的線(xiàn)程池執(zhí)行并返回值 requestTest.setFuture(future); requestTest.setKey(key); //把requestTest加入等待隊(duì)列(LinkedBlockingDeque) queue.add(requestTest); //future(傳話(huà)人)阻塞直到有值返回 String userName = future.get(); System.out.printf("用戶(hù)名為:" + userName + "---線(xiàn)程名為:"+Thread.currentThread().getName()+ "---執(zhí)行時(shí)間為:"+(System.currentTimeMillis() - currentMillis)+"\n"); return userName; }
到這里我們就完成了一個(gè)請(qǐng)求合并的demo,接著我們測(cè)試運(yùn)行結(jié)果。
這里我用了jemeter(jemeter的用法可以網(wǎng)上找一找)進(jìn)行測(cè)試,對(duì)單查詢(xún)接口和請(qǐng)求合并接口分別進(jìn)行了20000次的請(qǐng)求,以下是結(jié)果對(duì)比:
(1)單查詢(xún)運(yùn)行結(jié)果:
可以看到系統(tǒng)開(kāi)辟了許多線(xiàn)程來(lái)處理請(qǐng)求,jemeter的聚合報(bào)告如下:
(2)請(qǐng)求合并運(yùn)行結(jié)果:
可以看到多條線(xiàn)程被合并成了一條線(xiàn)程來(lái)處理,jemeter的聚合報(bào)告如下:
通過(guò)以上數(shù)據(jù),我們可以看到,請(qǐng)求合并比單查詢(xún)的吞吐量要大,并且在運(yùn)行過(guò)程中,流量浮動(dòng)的范圍比較小。
至此,我們就完成了利用請(qǐng)求合并發(fā)揮高并發(fā)下批量處理的優(yōu)勢(shì)。
心得
本文我利用了LinkedBlockingDeque阻塞隊(duì)列、ScheduledExecutorService定時(shí)執(zhí)行線(xiàn)程池和CompletableFuture線(xiàn)程通信來(lái)完成了請(qǐng)求合并的demo。
并且通過(guò)實(shí)驗(yàn)證明高并發(fā)下批量處理比單個(gè)處理更有優(yōu)勢(shì)。
當(dāng)然,我的demo并不完善。而且請(qǐng)求合并也有一些弊端,比如如果定時(shí)線(xiàn)程池的間隔時(shí)間比較長(zhǎng),反而會(huì)造成請(qǐng)求堆積時(shí)間太長(zhǎng),用戶(hù)不能快速得到響應(yīng)。
同時(shí)在請(qǐng)求數(shù)量比較小時(shí),請(qǐng)求合并的場(chǎng)景也是沒(méi)有必要的。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringMVC MVC架構(gòu)與Servlet使用詳解
MVC設(shè)計(jì)模式一般指 MVC 框架,M(Model)指數(shù)據(jù)模型層,V(View)指視圖層,C(Controller)指控制層。使用 MVC 的目的是將 M 和 V 的實(shí)現(xiàn)代碼分離,使同一個(gè)程序可以有不同的表現(xiàn)形式。其中,View 的定義比較清晰,就是用戶(hù)界面2022-10-10詳解SpringSecurity處理會(huì)話(huà)管理和注銷(xiāo)功能
本文介紹了詳解SpringSecurity處理會(huì)話(huà)管理和注銷(xiāo)功能,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2025-01-01java實(shí)現(xiàn)從網(wǎng)上下載圖片到本地的方法
這篇文章主要介紹了java實(shí)現(xiàn)從網(wǎng)上下載圖片到本地的方法,涉及java針對(duì)文件操作的相關(guān)技巧,非常簡(jiǎn)單實(shí)用,需要的朋友可以參考下2015-07-07Java Socket編程(四) 重復(fù)和并發(fā)服務(wù)器
Java Socket編程(四) 重復(fù)和并發(fā)服務(wù)器...2006-12-12java synchronized實(shí)現(xiàn)可見(jiàn)性過(guò)程解析
這篇文章主要介紹了java synchronized實(shí)現(xiàn)可見(jiàn)性過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-09-09SpringBoot中AOP的動(dòng)態(tài)匹配和靜態(tài)匹配詳解
這篇文章主要介紹了SpringBoot中AOP的動(dòng)態(tài)匹配和靜態(tài)匹配詳解,在創(chuàng)建代理的時(shí)候?qū)δ繕?biāo)類(lèi)的每個(gè)連接點(diǎn)使用靜態(tài)切點(diǎn)檢查,如果僅通過(guò)靜態(tài)切點(diǎn)檢查就可以知道連接點(diǎn)是不匹配的,則在運(yùn)行時(shí)就不再進(jìn)行動(dòng)態(tài)檢查了,需要的朋友可以參考下2023-09-09Java基于中介者模式實(shí)現(xiàn)多人聊天室功能示例
這篇文章主要介紹了Java基于中介者模式實(shí)現(xiàn)多人聊天室功能,詳細(xì)分析了中介者模式的概念、原理以及使用中介模式實(shí)現(xiàn)多人聊天的步驟、操作技巧與注意事項(xiàng),需要的朋友可以參考下2018-05-05Java并發(fā) CompletableFuture異步編程的實(shí)現(xiàn)
這篇文章主要介紹了Java并發(fā) CompletableFuture異步編程的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-01-01