SpringCloud整合Netty集群實現(xiàn)WebSocket的示例代碼
引言
在分布式系統(tǒng)中,Spring Cloud 為微服務(wù)架構(gòu)提供了豐富的功能,而 Netty 是一個高性能的網(wǎng)絡(luò)通信框架。將二者結(jié)合實現(xiàn) Socket 集群,可以滿足大規(guī)模、高性能網(wǎng)絡(luò)通信的需求,實現(xiàn)前后端間高效穩(wěn)定的通信。
1. 服務(wù)注冊和發(fā)現(xiàn)中心
這里服務(wù)注冊和發(fā)行中心使用nacos為例(需要啟動一個nacos服務(wù)器)。
微服務(wù)注冊: 在每一個微服務(wù)項目中,添加Nacos客戶端連接,并在配置文件中指定服務(wù)名稱和端口。例如:
# Tomcat
server:
port: 9201
netty:
port: 10201
application:
name: yhy-netty-server
# Spring
spring:
application:
# 應(yīng)用名稱
name: soc-dmoasp-system
profiles:
# 環(huán)境配置
active: dev
cloud:
nacos:
discovery:
# 服務(wù)注冊地址
server-addr: nacos-registry:8858
config:
# 配置中心地址
server-addr: nacos-registry:8858
file-extension: yml
# 共享配置
shared-configs:
- data-id: application.${spring.cloud.nacos.config.file-extension}
refresh: true
- data-id: soc-dmoasp-redission.${spring.cloud.nacos.config.file-extension}
- data-id: soc-dmoasp-druid.${spring.cloud.nacos.config.file-extension}
這是一個基本的服務(wù)配置。里面關(guān)于netty的applicaiton.name和port可以通過Nacos的NamingService類手動注冊。
1.1. Netty服務(wù)器搭建
- 添加Netty依賴:在具體微服務(wù)中的
pom.xml中添加Netty依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-reactor-netty</artifactId> </dependency>
- Netty啟動類:創(chuàng)建一個NettyServer類,用于啟動Netty,示例如下:
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.soc.dmoasp.system.server.handler.WebSocketIdleStateHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.net.InetAddress;
import java.util.Properties;
/**
* Netty服務(wù)
* @author dsw
* @date 2024/11/18 17:34
*/
@Component
public class NettyServer implements CommandLineRunner {
Logger log = LoggerFactory.getLogger(NettyServer.class);
@Autowired
private NacosDiscoveryProperties nacosDiscoveryProperties;
@Value("${server.netty.port}")
private Integer nettyPort;
@Value("${server.netty.application.name}")
private String nettyName;
EventLoopGroup bossGroup;
EventLoopGroup workGroup;
@Override
public void run(String... args) throws Exception {
log.info("初始化netty配置開始");
//netty 服務(wù)端啟動的端口不可和Springboot啟動類的端口號重復(fù)
this.start();
//關(guān)閉服務(wù)器的時候同時關(guān)閉Netty服務(wù)
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
this.destroy();
} catch (InterruptedException e) {
log.error(e.getMessage());
}
}));
}
@Async
public void start() throws InterruptedException {
try {
bossGroup = new NioEventLoopGroup(1);
workGroup = new NioEventLoopGroup(10);
ServerBootstrap bootstrap = new ServerBootstrap();
// bossGroup輔助客戶端的tcp連接請求, workGroup負(fù)責(zé)與客戶端之前的讀寫操作
bootstrap.group(bossGroup, workGroup)
// 指定Channel
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline
//HTTP解碼
.addLast(new HttpServerCodec())
.addLast(new ChunkedWriteHandler())
//HTTP段聚合
.addLast(new HttpObjectAggregator(1024*1024))
//將HTTP協(xié)議轉(zhuǎn)成ws協(xié)議
.addLast(new WebSocketServerProtocolHandler("/socket"))
;
}
});
registerNamingService(nettyName,nettyPort);
// 配置完成,開始綁定server,通過調(diào)用sync同步方法阻塞直到綁定成功
ChannelFuture future = bootstrap.bind(nettyPort).sync();
if (future.isSuccess()) {
log.info("Server started and listen on:{}", future.channel().localAddress());
log.info("啟動 Netty Server");
}
} catch (InterruptedException e) {
log.error("netty異常:{}", e.getMessage());
}
}
/**
* 將Netty服務(wù)注冊進(jìn)Nacos
*
* @param nettyName 服務(wù)名稱
* @param nettyPort 服務(wù)端口號
*/
private void registerNamingService(String nettyName, Integer nettyPort) {
try {
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosDiscoveryProperties.getServerAddr());
properties.setProperty(PropertyKeyConst.NAMESPACE, nacosDiscoveryProperties.getNamespace());
properties.setProperty(PropertyKeyConst.USERNAME, nacosDiscoveryProperties.getUsername());
properties.setProperty(PropertyKeyConst.PASSWORD, nacosDiscoveryProperties.getPassword());
NamingService namingService = NamingFactory.createNamingService(properties);
InetAddress address = InetAddress.getLocalHost();
// 定義服務(wù)實例信息
Instance instance = new Instance();
instance.setIp(address.getHostAddress());
instance.setPort(nettyPort);
instance.setWeight(1.0);
instance.setHealthy(true);
namingService.registerInstance(nettyName, nacosDiscoveryProperties.getGroup(), instance);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 釋放資源
*/
@PreDestroy
public void destroy() throws InterruptedException {
if (bossGroup != null) {
bossGroup.shutdownGracefully().sync();
}
if (workGroup != null) {
workGroup.shutdownGracefully().sync();
}
log.info("關(guān)閉Netty");
}
}
使用CommandLineRunner接口實現(xiàn)run方法在啟動項目的時候把Netty服務(wù)帶起。
bossGroup 和 workGroup 的角色區(qū)別
bossGroup(老板組)
主要職責(zé)是負(fù)責(zé)監(jiān)聽服務(wù)器端的端口,等待新的客戶端連接請求到來。它就像是公司里負(fù)責(zé)接待新客戶的前臺人員,當(dāng)有新客戶(客戶端)想要連接到服務(wù)器時,
bossGroup中的EventLoop會接收到這個連接請求。一般情況下,
bossGroup只需要配置較少數(shù)量的EventLoop就可以滿足需求,因為它主要處理的是連接建立的初期階段,即接受新連接這個相對不那么頻繁的操作(相比于后續(xù)處理大量數(shù)據(jù)傳輸?shù)炔僮鳎MǔO(shè)置為 1 個EventLoop或者根據(jù)服務(wù)器的具體性能和預(yù)期的連接請求頻率適當(dāng)增加數(shù)量,但總體數(shù)量相對較少。workGroup(工作組)
一旦
bossGroup接受了新的客戶端連接,就會把這個新連接交給workGroup來進(jìn)一步處理后續(xù)的所有與該連接相關(guān)的操作,比如讀取客戶端發(fā)送的數(shù)據(jù)、向客戶端發(fā)送響應(yīng)數(shù)據(jù)等。它就像是公司里負(fù)責(zé)具體為客戶辦理業(yè)務(wù)的工作人員。workGroup需要處理大量的實際業(yè)務(wù)數(shù)據(jù)傳輸和交互工作,所以通常會根據(jù)服務(wù)器的性能和預(yù)期要處理的并發(fā)連接數(shù)量等因素,配置相對較多數(shù)量的EventLoop。例如,在處理高并發(fā)場景時,可能會配置幾十甚至上百個EventLoop來確保能夠高效地處理眾多客戶端連接的各種業(yè)務(wù)操作。
registerNamingService方法
這時候可以看到我們Nacos配置中配置了server.netty.port和server.netty.application.name這兩個參數(shù)分別對應(yīng)netty的端口和netty的微服務(wù)應(yīng)用名。
registerNamingService方法用于往Nacos中注冊服務(wù),這里通過NamingService類的registerInstance方法將netty服務(wù)注冊進(jìn)Nacos中。
1.2. Gateway網(wǎng)關(guān)轉(zhuǎn)發(fā)
微服務(wù)中所有的請求都是由網(wǎng)關(guān)轉(zhuǎn)發(fā),這里使用Gateway轉(zhuǎn)發(fā)。
# spring配置
spring:
cloud:
gateway:
discovery:
locator:
lowerCaseServiceId: true
enabled: true
routes:
# 系統(tǒng)模塊
- id: soc-dmoasp-system
uri: lb://soc-dmoasp-system
predicates:
- Path=/system/**
filters:
- StripPrefix=1
# netty服務(wù)
- id: netty-server
uri: lb:ws://soc-netty-server
predicates:
- Path=/netty-server/**
filters:
- StripPrefix=1
#不需要進(jìn)行權(quán)限校驗的uri
security:
ignore:
whites:
- /auth/logout
- /auth/login
- /auth/register
- /*/v2/api-docs
- /csrf
#netty連接地址
- /netty-server/**
配置文件中添加Netty路由,在鑒權(quán)網(wǎng)關(guān)中需要將socket地址放行,不進(jìn)行權(quán)限驗證。例如:
@Component
@RefreshScope
public class AuthFilter implements GlobalFilter, Ordered
{
private static final Logger log = LoggerFactory.getLogger(AuthFilter.class);
// 排除過濾的 uri 地址,nacos自行添加
@Autowired
private IgnoreWhiteProperties ignoreWhite;
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain)
{
ServerHttpRequest request = exchange.getRequest();
ServerHttpRequest.Builder mutate = request.mutate();
String url = request.getURI().getPath();
// 跳過不需要驗證的路徑
if (StringUtils.matches(url, ignoreWhite.getWhites()))
{
return chain.filter(exchange);
}
......
}
}
啟動Gateway和System模塊


啟動完成后System模塊會打印NettyServer輸出的啟動日志,Nacos中也會有手動注冊的Netty服務(wù)。
通過ws://127.0.0.1:8080/netty-server/socket就可以直接連接上Netty服務(wù)器(8080為Gateway的端口)。

2. 鑒權(quán)、心跳、客戶端與服務(wù)端之間的通信
2.1. 鑒權(quán)
創(chuàng)建AuthHandler類,繼承SimpleChannelInboundHandler類重寫channelRead0方法,channelRead0中可以監(jiān)聽到客戶端往服務(wù)端發(fā)送的消息。 例如:
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.soc.dmoasp.common.core.constant.CacheConstants;
import com.soc.dmoasp.common.core.constant.TokenConstants;
import com.soc.dmoasp.common.core.enums.NettyMsgEnum;
import com.soc.dmoasp.common.core.utils.JwtUtils;
import com.soc.dmoasp.common.core.utils.StringUtils;
import com.soc.dmoasp.common.redis.service.RedisService;
import com.soc.dmoasp.system.server.vo.NettyResult;
import io.jsonwebtoken.Claims;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
/**
* netty鑒權(quán)處理
* @author dsw
* @date 2024/11/18 17:55
*/
public class AuthHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
Logger log = LoggerFactory.getLogger(AuthHandler.class);
private final RedisTemplate<String, Object> redisTemplate;
private final RedisService redisService;
public AuthHandler(RedisTemplate<String, Object> stringObjectRedisTemplate, RedisService redisService) {
redisTemplate = stringObjectRedisTemplate;
this.redisService = redisService;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
try {
JSONObject clientMessage = JSON.parseObject(textWebSocketFrame.text());
//獲取code 只判斷code為鑒權(quán)的消息類型
Integer code = clientMessage.getInteger("code");
if (NettyMsgEnum.AUTH_MESSAGE.getCode().equals(code)) {
//獲取token
String token = clientMessage.getString("token");
if (StringUtils.isEmpty(token))
{
ctx.channel().writeAndFlush(NettyResult.authFail("令牌不能為空"));
ctx.close();
}
// 如果前端設(shè)置了令牌前綴,則裁剪掉前綴
if (StringUtils.isNotEmpty(token) && token.startsWith(TokenConstants.PREFIX))
{
token = token.replaceFirst(TokenConstants.PREFIX, StringUtils.EMPTY);
}
//JWT校驗
Claims claims = JwtUtils.parseToken(token);
if (claims == null)
{
ctx.channel().writeAndFlush(NettyResult.authFail("令牌已過期或驗證不正確"));
ctx.close();
}
String userkey = JwtUtils.getUserKey(claims);
//從Redis中查看是否有這個Token沒有則不能登錄
boolean islogin = redisService.hasKey(getTokenKey(userkey));
if (!islogin)
{
ctx.channel().writeAndFlush(NettyResult.authFail("登錄狀態(tài)已過期"));
ctx.close();
}
//獲取用戶保存至Socket連接會話中
String userId = JwtUtils.getUserId(claims);
AttributeKey<String> userIdKey = AttributeKey.valueOf("userId");
ctx.channel().attr(userIdKey).setIfAbsent(userId);
JSONObject jsonObject = new JSONObject();
jsonObject.put("userId",userId);
log.info("有新的Socket客戶端鏈接 userId :{}", userId);
//將連接信息保存至Redis中key為userId value為ctx.channel().id()
redisTemplate.opsForHash().put(CacheConstants.getUserChannelKey(),userId,ctx.channel().id());
ctx.channel().writeAndFlush(NettyResult.success(NettyMsgEnum.AUTH_MESSAGE.getCode(), "鑒權(quán)成功", jsonObject));
//鑒權(quán)完成后移除AuthHandler消息監(jiān)聽
ctx.pipeline().remove(AuthHandler.class);
} else {
ctx.channel().writeAndFlush(NettyResult.authFail("請先鑒權(quán),在發(fā)送其他類型請求!"));
ctx.close();
}
} catch (Exception e) {
log.error(e.getMessage());
ctx.channel().writeAndFlush(NettyResult.authFail("鑒權(quán)失敗"));
ctx.close();
}
}
/**
* 獲取緩存key
*/
private String getTokenKey(String token)
{
return CacheConstants.LOGIN_TOKEN_KEY + token;
}
}
泛型TextWebSocketFrame表示接收文本類型的消息。
其中連接的用戶信息保存到Redis中,RedisTemplate<String, Object> redisTemplate對象是用來保存Netty連接信息的,序列化使用的是String(用戶信息用String存儲,使用JSON序列化會反序列化失?。?,對應(yīng)接收的JSON串如下:
{"code":1001,token:"Bearer XXXX"}
code取NettyMsgEnum中的code,token則是登錄時生成的token令牌。
鑒權(quán)后將AuthHandler移除,會話后續(xù)的消息交互不在進(jìn)AuthHandler。
NettyMsgEnum如下:
/**
* netty消息類型枚舉
* @author dsw
* @date 2024/11/18 17:58
*/
public enum NettyMsgEnum {
AUTH_MESSAGE(1001, "鑒權(quán)消息","Auth-Netty"),
//{'code':1003,'data':{'unreadCount':0}}
NOTICE_MESSAGE(1003, "公告通知消息","Notice-Netty"),
HEART_MESSAGE(1006, "心跳消息","Heart-Netty"),
ERROR_MESSAGE(-1, "錯誤",null);
private final Integer code;
private final String info;
private final String strategyName;
NettyMsgEnum(Integer code, String info, String strategyName){
this.code = code;
this.info = info;
this.strategyName = strategyName;
}
public static NettyMsgEnum getByCode(Integer code) {
for (NettyMsgEnum msgEnum : values()) {
if (msgEnum.getCode().equals(code)) {
return msgEnum;
}
}
return ERROR_MESSAGE;
}
public Integer getCode() {
return code;
}
public String getInfo() {
return info;
}
public String getStrategyName() {
return strategyName;
}
}
NettyResult如下:
import com.alibaba.fastjson2.JSON;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import java.io.Serializable;
/**
* netty響應(yīng)實體
* @author dsw
* @date 2024/11/18 18:02
*/
public class NettyResult implements Serializable {
private static final long serialVersionUID = 1L;
private Integer code;
private String message;
private Object data;
public NettyResult(Integer code, String message, Object data) {
this.code = code;
this.message = message;
this.data = data;
}
public static TextWebSocketFrame fail(String message) {
return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(-1, message, null)));
}
public static TextWebSocketFrame authFail(String message) {
return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(-2, message, null)));
}
public static TextWebSocketFrame success( String message) {
return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(200, message, null)));
}
public static TextWebSocketFrame success(Integer code, Object data) {
return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(code,null, data)));
}
public static TextWebSocketFrame success(Integer code, String message, Object data) {
return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(code,message, data)));
}
public Integer getCode() {
return code;
}
public String getMessage() {
return message;
}
public Object getData() {
return data;
}
}
最后到NettyServer中的ChannelInitializer加入AuthHandler:

