SpringBoot SSE服務(wù)端主動(dòng)推送事件的實(shí)現(xiàn)
一、SSE概述
1、SSE簡(jiǎn)介
SSE(Server Sent Event),直譯為服務(wù)器發(fā)送事件,也就是服務(wù)器主動(dòng)發(fā)送事件,客戶端可以獲取到服務(wù)器發(fā)送的事件。
我們常見的 http 交互方式是客戶端發(fā)起請(qǐng)求,服務(wù)端響應(yīng),然后一次請(qǐng)求完畢。但是在SSE的使用場(chǎng)景下,客戶端發(fā)起請(qǐng)求,然后建立SEE連接一直保持,服務(wù)端就可以返回?cái)?shù)據(jù)給客戶端。
SSE簡(jiǎn)單來(lái)說(shuō)就是服務(wù)器主動(dòng)向前端推送數(shù)據(jù)的一種技術(shù),它是單向的。SSE適用于消息推送,監(jiān)控等只需要服務(wù)器推送數(shù)據(jù)的場(chǎng)景中。比如:文件下載時(shí),后端可以推送下載進(jìn)度條信息。
2、特點(diǎn)
SSE (Server Send Event)服務(wù)端主動(dòng)推送:
- html5新標(biāo)準(zhǔn),用來(lái)從服務(wù)端實(shí)時(shí)推送數(shù)據(jù)到瀏覽器端
- 直接建立在當(dāng)前http連接上,本質(zhì)上是保持一個(gè)http長(zhǎng)連接,輕量協(xié)議
簡(jiǎn)單的服務(wù)器數(shù)據(jù)推送的場(chǎng)景,使用服務(wù)器推送事件還是很方便的。
3、SSE和WebScoket的區(qū)別
SSE 是單通道,只能服務(wù)端向客戶端發(fā)消息;而WebScoket 是雙通道。
SE | WebScoket |
---|---|
http 協(xié)議 | 獨(dú)立的 websocket 協(xié)議 |
輕量,使用簡(jiǎn)單 | 相對(duì)復(fù)雜 |
默認(rèn)支持?jǐn)嗑€重連 | 需要自己實(shí)現(xiàn)斷線重連 |
文本傳輸 | 二進(jìn)制傳輸 |
支持自定義發(fā)送的消息類型 | - |
4、SSE規(guī)范
在 html5 的定義中,服務(wù)端SSE,一般需要遵循以下要求:
1)請(qǐng)求頭開啟長(zhǎng)連接 + 流方式傳遞:
Content-Type: text/event-stream;charset=UTF-8 Cache-Control: no-cache Connection: keep-alive
2)數(shù)據(jù)格式服務(wù)端發(fā)送的消息,由 message 組成,其格式如下:
field:value
其中 field 有五種可能:
- 空:即以:開頭,表示注釋,可以理解為服務(wù)端向客戶端發(fā)送的心跳,確保連接不中斷
- data:數(shù)據(jù)
- event:事件,默認(rèn)值
- id:數(shù)據(jù)標(biāo)識(shí)符用 id 字段表示,相當(dāng)于每一條數(shù)據(jù)的編號(hào)
- retry:重連時(shí)間
二、SSE實(shí)戰(zhàn)
使用 SpringBoot簡(jiǎn)單實(shí)現(xiàn)一個(gè)SSE服務(wù)端主動(dòng)推送數(shù)據(jù)為前端,前端頁(yè)面接受后展示進(jìn)度條。
1、SseEmitter類簡(jiǎn)介
SpringBoot 利用 SseEmitter 來(lái)支持SSE,并對(duì)SSE規(guī)范做了一些封裝,使用起來(lái)非常簡(jiǎn)單。
我們操作SseEmitter對(duì)象,關(guān)注消息文本即可。
SseEmitter類的幾個(gè)方法:
- send():發(fā)送數(shù)據(jù),如果傳入的是一個(gè)非SseEventBuilder對(duì)象,那么傳遞參數(shù)會(huì)被封裝到 data 中。
- complete():表示執(zhí)行完畢,會(huì)斷開連接。
- onTimeout():連接超時(shí)時(shí)回調(diào)觸發(fā)。
- onCompletion():結(jié)束之后的回調(diào)觸發(fā)。
- onError():報(bào)錯(cuò)時(shí)的回調(diào)觸發(fā)。
2、示例實(shí)戰(zhàn)
創(chuàng)建 SpringBoot項(xiàng)目,引入依賴:
<!-- 里面包含了 spring-webmvc--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>2.7.12</version> </dependency>
2.1 創(chuàng)建 SseServer
我們創(chuàng)建一個(gè) SseServer來(lái)簡(jiǎn)單封裝一下業(yè)務(wù)操作SSE的方法。
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; /** * SseServer業(yè)務(wù)封裝類來(lái)操作SEE */ @Slf4j public class SseServer { /** * 當(dāng)前連接總數(shù) */ private static AtomicInteger currentConnectTotal = new AtomicInteger(0); /** * messageId的 SseEmitter對(duì)象映射集 */ private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>(); /** * 創(chuàng)建sse連接 * * @param messageId - 消息id(唯一) * @return */ public static SseEmitter createConnect(String messageId) { /** * 設(shè)置連接超時(shí)時(shí)間。0表示不過(guò)期,默認(rèn)是30秒,超過(guò)時(shí)間未完成會(huì)拋出異常 */ SseEmitter sseEmitter = new SseEmitter(0L); /* // 超時(shí)時(shí)間設(shè)置為3s,設(shè)置前端的重試時(shí)間為1s。重連時(shí),注意總數(shù)的統(tǒng)計(jì) SseEmitter sseEmitter = new SseEmitter(3_000L); try { sseEmitter.send( SseEmitter.event() .reconnectTime(1000L) //.data("前端重連成功") // 重連成功的提示信息 ); } catch (IOException e) { log.error("前端重連異常 ==> messageId={}, 異常信息:", messageId, e.getMessage()); e.printStackTrace(); }*/ // 注冊(cè)回調(diào) sseEmitter.onCompletion(completionCallBack(messageId)); sseEmitter.onTimeout(timeOutCallBack(messageId)); sseEmitter.onError(errorCallBack(messageId)); sseEmitterMap.put(messageId, sseEmitter); //記錄一下連接總數(shù)。數(shù)量+1 int count = currentConnectTotal.incrementAndGet(); log.info("創(chuàng)建sse連接成功 ==> 當(dāng)前連接總數(shù)={}, messageId={}", count, messageId); return sseEmitter; } /** * 給指定 messageId發(fā)消息 * * @param messageId - 消息id(唯一) * @param message - 消息文本 */ public static void sendMessage(String messageId, String message) { if (sseEmitterMap.containsKey(messageId)) { try { sseEmitterMap.get(messageId).send(message); } catch (IOException e) { log.error("發(fā)送消息異常 ==> messageId={}, 異常信息:", messageId, e.getMessage()); e.printStackTrace(); } } else { throw new RuntimeException("連接不存在或者超時(shí), messageId=" + messageId); } } /** * 給所有 messageId廣播發(fā)送消息 * * @param message */ public static void batchAllSendMessage(String message) { sseEmitterMap.forEach((messageId, sseEmitter) -> { try { sseEmitter.send(message, MediaType.APPLICATION_JSON); } catch (IOException e) { log.error("廣播發(fā)送消息異常 ==> messageId={}, 異常信息:", messageId, e.getMessage()); removeMessageId(messageId); } }); } /** * 給指定 messageId集合群發(fā)消息 * * @param messageIds * @param message */ public static void batchSendMessage(List<String> messageIds, String message) { if (CollectionUtils.isEmpty(messageIds)) { return; } // 去重 messageIds = messageIds.stream().distinct().collect(Collectors.toList()); messageIds.forEach(userId -> sendMessage(userId, message)); } /** * 給指定組群發(fā)消息(即組播,我們讓 messageId滿足我們的組命名確定即可) * * @param groupId * @param message */ public static void groupSendMessage(String groupId, String message) { if (MapUtils.isEmpty(sseEmitterMap)) { return; } sseEmitterMap.forEach((messageId, sseEmitter) -> { try { // 這里 groupId作為前綴 if (messageId.startsWith(groupId)) { sseEmitter.send(message, MediaType.APPLICATION_JSON); } } catch (IOException e) { log.error("組播發(fā)送消息異常 ==> groupId={}, 異常信息:", groupId, e.getMessage()); removeMessageId(messageId); } }); } /** * 移除 MessageId * * @param messageId */ public static void removeMessageId(String messageId) { sseEmitterMap.remove(messageId); //數(shù)量-1 currentConnectTotal.getAndDecrement(); log.info("remove messageId={}", messageId); } /** * 獲取所有的 MessageId集合 * * @return */ public static List<String> getMessageIds() { return new ArrayList<>(sseEmitterMap.keySet()); } /** * 獲取當(dāng)前連接總數(shù) * * @return */ public static int getConnectTotal() { return currentConnectTotal.intValue(); } /** * 斷開SSE連接時(shí)的回調(diào) * * @param messageId * @return */ private static Runnable completionCallBack(String messageId) { return () -> { log.info("結(jié)束連接 ==> messageId={}", messageId); removeMessageId(messageId); }; } /** * 連接超時(shí)時(shí)回調(diào)觸發(fā) * * @param messageId * @return */ private static Runnable timeOutCallBack(String messageId) { return () -> { log.info("連接超時(shí) ==> messageId={}", messageId); removeMessageId(messageId); }; } /** * 連接報(bào)錯(cuò)時(shí)回調(diào)觸發(fā)。 * * @param messageId * @return */ private static Consumer<Throwable> errorCallBack(String messageId) { return throwable -> { log.error("連接異常 ==> messageId={}", messageId); removeMessageId(messageId); }; } }
2.2 業(yè)務(wù)controller
@RestController @CrossOrigin @RequestMapping("/sse") public class SseDemoController { /** * 用戶SSE連接 * 它返回一個(gè)SseEmitter實(shí)例,這時(shí)候連接就已經(jīng)創(chuàng)建了. * * @return */ @GetMapping("/userConnect") public SseEmitter connect() { /** * 一般取登錄用戶賬號(hào)作為 messageId。分組的話需要約定 messageId的格式。 * 這里模擬創(chuàng)建一個(gè)用戶連接 */ String userId = "userId-" + RandomUtils.nextInt(1, 10); return SseServer.createConnect(userId); } /** * 模擬實(shí)例:下載進(jìn)度條顯示。 前端訪問(wèn)下載接口之前,先建立用戶SSE連接,然后訪問(wèn)下載接口,服務(wù)端推送消息。 * http://localhost:8080/sse/downLoad/userId-1 * * @throws InterruptedException */ @GetMapping("/downLoad/{userId}") public void pushOne(@PathVariable("userId") String userId) throws InterruptedException { for (int i = 0; i <= 100; i++) { if (i > 50 && i < 70) { Thread.sleep(500L); } else { Thread.sleep(100L); } System.out.println("sendMessage --> 消息=" + i); SseServer.sendMessage(userId, String.valueOf(i)); } System.out.println("下載成功"); } /** * 廣播發(fā)送。http://localhost:8080/sse/pushAllUser * * @throws InterruptedException */ @GetMapping("/pushAllUser") public void pushAllUser() throws InterruptedException { for (int i = 0; i <= 100; i++) { if (i > 50 && i < 70) { Thread.sleep(500L); } else { Thread.sleep(100L); } System.out.println("batchAllSendMessage --> 消息=" + i); SseServer.batchAllSendMessage(String.valueOf(i)); } } }
2.3 前端html
簡(jiǎn)單寫一個(gè)html來(lái)演示效果。
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Home</title> <script> var sseSource = new EventSource("http://localhost:8080/sse/userConnect"); <!-- 添加一個(gè)信息回調(diào) --> sseSource.onmessage = function(event){ console.log("test=>",event) document.getElementById("result").innerText = event.data+'%'; document.getElementById("my-progress").value = event.data; } // 使用vue交互事件,可以添加一些SSE的回調(diào) // sseSource.dispatchEvent(); // sseSource.close(); </script> </head> <body> <div id="result"></div> 下載進(jìn)度條:<progress style="width: 300px" id="my-progress" value="0" max="100"></progress> </body> </html>
3、演示效果
參考文章:
HTML- server-sent-events:https://html.spec.whatwg.org/multipage/server-sent-events.html
到此這篇關(guān)于SpringBoot SSE服務(wù)端主動(dòng)推送事件的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)SpringBoot SSE服務(wù)端主動(dòng)推送內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
idea中解決maven包沖突的問(wèn)題(maven helper)
這篇文章主要介紹了idea中解決maven包沖突的問(wèn)題(maven helper),小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-12-12Spring實(shí)現(xiàn)控制反轉(zhuǎn)和依賴注入的示例詳解
這篇文章主要為大家詳細(xì)介紹IoC(控制反轉(zhuǎn))和DI(依賴注入)的概念,以及如何在Spring框架中實(shí)現(xiàn)它們,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2023-08-08springboot 啟動(dòng)如何修改application.properties的參數(shù)
這篇文章主要介紹了springboot 啟動(dòng)如何修改application.properties的參數(shù)方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08Springboot使用sharedingjdbc實(shí)現(xiàn)分庫(kù)分表
這篇文章主要介紹了Springboot使用sharedingjdbc實(shí)現(xiàn)分庫(kù)分表,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-07-07SpringBoot創(chuàng)建線程池的六種方式小結(jié)
本文主要介紹了SpringBoot創(chuàng)建線程池的六種方式小結(jié),包括自定義線程池,固定長(zhǎng)度線程池,單一線程池,共享線程池,定時(shí)線程池,SpringBoot中注入異步線程池,感興趣的可以了解一下2023-11-11springboot~nexus項(xiàng)目打包要注意的地方示例代碼詳解
這篇文章主要介紹了springboot~nexus項(xiàng)目打包要注意的地方,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07springboot中的@value取不到正確的值問(wèn)題
這篇文章主要介紹了springboot中的@value取不到正確的值問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12javafx tableview鼠標(biāo)觸發(fā)更新屬性詳解
這篇文章主要為大家詳細(xì)介紹了javafx tableview鼠標(biāo)觸發(fā)更新屬性的相關(guān)資料,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-08-08Springboot 使用 JSR 303 對(duì) Controller 控制層校驗(yàn)及 Service 服務(wù)層 AOP 校驗(yàn)
這篇文章主要介紹了Springboot 使用 JSR 303 對(duì) Controller 控制層校驗(yàn)及 Service 服務(wù)層 AOP 校驗(yàn) 使用消息資源文件對(duì)消息國(guó)際化的相關(guān)知識(shí),需要的朋友可以參考下2017-12-12