欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot SSE服務(wù)端主動(dòng)推送事件的實(shí)現(xiàn)

 更新時(shí)間:2023年06月26日 11:19:51   作者:Charge8  
本文主要介紹了SpringBoot SSE服務(wù)端主動(dòng)推送事件的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧

一、SSE概述

1、SSE簡(jiǎn)介

SSE(Server Sent Event),直譯為服務(wù)器發(fā)送事件,也就是服務(wù)器主動(dòng)發(fā)送事件,客戶端可以獲取到服務(wù)器發(fā)送的事件。

我們常見的 http 交互方式是客戶端發(fā)起請(qǐng)求,服務(wù)端響應(yīng),然后一次請(qǐng)求完畢。但是在SSE的使用場(chǎng)景下,客戶端發(fā)起請(qǐng)求,然后建立SEE連接一直保持,服務(wù)端就可以返回?cái)?shù)據(jù)給客戶端。

SSE簡(jiǎn)單來(lái)說(shuō)就是服務(wù)器主動(dòng)向前端推送數(shù)據(jù)的一種技術(shù),它是單向的。SSE適用于消息推送,監(jiān)控等只需要服務(wù)器推送數(shù)據(jù)的場(chǎng)景中。比如:文件下載時(shí),后端可以推送下載進(jìn)度條信息。

2、特點(diǎn)

SSE (Server Send Event)服務(wù)端主動(dòng)推送:

  • html5新標(biāo)準(zhǔn),用來(lái)從服務(wù)端實(shí)時(shí)推送數(shù)據(jù)到瀏覽器端
  • 直接建立在當(dāng)前http連接上,本質(zhì)上是保持一個(gè)http長(zhǎng)連接,輕量協(xié)議

簡(jiǎn)單的服務(wù)器數(shù)據(jù)推送的場(chǎng)景,使用服務(wù)器推送事件還是很方便的。

3、SSE和WebScoket的區(qū)別

SSE 是單通道,只能服務(wù)端向客戶端發(fā)消息;而WebScoket 是雙通道。

SEWebScoket
http 協(xié)議獨(dú)立的 websocket 協(xié)議
輕量,使用簡(jiǎn)單相對(duì)復(fù)雜
默認(rèn)支持?jǐn)嗑€重連需要自己實(shí)現(xiàn)斷線重連
文本傳輸二進(jìn)制傳輸
支持自定義發(fā)送的消息類型-

4、SSE規(guī)范

在 html5 的定義中,服務(wù)端SSE,一般需要遵循以下要求:

1)請(qǐng)求頭開啟長(zhǎng)連接 + 流方式傳遞:

Content-Type: text/event-stream;charset=UTF-8
Cache-Control: no-cache
Connection: keep-alive

2)數(shù)據(jù)格式服務(wù)端發(fā)送的消息,由 message 組成,其格式如下:

field:value

其中 field 有五種可能:

  • 空:即以:開頭,表示注釋,可以理解為服務(wù)端向客戶端發(fā)送的心跳,確保連接不中斷
  • data:數(shù)據(jù)
  • event:事件,默認(rèn)值
  • id:數(shù)據(jù)標(biāo)識(shí)符用 id 字段表示,相當(dāng)于每一條數(shù)據(jù)的編號(hào)
  • retry:重連時(shí)間

二、SSE實(shí)戰(zhàn)

使用 SpringBoot簡(jiǎn)單實(shí)現(xiàn)一個(gè)SSE服務(wù)端主動(dòng)推送數(shù)據(jù)為前端,前端頁(yè)面接受后展示進(jìn)度條。

1、SseEmitter類簡(jiǎn)介

SpringBoot 利用 SseEmitter 來(lái)支持SSE,并對(duì)SSE規(guī)范做了一些封裝,使用起來(lái)非常簡(jiǎn)單。我們操作SseEmitter對(duì)象,關(guān)注消息文本即可。

SseEmitter類的幾個(gè)方法:

  • send():發(fā)送數(shù)據(jù),如果傳入的是一個(gè)非SseEventBuilder對(duì)象,那么傳遞參數(shù)會(huì)被封裝到 data 中。
  • complete():表示執(zhí)行完畢,會(huì)斷開連接。
  • onTimeout():連接超時(shí)時(shí)回調(diào)觸發(fā)。
  • onCompletion():結(jié)束之后的回調(diào)觸發(fā)。
  • onError():報(bào)錯(cuò)時(shí)的回調(diào)觸發(fā)。

2、示例實(shí)戰(zhàn)

創(chuàng)建 SpringBoot項(xiàng)目,引入依賴:

        <!-- 里面包含了 spring-webmvc-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.7.12</version>
        </dependency>