我們重新項目后連接socket查看結(jié)果

如果不鑒權(quán)直接發(fā)送消息,服務(wù)端會主動斷開連接,客戶端需要重連。

這就代表已經(jīng)連接成功了。
2.2. 空閑檢測
創(chuàng)建WebSocketIdleStateHandler類繼承IdleStateHandler類,重寫channelIdle方法。
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
/**
* 空閑檢測
* @author dsw
* @date 2024/11/18 11:47
*/
public class WebSocketIdleStateHandler extends IdleStateHandler {
Logger log = LoggerFactory.getLogger(WebSocketIdleStateHandler.class);
/**
* 默認(rèn)的讀空閑時間
*/
private static final int DEFAULT_READER_IDLE_TIME = 10;
/**
* 默認(rèn)10秒讀空閑斷開客戶端
*/
public WebSocketIdleStateHandler() {
super(DEFAULT_READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
}
/**
* 指定心跳時間(秒)
*
* @param readerIdleTimeSeconds 讀空閑時間
* @param writerIdleTimeSeconds 寫空閑時間
* @param allIdleTimeSeconds 讀寫空閑時間
*/
public WebSocketIdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds, TimeUnit.SECONDS);
}
/**
* 指定心跳時間及時間單位
*
* @param readerIdleTime 讀空閑時間
* @param writerIdleTime 寫空閑時間
* @param allIdleTime 讀寫空閑時間
* @param unit 時間單位
*/
public WebSocketIdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
super(readerIdleTime, writerIdleTime, allIdleTime, unit);
}
/**
* 當(dāng)空閑事件觸發(fā)時執(zhí)行
*/
@Override
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
//如果是讀空閑
if (evt.state().equals(IdleState.READER_IDLE)) {
Channel channel = ctx.channel();
log.debug("服務(wù)端未檢測到客戶端【{}】的心跳包,強(qiáng)制關(guān)閉客戶端!", channel.id());
channel.close();
}
super.channelIdle(ctx,evt);
}
}
以上實現(xiàn)了父類的構(gòu)造函數(shù),可以指定具體的空閑時間。當(dāng)空閑時會觸發(fā)channelIdle方法,則服務(wù)端主動斷開連接。
最后到NettyServer中的ChannelInitializer加入WebSocketIdleStateHandler:

