欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Spring SseEmitter推送消息及常用方法

 更新時間:2024年07月16日 10:54:43   作者:way_more  
SseEmitter 是 Spring Framework 提供的用于支持 Server-Sent Events(SSE)的類,它允許服務(wù)器端向客戶端推送事件流,實(shí)現(xiàn)服務(wù)器到客戶端的單向通信,這篇文章主要介紹了Spring SseEmitter推送消息,需要的朋友可以參考下

SseEmitter

SseEmitter 是 Spring Framework 提供的用于支持 Server-Sent Events(SSE)的類,它允許服務(wù)器端向客戶端推送事件流,實(shí)現(xiàn)服務(wù)器到客戶端的單向通信。

下面我們來看一下SSE的介紹

SSE(Server-Sent Events,服務(wù)器推送事件)是一種用于在客戶端和服務(wù)器之間建立單向通信的技術(shù)。它允許服務(wù)器端發(fā)送異步消息給客戶端,而無需客戶端明確地請求數(shù)據(jù)。SSE 基于 HTTP 協(xié)議,使用簡單的格式來傳輸文本數(shù)據(jù),通常被用于實(shí)時通知、實(shí)時更新等場景。

我們知道推送消息還有另一種方式是webscoket,那么SSE和webscoket有什么區(qū)別?

  • 協(xié)議:SSE 基于 HTTP 協(xié)議,而 WebSocket 則是一種獨(dú)立的協(xié)議。SSE 使用簡單的文本格式傳輸數(shù)據(jù),而 WebSocket 使用二進(jìn)制幀進(jìn)行通信。
  • 雙向通信:WebSocket 支持雙向通信,客戶端和服務(wù)器可以同時發(fā)送和接收數(shù)據(jù)。而 SSE 是單向通信,只允許服務(wù)器向客戶端推送數(shù)據(jù),客戶端無法發(fā)送數(shù)據(jù)給服務(wù)器。
  • 連接狀態(tài):WebSocket 使用長連接,保持持久連接,可以實(shí)現(xiàn)實(shí)時雙向通信。而 SSE 是基于短連接,每次請求結(jié)束后需要重新建立連接。
  • 兼容性:WebSocket 在大多數(shù)現(xiàn)代瀏覽器中都有良好的支持,但在一些舊版本的瀏覽器中可能會存在兼容性問題。SSE 在較新的瀏覽器中也有良好的支持,但在舊版本瀏覽器中可能不被支持。
  • 使用場景:WebSocket 適用于需要實(shí)時雙向通信的場景,如聊天應(yīng)用、實(shí)時游戲等。SSE 更適合服務(wù)器向客戶端單向推送實(shí)時數(shù)據(jù)的場景,如股票報(bào)價、實(shí)時通知等。

綜上所述,SSE 和 WebSocket 都是實(shí)現(xiàn)實(shí)時通信的技術(shù),但在協(xié)議、雙向通信、連接狀態(tài)、兼容性和使用場景等方面存在一些區(qū)別。選擇使用哪種技術(shù)取決于具體的需求和應(yīng)用場景。

SseEmitter 常用方法

下面是 SseEmitter 類的一些常用方法

SseEmitter():構(gòu)造方法,用于創(chuàng)建一個新的 SseEmitter 對象。

SseEmitter(Long timeout):構(gòu)造方法,用于創(chuàng)建一個設(shè)置了超時時間新的 SseEmitter 對象。

send(Object data):直接發(fā)送一個帶有默認(rèn)事件名稱的 SSE 事件給客戶端,數(shù)據(jù)由參數(shù)指定。

send(Object data, MediaType mediaType):發(fā)送一個帶有指定媒體類型的 SSE 事件給客戶端。

onTimeout(Runnable callback):設(shè)置連接超時時的回調(diào)函數(shù)。

onCompletion(Runnable callback):設(shè)置連接完成時的回調(diào)函數(shù)。

onError(Consumer callback):設(shè)置發(fā)送錯誤時的回調(diào)函數(shù)。

complete():表示 SSE 事件流結(jié)束,通知客戶端不再有新的事件發(fā)送。

SseEmitter推送消息工具類

下面,我就用SseEmitter來編寫一個推送消息給所有在線用戶和某個用戶的工具類

