欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java高并發(fā)下請(qǐng)求合并處理方式

 更新時(shí)間:2023年08月28日 15:08:21   作者:Soda_lw  
這篇文章主要介紹了Java高并發(fā)下請(qǐng)求合并處理方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教

Java高并發(fā)下請(qǐng)求合并處理

場(chǎng)景描述

在大并發(fā)量下每秒有一萬(wàn)個(gè)請(qǐng)求向后端查詢(xún)數(shù)據(jù),這樣我們就需要向后端請(qǐng)求一萬(wàn)次,甚至查詢(xún)一萬(wàn)次數(shù)據(jù)庫(kù)。

我們要做的請(qǐng)求合并就是每隔一段時(shí)間(10ms)將這段時(shí)間內(nèi)的請(qǐng)求合并到一起進(jìn)行批量查詢(xún),減少查詢(xún)數(shù)據(jù)庫(kù)的次數(shù)。

思考

1、如何存放一段時(shí)間內(nèi)的請(qǐng)求?這里我們可以用隊(duì)列。

2、如何每隔一段時(shí)間執(zhí)行任務(wù)?用定時(shí)任務(wù)線(xiàn)程池。

3、每個(gè)請(qǐng)求都是單獨(dú)的線(xiàn)程,如何保證各個(gè)請(qǐng)求能得到自己的查詢(xún)結(jié)果?這里我們使用callable返回查詢(xún)結(jié)果,在沒(méi)有查到結(jié)果前阻塞線(xiàn)程。

下面來(lái)看看具體實(shí)現(xiàn)的demo

package cn.codingxiaxw.combine;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.stream.Collectors;
@Service
public class QueryService {
//用來(lái)存放請(qǐng)求的隊(duì)列,我們將請(qǐng)求封裝成了一個(gè)Request對(duì)象
 private LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue<>() ;
       //這個(gè)是我們的單個(gè)的查詢(xún)方法,假設(shè)每隔請(qǐng)求都根據(jù)唯一的code進(jìn)行查詢(xún)
  public Map<String,Object> query(String code){
        //這個(gè)request是我們自定義的內(nèi)部類(lèi)
        Request request = new Request();
        request.code = code;
        CompletableFuture<Map<String,Object>> future = new CompletableFuture<>();
        request.future = future;
        queue.add(request);
        //阻塞 直到返回結(jié)果
        try {
            return future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return null;
    }
 //這個(gè)是個(gè)模擬批量查詢(xún)的方法
    public List<Map<String,Object>> batchQuery(List<String> codes){
        return null;
    }
    //封裝的請(qǐng)求
    class Request {
        String code;
        CompletableFuture<Map<String,Object>> future;
    }
    @PostConstruct
    public void init(){
    //在init方法中初始化一個(gè)定時(shí)任務(wù)線(xiàn)程,去定時(shí)執(zhí)行我們的查詢(xún)?nèi)蝿?wù).具體的任務(wù)實(shí)現(xiàn)是我們根據(jù)唯一code查詢(xún)出來(lái)的結(jié)果集,以code為key轉(zhuǎn)成map,然后我們隊(duì)列中的每個(gè)Request對(duì)象都有自己的唯一code,我們根據(jù)code一一對(duì)應(yīng),給相應(yīng)的future返回對(duì)應(yīng)的查詢(xún)結(jié)果。
        ScheduledExecutorService poolExecutor = new ScheduledThreadPoolExecutor(1);
        poolExecutor.scheduleAtFixedRate(()->{
            int size = queue.size();
            //如果沒(méi)有請(qǐng)求直接返回
            if(size==0)
                return ;
            List<Request> list = new ArrayList<>();
            for (int i = 0; i < size;i++){
                Request request = queue.poll();
                list.add(request);
            }
            System.out.println("批量處理:"+size);
            List<String> codes = list.stream().map(s->s.code).collect(Collectors.toList());
            //合并之后的結(jié)果集
            List<Map<String, Object>> batchResult = batchQuery(codes);
            Map<String,Map<String,Object>> responseMap = new HashMap<>();
            for (Map<String,Object> result : batchResult) {
                String code = result.get("code").toString();
                responseMap.put(code,result);
            }
            //返回對(duì)應(yīng)的請(qǐng)求結(jié)果
            for (Request request : list) {
                Map<String, Object> response = responseMap.get(request.code);
                request.future.complete(response);
            }
        },0,10,TimeUnit.MILLISECONDS);
    }
}

利用請(qǐng)求合并發(fā)揮高并發(fā)下批量處理的優(yōu)勢(shì)

需求分析

我們經(jīng)常會(huì)寫(xiě)一些查詢(xún)接口,假設(shè)現(xiàn)在我們需要寫(xiě)一個(gè)查詢(xún)用戶(hù)信息的接口,要求傳入用戶(hù)Id,返回用戶(hù)名稱(chēng)。

那么最簡(jiǎn)化的流程就是:用戶(hù)發(fā)送請(qǐng)求->controller層->service層->dao層->數(shù)據(jù)庫(kù)。

每次請(qǐng)求就相當(dāng)于請(qǐng)求一條用戶(hù)信息。

當(dāng)這個(gè)接口被用戶(hù)頻繁請(qǐng)求時(shí),此接口就不斷的在做“請(qǐng)求”到“返回”的操作,服務(wù)端同時(shí)會(huì)開(kāi)辟許多線(xiàn)程幫我們執(zhí)行這些操作,這么多的線(xiàn)程會(huì)消耗許多系統(tǒng)資源,服務(wù)端承受了巨大壓力。

//單查詢(xún)接口
@GetMapping("/getUser")
    public String getUser(Long key){
        long currentMillis = System.currentTimeMillis();
        //單查詢(xún)service,大量線(xiàn)程懟到這個(gè)service上去
        String userName = userService.getUser(key);
        System.out.printf("##############################################\n");
        System.out.printf("用戶(hù)名為:" + userName + "---線(xiàn)程名為:" + Thread.currentThread().getName() +
                "---執(zhí)行時(shí)間為:" + (System.currentTimeMillis() - currentMillis) + "\n");
        return userName;
    }

那么我們有什么方式可以?xún)?yōu)化這種操作呢?

