欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java如何通過SSE實(shí)現(xiàn)消息推送詳解

 更新時(shí)間:2025年04月21日 08:33:41   作者:野生java研究僧  
這篇文章主要介紹了Java如何通過SSE實(shí)現(xiàn)消息推送的相關(guān)資料,SSE是一種服務(wù)器向客戶端推送數(shù)據(jù)的技術(shù),基于HTTP協(xié)議,利用長(zhǎng)連接特性,它適用于單向數(shù)據(jù)流場(chǎng)景,如股票價(jià)格更新、新聞實(shí)時(shí)推送等,需要的朋友可以參考下

1.什么是SSE?

SSE(Server-Sent Events)是一種用于實(shí)現(xiàn)服務(wù)器主動(dòng)向客戶端推送數(shù)據(jù)的技術(shù),也被稱為“事件流”(Event Stream)。它基于 HTTP 協(xié)議,利用了其長(zhǎng)連接特性,在客戶端與服務(wù)器之間建立一條持久化連接,并通過這條連接實(shí)現(xiàn)服務(wù)器向客戶端的實(shí)時(shí)數(shù)據(jù)推送。

2.SSE技術(shù)的基本原理

  • 客戶端向服務(wù)器發(fā)送一個(gè)GET請(qǐng)求,帶有指定的header,表示可以接收事件流類型,并禁用任何的事件緩存。
  • 服務(wù)器返回一個(gè)響應(yīng),帶有指定的header,表示事件的媒體類型和編碼,以及使用分塊傳輸編碼(chunked)來流式傳輸動(dòng)態(tài)生成的內(nèi)容。
  • 服務(wù)器在有數(shù)據(jù)更新時(shí),向客戶端發(fā)送一個(gè)或多個(gè)名稱:值字段組成的事件,由單個(gè)換行符分隔。事件之間由兩個(gè)換行符分隔。服務(wù)器可以發(fā)送事件數(shù)據(jù)、事件類型、事件ID和重試時(shí)間等字段。
  • 客戶端使用EventSource接口來創(chuàng)建一個(gè)對(duì)象,打開連接,并訂閱onopen、onmessage和onerror等事件處理程序來處理連接狀態(tài)和接收消息。
  • 客戶端可以使用GET查詢參數(shù)來傳遞數(shù)據(jù)給服務(wù)器,也可以使用close方法來關(guān)閉連接。

3.SSE和Socket的區(qū)別

SSE(Server-Sent Events)和 WebSocket 都是實(shí)現(xiàn)服務(wù)器向客戶端實(shí)時(shí)推送數(shù)據(jù)的技術(shù),但它們?cè)谀承┓矫孢€是有一定的區(qū)別。

技術(shù)實(shí)現(xiàn)SSE 基于 HTTP 協(xié)議,利用了其長(zhǎng)連接特性,通過瀏覽器向服務(wù)器發(fā)送一個(gè) HTTP 請(qǐng)求,建立一條持久化的連接。而 WebSocket 則是通過特殊的升級(jí)協(xié)議(HTTP/1.1 Upgrade 或者 HTTP/2)建立新的 TCP 連接,與傳統(tǒng) HTTP 連接不同。

數(shù)據(jù)格式SSE 可以傳輸文本和二進(jìn)制格式的數(shù)據(jù),但只支持單向數(shù)據(jù)流,即只能由服務(wù)器向客戶端推送數(shù)據(jù)。WebSocket 支持雙向數(shù)據(jù)流,客戶端和服務(wù)器可以互相發(fā)送消息,并且沒有消息大小限制。

連接狀態(tài)SSE 的連接狀態(tài)僅有三種==:已連接、連接中、已斷開==。連接狀態(tài)是由瀏覽器自動(dòng)維護(hù)的,客戶端無法手動(dòng)關(guān)閉或重新打開連接。而 WebSocket 連接的狀態(tài)更靈活,可以手動(dòng)打開、關(guān)閉、重連等。

兼容性SSE 是標(biāo)準(zhǔn)的 Web API,可以在大部分現(xiàn)代瀏覽器和移動(dòng)設(shè)備上使用。但如果需要兼容老版本的瀏覽器(如 IE6/7/8),則需要使用 polyfill 庫進(jìn)行兼容。而 WebSocket 在一些老版本 Android 手機(jī)上可能存在兼容性問題,需要使用一些特殊的 API 進(jìn)行處理。