加到最前面。

示例設(shè)置了10秒斷開,需要使用中自行調(diào)整。
2.3. 消息通信
2.3.1. 接收客戶端的消息
創(chuàng)建WebSocketHandler類,繼承SimpleChannelInboundHandler類重寫channelRead0、handlerAdded、handlerRemoved、exceptionCaught方法。
channelRead0方法:監(jiān)聽客戶端發(fā)送過來的消息。handlerAdded方法:websocket連接后會調(diào)用,將連接信息添加到通道組handlerRemoved方法:斷開連接后會調(diào)用(服務(wù)端、客戶端斷開都會調(diào)用),用于用戶下線(刪除通道、刪除Redis中存儲的連接信息)exceptionCaught方法:發(fā)生異常后調(diào)用,發(fā)生異常后服務(wù)端通常會主動斷開連接。
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.soc.dmoasp.common.core.constant.CacheConstants;
import com.soc.dmoasp.common.core.enums.NettyMsgEnum;
import com.soc.dmoasp.system.server.config.NettyConfig;
import com.soc.dmoasp.system.server.strategy.NettyStrategy;
import com.soc.dmoasp.system.server.strategy.NettyStrategyFactory;
import com.soc.dmoasp.system.server.vo.NettyResult;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
/**
* webSocket處理
* @author dsw
* @date 2024/11/18 10:20
*/
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
Logger log = LoggerFactory.getLogger(WebSocketHandler.class);
private final RedisTemplate<String, Object> redisTemplate;
private final NettyStrategyFactory nettyStrategyFactory;
public WebSocketHandler(RedisTemplate<String, Object> redisTemplate, NettyStrategyFactory nettyStrategyFactory) {
this.redisTemplate = redisTemplate;
this.nettyStrategyFactory = nettyStrategyFactory;
}
/**
* webSocket連接創(chuàng)建后調(diào)用
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 添加到channelGroup 通道組
NettyConfig.getChannelGroup().add(ctx.channel());
}
/**
* 讀取數(shù)據(jù)
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
AttributeKey<String> userIdKey = AttributeKey.valueOf("userId");
String userId = ctx.channel().attr(userIdKey).get();
log.info("收到消息 userId:{} message:{}",userId,frame.text());
// 接收客戶端的消息
JSONObject pullMessage = JSON.parseObject(frame.text());
Integer code = pullMessage.getInteger("code");
// 獲取消息類型
NettyStrategy nettyStrategy = nettyStrategyFactory.getNettyStrategy(NettyMsgEnum.getByCode(code));
// 處理消息
TextWebSocketFrame pushMessage = nettyStrategy.execute(pullMessage);
// 返回處理結(jié)果給客戶端
ctx.channel().writeAndFlush(pushMessage);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
AttributeKey<String> userIdKey = AttributeKey.valueOf("userId");
String userId = ctx.channel().attr(userIdKey).get();
log.info("用戶下線了 userId:{}",userId);
// 刪除通道
removeUserId(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.channel().writeAndFlush(NettyResult.fail("系統(tǒng)錯誤:" + cause.getMessage()));
}
/**
* 刪除用戶與channel的對應(yīng)關(guān)系
*/
private void removeUserId(ChannelHandlerContext ctx) {
AttributeKey<String> userIdKey = AttributeKey.valueOf("userId");
String userId = ctx.channel().attr(userIdKey).get();
if(StringUtils.isNotBlank(userId)){
redisTemplate.opsForHash().delete(CacheConstants.getUserChannelKey(),userId);
}
}
}
這里收到消息后通過一個策略模式進(jìn)入不同的策略,通過NettyMsgEnum里面定義的code指定不同的策略類。
Strategy和StrategyFactory:
import com.alibaba.fastjson2.JSONObject;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
/**
* Netty接收消息處理策略類
* @author dsw
* @date 2024/5/27 10:21
*/
public interface NettyStrategy {
/**
* 執(zhí)行添加數(shù)值
*
* @return
*/
TextWebSocketFrame execute(JSONObject message);
}
import com.soc.dmoasp.common.core.enums.NettyMsgEnum;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* Netty策略工廠
* @author dsw
* @date 2024/11/18 10:20
*/
@Component
public class NettyStrategyFactory {
Logger log = org.slf4j.LoggerFactory.getLogger(NettyStrategyFactory.class);
/**
* 通過Spring容器的方式注入
*/
@Autowired
private Map<String, NettyStrategy> nettyStrategy;
/**
* 獲取對應(yīng)策略類
* @param
*/
public NettyStrategy getNettyStrategy(NettyMsgEnum nettyMsgEnum){
if(!nettyStrategy.containsKey(nettyMsgEnum.getStrategyName())){
log.warn("沒有對應(yīng)的消息策略");
throw new RuntimeException("沒有對應(yīng)的消息策略");
}
return nettyStrategy.get(nettyMsgEnum.getStrategyName());
}
}
我們實現(xiàn)一個心跳消息的策略:
import com.alibaba.fastjson2.JSONObject;
import com.soc.dmoasp.common.core.enums.NettyMsgEnum;
import com.soc.dmoasp.system.server.strategy.NettyStrategy;
import com.soc.dmoasp.system.server.vo.NettyResult;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.stereotype.Component;
/**
* Netty心跳消息
* @author dsw
* @date 2024/5/27 10:25
*/
@Component("Heart-Netty")
public class HeartStrategyImpl implements NettyStrategy {
@Override
public TextWebSocketFrame execute(JSONObject message) {
String data = message.getString("data");
if ("ping".equals(data)) {
return NettyResult.success(NettyMsgEnum.HEART_MESSAGE.getCode(), null, "pong");
}
return NettyResult.fail("消息格式不正確");
}
}

