欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot?2.x?接入非標(biāo)準(zhǔn)SSE格式大模型流式響應(yīng)的實戰(zhàn)解決方案

 更新時間:2025年02月25日 08:51:25   作者:fengzeng  
本文介紹了在SpringBoot2.7.3環(huán)境中接入非標(biāo)準(zhǔn)SSE格式大模型流式響應(yīng)的實戰(zhàn)解決方案,通過自定義實現(xiàn),解決了大模型返回數(shù)據(jù)格式不符合標(biāo)準(zhǔn)SSE規(guī)范的問題,關(guān)鍵步驟包括引入Gradle依賴、配置WebClient、處理粘包、格式兼容和雙重過濾機(jī)制,感興趣的朋友跟隨小編一起看看吧

近期DeepSeek等國產(chǎn)大模型熱度持續(xù)攀升,其關(guān)注度甚至超過了OpenAI(被戲稱為CloseAI)。在SpringBoot3.x環(huán)境中,可以使用官方的Spring AI輕松接入,但對于仍在使用JDK8SpringBoot2.7.3的企業(yè)級應(yīng)用來說,往往需要自定義實現(xiàn)。特別是當(dāng)大模型團(tuán)隊返回的數(shù)據(jù)格式不符合標(biāo)準(zhǔn)SSE規(guī)范時,更需要靈活處理。本文將分享我們的實戰(zhàn)解決方案。

?? 引入Gradle依賴

核心依賴說明:

  • spring-boot-starter-web:基礎(chǔ)Web支持
  • spring-boot-starter-webflux:響應(yīng)式編程支持(WebClient所在模塊)
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.boot:spring-boot-starter-webflux'

?? WebClient配置要點

初始化時特別注意Header配置:

@Bean
public WebClient init() {
    return WebClient.builder()
            .baseUrl(baseUrl)
            .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + openAi)
            // ?? 必須設(shè)置為JSON格式
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .build();
}

?? 關(guān)鍵踩坑點:初始設(shè)置MediaType.TEXT_EVENT_STREAM_VALUE會導(dǎo)致請求失敗,必須使用APPLICATION_JSON_VALUE

?? 核心處理邏輯

流式請求入口

@GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) {
    // 請求體構(gòu)建
    String requestBody = String.format("""
        {
            "model": "%s",
            "messages": [{"role": "user", "content": "%s"}],
            "stream": true
        }
        """, model, prompt);
    return webClient.post()
            // 請求配置
            .uri("/v1/chat/completions")
            .bodyValue(requestBody)
            .accept(MediaType.TEXT_EVENT_STREAM)
            .retrieve()
            .bodyToFlux(DataBuffer.class)  // ?? 關(guān)鍵配置點
            .transform(this::processStream)
            // 重試和超時配置
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
            .timeout(Duration.ofSeconds(180));
            // 錯誤處理
            .doOnError(e -> log.error("Stream error", e))
            .doFinally(signal -> log.info("Stream completed: {}", signal));
}

技術(shù)原理說明

當(dāng)使用bodyToFlux(DataBuffer.class)時:

  • ? 獲得原始字節(jié)流控制權(quán)
  • ? 避免自動SSE格式解析(適用于非標(biāo)準(zhǔn)響應(yīng))
  • ?? 動態(tài)數(shù)據(jù)流處理:類似Java Stream,但數(shù)據(jù)持續(xù)追加

?? 非標(biāo)準(zhǔn)SSE數(shù)據(jù)處理

核心處理流程

private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) {
    return dataBufferFlux
            .transform(DataBufferUtils::join)          // 字節(jié)流合并
            .map(buffer -> {                          // 字節(jié)轉(zhuǎn)字符串
                String content = buffer.toString(StandardCharsets.UTF_8);
                DataBufferUtils.release(buffer);
                return content;
            })
            .flatMap(content ->                       // 處理粘包問題
                Flux.fromArray(content.split("\\r?\\n\\r?\\n")))
            .filter(event -> !event.trim().isEmpty()) // 過濾空事件
            .map(event -> {                           // 格式標(biāo)準(zhǔn)化處理
                String trimmed = event.trim();
                if (trimmed.startsWith("data:")) {
                    String substring = trimmed.substring(5);
                    return substring.startsWith(" ") ? substring.substring(1) : substring;
                }
                return trimmed;
            })
            .filter(event -> !event.startsWith("data:")); // 二次過濾
}

