Java高并發(fā)下請求合并處理方式
Java高并發(fā)下請求合并處理
場景描述
在大并發(fā)量下每秒有一萬個請求向后端查詢數(shù)據(jù),這樣我們就需要向后端請求一萬次,甚至查詢一萬次數(shù)據(jù)庫。
我們要做的請求合并就是每隔一段時間(10ms)將這段時間內(nèi)的請求合并到一起進行批量查詢,減少查詢數(shù)據(jù)庫的次數(shù)。
思考
1、如何存放一段時間內(nèi)的請求?這里我們可以用隊列。
2、如何每隔一段時間執(zhí)行任務(wù)?用定時任務(wù)線程池。
3、每個請求都是單獨的線程,如何保證各個請求能得到自己的查詢結(jié)果?這里我們使用callable返回查詢結(jié)果,在沒有查到結(jié)果前阻塞線程。
下面來看看具體實現(xiàn)的demo
package cn.codingxiaxw.combine;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.stream.Collectors;
@Service
public class QueryService {
//用來存放請求的隊列,我們將請求封裝成了一個Request對象
private LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue<>() ;
//這個是我們的單個的查詢方法,假設(shè)每隔請求都根據(jù)唯一的code進行查詢
public Map<String,Object> query(String code){
//這個request是我們自定義的內(nèi)部類
Request request = new Request();
request.code = code;
CompletableFuture<Map<String,Object>> future = new CompletableFuture<>();
request.future = future;
queue.add(request);
//阻塞 直到返回結(jié)果
try {
return future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return null;
}
//這個是個模擬批量查詢的方法
public List<Map<String,Object>> batchQuery(List<String> codes){
return null;
}
//封裝的請求
class Request {
String code;
CompletableFuture<Map<String,Object>> future;
}
@PostConstruct
public void init(){
//在init方法中初始化一個定時任務(wù)線程,去定時執(zhí)行我們的查詢?nèi)蝿?wù).具體的任務(wù)實現(xiàn)是我們根據(jù)唯一code查詢出來的結(jié)果集,以code為key轉(zhuǎn)成map,然后我們隊列中的每個Request對象都有自己的唯一code,我們根據(jù)code一一對應(yīng),給相應(yīng)的future返回對應(yīng)的查詢結(jié)果。
ScheduledExecutorService poolExecutor = new ScheduledThreadPoolExecutor(1);
poolExecutor.scheduleAtFixedRate(()->{
int size = queue.size();
//如果沒有請求直接返回
if(size==0)
return ;
List<Request> list = new ArrayList<>();
for (int i = 0; i < size;i++){
Request request = queue.poll();
list.add(request);
}
System.out.println("批量處理:"+size);
List<String> codes = list.stream().map(s->s.code).collect(Collectors.toList());
//合并之后的結(jié)果集
List<Map<String, Object>> batchResult = batchQuery(codes);
Map<String,Map<String,Object>> responseMap = new HashMap<>();
for (Map<String,Object> result : batchResult) {
String code = result.get("code").toString();
responseMap.put(code,result);
}
//返回對應(yīng)的請求結(jié)果
for (Request request : list) {
Map<String, Object> response = responseMap.get(request.code);
request.future.complete(response);
}
},0,10,TimeUnit.MILLISECONDS);
}
}利用請求合并發(fā)揮高并發(fā)下批量處理的優(yōu)勢
需求分析
我們經(jīng)常會寫一些查詢接口,假設(shè)現(xiàn)在我們需要寫一個查詢用戶信息的接口,要求傳入用戶Id,返回用戶名稱。
那么最簡化的流程就是:用戶發(fā)送請求->controller層->service層->dao層->數(shù)據(jù)庫。
每次請求就相當(dāng)于請求一條用戶信息。
當(dāng)這個接口被用戶頻繁請求時,此接口就不斷的在做“請求”到“返回”的操作,服務(wù)端同時會開辟許多線程幫我們執(zhí)行這些操作,這么多的線程會消耗許多系統(tǒng)資源,服務(wù)端承受了巨大壓力。
//單查詢接口
@GetMapping("/getUser")
public String getUser(Long key){
long currentMillis = System.currentTimeMillis();
//單查詢service,大量線程懟到這個service上去
String userName = userService.getUser(key);
System.out.printf("##############################################\n");
System.out.printf("用戶名為:" + userName + "---線程名為:" + Thread.currentThread().getName() +
"---執(zhí)行時間為:" + (System.currentTimeMillis() - currentMillis) + "\n");
return userName;
}那么我們有什么方式可以優(yōu)化這種操作呢?
我目前能想到的就是利用緩存(緩存熱點數(shù)據(jù))、消息隊列(接收請求慢慢消費達到流量削峰)、多個服務(wù)實例(分散請求壓力提高計算能力)等方式應(yīng)對高并發(fā)場景。
在本文中,我利用另一種思路:把多個請求合并為一個請求,把單查詢變?yōu)榕坎樵?,這樣就能有效減少開辟線程的數(shù)量。
具體實現(xiàn)
首先我們定義一個用戶請求類Request:
//用戶請求類
public class RequestTest {
//請求條件
private Long key;
//傳話人
private CompletableFuture<String> future;
public CompletableFuture<String> getFuture() {
return future;
}
public void setFuture(CompletableFuture<String> future) {
this.future = future;
}
public Long getKey() {
return key;
}
public void setKey(Long key) {
this.key = key;
}
}接著是請求合并的主要代碼:
//存放請求的隊列
LinkedBlockingDeque<RequestTest> queue = new LinkedBlockingDeque<>();
//初始化方法
@PostConstruct
public void init() {
//定時執(zhí)行的線程池,每隔5毫秒執(zhí)行一次(間隔時間可以由業(yè)務(wù)決定),把所有堆積的請求
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
executorService.scheduleAtFixedRate(() -> {
//在這里具體執(zhí)行批量查詢邏輯
int size = queue.size();
if (size == 0) {
//若沒有請求堆積,直接返回,等10毫秒再執(zhí)行一次
return;
}
//若有請求堆積把所有請求都拿出來
List<RequestTest> requestTests = new ArrayList<>();
for (int i = 0; i < size; i++) {
//把請求拿出來
RequestTest poll = queue.poll();
requestTests.add(poll);
}
//至此請求已經(jīng)被合并了
System.out.printf("##############################################\n");
System.out.printf("請求合并了" + requestTests.size() + "條!\n");
//組裝批量查詢條件
List<Long> keyList = new ArrayList<>();
for (RequestTest requestTest : requestTests) {
keyList.add(requestTest.getKey());
}
//進行批量查詢
List<User> nameList = userService.getUserList(keyList);
//把批查結(jié)果放入一個map
Map<Long,String> map = new HashMap<>();
for(User user:nameList){
map.put(user.getId(),user.getName());
}
for (RequestTest requestTest : requestTests){
//把放在map中的結(jié)果集放回給對應(yīng)的線程
//future是對應(yīng)每個請求的,因為是每個請求線程都傳了自己的future是對應(yīng)的過來
requestTest.getFuture().complete(map.get(requestTest.getKey()));
}
}, 0, 5, TimeUnit.MILLISECONDS);
}
//請求合并
@GetMapping("/requestMerge/getUser")
public String getUserRequestMerge(Long key) throws InterruptedException, ExecutionException {
long currentMillis = System.currentTimeMillis();
//CompletableFuture可以使一個線程執(zhí)行操作后,主動返回值給另一個線程
CompletableFuture<String> future = new CompletableFuture<>();
RequestTest requestTest = new RequestTest();
//把future(把future可以認為是線程間的"傳話人")放到等待隊列中去,讓定時調(diào)度的線程池執(zhí)行并返回值
requestTest.setFuture(future);
requestTest.setKey(key);
//把requestTest加入等待隊列(LinkedBlockingDeque)
queue.add(requestTest);
//future(傳話人)阻塞直到有值返回
String userName = future.get();
System.out.printf("用戶名為:" + userName + "---線程名為:"+Thread.currentThread().getName()+
"---執(zhí)行時間為:"+(System.currentTimeMillis() - currentMillis)+"\n");
return userName;
}到這里我們就完成了一個請求合并的demo,接著我們測試運行結(jié)果。
這里我用了jemeter(jemeter的用法可以網(wǎng)上找一找)進行測試,對單查詢接口和請求合并接口分別進行了20000次的請求,以下是結(jié)果對比:
(1)單查詢運行結(jié)果:

可以看到系統(tǒng)開辟了許多線程來處理請求,jemeter的聚合報告如下:

(2)請求合并運行結(jié)果:

可以看到多條線程被合并成了一條線程來處理,jemeter的聚合報告如下:

通過以上數(shù)據(jù),我們可以看到,請求合并比單查詢的吞吐量要大,并且在運行過程中,流量浮動的范圍比較小。
至此,我們就完成了利用請求合并發(fā)揮高并發(fā)下批量處理的優(yōu)勢。
心得
本文我利用了LinkedBlockingDeque阻塞隊列、ScheduledExecutorService定時執(zhí)行線程池和CompletableFuture線程通信來完成了請求合并的demo。
并且通過實驗證明高并發(fā)下批量處理比單個處理更有優(yōu)勢。
當(dāng)然,我的demo并不完善。而且請求合并也有一些弊端,比如如果定時線程池的間隔時間比較長,反而會造成請求堆積時間太長,用戶不能快速得到響應(yīng)。
同時在請求數(shù)量比較小時,請求合并的場景也是沒有必要的。
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringMVC MVC架構(gòu)與Servlet使用詳解
MVC設(shè)計模式一般指 MVC 框架,M(Model)指數(shù)據(jù)模型層,V(View)指視圖層,C(Controller)指控制層。使用 MVC 的目的是將 M 和 V 的實現(xiàn)代碼分離,使同一個程序可以有不同的表現(xiàn)形式。其中,View 的定義比較清晰,就是用戶界面2022-10-10
java實現(xiàn)從網(wǎng)上下載圖片到本地的方法
這篇文章主要介紹了java實現(xiàn)從網(wǎng)上下載圖片到本地的方法,涉及java針對文件操作的相關(guān)技巧,非常簡單實用,需要的朋友可以參考下2015-07-07
Java Socket編程(四) 重復(fù)和并發(fā)服務(wù)器
Java Socket編程(四) 重復(fù)和并發(fā)服務(wù)器...2006-12-12
java synchronized實現(xiàn)可見性過程解析
這篇文章主要介紹了java synchronized實現(xiàn)可見性過程解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-09-09
SpringBoot中AOP的動態(tài)匹配和靜態(tài)匹配詳解
這篇文章主要介紹了SpringBoot中AOP的動態(tài)匹配和靜態(tài)匹配詳解,在創(chuàng)建代理的時候?qū)δ繕?biāo)類的每個連接點使用靜態(tài)切點檢查,如果僅通過靜態(tài)切點檢查就可以知道連接點是不匹配的,則在運行時就不再進行動態(tài)檢查了,需要的朋友可以參考下2023-09-09
Java并發(fā) CompletableFuture異步編程的實現(xiàn)
這篇文章主要介紹了Java并發(fā) CompletableFuture異步編程的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-01-01

