欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java高并發(fā)下請求合并處理方式

 更新時間:2023年08月28日 15:08:21   作者:Soda_lw  
這篇文章主要介紹了Java高并發(fā)下請求合并處理方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教

Java高并發(fā)下請求合并處理

場景描述

在大并發(fā)量下每秒有一萬個請求向后端查詢數(shù)據(jù),這樣我們就需要向后端請求一萬次,甚至查詢一萬次數(shù)據(jù)庫。

我們要做的請求合并就是每隔一段時間(10ms)將這段時間內(nèi)的請求合并到一起進行批量查詢,減少查詢數(shù)據(jù)庫的次數(shù)。

思考

1、如何存放一段時間內(nèi)的請求?這里我們可以用隊列。

2、如何每隔一段時間執(zhí)行任務(wù)?用定時任務(wù)線程池。

3、每個請求都是單獨的線程,如何保證各個請求能得到自己的查詢結(jié)果?這里我們使用callable返回查詢結(jié)果,在沒有查到結(jié)果前阻塞線程。

下面來看看具體實現(xiàn)的demo

package cn.codingxiaxw.combine;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.stream.Collectors;
@Service
public class QueryService {
//用來存放請求的隊列,我們將請求封裝成了一個Request對象
 private LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue<>() ;
       //這個是我們的單個的查詢方法,假設(shè)每隔請求都根據(jù)唯一的code進行查詢
  public Map<String,Object> query(String code){
        //這個request是我們自定義的內(nèi)部類
        Request request = new Request();
        request.code = code;
        CompletableFuture<Map<String,Object>> future = new CompletableFuture<>();
        request.future = future;
        queue.add(request);
        //阻塞 直到返回結(jié)果
        try {
            return future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return null;
    }
 //這個是個模擬批量查詢的方法
    public List<Map<String,Object>> batchQuery(List<String> codes){
        return null;
    }
    //封裝的請求
    class Request {
        String code;
        CompletableFuture<Map<String,Object>> future;
    }
    @PostConstruct
    public void init(){
    //在init方法中初始化一個定時任務(wù)線程,去定時執(zhí)行我們的查詢?nèi)蝿?wù).具體的任務(wù)實現(xiàn)是我們根據(jù)唯一code查詢出來的結(jié)果集,以code為key轉(zhuǎn)成map,然后我們隊列中的每個Request對象都有自己的唯一code,我們根據(jù)code一一對應(yīng),給相應(yīng)的future返回對應(yīng)的查詢結(jié)果。
        ScheduledExecutorService poolExecutor = new ScheduledThreadPoolExecutor(1);
        poolExecutor.scheduleAtFixedRate(()->{
            int size = queue.size();
            //如果沒有請求直接返回
            if(size==0)
                return ;
            List<Request> list = new ArrayList<>();
            for (int i = 0; i < size;i++){
                Request request = queue.poll();
                list.add(request);
            }
            System.out.println("批量處理:"+size);
            List<String> codes = list.stream().map(s->s.code).collect(Collectors.toList());
            //合并之后的結(jié)果集
            List<Map<String, Object>> batchResult = batchQuery(codes);
            Map<String,Map<String,Object>> responseMap = new HashMap<>();
            for (Map<String,Object> result : batchResult) {
                String code = result.get("code").toString();
                responseMap.put(code,result);
            }
            //返回對應(yīng)的請求結(jié)果
            for (Request request : list) {
                Map<String, Object> response = responseMap.get(request.code);
                request.future.complete(response);
            }
        },0,10,TimeUnit.MILLISECONDS);
    }
}

利用請求合并發(fā)揮高并發(fā)下批量處理的優(yōu)勢

需求分析

我們經(jīng)常會寫一些查詢接口,假設(shè)現(xiàn)在我們需要寫一個查詢用戶信息的接口,要求傳入用戶Id,返回用戶名稱。

那么最簡化的流程就是:用戶發(fā)送請求->controller層->service層->dao層->數(shù)據(jù)庫。

每次請求就相當(dāng)于請求一條用戶信息。

當(dāng)這個接口被用戶頻繁請求時,此接口就不斷的在做“請求”到“返回”的操作,服務(wù)端同時會開辟許多線程幫我們執(zhí)行這些操作,這么多的線程會消耗許多系統(tǒng)資源,服務(wù)端承受了巨大壓力。

//單查詢接口
@GetMapping("/getUser")
    public String getUser(Long key){
        long currentMillis = System.currentTimeMillis();
        //單查詢service,大量線程懟到這個service上去
        String userName = userService.getUser(key);
        System.out.printf("##############################################\n");
        System.out.printf("用戶名為:" + userName + "---線程名為:" + Thread.currentThread().getName() +
                "---執(zhí)行時間為:" + (System.currentTimeMillis() - currentMillis) + "\n");
        return userName;
    }

那么我們有什么方式可以優(yōu)化這種操作呢?

我目前能想到的就是利用緩存(緩存熱點數(shù)據(jù))、消息隊列(接收請求慢慢消費達到流量削峰)、多個服務(wù)實例(分散請求壓力提高計算能力)等方式應(yīng)對高并發(fā)場景。

在本文中,我利用另一種思路:把多個請求合并為一個請求,把單查詢變?yōu)榕坎樵?,這樣就能有效減少開辟線程的數(shù)量。