三大關(guān)鍵技術(shù)點

  • 粘包處理通過split("\\r?\\n\\r?\\n")解決網(wǎng)絡(luò)傳輸中的消息邊界問題,示例原始數(shù)據(jù):

    data:{response1}\n\ndata:{response2}\n\n
  • 格式兼容處理自動去除服務(wù)端可能返回的data:前綴,同時保留Spring自動添加SSE前綴的能力

  • 雙重過濾機(jī)制確保最終輸出不包含任何殘留的SSE格式標(biāo)識

?? 特別注意

當(dāng)接口設(shè)置produces = MediaType.TEXT_EVENT_STREAM_VALUE時:

  • Spring WebFlux會自動添加data: 前綴

  • 前端收到的格式示例:

    data: {實際內(nèi)容}
  • 若手動添加

    data: 

    前綴會導(dǎo)致重復(fù):

    data: data: {錯誤內(nèi)容}  // ? 錯誤格式
    

??? 完整實現(xiàn)代碼

// 包聲明和導(dǎo)入...
@Service
@Slf4j
public class OpenAiService {
    // 配置項和初始化
    private String openAiApiKey = "sk-xxxxxx";
    private String baseUrl = "https://openai.com/xxxx";
    private String model = "gpt-4o";
    private WebClient webClient;
    @PostConstruct
    public void init() {
        webClient = WebClient.builder()
                .baseUrl(baseUrl)
                .defaultHeader(HttpHeaders.AUTHORIZATION, "Bearer " + openAiApiKey)
                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .build();
    }
    @GetMapping(value = "/stream/chat", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> streamChatEnhanced(@RequestParam("prompt") String prompt) {
        // 構(gòu)建請求體
        String requestBody = String.format("""
                {
                    "model": "gpt-4o-mini",
                    "messages": [{"role": "user", "content": "%s"}],
                    "stream": true
                }
                """, prompt);
        // 發(fā)送流式請求
        return webClient.post()
            .uri("/v1/chat/completions")
            .bodyValue(requestBody)
            .retrieve()
            .onStatus(HttpStatusCode::isError, response ->
                    response.bodyToMono(String.class)
                            .flatMap(error -> Mono.error(new RuntimeException("API Error: " + error)))
            )
            .bodyToFlux(DataBuffer.class)
            .transform(this::processStream)
            .retryWhen(Retry.backoff(3, Duration.ofSeconds(1)))
            .timeout(Duration.ofSeconds(180))
            .doOnError(e -> log.error("Stream error", e))
            .doFinally(signal -> log.info("Stream completed: {}", signal));
    }
    private Flux<String> processStream(Flux<DataBuffer> dataBufferFlux) {
        return dataBufferFlux
                // 使用字節(jié)流處理
                .transform(DataBufferUtils::join)
                .map(buffer -> {
                    String content = buffer.toString(StandardCharsets.UTF_8);
                    DataBufferUtils.release(buffer);
                    return content;
                })
                // 按 SSE 事件邊界,防止粘包的問題
                .flatMap(content -> Flux.fromArray(content.split("\\r?\\n\\r?\\n")))
                // 過濾空事件
                .filter(event -> !event.trim().isEmpty())
                // 規(guī)范 SSE 事件格式
                .map(event -> {
                    String trimmed = event.trim();
                    // 由于webflux設(shè)置了"produces = MediaType.TEXT_EVENT_STREAM_VALUE",
                    // 所以在返回數(shù)據(jù)時會自動添加“data:”,因此如果返回的格式帶了“data:”需要手動去除
                    if (trimmed.startsWith("data:")) {
                        trimmed = trimmed.replaceFirst("data:","").trim();
                    }
                    return trimmed;
                })
                .filter(event -> !event.startsWith("data:"));
    }
}

到此這篇關(guān)于SpringBoot 2.x 接入非標(biāo)準(zhǔn)SSE格式大模型流式響應(yīng)實踐的文章就介紹到這了,更多相關(guān)SpringBoot 2.x 接入非標(biāo)準(zhǔn)SSE格式大模型流式響應(yīng)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • JavaEE實現(xiàn)基于SMTP協(xié)議的郵件發(fā)送功能

