Spring WebFlux 流式數(shù)據(jù)拉取與推送的實(shí)現(xiàn)
本文介紹了使用Spring WebFlux實(shí)現(xiàn)流式數(shù)據(jù)拉取與推送的方案。文章首先展示了流式返回?cái)?shù)據(jù)的格式(類似DeepSeek大模型的推送模式),然后詳細(xì)講解了三個(gè)核心實(shí)現(xiàn)部分:1)通過(guò)Flux.create實(shí)現(xiàn)流式響應(yīng)數(shù)據(jù)的橋接轉(zhuǎn)發(fā);2)配置OkHttpClient的HTTP客戶端參數(shù)(特別是readTimeout和callTimeout設(shè)為0以支持流式傳輸);3)核心數(shù)據(jù)獲取方法queryDifficultFaultMessage的實(shí)現(xiàn),包括異步請(qǐng)求處理、錯(cuò)誤處理和取消訂閱機(jī)制。該方案實(shí)現(xiàn)了后端對(duì)原始數(shù)
前言
1,流式返回?cái)?shù)據(jù)類型如下,是不斷的推送數(shù)據(jù),類似于主流DeepSeek大模型模式,推送數(shù)據(jù)一點(diǎn)點(diǎn)推送,直至推送結(jié)束或者主動(dòng)點(diǎn)擊停止
data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":""}
data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":""}
data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":"故障"}
data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":"根"}
data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":"因"}
....
data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"agent_message","answer":":\n\n"}
data:{"conversationId":"eef6023a-6ace-420f-ad46-324aa53decf8","event":"message_end","answer":"一"}```2,流式接口又稱基于響應(yīng)式編程(Reactive Programming)的服務(wù)器端到服務(wù)器端(Service-to-Service)的流式數(shù)據(jù)拉取與推送的實(shí)現(xiàn)。
3,它的核心作用是:從一個(gè)流式API(SSE)消費(fèi)數(shù)據(jù),并立即將其作為流式響應(yīng)轉(zhuǎn)發(fā)給客戶端
4,功能要求,后段不做數(shù)據(jù)處理,大模型接口返回什么數(shù)據(jù),直接回傳給前端,分阻塞返回,【當(dāng)時(shí)做的效果是:大模型返回?cái)?shù)據(jù),后端自動(dòng)拼接結(jié)束后返回給前端,這種效果很不友好,導(dǎo)致請(qǐng)求的時(shí)間比較長(zhǎng)】
功能實(shí)現(xiàn)
1.流式響應(yīng)數(shù)據(jù)
public Flux<DifficultFaultMessageVo> streamingQueryDifficultFault(DifficultFaultMessageDto paramDto) {
// 這個(gè) Flux 代表了從下游服務(wù)獲取的原始數(shù)據(jù)流。
Flux<DifficultFaultMessageVo> faults = queryDifficultFaultMessage(paramDto);
return Flux.create(sink -> {
// faults.subscribeOn(Schedulers.boundedElastic()): 這行代碼至關(guān)重要。它告訴上游的 faults 流在 boundedElastic 調(diào)度器上執(zhí)行其訂閱操作(即執(zhí)行網(wǎng)絡(luò)請(qǐng)求和處理響應(yīng))。
faults.subscribeOn(Schedulers.boundedElastic())
// 這里創(chuàng)建了一個(gè)新的 Flux。create 方法允許我們手動(dòng)控制如何向這個(gè)流中發(fā)射數(shù)據(jù)。我們傳入一個(gè) Consumer,它接收一個(gè) FluxSink 對(duì)象(這里的參數(shù)名為 sink)作為參數(shù)。Sink(匯)就是數(shù)據(jù)流的出口,我們可以通過(guò)它發(fā)射數(shù)據(jù) (next)、錯(cuò)誤 (error) 或完成信號(hào) (complete)。
.subscribe(sink::next, sink::error, sink::complete);
});
}
代碼解析:
- subscribe(sink::next, sink::error, sink::complete): 這里訂閱了從 queryDifficultFaults 返回的原始流。
- 當(dāng)原始流 (faults) 產(chǎn)生一個(gè)數(shù)據(jù) (DifficultFaultMessageVo 對(duì)象) 時(shí),就通過(guò) sink::next 將它轉(zhuǎn)發(fā)給我們新創(chuàng)建的流的 Sink。
- 當(dāng)原始流發(fā)生錯(cuò)誤時(shí),通過(guò) sink::error 將錯(cuò)誤轉(zhuǎn)發(fā)給新流的 Sink。
- 當(dāng)原始流結(jié)束時(shí),通過(guò) sink::complete 結(jié)束新流的 Sink。
總結(jié):這個(gè)方法的作用可以理解為 “流的橋接” 。它將在一個(gè)彈性線程上執(zhí)行的、可能阻塞的原始數(shù)據(jù)流,橋接成一個(gè)適合在WebFlux等響應(yīng)式Web框架中返回的響應(yīng)式流。外部調(diào)用者(如Controller)只需返回這個(gè)方法的返回值,框架就會(huì)自動(dòng)處理流的訂閱和HTTP響應(yīng)體的流式寫(xiě)入。
2.HTTP客戶端配置 okhttpclient
private final OkHttpClient httpClient = new OkHttpClient.Builder()
.connectTimeout(120, TimeUnit.SECONDS) // 連接超時(shí)2分鐘
.readTimeout(0, TimeUnit.SECONDS) // 讀取超時(shí):0(無(wú)限等待,對(duì)于流式響應(yīng)關(guān)鍵?。?
.writeTimeout(120, TimeUnit.SECONDS) // 寫(xiě)入超時(shí)2分鐘
.callTimeout(0, TimeUnit.SECONDS) // 整個(gè)調(diào)用超時(shí):0(無(wú)限等待)
.retryOnConnectionFailure(true) // 自動(dòng)重試連接失敗
.connectionPool(new ConnectionPool(20, 5, TimeUnit.MINUTES)) // 連接池(20個(gè)空閑連接,存活5分鐘)
.build();
代碼解讀:
這個(gè)配置是流式HTTP客戶端的靈魂,每一項(xiàng)都針對(duì)長(zhǎng)連接和流式傳輸進(jìn)行了優(yōu)化:
- readTimeout(0): 這是最關(guān)鍵的配置。普通的HTTP請(qǐng)求需要設(shè)置讀取超時(shí),但流式響應(yīng)是一個(gè)長(zhǎng)時(shí)間存在的連接,數(shù)據(jù)會(huì)分塊持續(xù)發(fā)送。設(shè)置為 0 表示永不超時(shí),客戶端會(huì)一直等待服務(wù)器發(fā)送更多數(shù)據(jù),直到連接被服務(wù)器或自己主動(dòng)關(guān)閉。
- callTimeout(0): 同理,整個(gè)調(diào)用的總時(shí)間也不應(yīng)設(shè)限。
- connectTimeout 和 writeTimeout: 這兩個(gè)仍然需要設(shè)置一個(gè)合理的值,分別控制建立TCP連接的時(shí)間和發(fā)送請(qǐng)求體的時(shí)間,這些操作不應(yīng)該無(wú)限等待。
- connectionPool: 使用連接池可以復(fù)用TCP連接,避免為每個(gè)請(qǐng)求都進(jìn)行三次握手,極大提升性能。這里配置了最多保持20個(gè)空閑連接,每個(gè)空閑連接最多存活5分鐘。
- retryOnConnectionFailure: 網(wǎng)絡(luò)抖動(dòng)時(shí)自動(dòng)重試,提高魯棒性。
3.核心數(shù)據(jù)獲取方法
private Flux<DifficultFaultMessageVo> queryDifficultFaultMessage(DifficultFaultMessageDto paramDto) {
return Flux.create(emitter -> { // 這個(gè)emitter是內(nèi)部Flux的Sink
Request request = buildRequest(paramDto);
Call call = httpClient.newCall(request);
call.enqueue(new Callback() { // 異步執(zhí)行HTTP請(qǐng)求
@Override
public void onFailure(...) {
if (!emitter.isCancelled()) {
emitter.error(...); // 網(wǎng)絡(luò)失敗,向上游發(fā)射錯(cuò)誤
}
}
@Override
public void onResponse(...) throws IOException {
if (!response.isSuccessful()) {
if (!emitter.isCancelled()) {
emitter.error(...); // HTTP狀態(tài)碼非2xx,向上游發(fā)射錯(cuò)誤
}
return;
}
// 成功響應(yīng),開(kāi)始處理流式響應(yīng)體
processResponseStream(response, emitter);
}
});
// 重要:注冊(cè)取消回調(diào)
emitter.onCancel(() -> {
if (!call.isCanceled()) {
call.cancel(); // 如果下游取消訂閱(如客戶端斷開(kāi)),則取消OkHttp請(qǐng)求
}
});
});
}
構(gòu)建請(qǐng)求
private Request buildRequest(DifficultFaultMessageDto paramDto) {
// 2.組裝請(qǐng)求頭信息
MediaType parse = MediaType.parse("application/json;charset=UTF-8");
// 3.組裝請(qǐng)求體信息
JSONObject requestBody = new JSONObject();
requestBody.put("x x x", paramDto.getxxxType());
requestBody.put("apiKey", paramDto.getApiKey());
// 省略業(yè)務(wù)代碼
...
log.info("API請(qǐng)求體: {}", requestBody);
return new Request.Builder().url("http://xxx.x.xx.xx:8000/servicexxx/r/postApi").post(body)
.addHeader("X-APP-ID", "xx09xx3xx0d7").addHeader("X-APP-KEY", "9jfksjfjkxxkkssdc")
.addHeader("Content-Type", "application/json").build();
}
代碼解讀:
- 目的:創(chuàng)建一個(gè) Flux,用于封裝對(duì)下游服務(wù)的異步HTTP調(diào)用和流式響應(yīng)處理。
執(zhí)行流程:
- 構(gòu)建請(qǐng)求 (buildRequest) 和調(diào)用對(duì)象 (Call)。
- 異步執(zhí)行 (call.enqueue) HTTP請(qǐng)求。
在回調(diào)中:
- 失敗 (onFailure): 檢查內(nèi)部的 FluxSink (emitter) 是否還未被取消(即下游是否還在關(guān)心結(jié)果),如果是,則發(fā)射一個(gè)錯(cuò)誤信號(hào)。
- 成功 (onResponse): 檢查HTTP狀態(tài)碼,如果不成功則發(fā)射錯(cuò)誤;如果成功,則調(diào)用 processResponseStream 開(kāi)始處理響應(yīng)體流。
- emitter.onCancel(…): 這是響應(yīng)式編程中資源清理的關(guān)鍵。它注冊(cè)了一個(gè)回調(diào),當(dāng)這個(gè) Flux 的下游訂閱者取消訂閱時(shí)(例如,前端用戶關(guān)閉了瀏覽器標(biāo)簽頁(yè)),這個(gè)回調(diào)會(huì)被觸發(fā)?;卣{(diào)里會(huì)取消底層的OkHttp Call 對(duì)象,從而立即關(guān)閉網(wǎng)絡(luò)連接,避免資源泄漏。這是一種“背壓”(Backpressure)傳播,體現(xiàn)了響應(yīng)式的優(yōu)點(diǎn)。
4.流式響應(yīng)體處理 【最核心部分】
private void processResponseStream(Response response, FluxSink<DifficultFaultsVo> emitter) {
try (ResponseBody responseBody = response.body()) { // 使用try-with-resources確保資源關(guān)閉
...
BufferedSource source = responseBody.source(); // 獲取緩沖數(shù)據(jù)源
AtomicBoolean isComplete = new AtomicBoolean(false); // 標(biāo)志位,是否收到結(jié)束事件
try {
while (!emitter.isCancelled()) { // 循環(huán),只要下游沒(méi)有取消就繼續(xù)讀
String line = source.readUtf8Line(); // 讀取一行UTF-8文本
if (line == null) break; // 讀到null表示流自然結(jié)束(服務(wù)器關(guān)閉連接)
if (StringUtils.isBlank(line)) continue; // 忽略空行
// SSE協(xié)議格式:每段數(shù)據(jù)以"data: "開(kāi)頭
if (line.startsWith("data:")) {
String jsonData = line.substring(6); // 截取"data: "后面的JSON字符串
// 過(guò)濾:只處理包含"answer"或"message_end"的數(shù)據(jù)行
if (!jsonData.contains("answer") && !jsonData.contains("message_end")) {
continue;
}
DifficultFaultMessageVo vo = processModelResponse(jsonData); // 解析JSON為值對(duì)象
if (vo != null) {
emitter.next(vo); // 解析成功,立即發(fā)射給下游(最終到前端)
// 如果遇到結(jié)束事件,標(biāo)記完成并結(jié)束循環(huán)
if ("message_end".equals(vo.getEvent())) {
isComplete.set(true);
emitter.complete();
break;
}
}
}
}
} catch (Exception e) {
// 處理讀取和解析過(guò)程中的異常
if (!emitter.isCancelled()){
emitter.error(e);
}
} finally {
// 確保流最終完成
if (!isComplete.get() && !emitter.isCancelled()){
emitter.complete();
}
}
...
} catch (Exception e) {
...
} finally {
response.close(); // 最終確保HTTP響應(yīng)被關(guān)閉
}
}
private DifficultFaultMessageVo processModelResponse(String jsonData) {
try {
// 1. 解析JSON
DifficultFaultMessageVo rawResponse = JSONUtil.toBean(
jsonData,
DifficultFaultMessageVo.class,
false
);
// 2. 過(guò)濾非消息事件
String event = rawResponse.getEvent();
// 3. 創(chuàng)建前端響應(yīng)對(duì)象
DifficultFaultMessageVo result = new DifficultFaultMessageVo();
result.setConversationId(rawResponse.getConversationId());
// 4. 處理結(jié)束事件
if ("message_end".equals(event)) {
result.setEvent("message_end");
result.setAnswer("");
return result;
}
result.setAnswer(rawResponse.getAnswer());
result.setEvent(rawResponse.getEvent());
return result;
} catch (Exception e) {
log.error("JSON解析錯(cuò)誤: {} - {}", jsonData, e.getMessage());
return null;
}
}
代碼解讀:
- 核心任務(wù):從 ResponseBody 的流中逐行讀取并解析SSE格式。
- SSE格式簡(jiǎn)介:通常為 data: {json}\n\n。代碼只關(guān)心以 data: 開(kāi)頭的行。
關(guān)鍵點(diǎn):
- 逐行讀取: source.readUtf8Line() 是阻塞方法,這就是為什么必須在 boundedElastic 線程上執(zhí)行的原因。
- 過(guò)濾: 并非所有 data: 行都需要處理,這里通過(guò)檢查內(nèi)容來(lái)過(guò)濾。
- JSON解析: processModelResponse(jsonData) 方法(代碼未給出)負(fù)責(zé)將JSON字符串解析為 DifficultFaultsVo 對(duì)象。
- 實(shí)時(shí)發(fā)射: 一旦解析成功,立即通過(guò) emitter.next(vo) 將數(shù)據(jù)推送給下游,實(shí)現(xiàn)了數(shù)據(jù)塊的零延遲轉(zhuǎn)發(fā)。
結(jié)束條件:
- 顯式結(jié)束: 收到 “message_end” 事件,調(diào)用 emitter.complete()。
- 隱式結(jié)束: 服務(wù)器關(guān)閉連接(readUtf8Line() 返回 null),跳出循環(huán)。
- 異常結(jié)束: 捕獲到任何異常,調(diào)用 emitter.error(e)。
- 健壯性保證: 大量的 if (!emitter.isCancelled()) 檢查確保了在下游已經(jīng)不感興趣的情況下,不會(huì)進(jìn)行無(wú)效的操作(發(fā)射數(shù)據(jù)、錯(cuò)誤或完成信號(hào))。finally 塊確保了在任何情況下流最終都會(huì)被關(guān)閉,防止資源泄漏。
總結(jié):
這段代碼實(shí)現(xiàn)了一個(gè)高效、健壯的雙重流式處理管道:
- 下游流 (OkHttp -> 本服務(wù)):使用配置了長(zhǎng)超時(shí)的OkHttp客戶端,異步調(diào)用外部流式API,并在獨(dú)立的彈性線程上阻塞地、逐行讀取SSE響應(yīng)。
- 上游流 (本服務(wù) -> 客戶端):通過(guò)Project Reactor的 Flux 和 Sink,將下游獲取到的數(shù)據(jù)塊立即、實(shí)時(shí)地轉(zhuǎn)發(fā)給最終的客戶端(如Web瀏覽器)。
關(guān)鍵特性:
- 非阻塞IO: 通過(guò)將阻塞操作卸載到專用線程池,保護(hù)了Web容器的核心線程。
- 背壓傳播: 下游的取消訂閱會(huì)向上傳播,最終取消OkHttp請(qǐng)求,及時(shí)釋放資源。
- 全面錯(cuò)誤處理: 對(duì)網(wǎng)絡(luò)錯(cuò)誤、HTTP錯(cuò)誤、解析錯(cuò)誤、連接意外關(guān)閉等都有處理。
- 資源安全: 廣泛使用 try-with-resources 和 finally 塊確保網(wǎng)絡(luò)連接和響應(yīng)體被正確關(guān)閉。
這是一種在Spring WebFlux等響應(yīng)式框架中集成傳統(tǒng)阻塞式HTTP客戶端以消費(fèi)流式服務(wù)的標(biāo)準(zhǔn)且優(yōu)雅的模式。
到此這篇關(guān)于Spring WebFlux 流式數(shù)據(jù)拉取與推送的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Spring WebFlux 流式拉取與推送內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Jmeter命令行執(zhí)行腳本如何設(shè)置動(dòng)態(tài)參數(shù)
這篇文章主要介紹了Jmeter命令行執(zhí)行腳本如何設(shè)置動(dòng)態(tài)參數(shù),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-08-08
JavaFX實(shí)現(xiàn)界面跳轉(zhuǎn)
這篇文章主要為大家詳細(xì)介紹了JavaFX實(shí)現(xiàn)界面跳轉(zhuǎn),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-06-06
Spring Boot整合消息隊(duì)列RabbitMQ的實(shí)現(xiàn)示例
本文主要介紹了Spring Boot整合消息隊(duì)列RabbitMQ的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2025-03-03
spring使用xml方式整合Druid數(shù)據(jù)源連接池
傳統(tǒng)的JDBC數(shù)據(jù)庫(kù)連接方式,每次連接都需加載Connection到內(nèi)存并驗(yàn)證,使用后再放回,從而重復(fù)利用數(shù)據(jù)庫(kù)連接資源,這不僅降低了系統(tǒng)資源消耗,還避免了頻繁連接導(dǎo)致的服務(wù)器崩潰和內(nèi)存泄漏風(fēng)險(xiǎn),數(shù)據(jù)庫(kù)連接池在初始化時(shí)創(chuàng)建并保持最小數(shù)量的數(shù)據(jù)庫(kù)連接2024-10-10
解決IDEA插件市場(chǎng)Plugins無(wú)法加載的問(wèn)題
這篇文章主要介紹了解決IDEA插件市場(chǎng)Plugins無(wú)法加載的問(wèn)題,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-10-10
Java實(shí)現(xiàn)List轉(zhuǎn)換為Map的方法小結(jié)
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)List轉(zhuǎn)換為Map的一些常見(jiàn)的方法,文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,有需要的小伙伴可以參考一下2024-03-03
如何理解Java中基類子對(duì)象的構(gòu)建過(guò)程從"基類向外"進(jìn)行擴(kuò)散的?
今天小編就為大家分享一篇關(guān)于如何理解Java中基類子對(duì)象的構(gòu)建過(guò)程從"基類向外"進(jìn)行擴(kuò)散的?,小編覺(jué)得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來(lái)看看吧2019-04-04
SpringBoot集成ElaticJob定時(shí)器的實(shí)現(xiàn)代碼
這篇文章主要介紹了SpringBoot集成ElaticJob定時(shí)器的實(shí)現(xiàn)代碼,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-06-06
Docker?DockerFile部署java?jar項(xiàng)目包及Mysql和Redis的詳細(xì)過(guò)程
Dockerfile是一種用于構(gòu)建Docker鏡像的文件格式,可以通過(guò)Dockerfile部署Java項(xiàng)目,這篇文章主要給大家介紹了關(guān)于Docker?DockerFile部署java?jar項(xiàng)目包及Mysql和Redis的詳細(xì)過(guò)程,需要的朋友可以參考下2023-12-12
springboot的java配置方式(實(shí)例講解)
下面小編就為大家分享一篇實(shí)例講解springboot的java配置方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2017-11-11