2.1 創(chuàng)建 SseServer

我們創(chuàng)建一個(gè) SseServer來(lái)簡(jiǎn)單封裝一下業(yè)務(wù)操作SSE的方法。

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
 * SseServer業(yè)務(wù)封裝類來(lái)操作SEE
 */
@Slf4j
public class SseServer {
    /**
     * 當(dāng)前連接總數(shù)
     */
    private static AtomicInteger currentConnectTotal = new AtomicInteger(0);
    /**
     * messageId的 SseEmitter對(duì)象映射集
     */
    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
    /**
     * 創(chuàng)建sse連接
     *
     * @param messageId - 消息id(唯一)
     * @return
     */
    public static SseEmitter createConnect(String messageId) {
        /**
         * 設(shè)置連接超時(shí)時(shí)間。0表示不過(guò)期,默認(rèn)是30秒,超過(guò)時(shí)間未完成會(huì)拋出異常
         */
        SseEmitter sseEmitter = new SseEmitter(0L);
        /*
        // 超時(shí)時(shí)間設(shè)置為3s,設(shè)置前端的重試時(shí)間為1s。重連時(shí),注意總數(shù)的統(tǒng)計(jì)
        SseEmitter sseEmitter = new SseEmitter(3_000L);
        try {
            sseEmitter.send(
                    SseEmitter.event()
                    .reconnectTime(1000L)
                    //.data("前端重連成功") // 重連成功的提示信息
            );
        } catch (IOException e) {
            log.error("前端重連異常 ==> messageId={}, 異常信息:", messageId, e.getMessage());
            e.printStackTrace();
        }*/
        // 注冊(cè)回調(diào)
        sseEmitter.onCompletion(completionCallBack(messageId));
        sseEmitter.onTimeout(timeOutCallBack(messageId));
        sseEmitter.onError(errorCallBack(messageId));
        sseEmitterMap.put(messageId, sseEmitter);
        //記錄一下連接總數(shù)。數(shù)量+1
        int count = currentConnectTotal.incrementAndGet();
        log.info("創(chuàng)建sse連接成功 ==> 當(dāng)前連接總數(shù)={}, messageId={}", count, messageId);
        return sseEmitter;
    }
    /**
     * 給指定 messageId發(fā)消息
     *
     * @param messageId - 消息id(唯一)
     * @param message   - 消息文本
     */
    public static void sendMessage(String messageId, String message) {
        if (sseEmitterMap.containsKey(messageId)) {
            try {
                sseEmitterMap.get(messageId).send(message);
            } catch (IOException e) {
                log.error("發(fā)送消息異常 ==> messageId={}, 異常信息:", messageId, e.getMessage());
                e.printStackTrace();
            }
        } else {
            throw new RuntimeException("連接不存在或者超時(shí), messageId=" + messageId);
        }
    }
    /**
     * 給所有 messageId廣播發(fā)送消息
     *
     * @param message
     */
    public static void batchAllSendMessage(String message) {
        sseEmitterMap.forEach((messageId, sseEmitter) -> {
            try {
                sseEmitter.send(message, MediaType.APPLICATION_JSON);
            } catch (IOException e) {
                log.error("廣播發(fā)送消息異常 ==> messageId={}, 異常信息:", messageId, e.getMessage());
                removeMessageId(messageId);
            }
        });
    }
    /**
     * 給指定 messageId集合群發(fā)消息
     *
     * @param messageIds
     * @param message
     */
    public static void batchSendMessage(List<String> messageIds, String message) {
        if (CollectionUtils.isEmpty(messageIds)) {
            return;
        }
        // 去重
        messageIds = messageIds.stream().distinct().collect(Collectors.toList());
        messageIds.forEach(userId -> sendMessage(userId, message));
    }
    /**
     * 給指定組群發(fā)消息(即組播,我們讓 messageId滿足我們的組命名確定即可)
     *
     * @param groupId
     * @param message
     */
    public static void groupSendMessage(String groupId, String message) {
        if (MapUtils.isEmpty(sseEmitterMap)) {
            return;
        }
        sseEmitterMap.forEach((messageId, sseEmitter) -> {
            try {
                // 這里 groupId作為前綴
                if (messageId.startsWith(groupId)) {
                    sseEmitter.send(message, MediaType.APPLICATION_JSON);
                }
            } catch (IOException e) {
                log.error("組播發(fā)送消息異常 ==> groupId={}, 異常信息:", groupId, e.getMessage());
                removeMessageId(messageId);
            }
        });
    }
    /**
     * 移除 MessageId
     *
     * @param messageId
     */
    public static void removeMessageId(String messageId) {
        sseEmitterMap.remove(messageId);
        //數(shù)量-1
        currentConnectTotal.getAndDecrement();
        log.info("remove messageId={}", messageId);
    }
    /**
     * 獲取所有的 MessageId集合
     *
     * @return
     */
    public static List<String> getMessageIds() {
        return new ArrayList<>(sseEmitterMap.keySet());
    }
    /**
     * 獲取當(dāng)前連接總數(shù)
     *
     * @return
     */
    public static int getConnectTotal() {
        return currentConnectTotal.intValue();
    }
    /**
     * 斷開SSE連接時(shí)的回調(diào)
     *
     * @param messageId
     * @return
     */
    private static Runnable completionCallBack(String messageId) {
        return () -> {
            log.info("結(jié)束連接 ==> messageId={}", messageId);
            removeMessageId(messageId);
        };
    }
    /**
     * 連接超時(shí)時(shí)回調(diào)觸發(fā)
     *
     * @param messageId
     * @return
     */
    private static Runnable timeOutCallBack(String messageId) {
        return () -> {
            log.info("連接超時(shí) ==> messageId={}", messageId);
            removeMessageId(messageId);
        };
    }
    /**
     * 連接報(bào)錯(cuò)時(shí)回調(diào)觸發(fā)。
     *
     * @param messageId
     * @return
     */
    private static Consumer<Throwable> errorCallBack(String messageId) {
        return throwable -> {
            log.error("連接異常 ==> messageId={}", messageId);
            removeMessageId(messageId);
        };
    }
}