添加至ChannelInitializer后重啟查看效果。

2.3.2. 發(fā)送消息給客戶端
集群下面的Netty配置
方案1:使用Redis的發(fā)布訂閱
方案2:使用MQ的發(fā)布訂閱
我們這里使用使用Redis的發(fā)布訂閱實現(xiàn)。
添加Redis訂閱器:
import com.soc.dmoasp.common.core.constant.CacheConstants;
import com.soc.dmoasp.common.redis.configure.FastJson2JsonRedisSerializer;
import com.soc.dmoasp.system.server.receiver.PushMsgRedisReceiver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import java.util.Collections;
import java.util.List;
/**
* redisReceiver 配置
* @author dsw
* @date 2024/11/18 12:07
*/
@Configuration
public class RedisReceiverConfig {
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
List<PatternTopic> topics = Collections.singletonList(
PatternTopic.of(CacheConstants.Topic.SYS_SOCKET_PUSH_TOPIC)
);
// 添加訂閱者監(jiān)聽類,數(shù)量不限.PatternTopic定義監(jiān)聽主題,這里監(jiān)聽test-topic主題
container.addMessageListener(listenerAdapter, topics);
return container;
}
@Bean
@SuppressWarnings(value = { "unchecked", "rawtypes" })
public MessageListenerAdapter listenerAdapter(PushMsgRedisReceiver pushMsgRedisReceiver) {
MessageListenerAdapter adapter = new MessageListenerAdapter(pushMsgRedisReceiver);
FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class);
adapter.setSerializer(serializer);
return adapter;
}
}
import com.soc.dmoasp.common.core.constant.CacheConstants;
import com.soc.dmoasp.common.core.exception.Asserts;
import com.soc.dmoasp.common.redis.dto.NettyMessage;
import com.soc.dmoasp.system.server.config.NettyConfig;
import com.soc.dmoasp.system.server.vo.NettyResult;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
* Netty發(fā)送消息接收者
* @author dsw
* @date 2024/11/18 12:07
*/
@Component
public class PushMsgRedisReceiver {
Logger log = LoggerFactory.getLogger(PushMsgRedisReceiver.class);
@Autowired
private RedisTemplate<String,Object> stringObjectRedisTemplate;
/**
* RedisTopic訂閱
* @param nettyMessage netty消息
* @param topic netty消息對應(yīng)的topic
*/
public void handleMessage(NettyMessage nettyMessage, String topic) {
Object channelId = stringObjectRedisTemplate.opsForHash().get(CacheConstants.getUserChannelKey(), nettyMessage.getUserId());
if (Objects.isNull(channelId)) {
log.warn("推送消息失敗,用戶不在線! userId:{},msg:{}",nettyMessage.getUserId(),nettyMessage.getMessage());
Asserts.fail("推送消息失敗,用戶不在線!");
}
Channel channel = NettyConfig.getChannelGroup().find((ChannelId) channelId);
if(channel!=null){
channel.writeAndFlush(NettyResult.success(nettyMessage.getCode(),nettyMessage.getMessage()));
log.info("推送消息成功! userId:{},msg:{}",nettyMessage.getUserId(),nettyMessage.getMessage());
}else {
log.warn("推送消息失敗,沒有找到Channel! userId:{},msg:{}",nettyMessage.getUserId(),nettyMessage.getMessage());
}
}
}
發(fā)布訂閱的機(jī)制就是所有集群都會收到消息,收到消息后每個netty集群都去找對應(yīng)的消息會話通道,如果沒找到則說明連接不到當(dāng)前服務(wù)上,找到通道后則可以直接推送。 這里使用stringObjectRedisTemplate獲取用戶通道,避免序列化失敗。
RedisService中實現(xiàn)發(fā)布消息
/**
* Redis消息發(fā)布訂閱 發(fā)布消息
* @param channel 通道ID
* @param message 消息
*/
@Async
public void convertAndSend(String channel, Object message) {
redisTemplate.convertAndSend(channel, message);
}
發(fā)布消息的工具類:
import com.alibaba.fastjson2.JSONObject;
import com.soc.dmoasp.common.core.constant.CacheConstants;
import com.soc.dmoasp.common.redis.dto.NettyMessage;
import com.soc.dmoasp.common.redis.dto.PushSocketMsgDTO;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import java.util.Set;
/**
* Netty發(fā)送消息給服務(wù)端
* @author dsw
* @date 2024/11/18 11:47
*/
@Service
public class PushSocketMsgService {
Logger log = org.slf4j.LoggerFactory.getLogger(PushSocketMsgService.class);
@Autowired
private RedisTemplate<String, Object> stringObjectRedisTemplate;
@Autowired
private RedisService redisService;
/**
* 給所有用戶發(fā)送消息
* @param msgDTO
*/
public void pushMsgToAll( PushSocketMsgDTO msgDTO) {
Set<Object> keys = stringObjectRedisTemplate.opsForHash().keys(CacheConstants.getUserChannelKey());
keys.forEach(key -> this.pushMsgToUser(msgDTO.getCode(), key.toString(), msgDTO.getMessage()));
}
/**
* 給指定用戶發(fā)送消息
* @param msgDTO
*/
public void pushMsgToUserList(PushSocketMsgDTO msgDTO) {
for(Long userId : msgDTO.getUserIdList()){
this.pushMsgToUser(msgDTO.getCode(), userId.toString(), msgDTO.getMessage());
}
}
protected void pushMsgToUser(Integer code, String userId, JSONObject message) {
//推送到其他負(fù)載處理
NettyMessage nettyMessage = new NettyMessage();
nettyMessage.setUserId(userId);
nettyMessage.setCode(code);
nettyMessage.setMessage(message);
redisService.convertAndSend(CacheConstants.Topic.SYS_SOCKET_PUSH_TOPIC, nettyMessage);
log.info("推送消息成功! userId:{},message:{}", userId, message);
}
}
NettyMessage和PushSocketMsgDTO:
import com.alibaba.fastjson2.JSONObject;
import lombok.Data;
/**
* Neety消息發(fā)布VO
* @author dsw
* @date 2024/11/18 13:49
*/
@Data
public class NettyMessage {
private Integer code;
private String userId;
private JSONObject message;
}
import com.alibaba.fastjson2.JSONObject;
import lombok.Data;
/**
* 發(fā)送socket消息DTO
* @author dsw
* @date 2024/11/18 13:39
*/
@Data
public class PushSocketMsgDTO {
/**
* 消息類型
* 詳情看 NettyMsgEnum
*/
private Integer code;
/**
* 用戶ID
*/
private List<Long> userIdList;
/**
* 消息體
*/
private JSONObject message;
}
測試結(jié)果:



以上就是SpringCloud整合Netty集群實現(xiàn)WebSocket的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于SpringCloud Netty實現(xiàn)WebSocket的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java Scanner類用法及nextLine()產(chǎn)生的換行符問題實例分析
這篇文章主要介紹了Java Scanner類用法及nextLine()產(chǎn)生的換行符問題,結(jié)合實例形式分析了Scanner類功能、hasNextInt()和nextInt()方法使用及nextLine()產(chǎn)生的換行符問題解決方法,需要的朋友可以參考下2019-03-03
AbstractProcessor擴(kuò)展MapStruct自動生成實體映射工具類
這篇文章主要為大家介紹了AbstractProcessor擴(kuò)展MapStruct自動生成實體映射工具類實現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-01-01
Java?IO網(wǎng)絡(luò)模型實現(xiàn)解析
這篇文章主要為大家介紹了Java?IO網(wǎng)絡(luò)模型實現(xiàn)解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-03-03
Mybatis-Plus或PageHelper多表分頁查詢總條數(shù)不對問題的解決方法
PageHelper 這個插件用了很多次了,今天使用的時候才遇到一個問題,這篇文章主要給大家介紹了關(guān)于Mybatis-Plus或PageHelper多表分頁查詢總條數(shù)不對問題的解決方法,文中通過實例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-08-08
SpringBoot整合Mybatis與druid實現(xiàn)流程詳解
這篇文章主要介紹了springboot整合mybatis plus與druid詳情,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價值,需要的下伙伴可以參考一下2022-10-10
MyBatis如何進(jìn)行雙重foreach循環(huán)
這篇文章主要介紹了MyBatis如何進(jìn)行雙重foreach循環(huán),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-02-02
SpringBoot+JPA?分頁查詢指定列并返回指定實體方式
這篇文章主要介紹了SpringBoot+JPA?分頁查詢指定列并返回指定實體方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12
springboot+jwt實現(xiàn)token登陸權(quán)限認(rèn)證的實現(xiàn)
這篇文章主要介紹了springboot+jwt實現(xiàn)token登陸權(quán)限認(rèn)證的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-06-06