@Slf4j
@Component
public class SseEmitterUtil {
    //保存客戶連接
    public static Map<String, SseEmitter> userSseEmitters = new HashMap<>();
    /**
     * 用戶上線,開啟一個SSE連接
     * @param userId
     * @return
     */
    public SseEmitter subscribe(String userId){
        //創(chuàng)建一個超時時間為30秒的SseEmitter對象
        SseEmitter sseEmitter = new SseEmitter(30*1000L);
        //設(shè)置回調(diào)函數(shù)
        sseEmitter.onCompletion(completionCallBack(userId));
        sseEmitter.onError(errorCallBack(userId));
        sseEmitter.onTimeout(timeoutCallBack(userId));
        //緩存起來
        userSseEmitters.put(userId,sseEmitter);
        return sseEmitter;
    }
    /**
     * 推送消息給某個客戶端
     * @param userId  用戶ID
     * @param content 消息體
     */
    public static void push(String userId,String content){
        SseEmitter sseEmitter = userSseEmitters.get(userId);
        if (sseEmitter != null) {
            try {
                sseEmitter.send(content);
            }catch (Exception e){
                log.error("用戶-{} 推送消息異常:{}",userId,e);
            }
        }else {
            log.info("用戶-{} 連接不存在",userId);
        }
    }
    /**
     * 推送消息給所有客戶端
     * @param content 消息體
     */
    public static void pushAllUser(String content){
        for (SseEmitter emitter : userSseEmitters.values()) {
            try {
                emitter.send(SseEmitter.event().name("全局消息").data(content));
            } catch (Exception e) {
                log.error("推送消息異常:{}",e);
            }
        }
    }
    /**
     * 刪除某個客戶連接
     * @param userId
     */
    public static void removeEmitter(String userId) {
        SseEmitter sseEmitter = userSseEmitters.get(userId);
        if (sseEmitter != null) {
            sseEmitter.complete();
            userSseEmitters.remove(userId);
        }else {
            log.info("用戶-{} 連接不存在",userId);
        }
    }
    private Runnable completionCallBack(String userId) {
        return () -> {
            log.info("用戶-{} 連接成功", userId);
        };
    }
    /**
     * 出現(xiàn)超時,將當(dāng)前用戶緩存刪除
     * @param userId
     * @return
     */
    private Runnable timeoutCallBack(String userId) {
        return () -> {
            log.info("用戶-{} 連接超時", userId);
            userSseEmitters.remove(userId);
        };
    }
    /**
     * 出現(xiàn)異常,將當(dāng)前用戶緩存刪除
     * @param userId
     * @return
     */
    private Consumer<Throwable> errorCallBack(String userId) {
        return throwable -> {
            log.info("用戶-{} 連接異常", userId);
            userSseEmitters.remove(userId);
        };
    }
}

我們在登錄的時候就可以調(diào)用subscribe來建立連接,然后發(fā)在線用戶公告就可以使用pushAllUser,某個用戶通知就使用push

SseEmitter搭配監(jiān)聽器

我們還可以搭配監(jiān)聽器來使用,定義某個事件的監(jiān)聽器,當(dāng)事件發(fā)生,就使用SseService 來推送消息

如下,我們定義一個監(jiān)聽器,用于監(jiān)聽用戶登錄的事件。在監(jiān)聽器中,使用 SseService 向所有在線用戶發(fā)送一條上線通知。

@Component
public class UserLoginListener {
   private final SseEmitterUtil sseService;
    @Autowired
    public UserLoginListener(SseEmitterUtil sseService) {
        this.sseService = sseService;
    }
    @EventListener
    public void onUserLogin(UserLoginEvent event) {
        String message = "用戶 " + event.getUserId() + " 上線了";
        sseService.pushAllUser(message);
    }
}

當(dāng)用戶登錄時,發(fā)布一個 UserLoginEvent 事件,觸發(fā) UserLoginListener 中的 onUserLogin 方法,向所有在線用戶發(fā)送上線通知。

@Service
public class UserService {
    private final ApplicationEventPublisher publisher;
    @Autowired
    public UserService(ApplicationEventPublisher publisher) {
        this.publisher = publisher;
    }
    public void login(String userId) {
        // 登錄邏輯
        publisher.publishEvent(new UserLoginEvent(userId));
    }
}

事件監(jiān)聽除了可以使用spring的監(jiān)聽機(jī)制,還可以使用redis的事件監(jiān)聽,這個之后講解

到此這篇關(guān)于Spring SseEmitter推送消息的文章就介紹到這了,更多相關(guān)Spring SseEmitter推送消息內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評論