java實現(xiàn)請求緩沖合并的示例代碼
業(yè)務(wù)背景:
我們對外提供了一個rest接口給第三方業(yè)務(wù)進行調(diào)用,但是由于第三方框架限制,導(dǎo)致會發(fā)送大量相似無效請求,例如:接口入?yún)son包含兩個字段,createBy和receiverList,完整的入?yún)son示例如下:
{ "createBy": "aa", "receiverList": [ "bb", "cc" ] }
實際第三方業(yè)務(wù)會進行多次調(diào)用接口,每次傳遞的數(shù)據(jù)可能如下:
{ "createBy": "aa", "receiverList": [ "bb" ] } 或者 { "createBy": "aa", "receiverList": [ "cc" ] } 或者 { "createBy": "bb", "receiverList": [ "cc" ] } 或者 { "createBy": "aa", "receiverList": [ "bb", "cc" ] }
所有需要對第三方業(yè)務(wù)傳遞過來的數(shù)據(jù)進行緩沖合并處理,減輕真正的后臺服務(wù)的壓力。
代碼實現(xiàn)
package com.demo; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; /** * Description: 請求合并管理類 */ @Slf4j @Component public class RequestMerger { // 線程池核心線程數(shù) private final int corePoolSize = 200; // 任務(wù)執(zhí)行超時時間,單位:毫秒 private final int timeout = 5 * 60 * 1000; // 隊列,隊列長度為Integer.MAX_VALUE private final LinkedBlockingQueue<String> requestQueue = new LinkedBlockingQueue<>(); // 定時器,所有任務(wù)共用線程池 private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(corePoolSize, new CustomizableThreadFactory("schedule-executor-")); // 是否關(guān)閉標(biāo)志 private final AtomicBoolean isShutdown = new AtomicBoolean(false); /** * 構(gòu)造函數(shù),用于初始化請求合并器。 * * @param batchSize 每次合并的最大請求數(shù)量。 * @param delayMillis 合并請求的周期間隔,單位為毫秒。 */ public RequestMerger(int batchSize, long delayMillis) { // 啟動定時器,定期合并請求,延遲delayMillis后開始,之后每隔delayMillis執(zhí)行一次 scheduler.scheduleAtFixedRate(() -> { if (!isShutdown.get()) { List<String> batch = new ArrayList<>(batchSize); int drainedCount = requestQueue.drainTo(batch, batchSize); log.info("==>scheduler,drainedCount:{},nowQueueCount:{}", drainedCount, requestQueue.size()); if (!batch.isEmpty()) { // 異步執(zhí)行任務(wù),防止業(yè)務(wù)執(zhí)行時間過長導(dǎo)致業(yè)務(wù)整體延遲過大 scheduler.submit(() -> { sendRequestBatch(batch); }); } } }, delayMillis, delayMillis, TimeUnit.MILLISECONDS); } /** * 發(fā)送請求批次的方法。 * * @param batch 請求批次。 */ private void sendRequestBatch(List<String> batch) { Future<?> future = scheduler.submit(() -> { try { // 在這里實現(xiàn)你的請求發(fā)送邏輯 // 可以使用HTTP客戶端庫(如Apache HttpClient或OkHttp)來發(fā)送請求 // ... System.out.println("Sending batch of " + batch.size() + " requests"); } catch (Exception e) { // 異常處理邏輯 System.err.println("Error sending requests: " + e.getMessage()); } }); // 嘗試獲取任務(wù)結(jié)果,如果超過超時時間則拋出TimeoutException異常,進行取消任務(wù) try { // 超時時間,單位:毫秒 future.get(timeout, TimeUnit.MILLISECONDS); } catch (TimeoutException | ExecutionException e) { // 超時或執(zhí)行異常時取消任務(wù) future.cancel(true); } catch (Exception e) { log.error("==>任務(wù)執(zhí)行異常", e); // 任務(wù)執(zhí)行異常 future.cancel(true); } } /** * 在對象銷毀前執(zhí)行的關(guān)閉操作。 * 該方法從請求隊列中拉取所有未處理的請求,并將它們批量發(fā)送。 * 無參數(shù)和返回值。 */ @PreDestroy public void shutdown() { isShutdown.set(true); List<String> batch = new ArrayList<>(); // 獲取請求隊列中的剩余所有請求 int drainedCount = requestQueue.drainTo(batch); log.info("==>shutdown,drainedCount:{},nowQueueCount:{}", drainedCount, requestQueue.size()); // 批量發(fā)送收集到的剩余請求 sendRequestBatch(batch); // 關(guān)閉定時執(zhí)行器 scheduler.shutdown(); try { if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) { log.error("Scheduler did not terminate gracefully within 60 seconds, force shutting down."); scheduler.shutdownNow(); } } catch (InterruptedException e) { log.warn("Interrupted during scheduler termination, force shutting down."); scheduler.shutdownNow(); Thread.currentThread().interrupt(); } } /** * 向請求隊列中添加一個請求。如果服務(wù)未關(guān)閉,則直接添加到請求隊列中; * 如果服務(wù)已關(guān)閉,則將該請求作為一批請求發(fā)送。 * * @param request 要添加的請求字符串。 */ public void addRequest(String request) throws InterruptedException { // 檢查服務(wù)是否已關(guān)閉 if (!isShutdown.get()) { // 未關(guān)閉,直接添加到請求隊列 requestQueue.put(request); } else { // 已關(guān)閉,將當(dāng)前請求作為一批發(fā)送 List<String> batch = new ArrayList<>(); batch.add(request); sendRequestBatch(batch); } } }
參考資料
注意:此代碼容易導(dǎo)致數(shù)據(jù)丟失。例如:調(diào)用add方法將10個元素放入隊列,但是真正獲取到9個元素。
造成原因:FlushThread#add()中使用offer方法將數(shù)據(jù)放入隊列,如果此時隊列已滿,返回值為false,實際數(shù)據(jù)未進入隊列,需要額外對數(shù)據(jù)進行處理。
修改建議:調(diào)大隊列長度,并且將offer方法改為put方法,保證數(shù)據(jù)最終進入隊列。
到此這篇關(guān)于java實現(xiàn)請求緩沖合并的文章就介紹到這了,更多相關(guān)java請求緩沖合并內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
淺談Spring中@Transactional事務(wù)回滾及示例(附源碼)
本篇文章主要介紹了淺談Spring中@Transactional事務(wù)回滾及示例(附源碼),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-12-12Java調(diào)用Python腳本傳遞數(shù)據(jù)并返回計算結(jié)果
實際工程項目中可能會用到Java和python兩種語言結(jié)合進行,這樣就會涉及到一個問題,Java如何調(diào)用Python腳本,感興趣的可以了解一下2021-05-05Java用數(shù)組實現(xiàn)循環(huán)隊列的示例
下面小編就為大家?guī)硪黄狫ava用數(shù)組實現(xiàn)循環(huán)隊列的示例。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-09-09基于HTML5+js+Java實現(xiàn)單文件文件上傳到服務(wù)器功能
應(yīng)公司要求,在HTML5頁面上實現(xiàn)上傳文件到服務(wù)器功能,對于我這樣的菜鳥,真是把我難住了,最后還是請教大神搞定的,下面小編把例子分享到腳本之家平臺,供大家參考2017-08-08最流行的java后臺框架spring quartz定時任務(wù)
近日項目開發(fā)中需要執(zhí)行一些定時任務(wù),比如需要在每天凌晨時候,分析一次前一天的日志信息,借此機會整理了一下定時任務(wù)的幾種實現(xiàn)方式,由于項目采用spring框架,所以我都將結(jié)合spring框架來介紹2015-12-12SpringBoot定制JSON響應(yīng)數(shù)據(jù)的實現(xiàn)
本文主要介紹了SpringBoot定制JSON響應(yīng)數(shù)據(jù)的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2025-02-02