欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot SSE服務(wù)端主動推送事件的實現(xiàn)

 更新時間:2023年06月26日 11:19:51   作者:Charge8  
本文主要介紹了SpringBoot SSE服務(wù)端主動推送事件的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧

一、SSE概述

1、SSE簡介

SSE(Server Sent Event),直譯為服務(wù)器發(fā)送事件,也就是服務(wù)器主動發(fā)送事件,客戶端可以獲取到服務(wù)器發(fā)送的事件。

我們常見的 http 交互方式是客戶端發(fā)起請求,服務(wù)端響應(yīng),然后一次請求完畢。但是在SSE的使用場景下,客戶端發(fā)起請求,然后建立SEE連接一直保持,服務(wù)端就可以返回數(shù)據(jù)給客戶端。

SSE簡單來說就是服務(wù)器主動向前端推送數(shù)據(jù)的一種技術(shù),它是單向的。SSE適用于消息推送,監(jiān)控等只需要服務(wù)器推送數(shù)據(jù)的場景中。比如:文件下載時,后端可以推送下載進度條信息。

2、特點

SSE (Server Send Event)服務(wù)端主動推送:

  • html5新標準,用來從服務(wù)端實時推送數(shù)據(jù)到瀏覽器端
  • 直接建立在當前http連接上,本質(zhì)上是保持一個http長連接,輕量協(xié)議

簡單的服務(wù)器數(shù)據(jù)推送的場景,使用服務(wù)器推送事件還是很方便的。

3、SSE和WebScoket的區(qū)別

SSE 是單通道,只能服務(wù)端向客戶端發(fā)消息;而WebScoket 是雙通道。

SEWebScoket
http 協(xié)議獨立的 websocket 協(xié)議
輕量,使用簡單相對復(fù)雜
默認支持斷線重連需要自己實現(xiàn)斷線重連
文本傳輸二進制傳輸
支持自定義發(fā)送的消息類型-

4、SSE規(guī)范

在 html5 的定義中,服務(wù)端SSE,一般需要遵循以下要求:

1)請求頭開啟長連接 + 流方式傳遞:

Content-Type: text/event-stream;charset=UTF-8
Cache-Control: no-cache
Connection: keep-alive

2)數(shù)據(jù)格式服務(wù)端發(fā)送的消息,由 message 組成,其格式如下:

field:value

其中 field 有五種可能:

  • 空:即以:開頭,表示注釋,可以理解為服務(wù)端向客戶端發(fā)送的心跳,確保連接不中斷
  • data:數(shù)據(jù)
  • event:事件,默認值
  • id:數(shù)據(jù)標識符用 id 字段表示,相當于每一條數(shù)據(jù)的編號
  • retry:重連時間

二、SSE實戰(zhàn)

使用 SpringBoot簡單實現(xiàn)一個SSE服務(wù)端主動推送數(shù)據(jù)為前端,前端頁面接受后展示進度條。

1、SseEmitter類簡介

SpringBoot 利用 SseEmitter 來支持SSE,并對SSE規(guī)范做了一些封裝,使用起來非常簡單。我們操作SseEmitter對象,關(guān)注消息文本即可。

SseEmitter類的幾個方法:

  • send():發(fā)送數(shù)據(jù),如果傳入的是一個非SseEventBuilder對象,那么傳遞參數(shù)會被封裝到 data 中。
  • complete():表示執(zhí)行完畢,會斷開連接。
  • onTimeout():連接超時時回調(diào)觸發(fā)。
  • onCompletion():結(jié)束之后的回調(diào)觸發(fā)。
  • onError():報錯時的回調(diào)觸發(fā)。

2、示例實戰(zhàn)

創(chuàng)建 SpringBoot項目,引入依賴:

        <!-- 里面包含了 spring-webmvc-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.7.12</version>
        </dependency>

2.1 創(chuàng)建 SseServer

我們創(chuàng)建一個 SseServer來簡單封裝一下業(yè)務(wù)操作SSE的方法。

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
/**
 * SseServer業(yè)務(wù)封裝類來操作SEE
 */
