SpringBoot SSE服務(wù)端主動推送事件的實現(xiàn)
一、SSE概述
1、SSE簡介
SSE(Server Sent Event),直譯為服務(wù)器發(fā)送事件,也就是服務(wù)器主動發(fā)送事件,客戶端可以獲取到服務(wù)器發(fā)送的事件。
我們常見的 http 交互方式是客戶端發(fā)起請求,服務(wù)端響應(yīng),然后一次請求完畢。但是在SSE的使用場景下,客戶端發(fā)起請求,然后建立SEE連接一直保持,服務(wù)端就可以返回數(shù)據(jù)給客戶端。
SSE簡單來說就是服務(wù)器主動向前端推送數(shù)據(jù)的一種技術(shù),它是單向的。SSE適用于消息推送,監(jiān)控等只需要服務(wù)器推送數(shù)據(jù)的場景中。比如:文件下載時,后端可以推送下載進度條信息。
2、特點
SSE (Server Send Event)服務(wù)端主動推送:
- html5新標準,用來從服務(wù)端實時推送數(shù)據(jù)到瀏覽器端
- 直接建立在當前http連接上,本質(zhì)上是保持一個http長連接,輕量協(xié)議
簡單的服務(wù)器數(shù)據(jù)推送的場景,使用服務(wù)器推送事件還是很方便的。
3、SSE和WebScoket的區(qū)別
SSE 是單通道,只能服務(wù)端向客戶端發(fā)消息;而WebScoket 是雙通道。
SE | WebScoket |
---|---|
http 協(xié)議 | 獨立的 websocket 協(xié)議 |
輕量,使用簡單 | 相對復(fù)雜 |
默認支持斷線重連 | 需要自己實現(xiàn)斷線重連 |
文本傳輸 | 二進制傳輸 |
支持自定義發(fā)送的消息類型 | - |
4、SSE規(guī)范
在 html5 的定義中,服務(wù)端SSE,一般需要遵循以下要求:
1)請求頭開啟長連接 + 流方式傳遞:
Content-Type: text/event-stream;charset=UTF-8 Cache-Control: no-cache Connection: keep-alive
2)數(shù)據(jù)格式服務(wù)端發(fā)送的消息,由 message 組成,其格式如下:
field:value
其中 field 有五種可能:
- 空:即以:開頭,表示注釋,可以理解為服務(wù)端向客戶端發(fā)送的心跳,確保連接不中斷
- data:數(shù)據(jù)
- event:事件,默認值
- id:數(shù)據(jù)標識符用 id 字段表示,相當于每一條數(shù)據(jù)的編號
- retry:重連時間
二、SSE實戰(zhàn)
使用 SpringBoot簡單實現(xiàn)一個SSE服務(wù)端主動推送數(shù)據(jù)為前端,前端頁面接受后展示進度條。
1、SseEmitter類簡介
SpringBoot 利用 SseEmitter 來支持SSE,并對SSE規(guī)范做了一些封裝,使用起來非常簡單。
我們操作SseEmitter對象,關(guān)注消息文本即可。
SseEmitter類的幾個方法:
- send():發(fā)送數(shù)據(jù),如果傳入的是一個非SseEventBuilder對象,那么傳遞參數(shù)會被封裝到 data 中。
- complete():表示執(zhí)行完畢,會斷開連接。
- onTimeout():連接超時時回調(diào)觸發(fā)。
- onCompletion():結(jié)束之后的回調(diào)觸發(fā)。
- onError():報錯時的回調(diào)觸發(fā)。
2、示例實戰(zhàn)
創(chuàng)建 SpringBoot項目,引入依賴:
<!-- 里面包含了 spring-webmvc--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.7.12</version> </dependency>
2.1 創(chuàng)建 SseServer
我們創(chuàng)建一個 SseServer來簡單封裝一下業(yè)務(wù)操作SSE的方法。
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; /** * SseServer業(yè)務(wù)封裝類來操作SEE */ @Slf4j public class SseServer { /** * 當前連接總數(shù) */ private static AtomicInteger currentConnectTotal = new AtomicInteger(0); /** * messageId的 SseEmitter對象映射集 */ private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>(); /** * 創(chuàng)建sse連接 * * @param messageId - 消息id(唯一) * @return */ public static SseEmitter createConnect(String messageId) { /** * 設(shè)置連接超時時間。0表示不過期,默認是30秒,超過時間未完成會拋出異常 */ SseEmitter sseEmitter = new SseEmitter(0L); /* // 超時時間設(shè)置為3s,設(shè)置前端的重試時間為1s。重連時,注意總數(shù)的統(tǒng)計 SseEmitter sseEmitter = new SseEmitter(3_000L); try { sseEmitter.send( SseEmitter.event() .reconnectTime(1000L) //.data("前端重連成功") // 重連成功的提示信息 ); } catch (IOException e) { log.error("前端重連異常 ==> messageId={}, 異常信息:", messageId, e.getMessage()); e.printStackTrace(); }*/ // 注冊回調(diào) sseEmitter.onCompletion(completionCallBack(messageId)); sseEmitter.onTimeout(timeOutCallBack(messageId)); sseEmitter.onError(errorCallBack(messageId)); sseEmitterMap.put(messageId, sseEmitter); //記錄一下連接總數(shù)。數(shù)量+1 int count = currentConnectTotal.incrementAndGet(); log.info("創(chuàng)建sse連接成功 ==> 當前連接總數(shù)={}, messageId={}", count, messageId); return sseEmitter; } /** * 給指定 messageId發(fā)消息 * * @param messageId - 消息id(唯一) * @param message - 消息文本 */ public static void sendMessage(String messageId, String message) { if (sseEmitterMap.containsKey(messageId)) { try { sseEmitterMap.get(messageId).send(message); } catch (IOException e) { log.error("發(fā)送消息異常 ==> messageId={}, 異常信息:", messageId, e.getMessage()); e.printStackTrace(); } } else { throw new RuntimeException("連接不存在或者超時, messageId=" + messageId); } } /** * 給所有 messageId廣播發(fā)送消息 * * @param message */ public static void batchAllSendMessage(String message) { sseEmitterMap.forEach((messageId, sseEmitter) -> { try { sseEmitter.send(message, MediaType.APPLICATION_JSON); } catch (IOException e) { log.error("廣播發(fā)送消息異常 ==> messageId={}, 異常信息:", messageId, e.getMessage()); removeMessageId(messageId); } }); } /** * 給指定 messageId集合群發(fā)消息 * * @param messageIds * @param message */ public static void batchSendMessage(List<String> messageIds, String message) { if (CollectionUtils.isEmpty(messageIds)) { return; } // 去重 messageIds = messageIds.stream().distinct().collect(Collectors.toList()); messageIds.forEach(userId -> sendMessage(userId, message)); } /** * 給指定組群發(fā)消息(即組播,我們讓 messageId滿足我們的組命名確定即可) * * @param groupId * @param message */ public static void groupSendMessage(String groupId, String message) { if (MapUtils.isEmpty(sseEmitterMap)) { return; } sseEmitterMap.forEach((messageId, sseEmitter) -> { try { // 這里 groupId作為前綴 if (messageId.startsWith(groupId)) { sseEmitter.send(message, MediaType.APPLICATION_JSON); } } catch (IOException e) { log.error("組播發(fā)送消息異常 ==> groupId={}, 異常信息:", groupId, e.getMessage()); removeMessageId(messageId); } }); } /** * 移除 MessageId * * @param messageId */ public static void removeMessageId(String messageId) { sseEmitterMap.remove(messageId); //數(shù)量-1 currentConnectTotal.getAndDecrement(); log.info("remove messageId={}", messageId); } /** * 獲取所有的 MessageId集合 * * @return */ public static List<String> getMessageIds() { return new ArrayList<>(sseEmitterMap.keySet()); } /** * 獲取當前連接總數(shù) * * @return */ public static int getConnectTotal() { return currentConnectTotal.intValue(); } /** * 斷開SSE連接時的回調(diào) * * @param messageId * @return */ private static Runnable completionCallBack(String messageId) { return () -> { log.info("結(jié)束連接 ==> messageId={}", messageId); removeMessageId(messageId); }; } /** * 連接超時時回調(diào)觸發(fā) * * @param messageId * @return */ private static Runnable timeOutCallBack(String messageId) { return () -> { log.info("連接超時 ==> messageId={}", messageId); removeMessageId(messageId); }; } /** * 連接報錯時回調(diào)觸發(fā)。 * * @param messageId * @return */ private static Consumer<Throwable> errorCallBack(String messageId) { return throwable -> { log.error("連接異常 ==> messageId={}", messageId); removeMessageId(messageId); }; } }
2.2 業(yè)務(wù)controller
@RestController @CrossOrigin @RequestMapping("/sse") public class SseDemoController { /** * 用戶SSE連接 * 它返回一個SseEmitter實例,這時候連接就已經(jīng)創(chuàng)建了. * * @return */ @GetMapping("/userConnect") public SseEmitter connect() { /** * 一般取登錄用戶賬號作為 messageId。分組的話需要約定 messageId的格式。 * 這里模擬創(chuàng)建一個用戶連接 */ String userId = "userId-" + RandomUtils.nextInt(1, 10); return SseServer.createConnect(userId); } /** * 模擬實例:下載進度條顯示。 前端訪問下載接口之前,先建立用戶SSE連接,然后訪問下載接口,服務(wù)端推送消息。 * http://localhost:8080/sse/downLoad/userId-1 * * @throws InterruptedException */ @GetMapping("/downLoad/{userId}") public void pushOne(@PathVariable("userId") String userId) throws InterruptedException { for (int i = 0; i <= 100; i++) { if (i > 50 && i < 70) { Thread.sleep(500L); } else { Thread.sleep(100L); } System.out.println("sendMessage --> 消息=" + i); SseServer.sendMessage(userId, String.valueOf(i)); } System.out.println("下載成功"); } /** * 廣播發(fā)送。http://localhost:8080/sse/pushAllUser * * @throws InterruptedException */ @GetMapping("/pushAllUser") public void pushAllUser() throws InterruptedException { for (int i = 0; i <= 100; i++) { if (i > 50 && i < 70) { Thread.sleep(500L); } else { Thread.sleep(100L); } System.out.println("batchAllSendMessage --> 消息=" + i); SseServer.batchAllSendMessage(String.valueOf(i)); } } }
2.3 前端html
簡單寫一個html來演示效果。
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Home</title> <script> var sseSource = new EventSource("http://localhost:8080/sse/userConnect"); <!-- 添加一個信息回調(diào) --> sseSource.onmessage = function(event){ console.log("test=>",event) document.getElementById("result").innerText = event.data+'%'; document.getElementById("my-progress").value = event.data; } // 使用vue交互事件,可以添加一些SSE的回調(diào) // sseSource.dispatchEvent(); // sseSource.close(); </script> </head> <body> <div id="result"></div> 下載進度條:<progress style="width: 300px" id="my-progress" value="0" max="100"></progress> </body> </html>
3、演示效果
參考文章:
HTML- server-sent-events:https://html.spec.whatwg.org/multipage/server-sent-events.html
到此這篇關(guān)于SpringBoot SSE服務(wù)端主動推送事件的實現(xiàn)的文章就介紹到這了,更多相關(guān)SpringBoot SSE服務(wù)端主動推送內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
idea中解決maven包沖突的問題(maven helper)
這篇文章主要介紹了idea中解決maven包沖突的問題(maven helper),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-12-12Spring實現(xiàn)控制反轉(zhuǎn)和依賴注入的示例詳解
這篇文章主要為大家詳細介紹IoC(控制反轉(zhuǎn))和DI(依賴注入)的概念,以及如何在Spring框架中實現(xiàn)它們,感興趣的小伙伴可以跟隨小編一起學習一下2023-08-08springboot 啟動如何修改application.properties的參數(shù)
這篇文章主要介紹了springboot 啟動如何修改application.properties的參數(shù)方法,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08Springboot使用sharedingjdbc實現(xiàn)分庫分表
這篇文章主要介紹了Springboot使用sharedingjdbc實現(xiàn)分庫分表,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-07-07SpringBoot創(chuàng)建線程池的六種方式小結(jié)
本文主要介紹了SpringBoot創(chuàng)建線程池的六種方式小結(jié),包括自定義線程池,固定長度線程池,單一線程池,共享線程池,定時線程池,SpringBoot中注入異步線程池,感興趣的可以了解一下2023-11-11springboot~nexus項目打包要注意的地方示例代碼詳解
這篇文章主要介紹了springboot~nexus項目打包要注意的地方,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-07-07javafx tableview鼠標觸發(fā)更新屬性詳解
這篇文章主要為大家詳細介紹了javafx tableview鼠標觸發(fā)更新屬性的相關(guān)資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-08-08Springboot 使用 JSR 303 對 Controller 控制層校驗及 Service 服務(wù)層 AOP 校驗
這篇文章主要介紹了Springboot 使用 JSR 303 對 Controller 控制層校驗及 Service 服務(wù)層 AOP 校驗 使用消息資源文件對消息國際化的相關(guān)知識,需要的朋友可以參考下2017-12-12