springboot整合SSE的項目實踐
SSE簡介
SSE(Server Sent Event),是一種可以主動從服務(wù)端推送消息的技術(shù)。SSE的本質(zhì)其實就是一個HTTP的長連接,只不過它給客戶端發(fā)送的不是一次性的數(shù)據(jù)包,而是一個stream流,格式為text/event-stream。所以客戶端不會關(guān)閉連接,會一直等著服務(wù)器發(fā)過來的新的數(shù)據(jù)流。
SSE服務(wù)端代碼
springboot中封裝了sse代碼,不需要額外的依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
public class SseEmitterServer { private static final Logger LOGGER = LoggerFactory.getLogger(SseEmitterServer.class); /** * 當(dāng)前連接數(shù) */ private static AtomicInteger count = new AtomicInteger(0); private static Map<Integer, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>(); public static SseEmitter connect(Integer userId){ // 設(shè)置超時日期,0表示不過期 SseEmitter sseEmitter = new SseEmitter(0L); // 注冊回調(diào) sseEmitter.onCompletion(completionCallBack(userId)); sseEmitter.onError(errorCallBack(userId)); sseEmitter.onTimeout(timeoutCallBack(userId)); sseEmitterMap.put(userId,sseEmitter); count.getAndIncrement(); LOGGER.info("創(chuàng)建新SSE連接,連接用戶編號:{}",userId); LOGGER.info("現(xiàn)有連接用戶:"+sseEmitterMap.keySet()); return sseEmitter; } /** * 給指定用戶發(fā)信息 */ public static void sendMessage(Integer userId,String message){ if (!sseEmitterMap.containsKey(userId)) { connect(userId); } try { sseEmitterMap.get(userId).send(message); LOGGER.info("給" + userId + "號發(fā)送消息:" + message); } catch (IOException e) { LOGGER.error("userId:{},發(fā)送信息出錯:{}", userId, e.getMessage()); e.printStackTrace(); } } /** * 群發(fā)消息 */ public static void batchSendMessage(String message){ if (sseEmitterMap != null&&!sseEmitterMap.isEmpty()) { sseEmitterMap.forEach((k,v)->{ try { v.send(message, MediaType.APPLICATION_JSON); } catch (IOException e) { LOGGER.error("userId:{},發(fā)送信息出錯:{}",k,e.getMessage()); e.printStackTrace(); } }); } } public static void batchSendMessage(Set<Integer> userIds,String message){ userIds.forEach(userId->sendMessage(userId,message)); } /** * 移出用戶 */ public static void removeUser(Integer userId){ sseEmitterMap.remove(userId); count.getAndDecrement(); LOGGER.info("remove user id:{}",userId); LOGGER.info("remain user id:"+sseEmitterMap.keySet()); } public static List<Integer> getIds(){ return new ArrayList<>(sseEmitterMap.keySet()); } public static int getUserCount(){ return count.intValue(); } private static Runnable completionCallBack(Integer userId){ return ()->{ LOGGER.info("結(jié)束連接,{}",userId); removeUser(userId); }; } private static Runnable timeoutCallBack(Integer userId){ return ()->{ LOGGER.info("連接超時,{}",userId); removeUser(userId); }; } private static Consumer<Throwable> errorCallBack(Integer userId){ return throwable -> { LOGGER.error("連接異常,{}",userId); removeUser(userId); }; } }
@RestController @CrossOrigin(maxAge = 3600) public class SseController { @RequestMapping(value = "/sse/connect/{id}",method = RequestMethod.GET) public SseEmitter connect(@PathVariable Integer id){ SseEmitter sseEmitter = SseEmitterServer.connect(id); return sseEmitter; } /** * 向指定用戶發(fā)送消息 */ @RequestMapping(value = "/sse/send/{id}", method = RequestMethod.GET) public EiInfo sendMsg(@PathVariable Integer id,@RequestParam("message") String message) { EiInfo eiInfo = new EiInfo(); SseEmitterServer.sendMessage(id,message); eiInfo.sysSetMsg("向"+id+"號用戶發(fā)送信息,"+message+",消息發(fā)送成功"); return eiInfo; } /** * 向所有用戶發(fā)送消息 */ @RequestMapping(value = "/sse/send/all", method = RequestMethod.GET) public EiInfo sendMsg2AllUser(@RequestParam("message") String message) { EiInfo eiInfo = new EiInfo(); SseEmitterServer.batchSendMessage(message); eiInfo.sysSetMsg("向所有用戶發(fā)送信息,"+message+",消息發(fā)送成功"); return eiInfo; } /** * 關(guān)閉用戶連接 */ @RequestMapping(value = "/sse/close/{id}", method = RequestMethod.GET) public EiInfo closeSse(@PathVariable Integer id) { EiInfo eiInfo = new EiInfo(); SseEmitterServer.removeUser(id); eiInfo.sysSetMsg("關(guān)閉"+id+"號連接。當(dāng)前連接用戶有:"+SseEmitterServer.getIds()); return eiInfo; } }
SSE測試
連接服務(wù)端
創(chuàng)建三個用戶,分別連接服務(wù)端
給指定用戶發(fā)送消息
使用接口測試工具,只給1號用戶發(fā)送消息
瀏覽器中,1號用戶接收到消息,2號3號未接收到消息
群發(fā)消息
關(guān)閉連接
當(dāng)SSE客戶端連接后,如果長時間不斷開,它會保持連接狀態(tài)。SSE(Server-Sent Events)是一種基于HTTP的推送技術(shù),允許服務(wù)器實時向客戶端發(fā)送數(shù)據(jù)。當(dāng)客戶端連接到服務(wù)器的SSE端點時,它會創(chuàng)建一個EventSource對象,該對象將與服務(wù)器進(jìn)行長期連接,并接收服務(wù)器發(fā)送的事件。只有在客戶端手動關(guān)閉連接或連接發(fā)生錯誤時,才會斷開SSE連接。
請注意,由于SSE是基于HTTP的技術(shù),因此它可能受到瀏覽器或服務(wù)器的超時設(shè)置的影響。如果在長時間沒有收到服務(wù)器的數(shù)據(jù)時,連接可能會斷開。如果您希望在長時間不斷開連接的情況下保持SSE連接,請確保服務(wù)器發(fā)送數(shù)據(jù)以保持連接活躍,或者調(diào)整相關(guān)超時設(shè)置。
在使用SSE的客戶端連接中,如果長時間不斷開連接,可能會出現(xiàn)以下情況:
- 連接超時。如果服務(wù)器端沒有發(fā)送新的事件數(shù)據(jù),而客戶端也沒有重新建立連接,可能會超過服務(wù)器的連接超時時間。這可能導(dǎo)致服務(wù)器關(guān)閉連接,客戶端需要重新建立連接才能接收新的事件數(shù)據(jù)。
- 網(wǎng)絡(luò)異常。長時間不斷開連接可能會導(dǎo)致網(wǎng)絡(luò)異常,例如連接中斷、丟包等問題。在這種情況下,客戶端可能需要重新建立連接,以恢復(fù)和服務(wù)器的通信。為了避免長時間不斷開連接的問題,建議在合適的時機(jī)關(guān)閉連接。
到此這篇關(guān)于springboot整合SSE的項目實踐的文章就介紹到這了,更多相關(guān)springboot整合SSE內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
解決BufferedReader.readLine()遇見的坑
這篇文章主要介紹了解決BufferedReader.readLine()遇見的坑,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12Spring activiti如何實現(xiàn)指定任務(wù)處理者
這篇文章主要介紹了Spring activiti如何實現(xiàn)指定任務(wù)處理者,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-11-11Spring boot如何通過@Scheduled實現(xiàn)定時任務(wù)及多線程配置
這篇文章主要介紹了Spring boot如何通過@Scheduled實現(xiàn)定時任務(wù)及多線程配置,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-12-12SpringBoot+MyBatis實現(xiàn)MD5加密數(shù)據(jù)庫用戶密碼的方法
MD5技術(shù)主要用于對用戶密碼加密,增加賬戶的安全性,他具有不可逆的特性,不會被輕易解密,這篇文章給大家介紹SpringBoot+MyBatis實現(xiàn)MD5加密數(shù)據(jù)庫用戶密碼的方法,感興趣的朋友跟隨小編一起看看吧2024-03-03