@Slf4j
public class SseServer {
    /**
     * 當前連接總數(shù)
     */
    private static AtomicInteger currentConnectTotal = new AtomicInteger(0);
    /**
     * messageId的 SseEmitter對象映射集
     */
    private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
    /**
     * 創(chuàng)建sse連接
     *
     * @param messageId - 消息id(唯一)
     * @return
     */
    public static SseEmitter createConnect(String messageId) {
        /**
         * 設(shè)置連接超時時間。0表示不過期,默認是30秒,超過時間未完成會拋出異常
         */
        SseEmitter sseEmitter = new SseEmitter(0L);
        /*
        // 超時時間設(shè)置為3s,設(shè)置前端的重試時間為1s。重連時,注意總數(shù)的統(tǒng)計
        SseEmitter sseEmitter = new SseEmitter(3_000L);
        try {
            sseEmitter.send(
                    SseEmitter.event()
                    .reconnectTime(1000L)
                    //.data("前端重連成功") // 重連成功的提示信息
            );
        } catch (IOException e) {
            log.error("前端重連異常 ==> messageId={}, 異常信息:", messageId, e.getMessage());
            e.printStackTrace();
        }*/
        // 注冊回調(diào)
        sseEmitter.onCompletion(completionCallBack(messageId));
        sseEmitter.onTimeout(timeOutCallBack(messageId));
        sseEmitter.onError(errorCallBack(messageId));
        sseEmitterMap.put(messageId, sseEmitter);
        //記錄一下連接總數(shù)。數(shù)量+1
        int count = currentConnectTotal.incrementAndGet();
        log.info("創(chuàng)建sse連接成功 ==> 當前連接總數(shù)={}, messageId={}", count, messageId);
        return sseEmitter;
    }
    /**
     * 給指定 messageId發(fā)消息
     *
     * @param messageId - 消息id(唯一)
     * @param message   - 消息文本
     */
    public static void sendMessage(String messageId, String message) {
        if (sseEmitterMap.containsKey(messageId)) {
            try {
                sseEmitterMap.get(messageId).send(message);
            } catch (IOException e) {
                log.error("發(fā)送消息異常 ==> messageId={}, 異常信息:", messageId, e.getMessage());
                e.printStackTrace();
            }
        } else {
            throw new RuntimeException("連接不存在或者超時, messageId=" + messageId);
        }
    }
    /**
     * 給所有 messageId廣播發(fā)送消息
     *
     * @param message
     */
    public static void batchAllSendMessage(String message) {
        sseEmitterMap.forEach((messageId, sseEmitter) -> {
            try {
                sseEmitter.send(message, MediaType.APPLICATION_JSON);
            } catch (IOException e) {
                log.error("廣播發(fā)送消息異常 ==> messageId={}, 異常信息:", messageId, e.getMessage());
                removeMessageId(messageId);
            }
        });
    }
    /**
     * 給指定 messageId集合群發(fā)消息
     *
     * @param messageIds
     * @param message
     */
    public static void batchSendMessage(List<String> messageIds, String message) {
        if (CollectionUtils.isEmpty(messageIds)) {
            return;
        }
        // 去重
        messageIds = messageIds.stream().distinct().collect(Collectors.toList());
        messageIds.forEach(userId -> sendMessage(userId, message));
    }
    /**
     * 給指定組群發(fā)消息(即組播,我們讓 messageId滿足我們的組命名確定即可)
     *
     * @param groupId
     * @param message
     */
    public static void groupSendMessage(String groupId, String message) {
        if (MapUtils.isEmpty(sseEmitterMap)) {
            return;
        }
        sseEmitterMap.forEach((messageId, sseEmitter) -> {
            try {
                // 這里 groupId作為前綴
                if (messageId.startsWith(groupId)) {
                    sseEmitter.send(message, MediaType.APPLICATION_JSON);
                }
            } catch (IOException e) {
                log.error("組播發(fā)送消息異常 ==> groupId={}, 異常信息:", groupId, e.getMessage());
                removeMessageId(messageId);
            }
        });
    }
    /**
     * 移除 MessageId
     *
     * @param messageId
     */
    public static void removeMessageId(String messageId) {
        sseEmitterMap.remove(messageId);
        //數(shù)量-1
        currentConnectTotal.getAndDecrement();
        log.info("remove messageId={}", messageId);
    }
    /**
     * 獲取所有的 MessageId集合
     *
     * @return
     */
    public static List<String> getMessageIds() {
        return new ArrayList<>(sseEmitterMap.keySet());
    }
    /**
     * 獲取當前連接總數(shù)
     *
     * @return
     */
    public static int getConnectTotal() {
        return currentConnectTotal.intValue();
    }
    /**
     * 斷開SSE連接時的回調(diào)
     *
     * @param messageId
     * @return
     */
    private static Runnable completionCallBack(String messageId) {
        return () -> {
            log.info("結(jié)束連接 ==> messageId={}", messageId);
            removeMessageId(messageId);
        };
    }
    /**
     * 連接超時時回調(diào)觸發(fā)
     *
     * @param messageId
     * @return
     */
    private static Runnable timeOutCallBack(String messageId) {
        return () -> {
            log.info("連接超時 ==> messageId={}", messageId);
            removeMessageId(messageId);
        };
    }
    /**
     * 連接報錯時回調(diào)觸發(fā)。
     *
     * @param messageId
     * @return
     */
    private static Consumer<Throwable> errorCallBack(String messageId) {
        return throwable -> {
            log.error("連接異常 ==> messageId={}", messageId);
            removeMessageId(messageId);
        };
    }
}

