SpringBoot?2.x?接入非標(biāo)準(zhǔn)SSE格式大模型流式響應(yīng)的實(shí)戰(zhàn)解決方案
近期DeepSeek等國產(chǎn)大模型熱度持續(xù)攀升,其關(guān)注度甚至超過了OpenAI(被戲稱為CloseAI)。在SpringBoot3.x
環(huán)境中,可以使用官方的Spring AI輕松接入,但對于仍在使用JDK8和SpringBoot2.7.3的企業(yè)級應(yīng)用來說,往往需要自定義實(shí)現(xiàn)。特別是當(dāng)大模型團(tuán)隊(duì)返回的數(shù)據(jù)格式不符合標(biāo)準(zhǔn)SSE規(guī)范時(shí),更需要靈活處理。本文將分享我們的實(shí)戰(zhàn)解決方案。
?? 引入Gradle依賴
核心依賴說明:
spring-boot-starter-web
:基礎(chǔ)Web支持spring-boot-starter-webflux
:響應(yīng)式編程支持(WebClient所在模塊)
implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.boot:spring-boot-starter-webflux'
?? WebClient配置要點(diǎn)
初始化時(shí)特別注意Header配置:
@Bean public WebClient init() { return WebClient.builder() .baseUrl(baseUrl) .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + openAi) // ?? 必須設(shè)置為JSON格式 .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .build(); }
?? 關(guān)鍵踩坑點(diǎn):初始設(shè)置MediaType.TEXT_EVENT_STREAM_VALUE
會(huì)導(dǎo)致請求失敗,必須使用APPLICATION_JSON_VALUE
?? 核心處理邏輯
流式請求入口
@GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) { // 請求體構(gòu)建 String requestBody = String.format(""" { "model": "%s", "messages": [{"role": "user", "content": "%s"}], "stream": true } """, model, prompt); return webClient.post() // 請求配置 .uri("/v1/chat/completions") .bodyValue(requestBody) .accept(MediaType.TEXT_EVENT_STREAM) .retrieve() .bodyToFlux(DataBuffer.class) // ?? 關(guān)鍵配置點(diǎn) .transform(this::processStream) // 重試和超時(shí)配置 .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) .timeout(Duration.ofSeconds(180)); // 錯(cuò)誤處理 .doOnError(e -> log.error("Stream error", e)) .doFinally(signal -> log.info("Stream completed: {}", signal)); }
技術(shù)原理說明
當(dāng)使用bodyToFlux(DataBuffer.class)
時(shí):
- ? 獲得原始字節(jié)流控制權(quán)
- ? 避免自動(dòng)SSE格式解析(適用于非標(biāo)準(zhǔn)響應(yīng))
- ?? 動(dòng)態(tài)數(shù)據(jù)流處理:類似Java Stream,但數(shù)據(jù)持續(xù)追加
?? 非標(biāo)準(zhǔn)SSE數(shù)據(jù)處理
核心處理流程
private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) { return dataBufferFlux .transform(DataBufferUtils::join) // 字節(jié)流合并 .map(buffer -> { // 字節(jié)轉(zhuǎn)字符串 String content = buffer.toString(StandardCharsets.UTF_8); DataBufferUtils.release(buffer); return content; }) .flatMap(content -> // 處理粘包問題 Flux.fromArray(content.split("\\r?\\n\\r?\\n"))) .filter(event -> !event.trim().isEmpty()) // 過濾空事件 .map(event -> { // 格式標(biāo)準(zhǔn)化處理 String trimmed = event.trim(); if (trimmed.startsWith("data:")) { String substring = trimmed.substring(5); return substring.startsWith(" ") ? substring.substring(1) : substring; } return trimmed; }) .filter(event -> !event.startsWith("data:")); // 二次過濾 }
三大關(guān)鍵技術(shù)點(diǎn)
粘包處理通過
split("\\r?\\n\\r?\\n")
解決網(wǎng)絡(luò)傳輸中的消息邊界問題,示例原始數(shù)據(jù):data:{response1}\n\ndata:{response2}\n\n
格式兼容處理自動(dòng)去除服務(wù)端可能返回的
data:
前綴,同時(shí)保留Spring自動(dòng)添加SSE前綴的能力雙重過濾機(jī)制確保最終輸出不包含任何殘留的SSE格式標(biāo)識(shí)
?? 特別注意
當(dāng)接口設(shè)置produces = MediaType.TEXT_EVENT_STREAM_VALUE
時(shí):
Spring WebFlux會(huì)自動(dòng)添加
data:
前綴前端收到的格式示例:
data: {實(shí)際內(nèi)容}
若手動(dòng)添加
data:
前綴會(huì)導(dǎo)致重復(fù):
data: data: {錯(cuò)誤內(nèi)容} // ? 錯(cuò)誤格式
??? 完整實(shí)現(xiàn)代碼
// 包聲明和導(dǎo)入... @Service @Slf4j public class OpenAiService { // 配置項(xiàng)和初始化 private String openAiApiKey = "sk-xxxxxx"; private String baseUrl = "https://openai.com/xxxx"; private String model = "gpt-4o"; private WebClient webClient; @PostConstruct public void init() { webClient = WebClient.builder() .baseUrl(baseUrl) .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + openAiApiKey) .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) .build(); } @GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) { // 構(gòu)建請求體 String requestBody = String.format(""" { "model": "gpt-4o-mini", "messages": [{"role": "user", "content": "%s"}], "stream": true } """, prompt); // 發(fā)送流式請求 return webClient.post() .uri("/v1/chat/completions") .bodyValue(requestBody) .retrieve() .onStatus(HttpStatusCode::isError, response -> response.bodyToMono(String.class) .flatMap(error -> Mono.error(new RuntimeException("API Error: " + error))) ) .bodyToFlux(DataBuffer.class) .transform(this::processStream) .retryWhen(Retry.backoff(3, Duration.ofSeconds(1))) .timeout(Duration.ofSeconds(180)) .doOnError(e -> log.error("Stream error", e)) .doFinally(signal -> log.info("Stream completed: {}", signal)); } private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) { return dataBufferFlux // 使用字節(jié)流處理 .transform(DataBufferUtils::join) .map(buffer -> { String content = buffer.toString(StandardCharsets.UTF_8); DataBufferUtils.release(buffer); return content; }) // 按 SSE 事件邊界,防止粘包的問題 .flatMap(content -> Flux.fromArray(content.split("\\r?\\n\\r?\\n"))) // 過濾空事件 .filter(event -> !event.trim().isEmpty()) // 規(guī)范 SSE 事件格式 .map(event -> { String trimmed = event.trim(); // 由于webflux設(shè)置了"produces = MediaType.TEXT_EVENT_STREAM_VALUE", // 所以在返回?cái)?shù)據(jù)時(shí)會(huì)自動(dòng)添加“data:”,因此如果返回的格式帶了“data:”需要手動(dòng)去除 if (trimmed.startsWith("data:")) { trimmed = trimmed.replaceFirst("data:","").trim(); } return trimmed; }) .filter(event -> !event.startsWith("data:")); } }
到此這篇關(guān)于SpringBoot 2.x 接入非標(biāo)準(zhǔn)SSE格式大模型流式響應(yīng)實(shí)踐的文章就介紹到這了,更多相關(guān)SpringBoot 2.x 接入非標(biāo)準(zhǔn)SSE格式大模型流式響應(yīng)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
JavaEE實(shí)現(xiàn)基于SMTP協(xié)議的郵件發(fā)送功能
這篇文章主要為大家詳細(xì)介紹了JavaEE實(shí)現(xiàn)基于SMTP協(xié)議的郵件發(fā)送功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2019-05-05java實(shí)現(xiàn)一個(gè)掃描包的工具類實(shí)例代碼
很多框架,比如springmvc,mybatis等使用注解,為了處理注解,必然要對包進(jìn)行掃描,所以下面這篇文章主要給大家分享介紹了關(guān)于利用java如何實(shí)現(xiàn)一個(gè)掃描包的工具類,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下。2017-10-10Java求10到100000之間的水仙花數(shù)算法示例
這篇文章主要介紹了Java求10到100000之間的水仙花數(shù)算法,結(jié)合實(shí)例形式分析了水仙花數(shù)的概念及相應(yīng)的java算法實(shí)現(xiàn)技巧,需要的朋友可以參考下2017-10-10Java實(shí)現(xiàn)Word/Pdf/TXT轉(zhuǎn)html的實(shí)例代碼
本文主要介紹了Java實(shí)現(xiàn)Word/Pdf/TXT轉(zhuǎn)html的實(shí)例代碼,代碼簡單易懂,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-02-02Spring?Boot面試必問之啟動(dòng)流程知識(shí)點(diǎn)詳解
SpringBoot是Spring開源組織下的子項(xiàng)目,是Spring組件一站式解決方案,主要是簡化了使用Spring的難度,簡省了繁重的配置,提供了各種啟動(dòng)器,開發(fā)者能快速上手,這篇文章主要給大家介紹了關(guān)于Spring?Boot面試必問之啟動(dòng)流程知識(shí)點(diǎn)的相關(guān)資料,需要的朋友可以參考下2022-06-06MyBatis-Plus流式查詢的實(shí)現(xiàn)示例
MyBatis-Plus 從 3.5.4 版本開始支持流式查詢,通過ResultHandler接口實(shí)現(xiàn)結(jié)果集的流式查詢,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-12-12