Springboot 集成 SocketIO的示例代碼
1 前言
1.1 什么是 SocketIO ?
Socket.IO 是一個可以在瀏覽器與服務(wù)器之間實現(xiàn)實時、雙向、基于事件的通信的工具庫。 Socket.IO 能夠在任何平臺、瀏覽器或設(shè)備上運行,可靠性和速度同樣出色。其本質(zhì)上是將 webSocket、Ajax 和其他通信方式再封裝了一層,更強大,適應(yīng)性和兼容性更好。
(這句話怎么理解呢?簡單的來說,就是客戶端可以給服務(wù)端發(fā)消息,服務(wù)端也可以給客戶端發(fā)消息,而鏈接它們之間的消息紐帶,就是“事件監(jiān)聽”。)
1.2 webSocket 的優(yōu)點
webSocket 和 socket.io 區(qū)別?
- webSocketa:一種讓客戶端和服務(wù)器之間能進(jìn)行雙向?qū)崟r通信的技術(shù)
b:使用時,雖然主流瀏覽器都已經(jīng)支持,但仍然可能有不兼容的情況
c:適合用于client和基于node搭建的服務(wù)端使用 - socket.ioa:將 webSocket、Ajax 和其它的通信方式全部封裝成了統(tǒng)一的通信接口
b:使用時,不用擔(dān)心兼容問題,底層會自動選用最佳的通信方式
c:適合進(jìn)行服務(wù)端和客戶端雙向數(shù)據(jù)通信
d:Socket.IO中文網(wǎng)地址:https://socket.nodejs.cn/docs/v4/
1.3 應(yīng)用及版本
- spring-boot:2.5.14
- socketio:2.0.3
- jdk:java8
- 本文是基于《若依前后端分離》版本的基礎(chǔ)上進(jìn)行代碼編寫和演示的
2 物料準(zhǔn)備(均為后端代碼)
2.1 添加 Socket 依賴包
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>2.0.3</version>
</dependency>2.2 創(chuàng)建頻道常量類:SocketEventContants
我這個常量類是為了統(tǒng)一頻道所建,你們不一定需要這個類
package com.mss.common.constant;
/**
* @Description: Socket 自定義事件名稱
* @Author: zhanleai
*/
public class SocketEventContants {
/**
* 用戶頻道
**/
public static final String CHANNEL_USER = "channel_user";
/**
* 系統(tǒng)頻道
**/
public static final String CHANNEL_SYSTEM = "channel_system";
}
2.3 創(chuàng)建 Socket 連接類:SocketHandler
- 用來監(jiān)聽 socket 客戶端上下線,以及服務(wù)端自動關(guān)閉;
- 有些博主把這個類的內(nèi)容跟工具類里監(jiān)聽事件方法放在一起,個人認(rèn)為需要解耦,特別是在分布式的項目中;
package com.mss.framework.handle;
import com.corundumstudio.socketio.SocketIOServer;
import com.mss.common.utils.socket.SocketUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.springframework.stereotype.Component;
/**
* @Author: zhanleai
* @Description: 客戶端自動連接和斷開、服務(wù)端關(guān)閉
*/
@Component
@Slf4j
public class SocketHandler {
@Autowired
private SocketIOServer socketIoServer;
/**
* 容器銷毀前,自動調(diào)用此方法,關(guān)閉 socketIo 服務(wù)端
*
* @Param []
* @return
**/
@PreDestroy
private void destroy(){
try {
log.debug("關(guān)閉 socket 服務(wù)端");
socketIoServer.stop();
}catch (Exception e){
e.printStackTrace();
}
}
@PostConstruct
public void init() {
log.debug("SocketEventListener initialized");
//添加監(jiān)聽,客戶端自動連接到 socket 服務(wù)端
socketIoServer.addConnectListener(client -> {
String userId = client.getHandshakeData().getSingleUrlParam("userId");
SocketUtil.connectMap.put(userId, client);
log.debug("客戶端userId: "+ userId+ "已連接,客戶端ID為:" + client.getSessionId());
});
//添加監(jiān)聽,客戶端跟 socket 服務(wù)端自動斷開
socketIoServer.addDisconnectListener(client -> {
String userId = client.getHandshakeData().getSingleUrlParam("userId");
SocketUtil.connectMap.remove(userId, client);
log.debug("客戶端userId:" + userId + "斷開連接,客戶端ID為:" + client.getSessionId());
});
}
// // 注釋說明:以下 onConnect和 onDisconnect 方法在某些場景下會失效,不建議使用,所以注釋掉
// /**
// * 客戶端自動連接到 socket 服務(wù)端
// *
// * @Param [client]
// * @return
// **/
// @OnConnect
// public void onConnect(SocketIOClient client) {
// String userId = client.getHandshakeData().getSingleUrlParam("userId");
// SocketUtil.connectMap.put(userId, client);
// log.debug("客戶端userId: "+ userId+ "已連接,客戶端ID為:" + client.getSessionId());
// }
//
// /**
// * 客戶端跟 socket 服務(wù)端自動斷開
// *
// * @Param [client]
// * @return
// **/
// @OnDisconnect
// public void onDisconnect(SocketIOClient client) {
// String userId = client.getHandshakeData().getSingleUrlParam("userId");
// log.debug("客戶端userId:" + userId + "斷開連接,客戶端ID為:" + client.getSessionId());
// SocketUtil.connectMap.remove(userId, client);
// }
}
2.4 Socket 配置文件和配置類
用來定義 socket 的一些配置
2.4.1 yml 配置
socketio:
host: 127.0.0.1 //主機名,默認(rèn)是 0.0.0.0 (這個設(shè)不設(shè)置無所謂,因為后面的 SocketConfig 類一般不用設(shè)置這個)
port: 33000 //監(jiān)聽端口
maxFramePayloadLength: 1048576
maxHttpContentLength: 1048576
bossCount: 1
workCount: 100
allowCustomRequests: true
upgradeTimeout: 1000000 //協(xié)議升級超時時間(毫秒),默認(rèn)10000。HTTP握手升級為ws協(xié)議超時時間
pingTimeout: 6000000 //Ping消息超時時間(毫秒),默認(rèn)60000,這個時間間隔內(nèi)沒有接收到心跳消息就會發(fā)送超時事件
pingInterval: 25000 //Ping消息間隔(毫秒),默認(rèn)25000。客戶端向服務(wù)器發(fā)送一條心跳消息間隔
2.4.2 配置類:SocketConfig
package com.mss.framework.config;
import com.corundumstudio.socketio.SocketIOServer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class SocketConfig {
@Value("${socketio.host}")
private String host;
@Value("${socketio.port}")
private Integer port;
@Value("${socketio.bossCount}")
private int bossCount;
@Value("${socketio.workCount}")
private int workCount;
@Value("${socketio.allowCustomRequests}")
private boolean allowCustomRequests;
@Value("${socketio.upgradeTimeout}")
private int upgradeTimeout;
@Value("${socketio.pingTimeout}")
private int pingTimeout;
@Value("${socketio.pingInterval}")
private int pingInterval;
@Bean
public SocketIOServer socketIOServer() {
com.corundumstudio.socketio.Configuration configuration = new com.corundumstudio.socketio.Configuration();
configuration.setPort(port);
com.corundumstudio.socketio.SocketConfig socketConfig=new com.corundumstudio.socketio.SocketConfig();
socketConfig.setReuseAddress(true);
configuration.setSocketConfig(socketConfig);
configuration.setOrigin(null);
configuration.setBossThreads(bossCount);
configuration.setWorkerThreads(workCount);
configuration.setAllowCustomRequests(allowCustomRequests);
configuration.setUpgradeTimeout(upgradeTimeout);
configuration.setPingTimeout(pingTimeout);
configuration.setPingInterval(pingInterval);
//設(shè)置 sessionId 隨機
configuration.setRandomSession(true);
// configuration.setKeyStorePassword("pi0yo93pqgrs");
// configuration.setKeyStore(this.getClass().getResourceAsStream("www.ibms.club.jks"));
// configuration.setAuthorizationListener(data -> {
// String token = data.getSingleUrlParam("token");
// return StrUtil.isNotBlank(token);
// });
//初始化 Socket 服務(wù)端配置
return new SocketIOServer(configuration);
}
/**
* Spring加載 SocketIOServer
*
* @Param [server]
* @return
**/
@Bean
public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketIOServer ) {
return new SpringAnnotationScanner(socketIOServer );
}
}
2.5 Socket 服務(wù)啟動類:ServerRunner
實現(xiàn) CommandLineRunner 接口類,項目啟動時自動執(zhí)行 socketIOServer.start() 方法
package com.mss.framework.run;
import com.corundumstudio.socketio.SocketIOServer;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@AllArgsConstructor
public class ServerRunner implements CommandLineRunner {
private final SocketIOServer socketIOServer;
/**
* 項目啟動時,自動啟動 socket 服務(wù),服務(wù)端開始工作
*
* @Param [args]
* @return
**/
@Override
public void run(String... args) {
socketIOServer.start();
log.info("socket.io server started !");
}
}
2.6 Socket 工具類:SocketUtil
下列實例代碼中,是使用 userId 來當(dāng)做客戶端唯一標(biāo)識,這個每個人可以根據(jù)自己項目里自行設(shè)置;
下列實例代碼的應(yīng)用場景,只有服務(wù)端向客戶端發(fā)送消息的需求,所以實際這個工具類只有 sendToOne() 方法是實際起作用的,其余的代碼都是為了本文額外寫的方法;
package com.mss.common.utils.socket;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.annotation.OnEvent;
import com.mss.common.constant.SocketEventContants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* @Author: zhanleai
* @Description:
*/
@Component
@Slf4j
public class SocketUtil {
//暫且把用戶&客戶端信息存在緩存
public static ConcurrentMap<String, SocketIOClient> connectMap = new ConcurrentHashMap<>();
/**
* 單發(fā)消息(以 userId 為標(biāo)識符,給用戶發(fā)送消息)
*
* @Param [userId, message]
* @return
**/
public static void sendToOne(String userId, Object message) {
//拿出某個客戶端信息
SocketIOClient socketClient = getSocketClient(userId);
if (Objects.nonNull(socketClient) ){
//單獨給他發(fā)消息
socketClient.sendEvent(SocketEventContants.CHANNEL_USER,message);
}else{
log.info(userId + "已下線,暫不發(fā)送消息。");
}
}
/**
* 群發(fā)消息
*
* @Param
* @return
**/
public static void sendToAll(Object message) {
if (connectMap.isEmpty()){
return;
}
//給在這個頻道的每個客戶端發(fā)消息
for (Map.Entry<String, SocketIOClient> entry : connectMap.entrySet()) {
entry.getValue().sendEvent(SocketEventContants.CHANNEL_SYSTEM, message);
}
}
/**
* 根據(jù) userId 識別出 socket 客戶端
* @param userId
* @return
*/
public static SocketIOClient getSocketClient(String userId){
SocketIOClient client = null;
if (StringUtils.hasLength(userId) && !connectMap.isEmpty()){
for (String key : connectMap.keySet()) {
if (userId.equals(key)){
client = connectMap.get(key);
}
}
}
return client;
}
/**
* 1)使用事件注解,服務(wù)端監(jiān)聽獲取客戶端消息;
* 2)拿到客戶端發(fā)過來的消息之后,可以再根據(jù)業(yè)務(wù)邏輯發(fā)送給想要得到這個消息的人;
* 3)channel_system 之所以會向全體客戶端發(fā)消息,是因為我跟前端約定好了,你們也可以自定定義;
*
* @Param message
* @return
**/
@OnEvent(value = SocketEventContants.CHANNEL_SYSTEM)
public void channelSystemListener(String message) {
if (!StringUtils.hasLength(message)){
return;
}
this.sendToAll(message);
}
}
3 Socket 調(diào)用
3.1 實際項目的應(yīng)用場景:在需要發(fā)送消息通知的業(yè)務(wù)代碼中調(diào)用
這個方法里有幾個類:Message、DateUtils、IMessageService、MessageMapper,均為根據(jù)自身業(yè)務(wù)場景自定義的類,你們自己建吧。有需要再私信我要;
后端代碼寫到這里,實際上已經(jīng)寫完了。從 3.2 開始均為測試代碼;
package com.mss.message.service.impl;
import com.mss.common.utils.DateUtils;
import com.mss.common.utils.socket.SocketUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.mss.message.mapper.MessageMapper;
import com.mss.message.domain.entity.Message;
import com.mss.message.service.IMessageService;
/**
* 消息Service業(yè)務(wù)層處理
*
* @author zhanleai
*/
@Service
@Slf4j
public class MessageServiceImpl implements IMessageService {
@Autowired
private MessageMapper messageMapper;
/**
* 新增消息
*
* @param message 消息
* @return 結(jié)果
*/
@Override
public int insertMessage(Message message) {
message.setSendTime(DateUtils.getNowDate());
// 消息入庫,消息持久化
int i = messageMapper.insertMessage(message);
if(i > 0){
// 新增消息之后,再向前端推送 Socket 消息
SocketUtil.sendToOne(message.getSendUserId().toString(),message);
}
return i;
}
}
3.2 測試Controller
下文均為測試的代碼
package com.mss.message.controller;
import com.mss.common.utils.socket.SocketUtil;
import io.swagger.annotations.Api;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.mss.common.core.controller.BaseController;
import com.mss.common.core.domain.AjaxResult;
/**
* 消息Controller
*
* @author zhanleai
*/
@RestController
@Api(tags="消息")
@RequestMapping("/message")
public class MessageController extends BaseController {
/**
* 給指定客戶端發(fā)送消息
*
* @Param [userId, message]
* @return
**/
@GetMapping("/sendToOne")
public AjaxResult sendToOne(String userId , String message){
SocketUtil.sendToOne(userId,message);
return AjaxResult.success("單獨發(fā)送消息成功。");
}
}
4 前端調(diào)用代碼
- 前端代碼監(jiān)聽了 channel_user 和 channel_system 兩個頻道,一個做了三個動作:
- 1)連接上服務(wù)端;
2)監(jiān)聽并接收 channel_user 頻道的消息;
3)給服務(wù)端發(fā)送一條消息,并廣播到所有客戶端; - postman 只做了一個動作,給后端指定的 userId 發(fā)送一條 channel_user 頻道的消息,并被指定客戶端捕獲;
4.1 html 測試代碼以及說明
詳細(xì)的 html 測試代碼
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
<title>TestConnect</title>
<base>
<script src="https://cdn.bootcss.com/jquery/3.4.0/jquery.min.js"></script>
<script src="https://cdn.bootcss.com/socket.io/2.0.3/socket.io.js"></script>
<style>
body {
padding: 20px;
}
#console {
height: 450px;
overflow: auto;
}
.msg-color {
color: green;
}
</style>
</head>
<body>
<div id="console" class="well"></div>
</body>
<script type="text/javascript">
var socket;
connect();
function connect() {
var userId = 'zhanleai';
var opts = {
query: 'userId=' + userId
};
socket = io.connect('http://127.0.0.1:33000', opts);
socket.on('connect', function () {
console.log("連接成功");
output('當(dāng)前用戶是:' + userId );
output('<span class="msg-color">連接成功了。</span>');
});
socket.on('disconnect', function () {
output('<span class="msg-color">下線了。 </span>');
});
socket.on('channel_user', function (data) {
let msg= JSON.stringify(data)
output('收到 channel_user 頻道消息了:' + msg );
console.log(data);
});
}
function output(message) {
var element = $("<div>" + message + "</div>");
$('#console').prepend(element);
}
</script>
</html>
4.2 瀏覽器打開 html 文件,然后查看后端服務(wù)日志
(socket 服務(wù)端啟動,端口號為 33000,客戶端 zhanleai 連接上來了)
瀏覽器截圖