安全性SSE 的實(shí)現(xiàn)比較簡(jiǎn)單,都是基于 HTTP 協(xié)議的,與普通的 Web 應(yīng)用沒有太大差異,因此風(fēng)險(xiǎn)相對(duì)較低。WebSocket 則需要通過額外的安全措施(如 SSL/TLS 加密)來確保數(shù)據(jù)傳輸?shù)陌踩?,避免被竊聽和篡改,否則可能會(huì)帶來安全隱患。

總體來說,SSE 和 WebSocket 都有各自的優(yōu)缺點(diǎn),適用于不同的場(chǎng)景和需求。如果只需要服務(wù)器向客戶端單向推送數(shù)據(jù),并且應(yīng)用在前端的瀏覽器環(huán)境中,則 SSE 是一個(gè)更加輕量級(jí)、易于實(shí)現(xiàn)和維護(hù)的選擇。而如果需要雙向傳輸數(shù)據(jù)、支持自定義協(xié)議、或者在更加復(fù)雜的網(wǎng)絡(luò)環(huán)境中應(yīng)用,則 WebSocket 可能更加適合。

SSE適用于場(chǎng)景SSE適用場(chǎng)景是指服務(wù)器向客戶端實(shí)時(shí)推送數(shù)據(jù)的場(chǎng)景,例如:

  • 股票價(jià)格更新:服務(wù)器可以根據(jù)股市的變化,實(shí)時(shí)地將股票價(jià)格推送給客戶端,讓客戶端能夠及時(shí)了解股票的走勢(shì)和行情。
  • 新聞實(shí)時(shí)推送:服務(wù)器可以根據(jù)新聞的更新,實(shí)時(shí)地將新聞內(nèi)容或標(biāo)題推送給客戶端,讓客戶端能夠及時(shí)了解最新的新聞動(dòng)態(tài)和信息。
  • 在線聊天:服務(wù)器可以根據(jù)用戶的發(fā)送,實(shí)時(shí)地將聊天消息推送給客戶端,讓客戶端能夠及時(shí)收到和回復(fù)消息。
  • 實(shí)時(shí)監(jiān)控:服務(wù)器可以根據(jù)設(shè)備的狀態(tài),實(shí)時(shí)地將監(jiān)控?cái)?shù)據(jù)或報(bào)警信息推送給客戶端,讓客戶端能夠及時(shí)了解設(shè)備的運(yùn)行情況和異常情況。

SSE適用場(chǎng)景的特點(diǎn)是:

  • 數(shù)據(jù)更新頻繁:服務(wù)器需要不斷地將最新的數(shù)據(jù)推送給客戶端,保持?jǐn)?shù)據(jù)的實(shí)時(shí)性和準(zhǔn)確性。
  • 低延遲:服務(wù)器需要盡快地將數(shù)據(jù)推送給客戶端,避免數(shù)據(jù)的延遲和過期。
  • 單向通信:服務(wù)器只需要向客戶端推送數(shù)據(jù),而不需要接收客戶端的數(shù)據(jù)。

4.編寫SSE服務(wù),來進(jìn)行創(chuàng)建鏈接和發(fā)送消息

Service:

package com.zillion.aggregate.app.controller;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Slf4j
@Service
public class SSEService {

    private static final Map<String,SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

    public SseEmitter crateSse(String uid) {
        SseEmitter sseEmitter = new SseEmitter(0L);
        sseEmitter.onCompletion(() -> {
            log.info("[{}]結(jié)束鏈接" , uid);
            sseEmitterMap.remove(uid);
        });
        sseEmitter.onTimeout(() -> {
            log.info("[{}]鏈接超時(shí)",uid);
        });
        sseEmitter.onError(throwable -> {
            try{
                log.info("[{}]鏈接異常,{}",uid,throwable.toString());
                sseEmitter.send(SseEmitter.event()
                        .id(uid)
                        .name("發(fā)生異常")
                        .data("發(fā)生異常請(qǐng)重試")
                        .reconnectTime(3000));
                sseEmitterMap.put(uid,sseEmitter);
            }catch (IOException e){
                e.printStackTrace();
            }
        });
        try{
            sseEmitter.send(SseEmitter.event().reconnectTime(5000));
        }catch (IOException e){
            e.printStackTrace();
        }
        sseEmitterMap.put(uid,sseEmitter);
        log.info("[{}]創(chuàng)建sse連接成功!",uid);
        return sseEmitter;
    }

