SpringCloud整合Netty集群實現(xiàn)WebSocket的示例代碼
引言
在分布式系統(tǒng)中,Spring Cloud 為微服務架構提供了豐富的功能,而 Netty 是一個高性能的網(wǎng)絡通信框架。將二者結合實現(xiàn) Socket 集群,可以滿足大規(guī)模、高性能網(wǎng)絡通信的需求,實現(xiàn)前后端間高效穩(wěn)定的通信。
1. 服務注冊和發(fā)現(xiàn)中心
這里服務注冊和發(fā)行中心使用nacos為例(需要啟動一個nacos服務器)。
微服務注冊: 在每一個微服務項目中,添加Nacos客戶端連接,并在配置文件中指定服務名稱和端口。例如:
# Tomcat server: port: 9201 netty: port: 10201 application: name: yhy-netty-server # Spring spring: application: # 應用名稱 name: soc-dmoasp-system profiles: # 環(huán)境配置 active: dev cloud: nacos: discovery: # 服務注冊地址 server-addr: nacos-registry:8858 config: # 配置中心地址 server-addr: nacos-registry:8858 file-extension: yml # 共享配置 shared-configs: - data-id: application.${spring.cloud.nacos.config.file-extension} refresh: true - data-id: soc-dmoasp-redission.${spring.cloud.nacos.config.file-extension} - data-id: soc-dmoasp-druid.${spring.cloud.nacos.config.file-extension}
這是一個基本的服務配置。里面關于netty的applicaiton.name和port可以通過Nacos的NamingService類手動注冊。
1.1. Netty服務器搭建
- 添加Netty依賴:在具體微服務中的
pom.xm
l中添加Netty依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-reactor-netty</artifactId> </dependency>
- Netty啟動類:創(chuàng)建一個NettyServer類,用于啟動Netty,示例如下:
import com.alibaba.cloud.nacos.NacosDiscoveryProperties; import com.alibaba.nacos.api.PropertyKeyConst; import com.alibaba.nacos.api.naming.NamingFactory; import com.alibaba.nacos.api.naming.NamingService; import com.alibaba.nacos.api.naming.pojo.Instance; import com.soc.dmoasp.system.server.handler.WebSocketIdleStateHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.CommandLineRunner; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import javax.annotation.PreDestroy; import java.net.InetAddress; import java.util.Properties; /** * Netty服務 * @author dsw * @date 2024/11/18 17:34 */ @Component public class NettyServer implements CommandLineRunner { Logger log = LoggerFactory.getLogger(NettyServer.class); @Autowired private NacosDiscoveryProperties nacosDiscoveryProperties; @Value("${server.netty.port}") private Integer nettyPort; @Value("${server.netty.application.name}") private String nettyName; EventLoopGroup bossGroup; EventLoopGroup workGroup; @Override public void run(String... args) throws Exception { log.info("初始化netty配置開始"); //netty 服務端啟動的端口不可和Springboot啟動類的端口號重復 this.start(); //關閉服務器的時候同時關閉Netty服務 Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { this.destroy(); } catch (InterruptedException e) { log.error(e.getMessage()); } })); } @Async public void start() throws InterruptedException { try { bossGroup = new NioEventLoopGroup(1); workGroup = new NioEventLoopGroup(10); ServerBootstrap bootstrap = new ServerBootstrap(); // bossGroup輔助客戶端的tcp連接請求, workGroup負責與客戶端之前的讀寫操作 bootstrap.group(bossGroup, workGroup) // 指定Channel .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline //HTTP解碼 .addLast(new HttpServerCodec()) .addLast(new ChunkedWriteHandler()) //HTTP段聚合 .addLast(new HttpObjectAggregator(1024*1024)) //將HTTP協(xié)議轉成ws協(xié)議 .addLast(new WebSocketServerProtocolHandler("/socket")) ; } }); registerNamingService(nettyName,nettyPort); // 配置完成,開始綁定server,通過調用sync同步方法阻塞直到綁定成功 ChannelFuture future = bootstrap.bind(nettyPort).sync(); if (future.isSuccess()) { log.info("Server started and listen on:{}", future.channel().localAddress()); log.info("啟動 Netty Server"); } } catch (InterruptedException e) { log.error("netty異常:{}", e.getMessage()); } } /** * 將Netty服務注冊進Nacos * * @param nettyName 服務名稱 * @param nettyPort 服務端口號 */ private void registerNamingService(String nettyName, Integer nettyPort) { try { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosDiscoveryProperties.getServerAddr()); properties.setProperty(PropertyKeyConst.NAMESPACE, nacosDiscoveryProperties.getNamespace()); properties.setProperty(PropertyKeyConst.USERNAME, nacosDiscoveryProperties.getUsername()); properties.setProperty(PropertyKeyConst.PASSWORD, nacosDiscoveryProperties.getPassword()); NamingService namingService = NamingFactory.createNamingService(properties); InetAddress address = InetAddress.getLocalHost(); // 定義服務實例信息 Instance instance = new Instance(); instance.setIp(address.getHostAddress()); instance.setPort(nettyPort); instance.setWeight(1.0); instance.setHealthy(true); namingService.registerInstance(nettyName, nacosDiscoveryProperties.getGroup(), instance); } catch (Exception e) { throw new RuntimeException(e); } } /** * 釋放資源 */ @PreDestroy public void destroy() throws InterruptedException { if (bossGroup != null) { bossGroup.shutdownGracefully().sync(); } if (workGroup != null) { workGroup.shutdownGracefully().sync(); } log.info("關閉Netty"); } }
使用CommandLineRunner接口實現(xiàn)run方法在啟動項目的時候把Netty服務帶起。
bossGroup
和 workGroup
的角色區(qū)別
bossGroup(老板組)
主要職責是負責監(jiān)聽服務器端的端口,等待新的客戶端連接請求到來。它就像是公司里負責接待新客戶的前臺人員,當有新客戶(客戶端)想要連接到服務器時,
bossGroup
中的EventLoop
會接收到這個連接請求。一般情況下,
bossGroup
只需要配置較少數(shù)量的EventLoop
就可以滿足需求,因為它主要處理的是連接建立的初期階段,即接受新連接這個相對不那么頻繁的操作(相比于后續(xù)處理大量數(shù)據(jù)傳輸?shù)炔僮鳎?。通常會設置為 1 個EventLoop
或者根據(jù)服務器的具體性能和預期的連接請求頻率適當增加數(shù)量,但總體數(shù)量相對較少。workGroup(工作組)
一旦
bossGroup
接受了新的客戶端連接,就會把這個新連接交給workGroup
來進一步處理后續(xù)的所有與該連接相關的操作,比如讀取客戶端發(fā)送的數(shù)據(jù)、向客戶端發(fā)送響應數(shù)據(jù)等。它就像是公司里負責具體為客戶辦理業(yè)務的工作人員。workGroup
需要處理大量的實際業(yè)務數(shù)據(jù)傳輸和交互工作,所以通常會根據(jù)服務器的性能和預期要處理的并發(fā)連接數(shù)量等因素,配置相對較多數(shù)量的EventLoop
。例如,在處理高并發(fā)場景時,可能會配置幾十甚至上百個EventLoop
來確保能夠高效地處理眾多客戶端連接的各種業(yè)務操作。
registerNamingService方法
這時候可以看到我們Nacos配置中配置了server.netty.port
和server.netty.application.name
這兩個參數(shù)分別對應netty的端口和netty的微服務應用名。
registerNamingService方法用于往Nacos中注冊服務,這里通過NamingService
類的registerInstance方法
將netty服務注冊進Nacos中。
1.2. Gateway網(wǎng)關轉發(fā)
微服務中所有的請求都是由網(wǎng)關轉發(fā),這里使用Gateway轉發(fā)。
# spring配置 spring: cloud: gateway: discovery: locator: lowerCaseServiceId: true enabled: true routes: # 系統(tǒng)模塊 - id: soc-dmoasp-system uri: lb://soc-dmoasp-system predicates: - Path=/system/** filters: - StripPrefix=1 # netty服務 - id: netty-server uri: lb:ws://soc-netty-server predicates: - Path=/netty-server/** filters: - StripPrefix=1 #不需要進行權限校驗的uri security: ignore: whites: - /auth/logout - /auth/login - /auth/register - /*/v2/api-docs - /csrf #netty連接地址 - /netty-server/**
配置文件中添加Netty路由,在鑒權網(wǎng)關中需要將socket地址放行,不進行權限驗證。例如:
@Component @RefreshScope public class AuthFilter implements GlobalFilter, Ordered { private static final Logger log = LoggerFactory.getLogger(AuthFilter.class); // 排除過濾的 uri 地址,nacos自行添加 @Autowired private IgnoreWhiteProperties ignoreWhite; @Override public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); ServerHttpRequest.Builder mutate = request.mutate(); String url = request.getURI().getPath(); // 跳過不需要驗證的路徑 if (StringUtils.matches(url, ignoreWhite.getWhites())) { return chain.filter(exchange); } ...... } }
啟動Gateway和System模塊
啟動完成后System模塊會打印NettyServer輸出的啟動日志,Nacos中也會有手動注冊的Netty服務。
通過ws://127.0.0.1:8080/netty-server/socket就可以直接連接上Netty服務器(8080為Gateway的端口)。
2. 鑒權、心跳、客戶端與服務端之間的通信
2.1. 鑒權
創(chuàng)建AuthHandler類,繼承SimpleChannelInboundHandler
類重寫channelRead0
方法,channelRead0
中可以監(jiān)聽到客戶端往服務端發(fā)送的消息。 例如:
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.soc.dmoasp.common.core.constant.CacheConstants; import com.soc.dmoasp.common.core.constant.TokenConstants; import com.soc.dmoasp.common.core.enums.NettyMsgEnum; import com.soc.dmoasp.common.core.utils.JwtUtils; import com.soc.dmoasp.common.core.utils.StringUtils; import com.soc.dmoasp.common.redis.service.RedisService; import com.soc.dmoasp.system.server.vo.NettyResult; import io.jsonwebtoken.Claims; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.AttributeKey; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.RedisTemplate; /** * netty鑒權處理 * @author dsw * @date 2024/11/18 17:55 */ public class AuthHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { Logger log = LoggerFactory.getLogger(AuthHandler.class); private final RedisTemplate<String, Object> redisTemplate; private final RedisService redisService; public AuthHandler(RedisTemplate<String, Object> stringObjectRedisTemplate, RedisService redisService) { redisTemplate = stringObjectRedisTemplate; this.redisService = redisService; } @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception { try { JSONObject clientMessage = JSON.parseObject(textWebSocketFrame.text()); //獲取code 只判斷code為鑒權的消息類型 Integer code = clientMessage.getInteger("code"); if (NettyMsgEnum.AUTH_MESSAGE.getCode().equals(code)) { //獲取token String token = clientMessage.getString("token"); if (StringUtils.isEmpty(token)) { ctx.channel().writeAndFlush(NettyResult.authFail("令牌不能為空")); ctx.close(); } // 如果前端設置了令牌前綴,則裁剪掉前綴 if (StringUtils.isNotEmpty(token) && token.startsWith(TokenConstants.PREFIX)) { token = token.replaceFirst(TokenConstants.PREFIX, StringUtils.EMPTY); } //JWT校驗 Claims claims = JwtUtils.parseToken(token); if (claims == null) { ctx.channel().writeAndFlush(NettyResult.authFail("令牌已過期或驗證不正確")); ctx.close(); } String userkey = JwtUtils.getUserKey(claims); //從Redis中查看是否有這個Token沒有則不能登錄 boolean islogin = redisService.hasKey(getTokenKey(userkey)); if (!islogin) { ctx.channel().writeAndFlush(NettyResult.authFail("登錄狀態(tài)已過期")); ctx.close(); } //獲取用戶保存至Socket連接會話中 String userId = JwtUtils.getUserId(claims); AttributeKey<String> userIdKey = AttributeKey.valueOf("userId"); ctx.channel().attr(userIdKey).setIfAbsent(userId); JSONObject jsonObject = new JSONObject(); jsonObject.put("userId",userId); log.info("有新的Socket客戶端鏈接 userId :{}", userId); //將連接信息保存至Redis中key為userId value為ctx.channel().id() redisTemplate.opsForHash().put(CacheConstants.getUserChannelKey(),userId,ctx.channel().id()); ctx.channel().writeAndFlush(NettyResult.success(NettyMsgEnum.AUTH_MESSAGE.getCode(), "鑒權成功", jsonObject)); //鑒權完成后移除AuthHandler消息監(jiān)聽 ctx.pipeline().remove(AuthHandler.class); } else { ctx.channel().writeAndFlush(NettyResult.authFail("請先鑒權,在發(fā)送其他類型請求!")); ctx.close(); } } catch (Exception e) { log.error(e.getMessage()); ctx.channel().writeAndFlush(NettyResult.authFail("鑒權失敗")); ctx.close(); } } /** * 獲取緩存key */ private String getTokenKey(String token) { return CacheConstants.LOGIN_TOKEN_KEY + token; } }
泛型TextWebSocketFrame
表示接收文本類型的消息。
其中連接的用戶信息保存到Redis中,RedisTemplate<String, Object> redisTemplate
對象是用來保存Netty連接信息的,序列化使用的是String(用戶信息用String存儲,使用JSON序列化會反序列化失敗),對應接收的JSON串如下:
{"code":1001,token:"Bearer XXXX"}
code取NettyMsgEnum中的code,token則是登錄時生成的token令牌。
鑒權后將AuthHandler移除,會話后續(xù)的消息交互不在進AuthHandler。
NettyMsgEnum如下:
/** * netty消息類型枚舉 * @author dsw * @date 2024/11/18 17:58 */ public enum NettyMsgEnum { AUTH_MESSAGE(1001, "鑒權消息","Auth-Netty"), //{'code':1003,'data':{'unreadCount':0}} NOTICE_MESSAGE(1003, "公告通知消息","Notice-Netty"), HEART_MESSAGE(1006, "心跳消息","Heart-Netty"), ERROR_MESSAGE(-1, "錯誤",null); private final Integer code; private final String info; private final String strategyName; NettyMsgEnum(Integer code, String info, String strategyName){ this.code = code; this.info = info; this.strategyName = strategyName; } public static NettyMsgEnum getByCode(Integer code) { for (NettyMsgEnum msgEnum : values()) { if (msgEnum.getCode().equals(code)) { return msgEnum; } } return ERROR_MESSAGE; } public Integer getCode() { return code; } public String getInfo() { return info; } public String getStrategyName() { return strategyName; } }
NettyResult如下:
import com.alibaba.fastjson2.JSON; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import java.io.Serializable; /** * netty響應實體 * @author dsw * @date 2024/11/18 18:02 */ public class NettyResult implements Serializable { private static final long serialVersionUID = 1L; private Integer code; private String message; private Object data; public NettyResult(Integer code, String message, Object data) { this.code = code; this.message = message; this.data = data; } public static TextWebSocketFrame fail(String message) { return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(-1, message, null))); } public static TextWebSocketFrame authFail(String message) { return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(-2, message, null))); } public static TextWebSocketFrame success( String message) { return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(200, message, null))); } public static TextWebSocketFrame success(Integer code, Object data) { return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(code,null, data))); } public static TextWebSocketFrame success(Integer code, String message, Object data) { return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(code,message, data))); } public Integer getCode() { return code; } public String getMessage() { return message; } public Object getData() { return data; } }
最后到NettyServer中的ChannelInitializer加入AuthHandler:
我們重新項目后連接socket查看結果
如果不鑒權直接發(fā)送消息,服務端會主動斷開連接,客戶端需要重連。
這就代表已經(jīng)連接成功了。
2.2. 空閑檢測
創(chuàng)建WebSocketIdleStateHandler
類繼承IdleStateHandler
類,重寫channelIdle
方法。
import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; /** * 空閑檢測 * @author dsw * @date 2024/11/18 11:47 */ public class WebSocketIdleStateHandler extends IdleStateHandler { Logger log = LoggerFactory.getLogger(WebSocketIdleStateHandler.class); /** * 默認的讀空閑時間 */ private static final int DEFAULT_READER_IDLE_TIME = 10; /** * 默認10秒讀空閑斷開客戶端 */ public WebSocketIdleStateHandler() { super(DEFAULT_READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS); } /** * 指定心跳時間(秒) * * @param readerIdleTimeSeconds 讀空閑時間 * @param writerIdleTimeSeconds 寫空閑時間 * @param allIdleTimeSeconds 讀寫空閑時間 */ public WebSocketIdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) { super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds, TimeUnit.SECONDS); } /** * 指定心跳時間及時間單位 * * @param readerIdleTime 讀空閑時間 * @param writerIdleTime 寫空閑時間 * @param allIdleTime 讀寫空閑時間 * @param unit 時間單位 */ public WebSocketIdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) { super(readerIdleTime, writerIdleTime, allIdleTime, unit); } /** * 當空閑事件觸發(fā)時執(zhí)行 */ @Override protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception { //如果是讀空閑 if (evt.state().equals(IdleState.READER_IDLE)) { Channel channel = ctx.channel(); log.debug("服務端未檢測到客戶端【{}】的心跳包,強制關閉客戶端!", channel.id()); channel.close(); } super.channelIdle(ctx,evt); } }
以上實現(xiàn)了父類的構造函數(shù),可以指定具體的空閑時間。當空閑時會觸發(fā)channelIdle
方法,則服務端主動斷開連接。
最后到NettyServer中的ChannelInitializer加入WebSocketIdleStateHandler:
加到最前面。
示例設置了10秒斷開,需要使用中自行調整。
2.3. 消息通信
2.3.1. 接收客戶端的消息
創(chuàng)建WebSocketHandler
類,繼承SimpleChannelInboundHandler
類重寫channelRead0、handlerAdded、handlerRemoved、exceptionCaught
方法。
channelRead0
方法:監(jiān)聽客戶端發(fā)送過來的消息。handlerAdded
方法:websocket連接后會調用,將連接信息添加到通道組handlerRemoved
方法:斷開連接后會調用(服務端、客戶端斷開都會調用),用于用戶下線(刪除通道、刪除Redis中存儲的連接信息)exceptionCaught
方法:發(fā)生異常后調用,發(fā)生異常后服務端通常會主動斷開連接。
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONObject; import com.soc.dmoasp.common.core.constant.CacheConstants; import com.soc.dmoasp.common.core.enums.NettyMsgEnum; import com.soc.dmoasp.system.server.config.NettyConfig; import com.soc.dmoasp.system.server.strategy.NettyStrategy; import com.soc.dmoasp.system.server.strategy.NettyStrategyFactory; import com.soc.dmoasp.system.server.vo.NettyResult; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.AttributeKey; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.RedisTemplate; /** * webSocket處理 * @author dsw * @date 2024/11/18 10:20 */ public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { Logger log = LoggerFactory.getLogger(WebSocketHandler.class); private final RedisTemplate<String, Object> redisTemplate; private final NettyStrategyFactory nettyStrategyFactory; public WebSocketHandler(RedisTemplate<String, Object> redisTemplate, NettyStrategyFactory nettyStrategyFactory) { this.redisTemplate = redisTemplate; this.nettyStrategyFactory = nettyStrategyFactory; } /** * webSocket連接創(chuàng)建后調用 */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // 添加到channelGroup 通道組 NettyConfig.getChannelGroup().add(ctx.channel()); } /** * 讀取數(shù)據(jù) */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception { AttributeKey<String> userIdKey = AttributeKey.valueOf("userId"); String userId = ctx.channel().attr(userIdKey).get(); log.info("收到消息 userId:{} message:{}",userId,frame.text()); // 接收客戶端的消息 JSONObject pullMessage = JSON.parseObject(frame.text()); Integer code = pullMessage.getInteger("code"); // 獲取消息類型 NettyStrategy nettyStrategy = nettyStrategyFactory.getNettyStrategy(NettyMsgEnum.getByCode(code)); // 處理消息 TextWebSocketFrame pushMessage = nettyStrategy.execute(pullMessage); // 返回處理結果給客戶端 ctx.channel().writeAndFlush(pushMessage); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { AttributeKey<String> userIdKey = AttributeKey.valueOf("userId"); String userId = ctx.channel().attr(userIdKey).get(); log.info("用戶下線了 userId:{}",userId); // 刪除通道 removeUserId(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.channel().writeAndFlush(NettyResult.fail("系統(tǒng)錯誤:" + cause.getMessage())); } /** * 刪除用戶與channel的對應關系 */ private void removeUserId(ChannelHandlerContext ctx) { AttributeKey<String> userIdKey = AttributeKey.valueOf("userId"); String userId = ctx.channel().attr(userIdKey).get(); if(StringUtils.isNotBlank(userId)){ redisTemplate.opsForHash().delete(CacheConstants.getUserChannelKey(),userId); } } }
這里收到消息后通過一個策略模式進入不同的策略,通過NettyMsgEnum
里面定義的code指定不同的策略類。
Strategy和StrategyFactory:
import com.alibaba.fastjson2.JSONObject; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; /** * Netty接收消息處理策略類 * @author dsw * @date 2024/5/27 10:21 */ public interface NettyStrategy { /** * 執(zhí)行添加數(shù)值 * * @return */ TextWebSocketFrame execute(JSONObject message); }
import com.soc.dmoasp.common.core.enums.NettyMsgEnum; import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.Map; /** * Netty策略工廠 * @author dsw * @date 2024/11/18 10:20 */ @Component public class NettyStrategyFactory { Logger log = org.slf4j.LoggerFactory.getLogger(NettyStrategyFactory.class); /** * 通過Spring容器的方式注入 */ @Autowired private Map<String, NettyStrategy> nettyStrategy; /** * 獲取對應策略類 * @param */ public NettyStrategy getNettyStrategy(NettyMsgEnum nettyMsgEnum){ if(!nettyStrategy.containsKey(nettyMsgEnum.getStrategyName())){ log.warn("沒有對應的消息策略"); throw new RuntimeException("沒有對應的消息策略"); } return nettyStrategy.get(nettyMsgEnum.getStrategyName()); } }
我們實現(xiàn)一個心跳消息的策略:
import com.alibaba.fastjson2.JSONObject; import com.soc.dmoasp.common.core.enums.NettyMsgEnum; import com.soc.dmoasp.system.server.strategy.NettyStrategy; import com.soc.dmoasp.system.server.vo.NettyResult; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import org.springframework.stereotype.Component; /** * Netty心跳消息 * @author dsw * @date 2024/5/27 10:25 */ @Component("Heart-Netty") public class HeartStrategyImpl implements NettyStrategy { @Override public TextWebSocketFrame execute(JSONObject message) { String data = message.getString("data"); if ("ping".equals(data)) { return NettyResult.success(NettyMsgEnum.HEART_MESSAGE.getCode(), null, "pong"); } return NettyResult.fail("消息格式不正確"); } }
添加至ChannelInitializer
后重啟查看效果。
2.3.2. 發(fā)送消息給客戶端
集群下面的Netty配置
方案1:使用Redis的發(fā)布訂閱
方案2:使用MQ的發(fā)布訂閱
我們這里使用使用Redis的發(fā)布訂閱實現(xiàn)。
添加Redis訂閱器:
import com.soc.dmoasp.common.core.constant.CacheConstants; import com.soc.dmoasp.common.redis.configure.FastJson2JsonRedisSerializer; import com.soc.dmoasp.system.server.receiver.PushMsgRedisReceiver; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import java.util.Collections; import java.util.List; /** * redisReceiver 配置 * @author dsw * @date 2024/11/18 12:07 */ @Configuration public class RedisReceiverConfig { @Bean public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); List<PatternTopic> topics = Collections.singletonList( PatternTopic.of(CacheConstants.Topic.SYS_SOCKET_PUSH_TOPIC) ); // 添加訂閱者監(jiān)聽類,數(shù)量不限.PatternTopic定義監(jiān)聽主題,這里監(jiān)聽test-topic主題 container.addMessageListener(listenerAdapter, topics); return container; } @Bean @SuppressWarnings(value = { "unchecked", "rawtypes" }) public MessageListenerAdapter listenerAdapter(PushMsgRedisReceiver pushMsgRedisReceiver) { MessageListenerAdapter adapter = new MessageListenerAdapter(pushMsgRedisReceiver); FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class); adapter.setSerializer(serializer); return adapter; } }
import com.soc.dmoasp.common.core.constant.CacheConstants; import com.soc.dmoasp.common.core.exception.Asserts; import com.soc.dmoasp.common.redis.dto.NettyMessage; import com.soc.dmoasp.system.server.config.NettyConfig; import com.soc.dmoasp.system.server.vo.NettyResult; import io.netty.channel.Channel; import io.netty.channel.ChannelId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.util.Objects; /** * Netty發(fā)送消息接收者 * @author dsw * @date 2024/11/18 12:07 */ @Component public class PushMsgRedisReceiver { Logger log = LoggerFactory.getLogger(PushMsgRedisReceiver.class); @Autowired private RedisTemplate<String,Object> stringObjectRedisTemplate; /** * RedisTopic訂閱 * @param nettyMessage netty消息 * @param topic netty消息對應的topic */ public void handleMessage(NettyMessage nettyMessage, String topic) { Object channelId = stringObjectRedisTemplate.opsForHash().get(CacheConstants.getUserChannelKey(), nettyMessage.getUserId()); if (Objects.isNull(channelId)) { log.warn("推送消息失敗,用戶不在線! userId:{},msg:{}",nettyMessage.getUserId(),nettyMessage.getMessage()); Asserts.fail("推送消息失敗,用戶不在線!"); } Channel channel = NettyConfig.getChannelGroup().find((ChannelId) channelId); if(channel!=null){ channel.writeAndFlush(NettyResult.success(nettyMessage.getCode(),nettyMessage.getMessage())); log.info("推送消息成功! userId:{},msg:{}",nettyMessage.getUserId(),nettyMessage.getMessage()); }else { log.warn("推送消息失敗,沒有找到Channel! userId:{},msg:{}",nettyMessage.getUserId(),nettyMessage.getMessage()); } } }
發(fā)布訂閱的機制就是所有集群都會收到消息,收到消息后每個netty集群都去找對應的消息會話通道,如果沒找到則說明連接不到當前服務上,找到通道后則可以直接推送。 這里使用stringObjectRedisTemplate
獲取用戶通道,避免序列化失敗。
RedisService中實現(xiàn)發(fā)布消息
/** * Redis消息發(fā)布訂閱 發(fā)布消息 * @param channel 通道ID * @param message 消息 */ @Async public void convertAndSend(String channel, Object message) { redisTemplate.convertAndSend(channel, message); }
發(fā)布消息的工具類:
import com.alibaba.fastjson2.JSONObject; import com.soc.dmoasp.common.core.constant.CacheConstants; import com.soc.dmoasp.common.redis.dto.NettyMessage; import com.soc.dmoasp.common.redis.dto.PushSocketMsgDTO; import org.slf4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import java.util.Set; /** * Netty發(fā)送消息給服務端 * @author dsw * @date 2024/11/18 11:47 */ @Service public class PushSocketMsgService { Logger log = org.slf4j.LoggerFactory.getLogger(PushSocketMsgService.class); @Autowired private RedisTemplate<String, Object> stringObjectRedisTemplate; @Autowired private RedisService redisService; /** * 給所有用戶發(fā)送消息 * @param msgDTO */ public void pushMsgToAll( PushSocketMsgDTO msgDTO) { Set<Object> keys = stringObjectRedisTemplate.opsForHash().keys(CacheConstants.getUserChannelKey()); keys.forEach(key -> this.pushMsgToUser(msgDTO.getCode(), key.toString(), msgDTO.getMessage())); } /** * 給指定用戶發(fā)送消息 * @param msgDTO */ public void pushMsgToUserList(PushSocketMsgDTO msgDTO) { for(Long userId : msgDTO.getUserIdList()){ this.pushMsgToUser(msgDTO.getCode(), userId.toString(), msgDTO.getMessage()); } } protected void pushMsgToUser(Integer code, String userId, JSONObject message) { //推送到其他負載處理 NettyMessage nettyMessage = new NettyMessage(); nettyMessage.setUserId(userId); nettyMessage.setCode(code); nettyMessage.setMessage(message); redisService.convertAndSend(CacheConstants.Topic.SYS_SOCKET_PUSH_TOPIC, nettyMessage); log.info("推送消息成功! userId:{},message:{}", userId, message); } }
NettyMessage和PushSocketMsgDTO:
import com.alibaba.fastjson2.JSONObject; import lombok.Data; /** * Neety消息發(fā)布VO * @author dsw * @date 2024/11/18 13:49 */ @Data public class NettyMessage { private Integer code; private String userId; private JSONObject message; }
import com.alibaba.fastjson2.JSONObject; import lombok.Data; /** * 發(fā)送socket消息DTO * @author dsw * @date 2024/11/18 13:39 */ @Data public class PushSocketMsgDTO { /** * 消息類型 * 詳情看 NettyMsgEnum */ private Integer code; /** * 用戶ID */ private List<Long> userIdList; /** * 消息體 */ private JSONObject message; }
測試結果:
以上就是SpringCloud整合Netty集群實現(xiàn)WebSocket的示例代碼的詳細內(nèi)容,更多關于SpringCloud Netty實現(xiàn)WebSocket的資料請關注腳本之家其它相關文章!
相關文章
Java Scanner類用法及nextLine()產(chǎn)生的換行符問題實例分析
這篇文章主要介紹了Java Scanner類用法及nextLine()產(chǎn)生的換行符問題,結合實例形式分析了Scanner類功能、hasNextInt()和nextInt()方法使用及nextLine()產(chǎn)生的換行符問題解決方法,需要的朋友可以參考下2019-03-03AbstractProcessor擴展MapStruct自動生成實體映射工具類
這篇文章主要為大家介紹了AbstractProcessor擴展MapStruct自動生成實體映射工具類實現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-01-01Mybatis-Plus或PageHelper多表分頁查詢總條數(shù)不對問題的解決方法
PageHelper 這個插件用了很多次了,今天使用的時候才遇到一個問題,這篇文章主要給大家介紹了關于Mybatis-Plus或PageHelper多表分頁查詢總條數(shù)不對問題的解決方法,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下2022-08-08SpringBoot整合Mybatis與druid實現(xiàn)流程詳解
這篇文章主要介紹了springboot整合mybatis plus與druid詳情,文章圍繞主題展開詳細的內(nèi)容介紹,具有一定的參考價值,需要的下伙伴可以參考一下2022-10-10SpringBoot+JPA?分頁查詢指定列并返回指定實體方式
這篇文章主要介紹了SpringBoot+JPA?分頁查詢指定列并返回指定實體方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-12-12springboot+jwt實現(xiàn)token登陸權限認證的實現(xiàn)
這篇文章主要介紹了springboot+jwt實現(xiàn)token登陸權限認證的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-06-06