2.2 業(yè)務(wù)controller

@RestController
@CrossOrigin
@RequestMapping("/sse")
public class SseDemoController {
    /**
     * 用戶SSE連接
     * 它返回一個SseEmitter實例,這時候連接就已經(jīng)創(chuàng)建了.
     *
     * @return
     */
    @GetMapping("/userConnect")
    public SseEmitter connect() {
        /**
         * 一般取登錄用戶賬號作為 messageId。分組的話需要約定 messageId的格式。
         * 這里模擬創(chuàng)建一個用戶連接
         */
        String userId = "userId-" + RandomUtils.nextInt(1, 10);
        return SseServer.createConnect(userId);
    }
    /**
     * 模擬實例:下載進度條顯示。 前端訪問下載接口之前,先建立用戶SSE連接,然后訪問下載接口,服務(wù)端推送消息。
     * http://localhost:8080/sse/downLoad/userId-1
     *
     * @throws InterruptedException
     */
    @GetMapping("/downLoad/{userId}")
    public void pushOne(@PathVariable("userId") String userId) throws InterruptedException {
        for (int i = 0; i <= 100; i++) {
            if (i > 50 && i < 70) {
                Thread.sleep(500L);
            } else {
                Thread.sleep(100L);
            }
            System.out.println("sendMessage --> 消息=" + i);
            SseServer.sendMessage(userId, String.valueOf(i));
        }
        System.out.println("下載成功");
    }
    /**
     * 廣播發(fā)送。http://localhost:8080/sse/pushAllUser
     *
     * @throws InterruptedException
     */
    @GetMapping("/pushAllUser")
    public void pushAllUser() throws InterruptedException {
        for (int i = 0; i <= 100; i++) {
            if (i > 50 && i < 70) {
                Thread.sleep(500L);
            } else {
                Thread.sleep(100L);
            }
            System.out.println("batchAllSendMessage --> 消息=" + i);
            SseServer.batchAllSendMessage(String.valueOf(i));
        }
    }
}

2.3 前端html

簡單寫一個html來演示效果。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Home</title>
    <script>
        var sseSource = new EventSource("http://localhost:8080/sse/userConnect");
        <!-- 添加一個信息回調(diào) -->
        sseSource.onmessage = function(event){
            console.log("test=>",event)
            document.getElementById("result").innerText = event.data+'%';
            document.getElementById("my-progress").value = event.data;
        }
        // 使用vue交互事件,可以添加一些SSE的回調(diào)
        // sseSource.dispatchEvent();
        // sseSource.close();
    </script>
</head>
<body>
    <div id="result"></div>
        下載進度條:<progress style="width: 300px" id="my-progress" value="0" max="100"></progress>
    </body>
</html>

3、演示效果

參考文章:

HTML- server-sent-events:https://html.spec.whatwg.org/multipage/server-sent-events.html

到此這篇關(guān)于SpringBoot SSE服務(wù)端主動推送事件的實現(xiàn)的文章就介紹到這了,更多相關(guān)SpringBoot SSE服務(wù)端主動推送內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論