    JavaEE實現(xiàn)基于SMTP協(xié)議的郵件發(fā)送功能

    這篇文章主要為大家詳細(xì)介紹了JavaEE實現(xiàn)基于SMTP協(xié)議的郵件發(fā)送功能,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2019-05-05
  • java實現(xiàn)一個掃描包的工具類實例代碼

    java實現(xiàn)一個掃描包的工具類實例代碼

    很多框架,比如springmvc,mybatis等使用注解,為了處理注解,必然要對包進(jìn)行掃描,所以下面這篇文章主要給大家分享介紹了關(guān)于利用java如何實現(xiàn)一個掃描包的工具類,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下。
    2017-10-10
  • Java求10到100000之間的水仙花數(shù)算法示例

    Java求10到100000之間的水仙花數(shù)算法示例

    這篇文章主要介紹了Java求10到100000之間的水仙花數(shù)算法,結(jié)合實例形式分析了水仙花數(shù)的概念及相應(yīng)的java算法實現(xiàn)技巧,需要的朋友可以參考下
    2017-10-10
  • Java實現(xiàn)Word/Pdf/TXT轉(zhuǎn)html的實例代碼

    Java實現(xiàn)Word/Pdf/TXT轉(zhuǎn)html的實例代碼

    本文主要介紹了Java實現(xiàn)Word/Pdf/TXT轉(zhuǎn)html的實例代碼,代碼簡單易懂,非常不錯,具有一定的參考借鑒價值,需要的朋友可以參考下
    2020-02-02
  • 解讀Spring?Bean的作用域

    解讀Spring?Bean的作用域

    這篇文章主要介紹了解讀Spring?Bean的作用域,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-10-10
  • Java中Scanner用法實例解析

    Java中Scanner用法實例解析

    Scanner?指的是java.util包下的Scanner類,可以接收控制臺輸入的數(shù)據(jù),下面這篇文章主要給大家介紹了關(guān)于Java中Scanner用法實例的相關(guān)資料,文中通過實例代碼以及圖文介紹的非常詳細(xì),需要的朋友可以參考下
    2022-11-11
  • Java SpringMVC異步處理詳解

    Java SpringMVC異步處理詳解

    這篇文章主要介紹了Java springmvc的處理異步,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2021-10-10
  • Spring?Boot面試必問之啟動流程知識點詳解

    Spring?Boot面試必問之啟動流程知識點詳解

    SpringBoot是Spring開源組織下的子項目,是Spring組件一站式解決方案,主要是簡化了使用Spring的難度,簡省了繁重的配置,提供了各種啟動器,開發(fā)者能快速上手,這篇文章主要給大家介紹了關(guān)于Spring?Boot面試必問之啟動流程知識點的相關(guān)資料,需要的朋友可以參考下
    2022-06-06
  • MyBatis-Plus流式查詢的實現(xiàn)示例

    MyBatis-Plus流式查詢的實現(xiàn)示例

    MyBatis-Plus 從 3.5.4 版本開始支持流式查詢,通過ResultHandler接口實現(xiàn)結(jié)果集的流式查詢,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2024-12-12
  • Java源碼解析之ClassLoader

    Java源碼解析之ClassLoader

    在看系統(tǒng)啟動的流程中看到了ClassLoader使用,重新溫故下ClassLoader流程和原理,文中有非常詳細(xì)的代碼示例,對正在學(xué)習(xí)java的小伙伴們很有幫助,需要的朋友可以參考下
    2021-05-05

最新評論