    public boolean sendMessage(String uid,String messageId,String message){
        if(StringUtils.isEmpty(message)){
            log.info("[{}]參數(shù)異常,msg為空",uid);
            return false;
        }
        SseEmitter sseEmitter = sseEmitterMap.get(uid);
        if(sseEmitter == null){
            log.info("[{}]sse連接不存在",uid);
            return  false;
        }
        try{
            sseEmitter.send(SseEmitter.event().id(messageId).reconnectTime(60000).data(message));
            log.info("用戶{},消息ID:{},推送成功:{}",uid,messageId,message);
            return true;
        }catch (IOException e){
            sseEmitterMap.remove(uid);
            log.info("用戶{},消息ID:{},消息推送失?。簕}",uid,messageId,message);
            sseEmitter.complete();
            return false;
        }
    }

    public void closeSse(String uid){
        if(sseEmitterMap.containsKey(uid)){
            SseEmitter sseEmitter = sseEmitterMap.get(uid);
            sseEmitter.complete();
            sseEmitterMap.remove(uid);
        }else {
            log.info("用戶{}連接已關(guān)閉",uid);
        }
    }
}

Controller:

package com.zillion.aggregate.app.controller;

import cn.hutool.core.util.IdUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Controller
@RequestMapping("/aggregate/api/pay")
public class TestController {

    private static final Map<String,Boolean> SEND_MAP = new ConcurrentHashMap<>();

    @Autowired
    private SSEService sseService;
    @GetMapping("createSse")
    @CrossOrigin
    public SseEmitter createSse(String uid)
    {
        return sseService.crateSse(uid);
    }

    @GetMapping("/sendMsg")
    @ResponseBody
    @CrossOrigin
    public SseEmitter sendMsg(@RequestParam("uid") String uid) throws InterruptedException {
        SseEmitter sseEmitter = sseService.crateSse(uid);
        if (SEND_MAP.get(uid)==null ||  !SEND_MAP.get(uid)){
             new Thread(()->{
                 int i=0;
                 while (true){
                     try {
                         i++;
                         String message = "uid:"+uid+" number:"+i+" message:"+IdUtil.fastUUID().replace("-", "");
                         sseService.sendMessage(uid,"消息"+i,message);
                         SEND_MAP.put(uid,true);
                         Thread.sleep(1000);
                     } catch (InterruptedException e) {
                         e.printStackTrace();
                         closeSse(uid);
                     }
                 }
             }).start();
         }

        return sseEmitter;
    }

    @GetMapping("closeSse")
    @CrossOrigin
    public void closeSse(String uid){
        sseService.closeSse(uid);
    }
}

5.前端實(shí)現(xiàn)消息監(jiān)聽

<!doctype html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1">
    <title>SSE消息推送監(jiān)聽</title>
</head>
<body>
    <div id="conMsg"></div>
<script>
    let uid = 1;
    let chat = document.getElementById("conMsg");
    if(window.EventSource){
        var eventSource = new EventSource(`http://localhost:9001/aggregate/aggregate/api/pay/sendMsg?interfaceId=CEDB297CECCC9DCBAD348204ACDD5BAD&uid=${uid}`);
        eventSource.onopen = ()=>{
            console.log("鏈接成功");
        }
        eventSource.onmessage = (ev)=>{
            if(ev.data){
                chat.innerHTML += ev.data+"<br>";
            }
        }
        eventSource.onerror = ()=>{
            console.log("sse鏈接失敗")
        }
    }else{
        alert("當(dāng)前瀏覽器不支持sse")
    }
</script>
</body>
</html>

總結(jié) 

到此這篇關(guān)于Java如何通過SSE實(shí)現(xiàn)消息推送的文章就介紹到這了,更多相關(guān)Java SSE消息推送內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

您可能感興趣的文章:

相關(guān)文章

最新評(píng)論