我目前能想到的就是利用緩存(緩存熱點(diǎn)數(shù)據(jù))、消息隊(duì)列(接收請(qǐng)求慢慢消費(fèi)達(dá)到流量削峰)、多個(gè)服務(wù)實(shí)例(分散請(qǐng)求壓力提高計(jì)算能力)等方式應(yīng)對(duì)高并發(fā)場(chǎng)景。

在本文中,我利用另一種思路:把多個(gè)請(qǐng)求合并為一個(gè)請(qǐng)求,把單查詢(xún)變?yōu)榕坎樵?xún),這樣就能有效減少開(kāi)辟線(xiàn)程的數(shù)量。

具體實(shí)現(xiàn)

首先我們定義一個(gè)用戶(hù)請(qǐng)求類(lèi)Request:

//用戶(hù)請(qǐng)求類(lèi)
public class RequestTest {
    //請(qǐng)求條件
    private Long key;
    //傳話(huà)人
    private CompletableFuture<String> future;
    public CompletableFuture<String> getFuture() {
        return future;
    }
    public void setFuture(CompletableFuture<String> future) {
        this.future = future;
    }
    public Long getKey() {
        return key;
    }
    public void setKey(Long key) {
        this.key = key;
    }
}

接著是請(qǐng)求合并的主要代碼:

//存放請(qǐng)求的隊(duì)列
    LinkedBlockingDeque<RequestTest> queue = new LinkedBlockingDeque<>();
    //初始化方法
    @PostConstruct
    public void init() {
        //定時(shí)執(zhí)行的線(xiàn)程池,每隔5毫秒執(zhí)行一次(間隔時(shí)間可以由業(yè)務(wù)決定),把所有堆積的請(qǐng)求
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);
        executorService.scheduleAtFixedRate(() -> {
            //在這里具體執(zhí)行批量查詢(xún)邏輯
            int size = queue.size();
            if (size == 0) {
                //若沒(méi)有請(qǐng)求堆積,直接返回,等10毫秒再執(zhí)行一次
                return;
            }
            //若有請(qǐng)求堆積把所有請(qǐng)求都拿出來(lái)
            List<RequestTest> requestTests = new ArrayList<>();
            for (int i = 0; i < size; i++) {
                //把請(qǐng)求拿出來(lái)
                RequestTest poll = queue.poll();
                requestTests.add(poll);
            }
            //至此請(qǐng)求已經(jīng)被合并了
            System.out.printf("##############################################\n");
            System.out.printf("請(qǐng)求合并了" + requestTests.size() + "條!\n");
            //組裝批量查詢(xún)條件
            List<Long> keyList = new ArrayList<>();
            for (RequestTest requestTest : requestTests) {
                keyList.add(requestTest.getKey());
            }
            //進(jìn)行批量查詢(xún)
            List<User> nameList = userService.getUserList(keyList);
            //把批查結(jié)果放入一個(gè)map
            Map<Long,String> map = new HashMap<>();
            for(User user:nameList){
                map.put(user.getId(),user.getName());
            }
            for (RequestTest requestTest : requestTests){
                //把放在map中的結(jié)果集放回給對(duì)應(yīng)的線(xiàn)程
                //future是對(duì)應(yīng)每個(gè)請(qǐng)求的,因?yàn)槭敲總€(gè)請(qǐng)求線(xiàn)程都傳了自己的future是對(duì)應(yīng)的過(guò)來(lái)
                requestTest.getFuture().complete(map.get(requestTest.getKey()));
            }
        }, 0, 5, TimeUnit.MILLISECONDS);
    }
    //請(qǐng)求合并
    @GetMapping("/requestMerge/getUser")
    public String getUserRequestMerge(Long key) throws InterruptedException, ExecutionException {
        long currentMillis = System.currentTimeMillis();
        //CompletableFuture可以使一個(gè)線(xiàn)程執(zhí)行操作后,主動(dòng)返回值給另一個(gè)線(xiàn)程
        CompletableFuture<String> future = new CompletableFuture<>();
        RequestTest requestTest = new RequestTest();
        //把future(把future可以認(rèn)為是線(xiàn)程間的"傳話(huà)人")放到等待隊(duì)列中去,讓定時(shí)調(diào)度的線(xiàn)程池執(zhí)行并返回值
        requestTest.setFuture(future);
        requestTest.setKey(key);
        //把requestTest加入等待隊(duì)列(LinkedBlockingDeque)
        queue.add(requestTest);
        //future(傳話(huà)人)阻塞直到有值返回
        String userName = future.get();
        System.out.printf("用戶(hù)名為:" + userName + "---線(xiàn)程名為:"+Thread.currentThread().getName()+
                "---執(zhí)行時(shí)間為:"+(System.currentTimeMillis() - currentMillis)+"\n");
        return userName;
    }

到這里我們就完成了一個(gè)請(qǐng)求合并的demo,接著我們測(cè)試運(yùn)行結(jié)果。

這里我用了jemeter(jemeter的用法可以網(wǎng)上找一找)進(jìn)行測(cè)試,對(duì)單查詢(xún)接口和請(qǐng)求合并接口分別進(jìn)行了20000次的請(qǐng)求,以下是結(jié)果對(duì)比:

(1)單查詢(xún)運(yùn)行結(jié)果:

可以看到系統(tǒng)開(kāi)辟了許多線(xiàn)程來(lái)處理請(qǐng)求,jemeter的聚合報(bào)告如下:

(2)請(qǐng)求合并運(yùn)行結(jié)果:

可以看到多條線(xiàn)程被合并成了一條線(xiàn)程來(lái)處理,jemeter的聚合報(bào)告如下:

通過(guò)以上數(shù)據(jù),我們可以看到,請(qǐng)求合并比單查詢(xún)的吞吐量要大,并且在運(yùn)行過(guò)程中,流量浮動(dòng)的范圍比較小。

至此,我們就完成了利用請(qǐng)求合并發(fā)揮高并發(fā)下批量處理的優(yōu)勢(shì)。

心得

本文我利用了LinkedBlockingDeque阻塞隊(duì)列、ScheduledExecutorService定時(shí)執(zhí)行線(xiàn)程池和CompletableFuture線(xiàn)程通信來(lái)完成了請(qǐng)求合并的demo。

并且通過(guò)實(shí)驗(yàn)證明高并發(fā)下批量處理比單個(gè)處理更有優(yōu)勢(shì)。

當(dāng)然,我的demo并不完善。而且請(qǐng)求合并也有一些弊端,比如如果定時(shí)線(xiàn)程池的間隔時(shí)間比較長(zhǎng),反而會(huì)造成請(qǐng)求堆積時(shí)間太長(zhǎng),用戶(hù)不能快速得到響應(yīng)。

同時(shí)在請(qǐng)求數(shù)量比較小時(shí),請(qǐng)求合并的場(chǎng)景也是沒(méi)有必要的。

總結(jié)

