Springboot+Stomp協(xié)議實(shí)現(xiàn)聊天功能
前端代碼
這里我對(duì)Stomp.js進(jìn)行了一個(gè)簡(jiǎn)單的封裝,寫(xiě)在stomp-client.js里面
/**
* 對(duì) stomp 客戶端進(jìn)行封裝
*/
var client;
var subscribes = [];
var errorTimes = 0;
var endpoint = "/ws";
/**
* 建立websocket連接
* @param {Function} onConnecting 開(kāi)始連接時(shí)的回調(diào)
* @param {Function} onConnected 連接成功回調(diào)
* @param {Function} onError 連接異常或斷開(kāi)回調(diào)
*/
function connect(onConnecting, onConnected, onError) {
onConnecting instanceof Function && onConnecting();
var sock = new SockJS(endpoint);
client = Stomp.over(sock);
console.log("ws: start connect to " + endpoint);
client.connect({}, function (frame) {
errorTimes = 0;
console.log('connected: ' + frame);
// 連接成功后重新訂閱
subscribes.forEach(function (item) {
client.subscribe(item.destination, function (resp) {
console.debug("ws收到消息: ", resp);
item.cb(JSON.parse(resp.body));
});
});
onConnected instanceof Function && onConnected();
}, function (err) {
errorTimes = errorTimes > 8 ? 0 : errorTimes;
var nextTime = ++errorTimes * 3000;
console.warn("與服務(wù)器斷開(kāi)連接," + nextTime + " 秒后重新連接", err);
setTimeout(function () {
console.log("嘗試重連……");
connect(onConnecting, onConnected, onError);
}, nextTime);
onError instanceof Function && onError();
});
}
/**
* 訂閱消息,若當(dāng)前未連接,則會(huì)在連接成功后自動(dòng)訂閱
*
* 注意,為防止重連導(dǎo)致重復(fù)訂閱,請(qǐng)勿使用匿名函數(shù)做回調(diào)
*
* @param {String} destination 目標(biāo)
* @param {Function} cb 回調(diào)
*/
function subscribe(destination, cb) {
var exist = subscribes.filter(function (sub) {
return sub.destination === destination && sub.cb === cb
});
// 防止重復(fù)訂閱
if (exist && exist.length) {
return;
}
// 記錄所有訂閱,在連接成功時(shí)統(tǒng)一處理
subscribes.push({
destination: destination,
cb: cb
});
if (client && client.connected) {
client.subscribe(destination, function (resp) {
console.debug("ws收到消息: ", resp);
cb instanceof Function && cb(JSON.parse(resp.body));
});
} else {
console.warn("ws未連接,暫時(shí)無(wú)法訂閱:" + destination)
}
}
/**
* 發(fā)送消息
* @param {String} destination 目標(biāo)
* @param {Object} msg 消息體對(duì)象
*/
function send(destination, msg) {
if (!client) {
console.error("客戶端未連接,無(wú)法發(fā)送消息!")
}
client.send(destination, {}, JSON.stringify(msg));
}
window.onbeforeunload = function () {
// 當(dāng)窗口關(guān)閉時(shí)斷開(kāi)連接
if (client && client.connected) {
client.disconnect(function () {
console.log("websocket disconnected ");
});
}
};
前端的html頁(yè)面index.html如下:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>STOMP</title>
</head>
<body>
<h1 id="tip">Welcome!</h1>
<p>狀態(tài): <span id="status"></span></p>
<input type="text" id="content" placeholder="請(qǐng)輸入要發(fā)送的消息"> <br>
<button onclick="sendTextMsg()">發(fā)送</button>
<ul id="ul">
</ul>
<script th:src="@{lib/sockjs.min.js}"></script>
<script th:src="@{lib/stomp.min.js}"></script>
<script th:src="@{stomp-client.js}"></script>
<script>
connect(function () {
statusChange("連接中...");
}, function () {
statusChange("在線");
// 注意,為防止重連導(dǎo)致重復(fù)訂閱,請(qǐng)勿使用匿名函數(shù)做回調(diào)
subscribe("/user/topic/subNewMsg", onNewMsg);
}, function () {
statusChange("離線");
});
function onNewMsg(msg) {
var li = document.createElement("li");
li.innerText = msg.content;
document.getElementById("ul").appendChild(li);
}
function sendTextMsg() {
var content = document.getElementById("content").value;
var msg = {
msgType: 1,
content: content
};
send("/app/echo", msg);
}
function statusChange(status) {
document.getElementById("status").innerText = status;
}
</script>
</body>
</html>
后端代碼
依賴引入,主要引入下面的包,其它的包略過(guò)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
配置類
@Slf4j
@Setter
@Configuration
@EnableWebSocketMessageBroker
@ConfigurationProperties(prefix = "websocket")
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer, ApplicationListener<BrokerAvailabilityEvent> {
private final BrokerConfig brokerConfig;
private String[] allowOrigins;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// 繼承DefaultHandshakeHandler并重寫(xiě)determineUser方法,可以自定義如何確定用戶
// 添加方法:registry.addEndpoint("/ws").setHandshakeHandler(handshakeHandler)
registry.addEndpoint("/ws")
.setAllowedOrigins(allowOrigins)
.withSockJS();
}
/**
* 配置消息代理
*/
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setApplicationDestinationPrefixes("/app");
if (brokerConfig.isUseSimpleBroker()) {
// 使用 SimpleBroker
// 配置前綴, 有這些前綴的消息會(huì)路由到broker
registry.enableSimpleBroker("/topic", "/queue")
//配置stomp協(xié)議里, server返回的心跳
.setHeartbeatValue(new long[]{10000L, 10000L})
//配置發(fā)送心跳的scheduler
.setTaskScheduler(new DefaultManagedTaskScheduler());
} else {
// 使用外部 Broker
// 指定前綴,有這些前綴的消息會(huì)路由到broker
registry.enableStompBrokerRelay("/topic", "/queue")
// 廣播用戶目標(biāo),如果要推送的用戶不在本地,則通過(guò) broker 廣播給集群的其他成員
.setUserDestinationBroadcast("/topic/log-unresolved-user")
// 用戶注冊(cè)廣播,一旦有用戶登錄,則廣播給集群中的其他成員
.setUserRegistryBroadcast("/topic/log-user-registry")
// 虛擬地址
.setVirtualHost(brokerConfig.getVirtualHost())
// 用戶密碼
.setSystemLogin(brokerConfig.getUsername())
.setSystemPasscode(brokerConfig.getPassword())
.setClientLogin(brokerConfig.getUsername())
.setClientPasscode(brokerConfig.getPassword())
// 心跳間隔
.setSystemHeartbeatSendInterval(10000)
.setSystemHeartbeatReceiveInterval(10000)
// 使用 setTcpClient 以配置多個(gè) broker 地址,setRelayHost/Port 只能配置一個(gè)
.setTcpClient(createTcpClient());
}
}
/**
* 創(chuàng)建 TcpClient 工廠,用于配置多個(gè) broker 地址
*/
private ReactorNettyTcpClient<byte[]> createTcpClient() {
return new ReactorNettyTcpClient<>(
// BrokerAddressSupplier 用于獲取中繼地址,一次只使用一個(gè),如果該中繼出錯(cuò),則會(huì)獲取下一個(gè)
client -> client.addressSupplier(brokerConfig.getBrokerAddressSupplier()),
new StompReactorNettyCodec());
}
@Override
public void onApplicationEvent(BrokerAvailabilityEvent event) {
if (!event.isBrokerAvailable()) {
log.warn("stomp broker is not available!!!!!!!!");
} else {
log.info("stomp broker is available");
}
}
}
消息處理
@Slf4j
@Controller
@RequiredArgsConstructor(onConstructor_ = {@Autowired})
public class StompController {
private final SimpMessageSendingOperations msgOperations;
private final SimpUserRegistry simpUserRegistry;
/**
* 回音消息,將用戶發(fā)來(lái)的消息內(nèi)容加上 Echo 前綴后推送回客戶端
*/
@MessageMapping("/echo")
public void echo(Principal principal, Msg msg) {
String username = principal.getName();
msg.setContent("Echo: " + msg.getContent());
msgOperations.convertAndSendToUser(username, "/topic/subNewMsg", msg);
int userCount = simpUserRegistry.getUserCount();
int sessionCount = simpUserRegistry.getUser(username).getSessions().size();
log.info("當(dāng)前本系統(tǒng)總在線人數(shù): {}, 當(dāng)前用戶: {}, 該用戶的客戶端連接數(shù): {}", userCount, username, sessionCount);
}
}
實(shí)現(xiàn)效果

