SpringBoot?2.x?接入非標(biāo)準(zhǔn)SSE格式大模型流式響應(yīng)的實戰(zhàn)解決方案
近期DeepSeek等國產(chǎn)大模型熱度持續(xù)攀升,其關(guān)注度甚至超過了OpenAI(被戲稱為CloseAI)。在SpringBoot3.x環(huán)境中,可以使用官方的Spring AI輕松接入,但對于仍在使用JDK8和SpringBoot2.7.3的企業(yè)級應(yīng)用來說,往往需要自定義實現(xiàn)。特別是當(dāng)大模型團隊返回的數(shù)據(jù)格式不符合標(biāo)準(zhǔn)SSE規(guī)范時,更需要靈活處理。本文將分享我們的實戰(zhàn)解決方案。
?? 引入Gradle依賴
核心依賴說明:
spring-boot-starter-web:基礎(chǔ)Web支持spring-boot-starter-webflux:響應(yīng)式編程支持(WebClient所在模塊)
implementation 'org.springframework.boot:spring-boot-starter-web' implementation 'org.springframework.boot:spring-boot-starter-webflux'
?? WebClient配置要點
初始化時特別注意Header配置:
@Bean
public WebClient init() {
return WebClient.builder()
.baseUrl(baseUrl)
.defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + openAi)
// ?? 必須設(shè)置為JSON格式
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
}?? 關(guān)鍵踩坑點:初始設(shè)置MediaType.TEXT_EVENT_STREAM_VALUE會導(dǎo)致請求失敗,必須使用APPLICATION_JSON_VALUE
?? 核心處理邏輯
流式請求入口
@GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) {
// 請求體構(gòu)建
String requestBody = String.format("""
{
"model": "%s",
"messages": [{"role": "user", "content": "%s"}],
"stream": true
}
""", model, prompt);
return webClient.post()
// 請求配置
.uri("/v1/chat/completions")
.bodyValue(requestBody)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(DataBuffer.class) // ?? 關(guān)鍵配置點
.transform(this::processStream)
// 重試和超時配置
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
.timeout(Duration.ofSeconds(180));
// 錯誤處理
.doOnError(e -> log.error("Stream error", e))
.doFinally(signal -> log.info("Stream completed: {}", signal));
}技術(shù)原理說明
當(dāng)使用bodyToFlux(DataBuffer.class)時:
- ? 獲得原始字節(jié)流控制權(quán)
- ? 避免自動SSE格式解析(適用于非標(biāo)準(zhǔn)響應(yīng))
- ?? 動態(tài)數(shù)據(jù)流處理:類似Java Stream,但數(shù)據(jù)持續(xù)追加
?? 非標(biāo)準(zhǔn)SSE數(shù)據(jù)處理
核心處理流程
private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) {
return dataBufferFlux
.transform(DataBufferUtils::join) // 字節(jié)流合并
.map(buffer -> { // 字節(jié)轉(zhuǎn)字符串
String content = buffer.toString(StandardCharsets.UTF_8);
DataBufferUtils.release(buffer);
return content;
})
.flatMap(content -> // 處理粘包問題
Flux.fromArray(content.split("\\r?\\n\\r?\\n")))
.filter(event -> !event.trim().isEmpty()) // 過濾空事件
.map(event -> { // 格式標(biāo)準(zhǔn)化處理
String trimmed = event.trim();
if (trimmed.startsWith("data:")) {
String substring = trimmed.substring(5);
return substring.startsWith(" ") ? substring.substring(1) : substring;
}
return trimmed;
})
.filter(event -> !event.startsWith("data:")); // 二次過濾
}三大關(guān)鍵技術(shù)點
粘包處理通過
split("\\r?\\n\\r?\\n")解決網(wǎng)絡(luò)傳輸中的消息邊界問題,示例原始數(shù)據(jù):data:{response1}\n\ndata:{response2}\n\n格式兼容處理自動去除服務(wù)端可能返回的
data:前綴,同時保留Spring自動添加SSE前綴的能力雙重過濾機制確保最終輸出不包含任何殘留的SSE格式標(biāo)識
?? 特別注意
當(dāng)接口設(shè)置produces = MediaType.TEXT_EVENT_STREAM_VALUE時:
Spring WebFlux會自動添加
data:前綴前端收到的格式示例:
data: {實際內(nèi)容}若手動添加
data:
前綴會導(dǎo)致重復(fù):
data: data: {錯誤內(nèi)容} // ? 錯誤格式
??? 完整實現(xiàn)代碼
// 包聲明和導(dǎo)入...
@Service
@Slf4j
public class OpenAiService {
// 配置項和初始化
private String openAiApiKey = "sk-xxxxxx";
private String baseUrl = "https://openai.com/xxxx";
private String model = "gpt-4o";
private WebClient webClient;
@PostConstruct
public void init() {
webClient = WebClient.builder()
.baseUrl(baseUrl)
.defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + openAiApiKey)
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
}
@GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) {
// 構(gòu)建請求體
String requestBody = String.format("""
{
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "%s"}],
"stream": true
}
""", prompt);
// 發(fā)送流式請求
return webClient.post()
.uri("/v1/chat/completions")
.bodyValue(requestBody)
.retrieve()
.onStatus(HttpStatusCode::isError, response ->
response.bodyToMono(String.class)
.flatMap(error -> Mono.error(new RuntimeException("API Error: " + error)))
)
.bodyToFlux(DataBuffer.class)
.transform(this::processStream)
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
.timeout(Duration.ofSeconds(180))
.doOnError(e -> log.error("Stream error", e))
.doFinally(signal -> log.info("Stream completed: {}", signal));
}
private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) {
return dataBufferFlux
// 使用字節(jié)流處理
.transform(DataBufferUtils::join)
.map(buffer -> {
String content = buffer.toString(StandardCharsets.UTF_8);
DataBufferUtils.release(buffer);
return content;
})
// 按 SSE 事件邊界,防止粘包的問題
.flatMap(content -> Flux.fromArray(content.split("\\r?\\n\\r?\\n")))
// 過濾空事件
.filter(event -> !event.trim().isEmpty())
// 規(guī)范 SSE 事件格式
.map(event -> {
String trimmed = event.trim();
// 由于webflux設(shè)置了"produces = MediaType.TEXT_EVENT_STREAM_VALUE",
// 所以在返回數(shù)據(jù)時會自動添加“data:”,因此如果返回的格式帶了“data:”需要手動去除
if (trimmed.startsWith("data:")) {
trimmed = trimmed.replaceFirst("data:","").trim();
}
return trimmed;
})
.filter(event -> !event.startsWith("data:"));
}
}到此這篇關(guān)于SpringBoot 2.x 接入非標(biāo)準(zhǔn)SSE格式大模型流式響應(yīng)實踐的文章就介紹到這了,更多相關(guān)SpringBoot 2.x 接入非標(biāo)準(zhǔn)SSE格式大模型流式響應(yīng)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
JavaEE實現(xiàn)基于SMTP協(xié)議的郵件發(fā)送功能
這篇文章主要為大家詳細(xì)介紹了JavaEE實現(xiàn)基于SMTP協(xié)議的郵件發(fā)送功能,具有一定的參考價值,感興趣的小伙伴們可以參考一下2019-05-05
Java求10到100000之間的水仙花數(shù)算法示例
這篇文章主要介紹了Java求10到100000之間的水仙花數(shù)算法,結(jié)合實例形式分析了水仙花數(shù)的概念及相應(yīng)的java算法實現(xiàn)技巧,需要的朋友可以參考下2017-10-10
Java實現(xiàn)Word/Pdf/TXT轉(zhuǎn)html的實例代碼
本文主要介紹了Java實現(xiàn)Word/Pdf/TXT轉(zhuǎn)html的實例代碼,代碼簡單易懂,非常不錯,具有一定的參考借鑒價值,需要的朋友可以參考下2020-02-02