2.2 業(yè)務(wù)controller

@RestController
@CrossOrigin
@RequestMapping("/sse")
public class SseDemoController {
    /**
     * 用戶SSE連接
     * 它返回一個(gè)SseEmitter實(shí)例,這時(shí)候連接就已經(jīng)創(chuàng)建了.
     *
     * @return
     */
    @GetMapping("/userConnect")
    public SseEmitter connect() {
        /**
         * 一般取登錄用戶賬號(hào)作為 messageId。分組的話需要約定 messageId的格式。
         * 這里模擬創(chuàng)建一個(gè)用戶連接
         */
        String userId = "userId-" + RandomUtils.nextInt(1, 10);
        return SseServer.createConnect(userId);
    }
    /**
     * 模擬實(shí)例:下載進(jìn)度條顯示。 前端訪問(wèn)下載接口之前,先建立用戶SSE連接,然后訪問(wèn)下載接口,服務(wù)端推送消息。
     * http://localhost:8080/sse/downLoad/userId-1
     *
     * @throws InterruptedException
     */
    @GetMapping("/downLoad/{userId}")
    public void pushOne(@PathVariable("userId") String userId) throws InterruptedException {
        for (int i = 0; i <= 100; i++) {
            if (i > 50 && i < 70) {
                Thread.sleep(500L);
            } else {
                Thread.sleep(100L);
            }
            System.out.println("sendMessage --> 消息=" + i);
            SseServer.sendMessage(userId, String.valueOf(i));
        }
        System.out.println("下載成功");
    }
    /**
     * 廣播發(fā)送。http://localhost:8080/sse/pushAllUser
     *
     * @throws InterruptedException
     */
    @GetMapping("/pushAllUser")
    public void pushAllUser() throws InterruptedException {
        for (int i = 0; i <= 100; i++) {
            if (i > 50 && i < 70) {
                Thread.sleep(500L);
            } else {
                Thread.sleep(100L);
            }
            System.out.println("batchAllSendMessage --> 消息=" + i);
            SseServer.batchAllSendMessage(String.valueOf(i));
        }
    }
}

2.3 前端html

簡(jiǎn)單寫一個(gè)html來(lái)演示效果。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Home</title>
    <script>
        var sseSource = new EventSource("http://localhost:8080/sse/userConnect");
        <!-- 添加一個(gè)信息回調(diào) -->
        sseSource.onmessage = function(event){
            console.log("test=>",event)
            document.getElementById("result").innerText = event.data+'%';
            document.getElementById("my-progress").value = event.data;
        }
        // 使用vue交互事件,可以添加一些SSE的回調(diào)
        // sseSource.dispatchEvent();
        // sseSource.close();
    </script>
</head>
<body>
    <div id="result"></div>
        下載進(jìn)度條:<progress style="width: 300px" id="my-progress" value="0" max="100"></progress>
    </body>
</html>

3、演示效果

參考文章:

HTML- server-sent-events:https://html.spec.whatwg.org/multipage/server-sent-events.html

到此這篇關(guān)于SpringBoot SSE服務(wù)端主動(dòng)推送事件的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)SpringBoot SSE服務(wù)端主動(dòng)推送內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論