報(bào)文分析
開(kāi)啟調(diào)試模式,我們根據(jù)報(bào)文來(lái)分析一下前后端互通的報(bào)文

握手
客戶端請(qǐng)求報(bào)文如下
GET ws://localhost:8025/ws/035/5hy4avgm/websocket HTTP/1.1 Host: localhost:8025 Connection: Upgrade Pragma: no-cache Cache-Control: no-cache User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.5735.289 Safari/537.36 Upgrade: websocket Origin: http://localhost:8025 Sec-WebSocket-Version: 13 Accept-Encoding: gzip, deflate, br Accept-Language: zh-CN,zh;q=0.9 Cookie: 略 Sec-WebSocket-Key: PlMHmdl2JRzDAVk3feOaeA== Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
服務(wù)端響應(yīng)握手請(qǐng)求
HTTP/1.1 101 Upgrade: websocket Connection: upgrade Sec-WebSocket-Accept: 9CKY8n1j/cHoKsWmpmX4pNlQuZg= Sec-WebSocket-Extensions: permessage-deflate;client_max_window_bits=15 X-Content-Type-Options: nosniff X-XSS-Protection: 1; mode=block Cache-Control: no-cache, no-store, max-age=0, must-revalidate Pragma: no-cache Expires: 0 X-Frame-Options: DENY Date: Thu, 08 Feb 2024 06:58:28 GMT
stomp報(bào)文分析
在瀏覽器消息一欄,我們可以看到長(zhǎng)連接過(guò)程中通信的報(bào)文