后端服務(wù)日志截圖

4.3 postman 工具測試
postman 截圖

瀏覽器收到消息截圖

到此這篇關(guān)于Springboot 集成 SocketIO的示例代碼的文章就介紹到這了,更多相關(guān)Springboot 集成 SocketIO內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java 文件目錄讀寫刪除操作詳細(xì)實現(xiàn)代碼
這篇文章主要介紹了java 文件讀寫刪操作詳細(xì)實現(xiàn)代碼,需要的朋友可以參考下2017-09-09
使用Spring AOP做接口權(quán)限校驗和日志記錄
本文介紹了面向切面編程(AOP)的基本概念、應(yīng)用場景及其在Spring中的實現(xiàn)原理,通過AOP,可以方便地在不修改原有代碼的情況下,實現(xiàn)日志記錄、權(quán)限校驗等功能,以學(xué)生身份證號查詢接口為例,展示了如何定義權(quán)限注解、切面類以及權(quán)限驗證服務(wù),感興趣的朋友一起看看吧2025-01-01
Java面試重點中的重點之Elasticsearch核心原理
ElasticSearch是一個基于Lucene的搜索引擎,是用Java語言開發(fā)的,能夠達(dá)到實時搜索,穩(wěn)定,可靠,快速,安裝使用方便,作為Apache許可條款下的開放源碼發(fā)布,是一種流行的企業(yè)級搜索引擎,是最受歡迎的企業(yè)搜索引擎2022-01-01