具體實現(xiàn)

首先我們定義一個用戶請求類Request:

//用戶請求類
public class RequestTest {
    //請求條件
    private Long key;
    //傳話人
    private CompletableFuture<String> future;
    public CompletableFuture<String> getFuture() {
        return future;
    }
    public void setFuture(CompletableFuture<String> future) {
        this.future = future;
    }
    public Long getKey() {
        return key;
    }
    public void setKey(Long key) {
        this.key = key;
    }
}

接著是請求合并的主要代碼:

//存放請求的隊列
    LinkedBlockingDeque<RequestTest> queue = new LinkedBlockingDeque<>();
    //初始化方法
    @PostConstruct
    public void init() {
        //定時執(zhí)行的線程池,每隔5毫秒執(zhí)行一次(間隔時間可以由業(yè)務(wù)決定),把所有堆積的請求
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.scheduleAtFixedRate(() -> {
            //在這里具體執(zhí)行批量查詢邏輯
            int size = queue.size();
            if (size == 0) {
                //若沒有請求堆積,直接返回,等10毫秒再執(zhí)行一次
                return;
            }
            //若有請求堆積把所有請求都拿出來
            List<RequestTest> requestTests = new ArrayList<>();
            for (int i = 0; i < size; i++) {
                //把請求拿出來
                RequestTest poll = queue.poll();
                requestTests.add(poll);
            }
            //至此請求已經(jīng)被合并了
            System.out.printf("##############################################\n");
            System.out.printf("請求合并了" + requestTests.size() + "條!\n");
            //組裝批量查詢條件
            List<Long> keyList = new ArrayList<>();
            for (RequestTest requestTest : requestTests) {
                keyList.add(requestTest.getKey());
            }
            //進行批量查詢
            List<User> nameList = userService.getUserList(keyList);
            //把批查結(jié)果放入一個map
            Map<Long,String> map = new HashMap<>();
            for(User user:nameList){
                map.put(user.getId(),user.getName());
            }
            for (RequestTest requestTest : requestTests){
                //把放在map中的結(jié)果集放回給對應(yīng)的線程
                //future是對應(yīng)每個請求的,因為是每個請求線程都傳了自己的future是對應(yīng)的過來
                requestTest.getFuture().complete(map.get(requestTest.getKey()));
            }
        }, 0, 5, TimeUnit.MILLISECONDS);
    }
    //請求合并
    @GetMapping("/requestMerge/getUser")
    public String getUserRequestMerge(Long key) throws InterruptedException, ExecutionException {
        long currentMillis = System.currentTimeMillis();
        //CompletableFuture可以使一個線程執(zhí)行操作后,主動返回值給另一個線程
        CompletableFuture<String> future = new CompletableFuture<>();
        RequestTest requestTest = new RequestTest();
        //把future(把future可以認為是線程間的"傳話人")放到等待隊列中去,讓定時調(diào)度的線程池執(zhí)行并返回值
        requestTest.setFuture(future);
        requestTest.setKey(key);
        //把requestTest加入等待隊列(LinkedBlockingDeque)
        queue.add(requestTest);
        //future(傳話人)阻塞直到有值返回
        String userName = future.get();
        System.out.printf("用戶名為:" + userName + "---線程名為:"+Thread.currentThread().getName()+
                "---執(zhí)行時間為:"+(System.currentTimeMillis() - currentMillis)+"\n");
        return userName;
    }

到這里我們就完成了一個請求合并的demo,接著我們測試運行結(jié)果。

這里我用了jemeter(jemeter的用法可以網(wǎng)上找一找)進行測試,對單查詢接口和請求合并接口分別進行了20000次的請求,以下是結(jié)果對比:

(1)單查詢運行結(jié)果:

可以看到系統(tǒng)開辟了許多線程來處理請求,jemeter的聚合報告如下:

(2)請求合并運行結(jié)果:

可以看到多條線程被合并成了一條線程來處理,jemeter的聚合報告如下:

通過以上數(shù)據(jù),我們可以看到,請求合并比單查詢的吞吐量要大,并且在運行過程中,流量浮動的范圍比較小。

至此,我們就完成了利用請求合并發(fā)揮高并發(fā)下批量處理的優(yōu)勢。

心得

本文我利用了LinkedBlockingDeque阻塞隊列、ScheduledExecutorService定時執(zhí)行線程池和CompletableFuture線程通信來完成了請求合并的demo。

并且通過實驗證明高并發(fā)下批量處理比單個處理更有優(yōu)勢。

當(dāng)然,我的demo并不完善。而且請求合并也有一些弊端,比如如果定時線程池的間隔時間比較長,反而會造成請求堆積時間太長,用戶不能快速得到響應(yīng)。

同時在請求數(shù)量比較小時,請求合并的場景也是沒有必要的。

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • SpringMVC MVC架構(gòu)與Servlet使用詳解

    SpringMVC MVC架構(gòu)與Servlet使用詳解

    MVC設(shè)計模式一般指 MVC 框架,M(Model)指數(shù)據(jù)模型層,V(View)指視圖層,C(Controller)指控制層。使用 MVC 的目的是將 M 和 V 的實現(xiàn)代碼分離,使同一個程序可以有不同的表現(xiàn)形式。其中,View 的定義比較清晰,就是用戶界面
    2022-10-10
  • 詳解SpringSecurity處理會話管理和注銷功能

    詳解SpringSecurity處理會話管理和注銷功能

    本文介紹了詳解SpringSecurity處理會話管理和注銷功能,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2025-01-01
  • java實現(xiàn)從網(wǎng)上下載圖片到本地的方法

    java實現(xiàn)從網(wǎng)上下載圖片到本地的方法

    這篇文章主要介紹了java實現(xiàn)從網(wǎng)上下載圖片到本地的方法,涉及java針對文件操作的相關(guān)技巧,非常簡單實用,需要的朋友可以參考下
    2015-07-07
  • Java Socket編程(四) 重復(fù)和并發(fā)服務(wù)器

    Java Socket編程(四) 重復(fù)和并發(fā)服務(wù)器

    Java Socket編程(四) 重復(fù)和并發(fā)服務(wù)器...
    2006-12-12
  • java synchronized實現(xiàn)可見性過程解析

    java synchronized實現(xiàn)可見性過程解析

    這篇文章主要介紹了java synchronized實現(xiàn)可見性過程解析,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下
    2019-09-09
  • Mybatis 自定義類型處理器示例詳解

    Mybatis 自定義類型處理器示例詳解

    在某些情況下我們需要對類型做處理,例如數(shù)據(jù)存儲的是Long,程序里是BigDecimal,那么我們出庫入庫都需要做處理,此時就可以使用類型處理器,本文通過示例給大家介紹Mybatis 自定義類型處理器的相關(guān)知識,感興趣的朋友跟隨小編一起看看吧
    2023-10-10
  • SpringBoot中AOP的動態(tài)匹配和靜態(tài)匹配詳解

    SpringBoot中AOP的動態(tài)匹配和靜態(tài)匹配詳解

    這篇文章主要介紹了SpringBoot中AOP的動態(tài)匹配和靜態(tài)匹配詳解,在創(chuàng)建代理的時候?qū)δ繕?biāo)類的每個連接點使用靜態(tài)切點檢查,如果僅通過靜態(tài)切點檢查就可以知道連接點是不匹配的,則在運行時就不再進行動態(tài)檢查了,需要的朋友可以參考下
    2023-09-09
  • Java基于中介者模式實現(xiàn)多人聊天室功能示例

    Java基于中介者模式實現(xiàn)多人聊天室功能示例

    這篇文章主要介紹了Java基于中介者模式實現(xiàn)多人聊天室功能,詳細分析了中介者模式的概念、原理以及使用中介模式實現(xiàn)多人聊天的步驟、操作技巧與注意事項,需要的朋友可以參考下
    2018-05-05
  • 詳解commons-pool2池化技術(shù)

    詳解commons-pool2池化技術(shù)

    本文主要是分析commons-pool2池化技術(shù)的實現(xiàn)方案,希望通過本文能讓讀者對commons-pool2的實現(xiàn)原理一個更全面的了解
    2021-06-06
  • Java并發(fā) CompletableFuture異步編程的實現(xiàn)

    Java并發(fā) CompletableFuture異步編程的實現(xiàn)

    這篇文章主要介紹了Java并發(fā) CompletableFuture異步編程的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-01-01

最新評論