Spring Boot集成WebSocket項(xiàng)目實(shí)戰(zhàn)的示例代碼
WebSocket是一種在單個(gè)TCP連接上進(jìn)行全雙工通信的協(xié)議,它使得客戶端和服務(wù)器之間的數(shù)據(jù)交換變得更加簡單,允許服務(wù)端主動(dòng)向客戶端推送數(shù)據(jù)。下面我將詳細(xì)介紹如何在Spring Boot項(xiàng)目中集成WebSocket,并提供完整的實(shí)戰(zhàn)代碼。
一、項(xiàng)目配置與依賴
1. 添加依賴
首先在pom.xml中添加必要依賴:
<dependencies>
<!-- WebSocket支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- Web支持 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Lombok簡化代碼 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- JSON處理 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
</dependencies>
2. WebSocket配置類
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
二、WebSocket服務(wù)端實(shí)現(xiàn)
1. 消息實(shí)體類
import lombok.Data;
@Data
public class WebSocketMessage {
private String type; // 消息類型:heartbeat/unicast/broadcast
private String from; // 發(fā)送者ID
private String to; // 接收者ID(單播時(shí)使用)
private String content; // 消息內(nèi)容
private Long timestamp; // 時(shí)間戳
}
2. WebSocket服務(wù)核心類
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@Component
@ServerEndpoint("/ws/{userId}")
public class WebSocketServer {
// 在線連接數(shù)
private static final AtomicInteger onlineCount = new AtomicInteger(0);
// 存放每個(gè)客戶端對應(yīng)的WebSocketServer對象
private static final ConcurrentHashMap<String, WebSocketServer> webSocketMap = new ConcurrentHashMap<>();
// 與某個(gè)客戶端的連接會(huì)話
private Session session;
// 接收的用戶ID
private String userId;
// 最后心跳時(shí)間
private long lastHeartbeatTime = System.currentTimeMillis();
/**
* 連接建立成功調(diào)用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
this.session = session;
this.userId = userId;
if (webSocketMap.containsKey(userId)) {
webSocketMap.remove(userId);
webSocketMap.put(userId, this);
} else {
webSocketMap.put(userId, this);
addOnlineCount();
}
log.info("用戶連接:{},當(dāng)前在線人數(shù):{}", userId, getOnlineCount());
// 啟動(dòng)心跳檢測線程
new HeartbeatThread().start();
}
/**
* 連接關(guān)閉調(diào)用的方法
*/
@OnClose
public void onClose() {
if (webSocketMap.containsKey(userId)) {
webSocketMap.remove(userId);
subOnlineCount();
}
log.info("用戶退出:{},當(dāng)前在線人數(shù):{}", userId, getOnlineCount());
}
/**
* 收到客戶端消息后調(diào)用的方法
*/
@OnMessage
public void onMessage(String message, Session session) {
log.info("收到用戶{}的消息:{}", userId, message);
WebSocketMessage msg = JSON.parseObject(message, WebSocketMessage.class);
// 心跳檢測
if ("heartbeat".equals(msg.getType())) {
this.lastHeartbeatTime = System.currentTimeMillis();
return;
}
// 處理業(yè)務(wù)消息
switch (msg.getType()) {
case "unicast":
sendToUser(msg.getTo(), message);
break;
case "broadcast":
broadcast(message);
break;
default:
log.warn("未知的消息類型:{}", msg.getType());
}
}
/**
* 發(fā)生錯(cuò)誤時(shí)調(diào)用
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用戶{}的連接發(fā)生錯(cuò)誤:{}", userId, error.getMessage());
error.printStackTrace();
}
/**
* 單播消息
*/
public static void sendToUser(String toUserId, String message) {
if (webSocketMap.containsKey(toUserId)) {
webSocketMap.get(toUserId).sendMessage(message);
} else {
log.warn("用戶{}不在線,消息發(fā)送失敗", toUserId);
}
}
/**
* 廣播消息
*/
public static void broadcast(String message) {
webSocketMap.forEach((userId, server) -> {
if (server.session.isOpen()) {
server.sendMessage(message);
}
});
}
/**
* 服務(wù)器主動(dòng)推送
*/
public void sendMessage(String message) {
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("發(fā)送消息失敗:{}", e.getMessage());
}
}
public static synchronized int getOnlineCount() {
return onlineCount.get();
}
public static synchronized void addOnlineCount() {
onlineCount.incrementAndGet();
}
public static synchronized void subOnlineCount() {
onlineCount.decrementAndGet();
}
/**
* 心跳檢測線程
*/
private class HeartbeatThread extends Thread {
@Override
public void run() {
while (session.isOpen()) {
try {
// 每30秒檢測一次
Thread.sleep(30000);
// 超過60秒未收到心跳,關(guān)閉連接
if (System.currentTimeMillis() - lastHeartbeatTime > 60000) {
session.close();
break;
}
} catch (Exception e) {
log.error("心跳檢測異常:{}", e.getMessage());
break;
}
}
}
}
}
3. WebSocket控制器(可選)
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/ws")
public class WebSocketController {
@PostMapping("/sendToUser")
public String sendToUser(@RequestParam String toUserId, @RequestParam String message) {
WebSocketServer.sendToUser(toUserId, message);
return "消息已發(fā)送";
}
@PostMapping("/broadcast")
public String broadcast(@RequestParam String message) {
WebSocketServer.broadcast(message);
return "廣播已發(fā)送";
}
}
三、前端實(shí)現(xiàn)
1. HTML頁面
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>WebSocket測試</title>
<script src="https://cdn.bootcss.com/jquery/3.4.1/jquery.min.js"></script>
</head>
<body>
<div>
<h2>WebSocket測試</h2>
<div>
<label>用戶ID:</label>
<input type="text" id="userId" value="user1">
<button onclick="connect()">連接</button>
<button onclick="disconnect()" disabled id="disconnectBtn">斷開</button>
</div>
<div>
<label>目標(biāo)用戶:</label>
<input type="text" id="toUserId" value="user2">
<label>消息內(nèi)容:</label>
<input type="text" id="message" value="Hello WebSocket">
<button onclick="sendUnicast()">單播</button>
<button onclick="sendBroadcast()">廣播</button>
</div>
<div>
<h3>消息日志:</h3>
<div id="messageLog" style="height:300px;overflow-y:scroll;border:1px solid #ccc;"></div>
</div>
</div>
<script>
var websocket = null;
var heartbeatInterval = null;
// 連接WebSocket
function connect() {
var userId = $('#userId').val();
if (!userId) {
alert('請輸入用戶ID');
return;
}
if ('WebSocket' in window) {
websocket = new WebSocket('ws://' + window.location.host + '/ws/' + userId);
} else {
alert('當(dāng)前瀏覽器不支持WebSocket');
return;
}
websocket.onopen = function() {
logMessage('連接已建立');
$('#disconnectBtn').prop('disabled', false);
// 啟動(dòng)心跳檢測
startHeartbeat();
};
websocket.onmessage = function(event) {
logMessage('收到消息: ' + event.data);
};
websocket.onclose = function() {
logMessage('連接已關(guān)閉');
$('#disconnectBtn').prop('disabled', true);
// 停止心跳檢測
stopHeartbeat();
};
websocket.onerror = function(error) {
logMessage('發(fā)生錯(cuò)誤: ' + error.data);
};
}
// 斷開連接
function disconnect() {
if (websocket != null) {
websocket.close();
}
}
// 發(fā)送單播消息
function sendUnicast() {
if (websocket == null || websocket.readyState != WebSocket.OPEN) {
alert('請先建立連接');
return;
}
var message = {
type: 'unicast',
from: $('#userId').val(),
to: $('#toUserId').val(),
content: $('#message').val(),
timestamp: new Date().getTime()
};
websocket.send(JSON.stringify(message));
logMessage('已發(fā)送單播消息: ' + JSON.stringify(message));
}
// 發(fā)送廣播消息
function sendBroadcast() {
if (websocket == null || websocket.readyState != WebSocket.OPEN) {
alert('請先建立連接');
return;
}
var message = {
type: 'broadcast',
from: $('#userId').val(),
content: $('#message').val(),
timestamp: new Date().getTime()
};
websocket.send(JSON.stringify(message));
logMessage('已發(fā)送廣播消息: ' + JSON.stringify(message));
}
// 啟動(dòng)心跳檢測
function startHeartbeat() {
// 每20秒發(fā)送一次心跳
heartbeatInterval = setInterval(function() {
if (websocket.readyState == WebSocket.OPEN) {
var heartbeat = {
type: 'heartbeat',
from: $('#userId').val(),
timestamp: new Date().getTime()
};
websocket.send(JSON.stringify(heartbeat));
logMessage('已發(fā)送心跳');
}
}, 20000);
}
// 停止心跳檢測
function stopHeartbeat() {
if (heartbeatInterval != null) {
clearInterval(heartbeatInterval);
heartbeatInterval = null;
}
}
// 記錄日志
function logMessage(message) {
var logDiv = $('#messageLog');
logDiv.append('<p>' + new Date().toLocaleString() + ' - ' + message + '</p>');
logDiv.scrollTop(logDiv[0].scrollHeight);
}
</script>
</body>
</html>
四、功能測試
?連接測試?:
- 啟動(dòng)Spring Boot應(yīng)用
- 打開兩個(gè)瀏覽器窗口,分別輸入不同用戶ID連接
- 觀察控制臺(tái)日志,確認(rèn)連接建立
?單播測試?:
- 在窗口A輸入目標(biāo)用戶ID(窗口B的用戶ID)和消息內(nèi)容
- 點(diǎn)擊"單播"按鈕,確認(rèn)窗口B收到消息
?廣播測試?:
- 在任意窗口點(diǎn)擊"廣播"按鈕
- 確認(rèn)所有連接的窗口都收到消息
?心跳檢測測試?:
- 觀察控制臺(tái)日志,確認(rèn)每20秒發(fā)送一次心跳
- 斷開網(wǎng)絡(luò)連接60秒以上,確認(rèn)自動(dòng)斷開WebSocket連接
五、高級(jí)功能擴(kuò)展
1. 消息持久化
// 在WebSocketServer類中添加
private void saveMessage(WebSocketMessage message) {
// 實(shí)現(xiàn)消息存儲(chǔ)邏輯,可存入數(shù)據(jù)庫或Redis
log.info("存儲(chǔ)消息: {}", JSON.toJSONString(message));
}
2. 斷線重連機(jī)制(前端)
// 修改前端connect函數(shù)
var reconnectAttempts = 0;
var maxReconnectAttempts = 5;
var reconnectInterval = 5000; // 5秒
function connect() {
// ...原有代碼...
websocket.onclose = function() {
logMessage('連接已關(guān)閉');
$('#disconnectBtn').prop('disabled', true);
stopHeartbeat();
// 斷線重連邏輯
if (reconnectAttempts < maxReconnectAttempts) {
reconnectAttempts++;
logMessage('嘗試重新連接(' + reconnectAttempts + '/' + maxReconnectAttempts + ')');
setTimeout(connect, reconnectInterval);
}
};
// 連接成功后重置重試計(jì)數(shù)
websocket.onopen = function() {
reconnectAttempts = 0;
// ...原有代碼...
};
}
3. 使用STOMP協(xié)議(可選)
如果需要更復(fù)雜的消息路由和訂閱/發(fā)布模式,可以考慮使用STOMP協(xié)議:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic", "/queue");
config.setApplicationDestinationPrefixes("/app");
config.setUserDestinationPrefix("/user");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws-stomp")
.setAllowedOriginPatterns("*")
.withSockJS();
}
}
六、優(yōu)化
實(shí)際項(xiàng)目中,還可以根據(jù)需求進(jìn)一步優(yōu)化:
- 添加JWT認(rèn)證機(jī)制
- 實(shí)現(xiàn)消息歷史記錄查詢
- 增加用戶在線狀態(tài)管理
對于高并發(fā)場景,可以考慮以下優(yōu)化:
- 使用
session.getAsyncRemote().sendText()替代同步發(fā)送 - 增加消息緩沖區(qū)
- 使用Redis等中間件實(shí)現(xiàn)分布式WebSocket
到此這篇關(guān)于Spring Boot集成WebSocket項(xiàng)目實(shí)戰(zhàn)的示例代碼的文章就介紹到這了,更多相關(guān)Spring Boot集成WebSocket內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot分布式WebSocket的實(shí)現(xiàn)指南
- SpringBoot實(shí)現(xiàn)WebSocket通信過程解讀
- 深入淺出SpringBoot WebSocket構(gòu)建實(shí)時(shí)應(yīng)用全面指南
- 利用SpringBoot與WebSocket實(shí)現(xiàn)實(shí)時(shí)雙向通信功能
- Springboot整合WebSocket 實(shí)現(xiàn)聊天室功能
- vue+springboot+webtrc+websocket實(shí)現(xiàn)雙人音視頻通話會(huì)議(最新推薦)
- Springboot使用Websocket的時(shí)候調(diào)取IOC管理的Bean報(bào)空指針異常問題
- Java?springBoot初步使用websocket的代碼示例
- SpringBoot3整合WebSocket詳細(xì)指南
- SpringBoot實(shí)現(xiàn)WebSocket的示例代碼
相關(guān)文章
Spring boot中使用Spring-data-jpa方便快捷的訪問數(shù)據(jù)庫(推薦)
Spring Data JPA 是 Spring 基于 ORM 框架、JPA 規(guī)范的基礎(chǔ)上封裝的一套JPA應(yīng)用框架,可使開發(fā)者用極簡的代碼即可實(shí)現(xiàn)對數(shù)據(jù)的訪問和操作。這篇文章主要介紹了Spring-boot中使用Spring-data-jpa方便快捷的訪問數(shù)據(jù)庫,需要的朋友可以參考下2018-05-05
java 數(shù)據(jù)結(jié)構(gòu)中棧和隊(duì)列的實(shí)例詳解
這篇文章主要介紹了java 數(shù)據(jù)結(jié)構(gòu)中棧和隊(duì)列的實(shí)例詳解的相關(guān)資料,主要使用數(shù)組與線性表的方法來實(shí)現(xiàn),需要的朋友可以參考下2017-09-09
SpringCloud與Dubbo集成Nacos時(shí)服務(wù)重復(fù)注冊問題的分析與解決
Nacos作為阿里巴巴開源的服務(wù)注冊與發(fā)現(xiàn)工具,廣泛應(yīng)用于Spring Cloud和Dubbo等微服務(wù)框架中,然而,在實(shí)際開發(fā)中,我們可能會(huì)遇到服務(wù)重復(fù)注冊的問題,下面我們就來詳細(xì)分析一下這一問題2025-03-03
SpringBoot FreeWorker模板技術(shù)解析
這篇文章主要介紹了SpringBoot FreeWorker模板技術(shù)解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11