以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • SpringMVC MVC架構(gòu)與Servlet使用詳解

    SpringMVC MVC架構(gòu)與Servlet使用詳解

    MVC設(shè)計(jì)模式一般指 MVC 框架,M(Model)指數(shù)據(jù)模型層,V(View)指視圖層,C(Controller)指控制層。使用 MVC 的目的是將 M 和 V 的實(shí)現(xiàn)代碼分離,使同一個(gè)程序可以有不同的表現(xiàn)形式。其中,View 的定義比較清晰,就是用戶(hù)界面
    2022-10-10
  • 詳解SpringSecurity處理會(huì)話(huà)管理和注銷(xiāo)功能

    詳解SpringSecurity處理會(huì)話(huà)管理和注銷(xiāo)功能

    本文介紹了詳解SpringSecurity處理會(huì)話(huà)管理和注銷(xiāo)功能,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2025-01-01
  • java實(shí)現(xiàn)從網(wǎng)上下載圖片到本地的方法

    java實(shí)現(xiàn)從網(wǎng)上下載圖片到本地的方法

    這篇文章主要介紹了java實(shí)現(xiàn)從網(wǎng)上下載圖片到本地的方法,涉及java針對(duì)文件操作的相關(guān)技巧,非常簡(jiǎn)單實(shí)用,需要的朋友可以參考下
    2015-07-07
  • Java Socket編程(四) 重復(fù)和并發(fā)服務(wù)器

    Java Socket編程(四) 重復(fù)和并發(fā)服務(wù)器

    Java Socket編程(四) 重復(fù)和并發(fā)服務(wù)器...
    2006-12-12
  • java synchronized實(shí)現(xiàn)可見(jiàn)性過(guò)程解析

    java synchronized實(shí)現(xiàn)可見(jiàn)性過(guò)程解析

    這篇文章主要介紹了java synchronized實(shí)現(xiàn)可見(jiàn)性過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下
    2019-09-09
  • Mybatis 自定義類(lèi)型處理器示例詳解

    Mybatis 自定義類(lèi)型處理器示例詳解

    在某些情況下我們需要對(duì)類(lèi)型做處理,例如數(shù)據(jù)存儲(chǔ)的是Long,程序里是BigDecimal,那么我們出庫(kù)入庫(kù)都需要做處理,此時(shí)就可以使用類(lèi)型處理器,本文通過(guò)示例給大家介紹Mybatis 自定義類(lèi)型處理器的相關(guān)知識(shí),感興趣的朋友跟隨小編一起看看吧
    2023-10-10
  • SpringBoot中AOP的動(dòng)態(tài)匹配和靜態(tài)匹配詳解

    SpringBoot中AOP的動(dòng)態(tài)匹配和靜態(tài)匹配詳解

    這篇文章主要介紹了SpringBoot中AOP的動(dòng)態(tài)匹配和靜態(tài)匹配詳解,在創(chuàng)建代理的時(shí)候?qū)δ繕?biāo)類(lèi)的每個(gè)連接點(diǎn)使用靜態(tài)切點(diǎn)檢查,如果僅通過(guò)靜態(tài)切點(diǎn)檢查就可以知道連接點(diǎn)是不匹配的,則在運(yùn)行時(shí)就不再進(jìn)行動(dòng)態(tài)檢查了,需要的朋友可以參考下
    2023-09-09
  • Java基于中介者模式實(shí)現(xiàn)多人聊天室功能示例

    Java基于中介者模式實(shí)現(xiàn)多人聊天室功能示例

    這篇文章主要介紹了Java基于中介者模式實(shí)現(xiàn)多人聊天室功能,詳細(xì)分析了中介者模式的概念、原理以及使用中介模式實(shí)現(xiàn)多人聊天的步驟、操作技巧與注意事項(xiàng),需要的朋友可以參考下
    2018-05-05
  • 詳解commons-pool2池化技術(shù)

    詳解commons-pool2池化技術(shù)

    本文主要是分析commons-pool2池化技術(shù)的實(shí)現(xiàn)方案,希望通過(guò)本文能讓讀者對(duì)commons-pool2的實(shí)現(xiàn)原理一個(gè)更全面的了解
    2021-06-06
  • Java并發(fā) CompletableFuture異步編程的實(shí)現(xiàn)

    Java并發(fā) CompletableFuture異步編程的實(shí)現(xiàn)

    這篇文章主要介紹了Java并發(fā) CompletableFuture異步編程的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-01-01

最新評(píng)論