Springboot SseEmitter流式輸出的實現(xiàn)代碼
Springboot SseEmitter流式輸出
前言
最近做AI類的開發(fā),看到各大AI模型的輸出方式都是采取的一種EventStream
的方式實現(xiàn)。
不是通常的等接口處理完成后,一次性返回。
而是片段式的處理完成一個分片,就立馬告知前端做出處理;后續(xù)處理出新的片段則再次發(fā)送給客戶端。
在Spring
框架中就有一個類似的方式實現(xiàn)。SseEmitter
。
SseEmitter 簡介
SseEmitter
是在Spring 4.2
開始引入的,使用的話需要注意版本,不過Springboot 2.X 是可以玩的。
測試demo
編寫一段代碼,循環(huán)返回給客戶端。如下所示:
package cn.xj.controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @RestController @RequestMapping("/sse/mitter") public class SseMitterController { @GetMapping(value = "/stream", produces = "text/event-stream") public SseEmitter stream() { // 設(shè)置默認超時時間 0L 表示無限 // 注意:這里的單位是 ms SseEmitter sseEmitter = new SseEmitter(30000L); // 最好不要阻塞主線程 Executors.newSingleThreadExecutor().execute(() -> { try { for (int i = 0; i < 10; i++) { sseEmitter.send("這只是一個流式輸出案例:" + i); TimeUnit.SECONDS.sleep(1); } // 通知客戶端消息發(fā)送完畢 sseEmitter.complete(); } catch (Exception e) { e.printStackTrace(); sseEmitter.completeWithError(e); } }); return sseEmitter; } }
瀏覽器請求,打開控制臺查看數(shù)據(jù)格式,如下所示:
注意點
異常一 ResponseBodyEmitter is already set complete
這種問題通常是 設(shè)置超時時間timeout
太小導(dǎo)致的。網(wǎng)上很多demo中說的這個單位是秒,但實際測試來看,單位應(yīng)該是毫秒 ms
。
補充:SpringBoot中SSE流式輸出中止的核心代碼
SpringBoot中SSE流式輸出中止的核心代碼
在大模型會話中,會有一個功能是停止生成功能。這個功能如果在前端實現(xiàn),既取消監(jiān)聽后端的流式返回事件,會導(dǎo)致后端日志中報錯連接中斷等錯誤。
由此引出的需求,我的接口A中使用了sse流式返回,需要做一個接口B,B的功能是中止第一個接口的流式返回,以下是核心代碼和思路:
方案一:需要借助redis,在輸出時循環(huán)判定來解決。
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import javax.servlet.http.HttpServletRequest; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.util.concurrent.TimeUnit; @Controller public class MyController { @Autowired private RedisTemplate<String, Object> redisTemplate; @RequestMapping("/startStreaming") public SseEmitter startStreaming(HttpServletRequest request) throws IOException { String requestId = request.getId(); // 獲取請求的唯一標識符 String key = "shouldStopStreaming_" + requestId; // 生成唯一的key SseEmitter emitter = new SseEmitter(); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(/*輸入流*/)); // SSE輸出邏輯 String line; while ((line = bufferedReader.readLine()) != null) { Boolean shouldStop = (Boolean) redisTemplate.opsForValue().get(key); if (shouldStop != null && shouldStop) { break; // 檢查shouldStopStreaming標志,若為true則中斷循環(huán) } // 發(fā)送數(shù)據(jù)給客戶端 emitter.send(line); } // 刪除key,確保不再需要該key時將其移除 redisTemplate.delete(key); return emitter; } @RequestMapping("/stopStreaming") @ResponseBody public String stopStreaming(HttpServletRequest request) { String requestId = request.getId(); // 獲取請求的唯一標識符 String key = "shouldStopStreaming_" + requestId; // 生成唯一的key // 設(shè)置shouldStopStreaming為true,終止流式輸出 redisTemplate.opsForValue().set(key, true, 1, TimeUnit.HOURS); // 設(shè)置過期時間為1小時(可根據(jù)需要調(diào)整) return "Streaming stopped"; } }
A接口定期從Redis中獲取shouldStopStreaming的值,并檢查是否應(yīng)該中止流式輸出。B接口使用RedisTemplate將shouldStopStreaming的值設(shè)置為true,以指示A接口中止輸出。由于Redis的操作是原子性的,并且RedisTemplate提供了線程安全的訪問,這樣可以確保多個線程之間的協(xié)調(diào)和線程安全性。
方案二:使用本地緩存,結(jié)合SseEmitter特性實現(xiàn)(實際使用的此種方案)
private final Map<String, SseEmitter> sseCache = new ConcurrentHashMap<>(10); ## 對話接口中put一下前端隨機生成的不唯一emitterId sseCache.put(emitterId, emitter); ## 停止回答接口 @Override public void stop(String emitterId) { if (sseCache.containsKey(emitterId)) { sseCache.get(emitterId).complete(); sseCache.remove(emitterId); } }
到此這篇關(guān)于Springboot SseEmitter流式輸出 的文章就介紹到這了,更多相關(guān)Springboot SseEmitter流式輸出 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java整數(shù)和字符串相互轉(zhuǎn)化實例詳解
這篇文章主要介紹了Java整數(shù)和字符串相互轉(zhuǎn)化實例詳解,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-02-02SpringBoot校園綜合管理系統(tǒng)實現(xiàn)流程分步講解
這篇文章主要介紹了SpringBoot+Vue實現(xiàn)校園綜合管理系統(tǒng)流程分步講解,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧2022-09-09Java?代碼本地設(shè)置Hadoop用戶名密碼的方法
在Hadoop環(huán)境中,通常使用Kerberos進行身份驗證,這篇文章主要介紹了Java?代碼本地設(shè)置Hadoop用戶名密碼的方法,需要的朋友可以參考下2024-08-08詳解SpringMVC和MyBatis框架開發(fā)環(huán)境搭建和簡單實用
這篇文章主要介紹了詳解SpringMVC和MyBatis框架開發(fā)環(huán)境搭建和簡單實用,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-05-05