SpringBoot?2.x?接入非標(biāo)準(zhǔn)SSE格式大模型流式響應(yīng)的實戰(zhàn)解決方案
近期DeepSeek等國產(chǎn)大模型熱度持續(xù)攀升,其關(guān)注度甚至超過了OpenAI(被戲稱為CloseAI)。在SpringBoot3.x
環(huán)境中,可以使用官方的Spring AI輕松接入,但對于仍在使用JDK8和SpringBoot2.7.3的企業(yè)級應(yīng)用來說,往往需要自定義實現(xiàn)。特別是當(dāng)大模型團(tuán)隊返回的數(shù)據(jù)格式不符合標(biāo)準(zhǔn)SSE規(guī)范時,更需要靈活處理。本文將分享我們的實戰(zhàn)解決方案。
?? 引入Gradle依賴
核心依賴說明:
spring-boot-starter-web
:基礎(chǔ)Web支持spring-boot-starter-webflux
:響應(yīng)式編程支持(WebClient所在模塊)
implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.boot:spring-boot-starter-webflux'
?? WebClient配置要點
初始化時特別注意Header配置:
@Bean public WebClient init() { return WebClient.builder() .baseUrl(baseUrl) .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + openAi) // ?? 必須設(shè)置為JSON格式 .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .build(); }
?? 關(guān)鍵踩坑點:初始設(shè)置MediaType.TEXT_EVENT_STREAM_VALUE
會導(dǎo)致請求失敗,必須使用APPLICATION_JSON_VALUE
?? 核心處理邏輯
流式請求入口
@GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) { // 請求體構(gòu)建 String requestBody = String.format(""" { "model": "%s", "messages": [{"role": "user", "content": "%s"}], "stream": true } """, model, prompt); return webClient.post() // 請求配置 .uri("/v1/chat/completions") .bodyValue(requestBody) .accept(MediaType.TEXT_EVENT_STREAM) .retrieve() .bodyToFlux(DataBuffer.class) // ?? 關(guān)鍵配置點 .transform(this::processStream) // 重試和超時配置 .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) .timeout(Duration.ofSeconds(180)); // 錯誤處理 .doOnError(e -> log.error("Stream error", e)) .doFinally(signal -> log.info("Stream completed: {}", signal)); }
技術(shù)原理說明
當(dāng)使用bodyToFlux(DataBuffer.class)
時:
- ? 獲得原始字節(jié)流控制權(quán)
- ? 避免自動SSE格式解析(適用于非標(biāo)準(zhǔn)響應(yīng))
- ?? 動態(tài)數(shù)據(jù)流處理:類似Java Stream,但數(shù)據(jù)持續(xù)追加
?? 非標(biāo)準(zhǔn)SSE數(shù)據(jù)處理
核心處理流程
private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) { return dataBufferFlux .transform(DataBufferUtils::join) // 字節(jié)流合并 .map(buffer -> { // 字節(jié)轉(zhuǎn)字符串 String content = buffer.toString(StandardCharsets.UTF_8); DataBufferUtils.release(buffer); return content; }) .flatMap(content -> // 處理粘包問題 Flux.fromArray(content.split("\\r?\\n\\r?\\n"))) .filter(event -> !event.trim().isEmpty()) // 過濾空事件 .map(event -> { // 格式標(biāo)準(zhǔn)化處理 String trimmed = event.trim(); if (trimmed.startsWith("data:")) { String substring = trimmed.substring(5); return substring.startsWith(" ") ? substring.substring(1) : substring; } return trimmed; }) .filter(event -> !event.startsWith("data:")); // 二次過濾 }
三大關(guān)鍵技術(shù)點
粘包處理通過
split("\\r?\\n\\r?\\n")
解決網(wǎng)絡(luò)傳輸中的消息邊界問題,示例原始數(shù)據(jù):data:{response1}\n\ndata:{response2}\n\n
格式兼容處理自動去除服務(wù)端可能返回的
data:
前綴,同時保留Spring自動添加SSE前綴的能力雙重過濾機(jī)制確保最終輸出不包含任何殘留的SSE格式標(biāo)識
?? 特別注意
當(dāng)接口設(shè)置produces = MediaType.TEXT_EVENT_STREAM_VALUE
時:
Spring WebFlux會自動添加
data:
前綴前端收到的格式示例:
data: {實際內(nèi)容}
若手動添加
data:
前綴會導(dǎo)致重復(fù):
data: data: {錯誤內(nèi)容} // ? 錯誤格式
??? 完整實現(xiàn)代碼
// 包聲明和導(dǎo)入... @Service @Slf4j public class OpenAiService { // 配置項和初始化 private String openAiApiKey = "sk-xxxxxx"; private String baseUrl = "https://openai.com/xxxx"; private String model = "gpt-4o"; private WebClient webClient; @PostConstruct public void init() { webClient = WebClient.builder() .baseUrl(baseUrl) .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + openAiApiKey) .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .build(); } @GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) { // 構(gòu)建請求體 String requestBody = String.format(""" { "model": "gpt-4o-mini", "messages": [{"role": "user", "content": "%s"}], "stream": true } """, prompt); // 發(fā)送流式請求 return webClient.post() .uri("/v1/chat/completions") .bodyValue(requestBody) .retrieve() .onStatus(HttpStatusCode::isError, response -> response.bodyToMono(String.class) .flatMap(error -> Mono.error(new RuntimeException("API Error: " + error))) ) .bodyToFlux(DataBuffer.class) .transform(this::processStream) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) .timeout(Duration.ofSeconds(180)) .doOnError(e -> log.error("Stream error", e)) .doFinally(signal -> log.info("Stream completed: {}", signal)); } private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) { return dataBufferFlux // 使用字節(jié)流處理 .transform(DataBufferUtils::join) .map(buffer -> { String content = buffer.toString(StandardCharsets.UTF_8); DataBufferUtils.release(buffer); return content; }) // 按 SSE 事件邊界,防止粘包的問題 .flatMap(content -> Flux.fromArray(content.split("\\r?\\n\\r?\\n"))) // 過濾空事件 .filter(event -> !event.trim().isEmpty()) // 規(guī)范 SSE 事件格式 .map(event -> { String trimmed = event.trim(); // 由于webflux設(shè)置了"produces = MediaType.TEXT_EVENT_STREAM_VALUE", // 所以在返回數(shù)據(jù)時會自動添加“data:”,因此如果返回的格式帶了“data:”需要手動去除 if (trimmed.startsWith("data:")) { trimmed = trimmed.replaceFirst("data:","").trim(); } return trimmed; }) .filter(event -> !event.startsWith("data:")); } }
到此這篇關(guān)于SpringBoot 2.x 接入非標(biāo)準(zhǔn)SSE格式大模型流式響應(yīng)實踐的文章就介紹到這了,更多相關(guān)SpringBoot 2.x 接入非標(biāo)準(zhǔn)SSE格式大模型流式響應(yīng)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
JavaEE實現(xiàn)基于SMTP協(xié)議的郵件發(fā)送功能
這篇文章主要為大家詳細(xì)介紹了JavaEE實現(xiàn)基于SMTP協(xié)議的郵件發(fā)送功能,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-05-05Java求10到100000之間的水仙花數(shù)算法示例
這篇文章主要介紹了Java求10到100000之間的水仙花數(shù)算法,結(jié)合實例形式分析了水仙花數(shù)的概念及相應(yīng)的java算法實現(xiàn)技巧,需要的朋友可以參考下2017-10-10Java實現(xiàn)Word/Pdf/TXT轉(zhuǎn)html的實例代碼
本文主要介紹了Java實現(xiàn)Word/Pdf/TXT轉(zhuǎn)html的實例代碼,代碼簡單易懂,非常不錯,具有一定的參考借鑒價值,需要的朋友可以參考下2020-02-02