Java如何通過SSE實現(xiàn)消息推送詳解
1.什么是SSE?
SSE(Server-Sent Events)是一種用于實現(xiàn)服務(wù)器主動向客戶端推送數(shù)據(jù)的技術(shù),也被稱為“事件流”(Event Stream)。它基于 HTTP 協(xié)議,利用了其長連接特性,在客戶端與服務(wù)器之間建立一條持久化連接,并通過這條連接實現(xiàn)服務(wù)器向客戶端的實時數(shù)據(jù)推送。
2.SSE技術(shù)的基本原理
- 客戶端向服務(wù)器發(fā)送一個GET請求,帶有指定的header,表示可以接收事件流類型,并禁用任何的事件緩存。
- 服務(wù)器返回一個響應,帶有指定的header,表示事件的媒體類型和編碼,以及使用分塊傳輸編碼(chunked)來流式傳輸動態(tài)生成的內(nèi)容。
- 服務(wù)器在有數(shù)據(jù)更新時,向客戶端發(fā)送一個或多個名稱:值字段組成的事件,由單個換行符分隔。事件之間由兩個換行符分隔。服務(wù)器可以發(fā)送事件數(shù)據(jù)、事件類型、事件ID和重試時間等字段。
- 客戶端使用EventSource接口來創(chuàng)建一個對象,打開連接,并訂閱onopen、onmessage和onerror等事件處理程序來處理連接狀態(tài)和接收消息。
- 客戶端可以使用GET查詢參數(shù)來傳遞數(shù)據(jù)給服務(wù)器,也可以使用close方法來關(guān)閉連接。
3.SSE和Socket的區(qū)別
SSE(Server-Sent Events)和 WebSocket 都是實現(xiàn)服務(wù)器向客戶端實時推送數(shù)據(jù)的技術(shù),但它們在某些方面還是有一定的區(qū)別。
技術(shù)實現(xiàn)SSE 基于 HTTP 協(xié)議,利用了其長連接特性,通過瀏覽器向服務(wù)器發(fā)送一個 HTTP 請求,建立一條持久化的連接。而 WebSocket 則是通過特殊的升級協(xié)議(HTTP/1.1 Upgrade 或者 HTTP/2)建立新的 TCP 連接,與傳統(tǒng) HTTP 連接不同。
數(shù)據(jù)格式SSE 可以傳輸文本和二進制格式的數(shù)據(jù),但只支持單向數(shù)據(jù)流,即只能由服務(wù)器向客戶端推送數(shù)據(jù)。WebSocket 支持雙向數(shù)據(jù)流,客戶端和服務(wù)器可以互相發(fā)送消息,并且沒有消息大小限制。
連接狀態(tài)SSE 的連接狀態(tài)僅有三種==:已連接、連接中、已斷開==。連接狀態(tài)是由瀏覽器自動維護的,客戶端無法手動關(guān)閉或重新打開連接。而 WebSocket 連接的狀態(tài)更靈活,可以手動打開、關(guān)閉、重連等。
兼容性SSE 是標準的 Web API,可以在大部分現(xiàn)代瀏覽器和移動設(shè)備上使用。但如果需要兼容老版本的瀏覽器(如 IE6/7/8),則需要使用 polyfill 庫進行兼容。而 WebSocket 在一些老版本 Android 手機上可能存在兼容性問題,需要使用一些特殊的 API 進行處理。
安全性SSE 的實現(xiàn)比較簡單,都是基于 HTTP 協(xié)議的,與普通的 Web 應用沒有太大差異,因此風險相對較低。WebSocket 則需要通過額外的安全措施(如 SSL/TLS 加密)來確保數(shù)據(jù)傳輸?shù)陌踩裕苊獗桓`聽和篡改,否則可能會帶來安全隱患。
總體來說,SSE 和 WebSocket 都有各自的優(yōu)缺點,適用于不同的場景和需求。如果只需要服務(wù)器向客戶端單向推送數(shù)據(jù),并且應用在前端的瀏覽器環(huán)境中,則 SSE 是一個更加輕量級、易于實現(xiàn)和維護的選擇。而如果需要雙向傳輸數(shù)據(jù)、支持自定義協(xié)議、或者在更加復雜的網(wǎng)絡(luò)環(huán)境中應用,則 WebSocket 可能更加適合。
SSE適用于場景SSE適用場景是指服務(wù)器向客戶端實時推送數(shù)據(jù)的場景,例如:
- 股票價格更新:服務(wù)器可以根據(jù)股市的變化,實時地將股票價格推送給客戶端,讓客戶端能夠及時了解股票的走勢和行情。
- 新聞實時推送:服務(wù)器可以根據(jù)新聞的更新,實時地將新聞內(nèi)容或標題推送給客戶端,讓客戶端能夠及時了解最新的新聞動態(tài)和信息。
- 在線聊天:服務(wù)器可以根據(jù)用戶的發(fā)送,實時地將聊天消息推送給客戶端,讓客戶端能夠及時收到和回復消息。
- 實時監(jiān)控:服務(wù)器可以根據(jù)設(shè)備的狀態(tài),實時地將監(jiān)控數(shù)據(jù)或報警信息推送給客戶端,讓客戶端能夠及時了解設(shè)備的運行情況和異常情況。
SSE適用場景的特點是:
- 數(shù)據(jù)更新頻繁:服務(wù)器需要不斷地將最新的數(shù)據(jù)推送給客戶端,保持數(shù)據(jù)的實時性和準確性。
- 低延遲:服務(wù)器需要盡快地將數(shù)據(jù)推送給客戶端,避免數(shù)據(jù)的延遲和過期。
- 單向通信:服務(wù)器只需要向客戶端推送數(shù)據(jù),而不需要接收客戶端的數(shù)據(jù)。
4.編寫SSE服務(wù),來進行創(chuàng)建鏈接和發(fā)送消息
Service:
package com.zillion.aggregate.app.controller; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Service; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Slf4j @Service public class SSEService { private static final Map<String,SseEmitter> sseEmitterMap = new ConcurrentHashMap<>(); public SseEmitter crateSse(String uid) { SseEmitter sseEmitter = new SseEmitter(0L); sseEmitter.onCompletion(() -> { log.info("[{}]結(jié)束鏈接" , uid); sseEmitterMap.remove(uid); }); sseEmitter.onTimeout(() -> { log.info("[{}]鏈接超時",uid); }); sseEmitter.onError(throwable -> { try{ log.info("[{}]鏈接異常,{}",uid,throwable.toString()); sseEmitter.send(SseEmitter.event() .id(uid) .name("發(fā)生異常") .data("發(fā)生異常請重試") .reconnectTime(3000)); sseEmitterMap.put(uid,sseEmitter); }catch (IOException e){ e.printStackTrace(); } }); try{ sseEmitter.send(SseEmitter.event().reconnectTime(5000)); }catch (IOException e){ e.printStackTrace(); } sseEmitterMap.put(uid,sseEmitter); log.info("[{}]創(chuàng)建sse連接成功!",uid); return sseEmitter; } public boolean sendMessage(String uid,String messageId,String message){ if(StringUtils.isEmpty(message)){ log.info("[{}]參數(shù)異常,msg為空",uid); return false; } SseEmitter sseEmitter = sseEmitterMap.get(uid); if(sseEmitter == null){ log.info("[{}]sse連接不存在",uid); return false; } try{ sseEmitter.send(SseEmitter.event().id(messageId).reconnectTime(60000).data(message)); log.info("用戶{},消息ID:{},推送成功:{}",uid,messageId,message); return true; }catch (IOException e){ sseEmitterMap.remove(uid); log.info("用戶{},消息ID:{},消息推送失?。簕}",uid,messageId,message); sseEmitter.complete(); return false; } } public void closeSse(String uid){ if(sseEmitterMap.containsKey(uid)){ SseEmitter sseEmitter = sseEmitterMap.get(uid); sseEmitter.complete(); sseEmitterMap.remove(uid); }else { log.info("用戶{}連接已關(guān)閉",uid); } } }
Controller:
package com.zillion.aggregate.app.controller; import cn.hutool.core.util.IdUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.*; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Controller @RequestMapping("/aggregate/api/pay") public class TestController { private static final Map<String,Boolean> SEND_MAP = new ConcurrentHashMap<>(); @Autowired private SSEService sseService; @GetMapping("createSse") @CrossOrigin public SseEmitter createSse(String uid) { return sseService.crateSse(uid); } @GetMapping("/sendMsg") @ResponseBody @CrossOrigin public SseEmitter sendMsg(@RequestParam("uid") String uid) throws InterruptedException { SseEmitter sseEmitter = sseService.crateSse(uid); if (SEND_MAP.get(uid)==null || !SEND_MAP.get(uid)){ new Thread(()->{ int i=0; while (true){ try { i++; String message = "uid:"+uid+" number:"+i+" message:"+IdUtil.fastUUID().replace("-", ""); sseService.sendMessage(uid,"消息"+i,message); SEND_MAP.put(uid,true); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); closeSse(uid); } } }).start(); } return sseEmitter; } @GetMapping("closeSse") @CrossOrigin public void closeSse(String uid){ sseService.closeSse(uid); } }
5.前端實現(xiàn)消息監(jiān)聽
<!doctype html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1"> <title>SSE消息推送監(jiān)聽</title> </head> <body> <div id="conMsg"></div> <script> let uid = 1; let chat = document.getElementById("conMsg"); if(window.EventSource){ var eventSource = new EventSource(`http://localhost:9001/aggregate/aggregate/api/pay/sendMsg?interfaceId=CEDB297CECCC9DCBAD348204ACDD5BAD&uid=${uid}`); eventSource.onopen = ()=>{ console.log("鏈接成功"); } eventSource.onmessage = (ev)=>{ if(ev.data){ chat.innerHTML += ev.data+"<br>"; } } eventSource.onerror = ()=>{ console.log("sse鏈接失敗") } }else{ alert("當前瀏覽器不支持sse") } </script> </body> </html>
總結(jié)
到此這篇關(guān)于Java如何通過SSE實現(xiàn)消息推送的文章就介紹到這了,更多相關(guān)Java SSE消息推送內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
@PreAuthorize、@PostAuthorize、@PreFilter、@PostFilter注解的用法詳解
這篇文章主要介紹了@PreAuthorize、@PostAuthorize、@PreFilter、@PostFilter注解的用法詳解,通過在方法上添加@PreAuthorize注解,可以指定需要滿足的權(quán)限條件,只有滿足條件的用戶才能執(zhí)行該方法,需要的朋友可以參考下2023-10-10java 實現(xiàn)微信服務(wù)器下載圖片到自己服務(wù)器
這篇文章主要介紹了 java 實現(xiàn)微信服務(wù)器下載圖片到自己服務(wù)器的相關(guān)資料,需要的朋友可以參考下2017-05-05IntelliJ IDEA中查看文件內(nèi)所有已聲明的方法(類似eclipse的outline)
今天小編就為大家分享一篇關(guān)于IntelliJ IDEA中查看文件內(nèi)所有已聲明的方法(類似eclipse的outline),小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-10-10通過Spring Security魔幻山谷講解獲取認證機制核心原理
這篇文章主要介紹了通過Spring Security魔幻山谷講解獲取認證機制核心原理,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-04-04RabbitMQ消息隊列實現(xiàn)延遲任務(wù)示例
這篇文章主要為大家介紹了RabbitMQ消息隊列實現(xiàn)延遲任務(wù)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步早日升職加薪2022-04-04java 中模擬TCP傳輸?shù)目蛻舳撕头?wù)端實例詳解
這篇文章主要介紹了java 中模擬TCP傳輸?shù)目蛻舳撕头?wù)端實例詳解的相關(guān)資料,需要的朋友可以參考下2017-03-03