Springboot+Stomp協(xié)議實現(xiàn)聊天功能
前端代碼
這里我對Stomp.js進行了一個簡單的封裝,寫在stomp-client.js里面
/** * 對 stomp 客戶端進行封裝 */ var client; var subscribes = []; var errorTimes = 0; var endpoint = "/ws"; /** * 建立websocket連接 * @param {Function} onConnecting 開始連接時的回調(diào) * @param {Function} onConnected 連接成功回調(diào) * @param {Function} onError 連接異常或斷開回調(diào) */ function connect(onConnecting, onConnected, onError) { onConnecting instanceof Function && onConnecting(); var sock = new SockJS(endpoint); client = Stomp.over(sock); console.log("ws: start connect to " + endpoint); client.connect({}, function (frame) { errorTimes = 0; console.log('connected: ' + frame); // 連接成功后重新訂閱 subscribes.forEach(function (item) { client.subscribe(item.destination, function (resp) { console.debug("ws收到消息: ", resp); item.cb(JSON.parse(resp.body)); }); }); onConnected instanceof Function && onConnected(); }, function (err) { errorTimes = errorTimes > 8 ? 0 : errorTimes; var nextTime = ++errorTimes * 3000; console.warn("與服務(wù)器斷開連接," + nextTime + " 秒后重新連接", err); setTimeout(function () { console.log("嘗試重連……"); connect(onConnecting, onConnected, onError); }, nextTime); onError instanceof Function && onError(); }); } /** * 訂閱消息,若當(dāng)前未連接,則會在連接成功后自動訂閱 * * 注意,為防止重連導(dǎo)致重復(fù)訂閱,請勿使用匿名函數(shù)做回調(diào) * * @param {String} destination 目標(biāo) * @param {Function} cb 回調(diào) */ function subscribe(destination, cb) { var exist = subscribes.filter(function (sub) { return sub.destination === destination && sub.cb === cb }); // 防止重復(fù)訂閱 if (exist && exist.length) { return; } // 記錄所有訂閱,在連接成功時統(tǒng)一處理 subscribes.push({ destination: destination, cb: cb }); if (client && client.connected) { client.subscribe(destination, function (resp) { console.debug("ws收到消息: ", resp); cb instanceof Function && cb(JSON.parse(resp.body)); }); } else { console.warn("ws未連接,暫時無法訂閱:" + destination) } } /** * 發(fā)送消息 * @param {String} destination 目標(biāo) * @param {Object} msg 消息體對象 */ function send(destination, msg) { if (!client) { console.error("客戶端未連接,無法發(fā)送消息!") } client.send(destination, {}, JSON.stringify(msg)); } window.onbeforeunload = function () { // 當(dāng)窗口關(guān)閉時斷開連接 if (client && client.connected) { client.disconnect(function () { console.log("websocket disconnected "); }); } };
前端的html頁面index.html如下:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>STOMP</title> </head> <body> <h1 id="tip">Welcome!</h1> <p>狀態(tài): <span id="status"></span></p> <input type="text" id="content" placeholder="請輸入要發(fā)送的消息"> <br> <button onclick="sendTextMsg()">發(fā)送</button> <ul id="ul"> </ul> <script th:src="@{lib/sockjs.min.js}"></script> <script th:src="@{lib/stomp.min.js}"></script> <script th:src="@{stomp-client.js}"></script> <script> connect(function () { statusChange("連接中..."); }, function () { statusChange("在線"); // 注意,為防止重連導(dǎo)致重復(fù)訂閱,請勿使用匿名函數(shù)做回調(diào) subscribe("/user/topic/subNewMsg", onNewMsg); }, function () { statusChange("離線"); }); function onNewMsg(msg) { var li = document.createElement("li"); li.innerText = msg.content; document.getElementById("ul").appendChild(li); } function sendTextMsg() { var content = document.getElementById("content").value; var msg = { msgType: 1, content: content }; send("/app/echo", msg); } function statusChange(status) { document.getElementById("status").innerText = status; } </script> </body> </html>
后端代碼
依賴引入,主要引入下面的包,其它的包略過
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
配置類
@Slf4j @Setter @Configuration @EnableWebSocketMessageBroker @ConfigurationProperties(prefix = "websocket") @RequiredArgsConstructor(onConstructor_ = {@Autowired}) public class WebSocketConfig implements WebSocketMessageBrokerConfigurer, ApplicationListener<BrokerAvailabilityEvent> { private final BrokerConfig brokerConfig; private String[] allowOrigins; @Override public void registerStompEndpoints(StompEndpointRegistry registry) { // 繼承DefaultHandshakeHandler并重寫determineUser方法,可以自定義如何確定用戶 // 添加方法:registry.addEndpoint("/ws").setHandshakeHandler(handshakeHandler) registry.addEndpoint("/ws") .setAllowedOrigins(allowOrigins) .withSockJS(); } /** * 配置消息代理 */ @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.setApplicationDestinationPrefixes("/app"); if (brokerConfig.isUseSimpleBroker()) { // 使用 SimpleBroker // 配置前綴, 有這些前綴的消息會路由到broker registry.enableSimpleBroker("/topic", "/queue") //配置stomp協(xié)議里, server返回的心跳 .setHeartbeatValue(new long[]{10000L, 10000L}) //配置發(fā)送心跳的scheduler .setTaskScheduler(new DefaultManagedTaskScheduler()); } else { // 使用外部 Broker // 指定前綴,有這些前綴的消息會路由到broker registry.enableStompBrokerRelay("/topic", "/queue") // 廣播用戶目標(biāo),如果要推送的用戶不在本地,則通過 broker 廣播給集群的其他成員 .setUserDestinationBroadcast("/topic/log-unresolved-user") // 用戶注冊廣播,一旦有用戶登錄,則廣播給集群中的其他成員 .setUserRegistryBroadcast("/topic/log-user-registry") // 虛擬地址 .setVirtualHost(brokerConfig.getVirtualHost()) // 用戶密碼 .setSystemLogin(brokerConfig.getUsername()) .setSystemPasscode(brokerConfig.getPassword()) .setClientLogin(brokerConfig.getUsername()) .setClientPasscode(brokerConfig.getPassword()) // 心跳間隔 .setSystemHeartbeatSendInterval(10000) .setSystemHeartbeatReceiveInterval(10000) // 使用 setTcpClient 以配置多個 broker 地址,setRelayHost/Port 只能配置一個 .setTcpClient(createTcpClient()); } } /** * 創(chuàng)建 TcpClient 工廠,用于配置多個 broker 地址 */ private ReactorNettyTcpClient<byte[]> createTcpClient() { return new ReactorNettyTcpClient<>( // BrokerAddressSupplier 用于獲取中繼地址,一次只使用一個,如果該中繼出錯,則會獲取下一個 client -> client.addressSupplier(brokerConfig.getBrokerAddressSupplier()), new StompReactorNettyCodec()); } @Override public void onApplicationEvent(BrokerAvailabilityEvent event) { if (!event.isBrokerAvailable()) { log.warn("stomp broker is not available!!!!!!!!"); } else { log.info("stomp broker is available"); } } }
消息處理
@Slf4j @Controller @RequiredArgsConstructor(onConstructor_ = {@Autowired}) public class StompController { private final SimpMessageSendingOperations msgOperations; private final SimpUserRegistry simpUserRegistry; /** * 回音消息,將用戶發(fā)來的消息內(nèi)容加上 Echo 前綴后推送回客戶端 */ @MessageMapping("/echo") public void echo(Principal principal, Msg msg) { String username = principal.getName(); msg.setContent("Echo: " + msg.getContent()); msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg); int userCount = simpUserRegistry.getUserCount(); int sessionCount = simpUserRegistry.getUser(username).getSessions().size(); log.info("當(dāng)前本系統(tǒng)總在線人數(shù): {}, 當(dāng)前用戶: {}, 該用戶的客戶端連接數(shù): {}", userCount, username, sessionCount); } }
實現(xiàn)效果
報文分析
開啟調(diào)試模式,我們根據(jù)報文來分析一下前后端互通的報文
握手
客戶端請求報文如下
GET ws://localhost:8025/ws/035/5hy4avgm/websocket HTTP/1.1 Host: localhost:8025 Connection: Upgrade Pragma: no-cache Cache-Control: no-cache User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.5735.289 Safari/537.36 Upgrade: websocket Origin: http://localhost:8025 Sec-WebSocket-Version: 13 Accept-Encoding: gzip, deflate, br Accept-Language: zh-CN,zh;q=0.9 Cookie: 略 Sec-WebSocket-Key: PlMHmdl2JRzDAVk3feOaeA== Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
服務(wù)端響應(yīng)握手請求
HTTP/1.1 101 Upgrade: websocket Connection: upgrade Sec-WebSocket-Accept: 9CKY8n1j/cHoKsWmpmX4pNlQuZg= Sec-WebSocket-Extensions: permessage-deflate;client_max_window_bits=15 X-Content-Type-Options: nosniff X-XSS-Protection: 1; mode=block Cache-Control: no-cache, no-store, max-age=0, must-revalidate Pragma: no-cache Expires: 0 X-Frame-Options: DENY Date: Thu, 08 Feb 2024 06:58:28 GMT
stomp報文分析
在瀏覽器消息一欄,我們可以看到長連接過程中通信的報文
下面來簡單分析一下stomp的報文
客戶端請求連接
其中\(zhòng)n表示換行
[ "CONNECT\naccept-version:1.1,1.0\nheart-beat:10000,10000\n\n\u0000" ]
可以看到請求連接的命令是CONNECT,連接報文里面還包含了心跳的信息
服務(wù)端返回連接成功
[ "CONNECTED\nversion:1.1\nheart-beat:10000,10000\nuser-name:admin\n\n\u0000" ]
CONNECTED是服務(wù)端連接成功的命令,報文中也包含了心跳的信息
客戶端訂閱
訂閱的目的地是:/user/topic/subNewMsg
["SUBSCRIBE\nid:sub-0\ndestination:/user/topic/subNewMsg\n\n\u0000"]
客戶端發(fā)送消息
發(fā)送的目的地是:/app/echo
[ "SEND\ndestination:/app/echo\ncontent-length:35\n\n{\"msgType\":1,\"content\":\"你好啊\"}\u0000" ]
服務(wù)端響應(yīng)消息
響應(yīng)的目的地是:/user/topic/subNewMsg,當(dāng)訂閱了這個目的地的,方法,將會被回調(diào)
[ "MESSAGE\ndestination:/user/topic/subNewMsg\ncontent-type:application/json;charset=UTF-8\nsubscription:sub-0\nmessage-id:5hy4avgm-1\ncontent-length:41\n\n{\"content\":\"Echo: 你好啊\",\"msgType\":1}\u0000" ]
心跳報文
可以看到,約每隔10S,客戶端和服務(wù)端都有一次心跳報文,發(fā)送的報文內(nèi)容為一個回車。
[\n]
項目鏈接:https://gitee.com/syk1234/stomp-demo.git
以上就是Springboot+Stomp協(xié)議實現(xiàn)聊天功能的詳細內(nèi)容,更多關(guān)于Springboot+Stomp聊天的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
詳解SpringBoot項目整合Vue做一個完整的用戶注冊功能
本文主要介紹了SpringBoot項目整合Vue做一個完整的用戶注冊功能,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07Mybatis-plus foreach拼接字符串查詢無數(shù)據(jù)返回問題
這篇文章主要介紹了Mybatis-plus foreach拼接字符串查詢無數(shù)據(jù)返回問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03Netty + ZooKeeper 實現(xiàn)簡單的服務(wù)注冊與發(fā)現(xiàn)
服務(wù)注冊和發(fā)現(xiàn)一直是分布式的核心組件。本文介紹了借助 ZooKeeper 做注冊中心,如何實現(xiàn)一個簡單的服務(wù)注冊和發(fā)現(xiàn)。,需要的朋友可以參考下2019-06-06