springboot整合SSE技術(shù)開發(fā)小結(jié)
一、開發(fā)背景
公司需要開發(fā)一個(gè)大屏界面,大屏頁(yè)面的數(shù)據(jù)是實(shí)時(shí)更新的,由后端主動(dòng)實(shí)時(shí)推送數(shù)據(jù)給大屏頁(yè)面。此時(shí)會(huì)立刻聯(lián)想到:websocket 技術(shù)。當(dāng)然使用websocket,確實(shí)可以解決這個(gè)場(chǎng)景。但是今天本文的主角是 :SSE,他和websocket略有不同,SSE只能由服務(wù)端主動(dòng)發(fā)消息,而websocket前后端都可以推送消息。
二、快速了解SSE
1、概念
SSE全稱 Server Sent Event,顧名思義,就是服務(wù)器發(fā)送事件,所以也就注定了他 只能由服務(wù)端發(fā)送信息。
2、特性
- 主動(dòng)從服務(wù)端推送消息的技術(shù)
- 本質(zhì)是一個(gè)HTTP的長(zhǎng)連接
- 發(fā)送的是一個(gè)stream流,格式為text/event-stream
三、開發(fā)思路
要實(shí)現(xiàn)后端的實(shí)時(shí)推送消息,前臺(tái)實(shí)時(shí)更新數(shù)據(jù),思路如下:
- 1、前后端需要建立連接
- 2、后端如何做到實(shí)時(shí)推送信息呢?可以采用定時(shí)調(diào)度
四、代碼演示
1、引入依賴
原則上是不需要引入的,因?yàn)閟pringboot底層已經(jīng)整合了SSE
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
2、服務(wù)端代碼
controller層
@RestController @CrossOrigin @RequestMapping("/sse") public class SseEmitterController extends BaseController { @Autowired private SseEmitterService sseEmitterService; /** * 創(chuàng)建SSE連接 * * @return */ @GetMapping("/connect/{type}") public SseEmitter connect(@PathVariable("type") String type) { return sseEmitterService.connect(type); } }
service層
public interface SseEmitterService { SseEmitter connect(String type); void volumeOverview(); void sysOperation(); void monitor(); ........ }
service實(shí)現(xiàn)層
@Service public class SseEmitterServiceImpl implements SseEmitterService { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private static Map<String, SseEmitterUTF8> sseCache = new ConcurrentHashMap<>(); /** * 創(chuàng)建連接sse * @param type * @return */ @Override public SseEmitter connect(String type) { final String clientId = UUID.randomUUID().toString().replace("-", ""); SseEmitterUTF8 sseEmitter = new SseEmitterUTF8(0L); try { sseEmitter.send(SseEmitter.event().comment("創(chuàng)建連接成功 !!!")); } catch (IOException e) { logger.error("創(chuàng)建連接失敗 , {} " , e.getMessage()); } sseEmitter.onCompletion(() -> { logger.info("connect onCompletion , {} 結(jié)束連接 ..." , clientId); removeClient(clientId); }); sseEmitter.onTimeout(() -> { logger.info("connect onTimeout , {} 連接超時(shí) ..." , clientId); removeClient(clientId); }); sseEmitter.onError((throwable) -> { logger.error("connect onError , {} 連接異常 ..." , clientId); removeClient(clientId); }); sseCache.put(clientId, sseEmitter); //立即推送 volumeOverview(); dealResp(); monitor(); if (type.equals(SseEmitterConstant.OVER_VIEW)){ sysOperation(); mileStone(); } logger.info("當(dāng)前用戶總連接數(shù) : {} " , sseCache.size()); return sseEmitter; } /** * 交易量概覽 */ @Override public void volumeOverview() { Map<String,Object> map = new HashMap<>(); map.put("latest_tps",440.3); map.put("total_cics_trans",341656001); map.put("total_zjcx_trans",391656001); map.put("zjcx_tps",23657); map.put("day10",48388352); map.put("history",105013985); SseEmitter.SseEventBuilder data = SseEmitter.event().name(SseEmitterConstant.VOLUME_OVERVIEW).data(map, MediaType.APPLICATION_JSON); for (Map.Entry<String, SseEmitterUTF8> entry : sseCache.entrySet()) { SseEmitterUTF8 sseEmitter = entry.getValue(); if (sseEmitter == null) { continue; } try { sseEmitter.send(data); } catch (IOException e) { String body = "SseEmitterServiceImpl[volumeOverview ]"; logger.error(body + ": 向客戶端 {} 推送消息失敗 , 嘗試進(jìn)行重推 : {}", entry.getKey() ,e.getMessage()); messageRepush(entry.getKey(),data,body); } } } private void messageRepush(String type, SseEmitter.SseEventBuilder data,String body){ for (int i = 0; i < 3; i++) { try { Thread.sleep(2000); SseEmitterUTF8 sseEmitter = sseCache.get(type); if (sseEmitter == null) { logger.error(body + " :向客戶端{(lán)} 第{}次消息重推失敗,未創(chuàng)建長(zhǎng)鏈接", type, i + 1); continue; } sseEmitter.send(data); } catch (Exception ex) { logger.error(body + " :向客戶端{(lán)} 第{}次消息重推失敗", type, i + 1, ex); continue; } logger.info(body + " :向客戶端{(lán)} 第{}次消息重推成功", type, i + 1); return; } }
常量類
public class SseEmitterConstant { /** * 創(chuàng)建連接的客戶端類型 */ public static final String OVER_VIEW = "overview"; /** * even 數(shù)據(jù)類型 */ public static final String VOLUME_OVERVIEW = "vw"; public SseEmitterConstant(){} }
3、后端定時(shí)任務(wù)代碼
采用注解的方式實(shí)現(xiàn):@Scheduled,使用該注解時(shí),需要增加這個(gè)注解@EnableScheduling,相當(dāng)于來開啟定時(shí)調(diào)度功能,如果不加@EnableScheduling注解,那么定時(shí)調(diào)度會(huì)不生效的。
啟動(dòng)類增加注解@EnableScheduling
package com.hidata; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; import org.springframework.context.annotation.ComponentScan; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication(exclude = { DataSourceAutoConfiguration.class }) @EnableScheduling public class HidataApplication { public static void main(String[] args) { SpringApplication.run(HidataApplication.class, args); System.out.println("[HiUrlShorter platform startup!]"); } }
創(chuàng)建 定時(shí)任務(wù)調(diào)度類,在該類上加上@Scheduled注解,
@Configuration public class SendMessageTask{ private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Autowired private SseEmitterService sseEmitterService; @Scheduled(cron = "0/40 * * * * ?}") public void volumeOverviewTask() { try { sseEmitterService.volumeOverview(); } catch (Exception e) { logger.error("SendMessageTask [volumeOverviewTask]: {} ",e.getMessage()); } } ....... }
4、解決亂碼的實(shí)體類
如果發(fā)送中文數(shù)據(jù)的時(shí)候,會(huì)出現(xiàn)亂碼的現(xiàn)象。此時(shí)需要做對(duì)應(yīng)的處理
package com.hidata.devops.lagrescreen.domain; import org.springframework.http.HttpHeaders; import org.springframework.http.MediaType; import org.springframework.http.server.ServerHttpResponse; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.nio.charset.StandardCharsets; public class SseEmitterUTF8 extends SseEmitter { public SseEmitterUTF8(Long timeout) { super(timeout); } @Override protected void extendResponse(ServerHttpResponse outputMessage) { super.extendResponse(outputMessage); HttpHeaders headers = outputMessage.getHeaders(); headers.setContentType(new MediaType(MediaType.TEXT_EVENT_STREAM, StandardCharsets.UTF_8)); } }
5、前端代碼
// 連接服務(wù)器 var sseSource = new EventSource("http://localhost:8080/sse/connect"); // 連接打開 sseSource.onopen = function () { console.log("連接打開"); } // 連接錯(cuò)誤 sseSource.onerror = function (err) { console.log("連接錯(cuò)誤:", err); } //接收信息 eventSource.addEventListener("vw", function (event) { console.log(event.data); ..... });
五、核心代碼分析
先看代碼片段
SseEmitter.event().name("vw").data(map, MediaType.APPLICATION_JSON);
分析:
后端不會(huì)把所有數(shù)據(jù)一起發(fā)送給前端,而是會(huì)把頁(yè)面分成多個(gè)模塊,然后發(fā)給前端,此時(shí)前端需要區(qū)分哪一塊數(shù)據(jù)對(duì)應(yīng)哪一塊頁(yè)面。所以我們可以給各個(gè)模塊的數(shù)據(jù)起個(gè)名字。也就是上述的代碼
SseEmitter.event().name("vw")
這樣,前端就知道怎么渲染頁(yè)面了,類似于這樣
關(guān)于even()的屬性,可以查看源碼,
public interface SseEventBuilder { SseEmitter.SseEventBuilder id(String var1); SseEmitter.SseEventBuilder name(String var1); SseEmitter.SseEventBuilder reconnectTime(long var1); SseEmitter.SseEventBuilder comment(String var1); SseEmitter.SseEventBuilder data(Object var1); SseEmitter.SseEventBuilder data(Object var1, @Nullable MediaType var2); Set<DataWithMediaType> build(); }
到此這篇關(guān)于springboot整合SSE技術(shù)開發(fā)小結(jié)的文章就介紹到這了,更多相關(guān)springboot整合SSE內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java編程IP地址和數(shù)字相互轉(zhuǎn)換代碼示例
這篇文章主要介紹了Java編程IP地址和數(shù)字相互轉(zhuǎn)換代碼示例,具有一定借鑒價(jià)值,需要的朋友可以參考下。2017-11-11Mybatis-plus如何在xml中傳入自定義的SQL語(yǔ)句
這篇文章主要介紹了Mybatis-plus如何在xml中傳入自定義的SQL語(yǔ)句問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05Spring源碼之事件監(jiān)聽機(jī)制(實(shí)現(xiàn)EventListener接口方式)
這篇文章主要介紹了Spring源碼之事件監(jiān)聽機(jī)制(實(shí)現(xiàn)EventListener接口方式),具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08SpringBoot實(shí)現(xiàn)API接口的完整代碼
這篇文章主要給大家介紹了關(guān)于SpringBoot實(shí)現(xiàn)API接口的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10springcloud?feign?接口指定接口服務(wù)ip方式
這篇文章主要介紹了springcloud?feign?接口指定接口服務(wù)ip方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03spring boot實(shí)戰(zhàn)之本地jar包引用示例
本篇文章主要介紹了spring boot實(shí)戰(zhàn)之本地jar包引用示例,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-10-10