下面來(lái)簡(jiǎn)單分析一下stomp的報(bào)文

客戶端請(qǐng)求連接
其中\(zhòng)n表示換行
[ "CONNECT\naccept-version:1.1,1.0\nheart-beat:10000,10000\n\n\u0000" ]
可以看到請(qǐng)求連接的命令是CONNECT,連接報(bào)文里面還包含了心跳的信息
服務(wù)端返回連接成功
[ "CONNECTED\nversion:1.1\nheart-beat:10000,10000\nuser-name:admin\n\n\u0000" ]
CONNECTED是服務(wù)端連接成功的命令,報(bào)文中也包含了心跳的信息
客戶端訂閱
訂閱的目的地是:/user/topic/subNewMsg
["SUBSCRIBE\nid:sub-0\ndestination:/user/topic/subNewMsg\n\n\u0000"]
客戶端發(fā)送消息
發(fā)送的目的地是:/app/echo
[
"SEND\ndestination:/app/echo\ncontent-length:35\n\n{\"msgType\":1,\"content\":\"你好啊\"}\u0000"
]
服務(wù)端響應(yīng)消息
響應(yīng)的目的地是:/user/topic/subNewMsg,當(dāng)訂閱了這個(gè)目的地的,方法,將會(huì)被回調(diào)
[
"MESSAGE\ndestination:/user/topic/subNewMsg\ncontent-type:application/json;charset=UTF-8\nsubscription:sub-0\nmessage-id:5hy4avgm-1\ncontent-length:41\n\n{\"content\":\"Echo: 你好啊\",\"msgType\":1}\u0000"
]
心跳報(bào)文
可以看到,約每隔10S,客戶端和服務(wù)端都有一次心跳報(bào)文,發(fā)送的報(bào)文內(nèi)容為一個(gè)回車。
[\n]

項(xiàng)目鏈接:https://gitee.com/syk1234/stomp-demo.git
以上就是Springboot+Stomp協(xié)議實(shí)現(xiàn)聊天功能的詳細(xì)內(nèi)容,更多關(guān)于Springboot+Stomp聊天的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
詳解SpringBoot項(xiàng)目整合Vue做一個(gè)完整的用戶注冊(cè)功能
本文主要介紹了SpringBoot項(xiàng)目整合Vue做一個(gè)完整的用戶注冊(cè)功能,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07
Mybatis-plus foreach拼接字符串查詢無(wú)數(shù)據(jù)返回問(wèn)題
這篇文章主要介紹了Mybatis-plus foreach拼接字符串查詢無(wú)數(shù)據(jù)返回問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03
Netty + ZooKeeper 實(shí)現(xiàn)簡(jiǎn)單的服務(wù)注冊(cè)與發(fā)現(xiàn)
服務(wù)注冊(cè)和發(fā)現(xiàn)一直是分布式的核心組件。本文介紹了借助 ZooKeeper 做注冊(cè)中心,如何實(shí)現(xiàn)一個(gè)簡(jiǎn)單的服務(wù)注冊(cè)和發(fā)現(xiàn)。,需要的朋友可以參考下2019-06-06
Java異或運(yùn)算應(yīng)用場(chǎng)景詳解
這篇文章主要給大家介紹了關(guān)于Java異或運(yùn)算應(yīng)用場(chǎng)景的相關(guān)資料,異或運(yùn)算會(huì)應(yīng)用在很多算法題中,這里整理了幾個(gè)最常見(jiàn)的應(yīng)用場(chǎng)景,文中通過(guò)代碼示例介紹的非常詳細(xì),需要的朋友可以參考下2023-07-07
SpringBoot實(shí)現(xiàn)多租戶架構(gòu)
在SpringBoot中可以通過(guò)多數(shù)據(jù)源和動(dòng)態(tài)路由來(lái)實(shí)現(xiàn)多租戶機(jī)制,本文主要介紹了SpringBoot實(shí)現(xiàn)多租戶架構(gòu),具有一定的參考價(jià)值,感興趣的可以里哦啊接一下2024-03-03
springboot集成Feign的實(shí)現(xiàn)示例
Feign是聲明式HTTP客戶端,用于簡(jiǎn)化微服務(wù)之間的REST調(diào)用,本文就來(lái)介紹一下springboot集成Feign的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-11-11

