欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringCloud整合Netty集群實現(xiàn)WebSocket的示例代碼

 更新時間:2024年11月19日 08:53:43   作者:黑加菲  
文章主要介紹了SpringCloud整合Netty集群實現(xiàn)WebSocket的相關內(nèi)容,包括服務注冊和發(fā)現(xiàn)中心的配置,如使用Nacos、CommandLineRunner啟動Netty服務等,還介紹了通過Redis實現(xiàn)消息發(fā)布訂閱的機制,需要的朋友可以參考下

引言

在分布式系統(tǒng)中,Spring Cloud 為微服務架構提供了豐富的功能,而 Netty 是一個高性能的網(wǎng)絡通信框架。將二者結合實現(xiàn) Socket 集群,可以滿足大規(guī)模、高性能網(wǎng)絡通信的需求,實現(xiàn)前后端間高效穩(wěn)定的通信。

1. 服務注冊和發(fā)現(xiàn)中心

這里服務注冊和發(fā)行中心使用nacos為例(需要啟動一個nacos服務器)。

微服務注冊: 在每一個微服務項目中,添加Nacos客戶端連接,并在配置文件中指定服務名稱和端口。例如:

# Tomcat
server:
  port: 9201
  netty:
    port: 10201
    application:
      name: yhy-netty-server

# Spring
spring:
  application:
    # 應用名稱
    name: soc-dmoasp-system
  profiles:
    # 環(huán)境配置
    active: dev
  cloud:
    nacos:
      discovery:
        # 服務注冊地址
        server-addr: nacos-registry:8858
      config:
        # 配置中心地址
        server-addr: nacos-registry:8858
        file-extension: yml
        # 共享配置
        shared-configs:
          - data-id: application.${spring.cloud.nacos.config.file-extension}
            refresh: true
          - data-id: soc-dmoasp-redission.${spring.cloud.nacos.config.file-extension}
          - data-id: soc-dmoasp-druid.${spring.cloud.nacos.config.file-extension}

這是一個基本的服務配置。里面關于netty的applicaiton.name和port可以通過Nacos的NamingService類手動注冊。

1.1. Netty服務器搭建

  • 添加Netty依賴:在具體微服務中的pom.xml中添加Netty依賴:
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-reactor-netty</artifactId>
</dependency>
  • Netty啟動類:創(chuàng)建一個NettyServer類,用于啟動Netty,示例如下:
import com.alibaba.cloud.nacos.NacosDiscoveryProperties;
import com.alibaba.nacos.api.PropertyKeyConst;
import com.alibaba.nacos.api.naming.NamingFactory;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.soc.dmoasp.system.server.handler.WebSocketIdleStateHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;
import java.net.InetAddress;
import java.util.Properties;

/**
 * Netty服務
 * @author dsw
 * @date 2024/11/18 17:34
 */
@Component
public class NettyServer implements CommandLineRunner {

    Logger log = LoggerFactory.getLogger(NettyServer.class);

    @Autowired
    private NacosDiscoveryProperties nacosDiscoveryProperties;


    @Value("${server.netty.port}")
    private Integer nettyPort;
    @Value("${server.netty.application.name}")
    private String nettyName;

    EventLoopGroup bossGroup;

    EventLoopGroup workGroup;

    @Override
    public void run(String... args) throws Exception {
        log.info("初始化netty配置開始");
        //netty 服務端啟動的端口不可和Springboot啟動類的端口號重復
        this.start();
        //關閉服務器的時候同時關閉Netty服務
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                this.destroy();
            } catch (InterruptedException e) {
                log.error(e.getMessage());
            }
        }));
    }

    @Async
    public void start() throws InterruptedException {
        try {
            bossGroup = new NioEventLoopGroup(1);
            workGroup = new NioEventLoopGroup(10);
            ServerBootstrap bootstrap = new ServerBootstrap();
            // bossGroup輔助客戶端的tcp連接請求, workGroup負責與客戶端之前的讀寫操作
            bootstrap.group(bossGroup, workGroup)
            // 指定Channel
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline
                                    //HTTP解碼
                                    .addLast(new HttpServerCodec())
                                    .addLast(new ChunkedWriteHandler())
                                    //HTTP段聚合
                                    .addLast(new HttpObjectAggregator(1024*1024))
                                    //將HTTP協(xié)議轉成ws協(xié)議
                                    .addLast(new WebSocketServerProtocolHandler("/socket"))

                            ;

                        }
                    });
            registerNamingService(nettyName,nettyPort);
            // 配置完成,開始綁定server,通過調用sync同步方法阻塞直到綁定成功
            ChannelFuture future = bootstrap.bind(nettyPort).sync();
            if (future.isSuccess()) {
                log.info("Server started and listen on:{}", future.channel().localAddress());
                log.info("啟動 Netty Server");
            }
        } catch (InterruptedException e) {
            log.error("netty異常:{}", e.getMessage());
        }
    }
    /**
     * 將Netty服務注冊進Nacos
     *
     * @param nettyName 服務名稱
     * @param nettyPort 服務端口號
     */
    private void registerNamingService(String nettyName, Integer nettyPort) {
        try {
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.SERVER_ADDR, nacosDiscoveryProperties.getServerAddr());
            properties.setProperty(PropertyKeyConst.NAMESPACE, nacosDiscoveryProperties.getNamespace());
            properties.setProperty(PropertyKeyConst.USERNAME, nacosDiscoveryProperties.getUsername());
            properties.setProperty(PropertyKeyConst.PASSWORD, nacosDiscoveryProperties.getPassword());
            NamingService namingService = NamingFactory.createNamingService(properties);
            InetAddress address = InetAddress.getLocalHost();
            // 定義服務實例信息
            Instance instance = new Instance();
            instance.setIp(address.getHostAddress());
            instance.setPort(nettyPort);
            instance.setWeight(1.0);
            instance.setHealthy(true);
            namingService.registerInstance(nettyName, nacosDiscoveryProperties.getGroup(), instance);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 釋放資源
     */
    @PreDestroy
    public void destroy() throws InterruptedException {
        if (bossGroup != null) {
            bossGroup.shutdownGracefully().sync();
        }
        if (workGroup != null) {
            workGroup.shutdownGracefully().sync();
        }
        log.info("關閉Netty");

    }

}

使用CommandLineRunner接口實現(xiàn)run方法在啟動項目的時候把Netty服務帶起。

bossGroup workGroup 的角色區(qū)別

  • bossGroup(老板組)

    主要職責是負責監(jiān)聽服務器端的端口,等待新的客戶端連接請求到來。它就像是公司里負責接待新客戶的前臺人員,當有新客戶(客戶端)想要連接到服務器時,bossGroup中的EventLoop會接收到這個連接請求。

    一般情況下,bossGroup只需要配置較少數(shù)量的EventLoop就可以滿足需求,因為它主要處理的是連接建立的初期階段,即接受新連接這個相對不那么頻繁的操作(相比于后續(xù)處理大量數(shù)據(jù)傳輸?shù)炔僮鳎?。通常會設置為 1 個EventLoop或者根據(jù)服務器的具體性能和預期的連接請求頻率適當增加數(shù)量,但總體數(shù)量相對較少。

  • workGroup(工作組)

    一旦bossGroup接受了新的客戶端連接,就會把這個新連接交給workGroup來進一步處理后續(xù)的所有與該連接相關的操作,比如讀取客戶端發(fā)送的數(shù)據(jù)、向客戶端發(fā)送響應數(shù)據(jù)等。它就像是公司里負責具體為客戶辦理業(yè)務的工作人員。

    workGroup需要處理大量的實際業(yè)務數(shù)據(jù)傳輸和交互工作,所以通常會根據(jù)服務器的性能和預期要處理的并發(fā)連接數(shù)量等因素,配置相對較多數(shù)量的EventLoop。例如,在處理高并發(fā)場景時,可能會配置幾十甚至上百個EventLoop來確保能夠高效地處理眾多客戶端連接的各種業(yè)務操作。

registerNamingService方法

這時候可以看到我們Nacos配置中配置了server.netty.portserver.netty.application.name這兩個參數(shù)分別對應netty的端口和netty的微服務應用名。

registerNamingService方法用于往Nacos中注冊服務,這里通過NamingService類的registerInstance方法將netty服務注冊進Nacos中。

1.2. Gateway網(wǎng)關轉發(fā)

微服務中所有的請求都是由網(wǎng)關轉發(fā),這里使用Gateway轉發(fā)。

# spring配置
  spring:
    cloud:
      gateway:
        discovery:
          locator:
            lowerCaseServiceId: true
            enabled: true
        routes:
          # 系統(tǒng)模塊
          - id: soc-dmoasp-system
            uri: lb://soc-dmoasp-system
            predicates:
              - Path=/system/**
            filters:
              - StripPrefix=1
          # netty服務
          - id:  netty-server
            uri: lb:ws://soc-netty-server
            predicates:
              - Path=/netty-server/**
            filters:
              - StripPrefix=1
#不需要進行權限校驗的uri
security:
  ignore:
    whites:
      - /auth/logout
      - /auth/login
      - /auth/register
      - /*/v2/api-docs
      - /csrf
      #netty連接地址
      - /netty-server/**

配置文件中添加Netty路由,在鑒權網(wǎng)關中需要將socket地址放行,不進行權限驗證。例如:

@Component
@RefreshScope
public class AuthFilter implements GlobalFilter, Ordered
{
    private static final Logger log = LoggerFactory.getLogger(AuthFilter.class);

    // 排除過濾的 uri 地址,nacos自行添加
    @Autowired
    private IgnoreWhiteProperties ignoreWhite;

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain)
    {
        ServerHttpRequest request = exchange.getRequest();
        ServerHttpRequest.Builder mutate = request.mutate();

        String url = request.getURI().getPath();
        // 跳過不需要驗證的路徑
        if (StringUtils.matches(url, ignoreWhite.getWhites()))
        {
            return chain.filter(exchange);
        }
        ......
    }
}

啟動Gateway和System模塊

啟動完成后System模塊會打印NettyServer輸出的啟動日志,Nacos中也會有手動注冊的Netty服務。

通過ws://127.0.0.1:8080/netty-server/socket就可以直接連接上Netty服務器(8080為Gateway的端口)。

2. 鑒權、心跳、客戶端與服務端之間的通信

2.1. 鑒權

創(chuàng)建AuthHandler類,繼承SimpleChannelInboundHandler類重寫channelRead0方法,channelRead0中可以監(jiān)聽到客戶端往服務端發(fā)送的消息。 例如:

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.soc.dmoasp.common.core.constant.CacheConstants;
import com.soc.dmoasp.common.core.constant.TokenConstants;
import com.soc.dmoasp.common.core.enums.NettyMsgEnum;
import com.soc.dmoasp.common.core.utils.JwtUtils;
import com.soc.dmoasp.common.core.utils.StringUtils;
import com.soc.dmoasp.common.redis.service.RedisService;
import com.soc.dmoasp.system.server.vo.NettyResult;
import io.jsonwebtoken.Claims;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;

/**
 * netty鑒權處理
 * @author dsw
 * @date 2024/11/18 17:55
 */
public class AuthHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    Logger log = LoggerFactory.getLogger(AuthHandler.class);

    private final RedisTemplate<String, Object> redisTemplate;

    private final RedisService redisService;

    public AuthHandler(RedisTemplate<String, Object> stringObjectRedisTemplate, RedisService redisService) {
        redisTemplate = stringObjectRedisTemplate;
        this.redisService = redisService;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
        try {
            JSONObject clientMessage = JSON.parseObject(textWebSocketFrame.text());
            //獲取code 只判斷code為鑒權的消息類型
            Integer code = clientMessage.getInteger("code");
            if (NettyMsgEnum.AUTH_MESSAGE.getCode().equals(code)) {
                //獲取token
                String token = clientMessage.getString("token");
                if (StringUtils.isEmpty(token))
                {
                    ctx.channel().writeAndFlush(NettyResult.authFail("令牌不能為空"));
                    ctx.close();
                }
                // 如果前端設置了令牌前綴,則裁剪掉前綴
                if (StringUtils.isNotEmpty(token) && token.startsWith(TokenConstants.PREFIX))
                {
                    token = token.replaceFirst(TokenConstants.PREFIX, StringUtils.EMPTY);
                }
                //JWT校驗
                Claims claims = JwtUtils.parseToken(token);
                if (claims == null)
                {
                    ctx.channel().writeAndFlush(NettyResult.authFail("令牌已過期或驗證不正確"));
                    ctx.close();
                }
                String userkey = JwtUtils.getUserKey(claims);
                //從Redis中查看是否有這個Token沒有則不能登錄
                boolean islogin = redisService.hasKey(getTokenKey(userkey));
                if (!islogin)
                {
                    ctx.channel().writeAndFlush(NettyResult.authFail("登錄狀態(tài)已過期"));
                    ctx.close();
                }
                //獲取用戶保存至Socket連接會話中
                String userId = JwtUtils.getUserId(claims);
                AttributeKey<String> userIdKey = AttributeKey.valueOf("userId");
                ctx.channel().attr(userIdKey).setIfAbsent(userId);
                JSONObject jsonObject = new JSONObject();
                jsonObject.put("userId",userId);
                log.info("有新的Socket客戶端鏈接 userId :{}", userId);
                //將連接信息保存至Redis中key為userId value為ctx.channel().id()
                redisTemplate.opsForHash().put(CacheConstants.getUserChannelKey(),userId,ctx.channel().id());
                ctx.channel().writeAndFlush(NettyResult.success(NettyMsgEnum.AUTH_MESSAGE.getCode(), "鑒權成功", jsonObject));
                //鑒權完成后移除AuthHandler消息監(jiān)聽
                ctx.pipeline().remove(AuthHandler.class);
            } else {
                ctx.channel().writeAndFlush(NettyResult.authFail("請先鑒權,在發(fā)送其他類型請求!"));
                ctx.close();
            }
        } catch (Exception e) {
            log.error(e.getMessage());
            ctx.channel().writeAndFlush(NettyResult.authFail("鑒權失敗"));
            ctx.close();
        }
    }

    /**
     * 獲取緩存key
     */
    private String getTokenKey(String token)
    {
        return CacheConstants.LOGIN_TOKEN_KEY + token;
    }
}

泛型TextWebSocketFrame表示接收文本類型的消息。

其中連接的用戶信息保存到Redis中,RedisTemplate<String, Object> redisTemplate對象是用來保存Netty連接信息的,序列化使用的是String(用戶信息用String存儲,使用JSON序列化會反序列化失敗),對應接收的JSON串如下:

{"code":1001,token:"Bearer XXXX"}

code取NettyMsgEnum中的code,token則是登錄時生成的token令牌。

鑒權后將AuthHandler移除,會話后續(xù)的消息交互不在進AuthHandler。

NettyMsgEnum如下:

/**
 * netty消息類型枚舉
 * @author dsw
 * @date 2024/11/18 17:58
 */
public enum NettyMsgEnum {

    AUTH_MESSAGE(1001, "鑒權消息","Auth-Netty"),
    //{'code':1003,'data':{'unreadCount':0}}
    NOTICE_MESSAGE(1003, "公告通知消息","Notice-Netty"),
    HEART_MESSAGE(1006, "心跳消息","Heart-Netty"),
    ERROR_MESSAGE(-1, "錯誤",null);

    private final Integer code;

    private final String info;

    private final String strategyName;

    NettyMsgEnum(Integer code, String info, String strategyName){
        this.code = code;
        this.info = info;
        this.strategyName = strategyName;
    }

    public static NettyMsgEnum getByCode(Integer code) {
        for (NettyMsgEnum msgEnum : values()) {
            if (msgEnum.getCode().equals(code)) {
                return msgEnum;
            }
        }
        return ERROR_MESSAGE;
    }

    public Integer getCode() {
        return code;
    }

    public String getInfo() {
        return info;
    }

    public String getStrategyName() {
        return strategyName;
    }
}

NettyResult如下:

import com.alibaba.fastjson2.JSON;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

import java.io.Serializable;

/**
 * netty響應實體
 * @author dsw
 * @date 2024/11/18 18:02
 */
public class NettyResult  implements Serializable {

    private static final long serialVersionUID = 1L;

    private Integer code;

    private String message;

    private Object data;

    public NettyResult(Integer code, String message, Object data) {
        this.code = code;
        this.message = message;
        this.data = data;
    }


    public static TextWebSocketFrame fail(String message) {
        return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(-1, message, null)));
    }

    public static TextWebSocketFrame authFail(String message) {
        return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(-2, message, null)));
    }

    public static TextWebSocketFrame success( String message) {
        return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(200, message, null)));
    }

    public static TextWebSocketFrame success(Integer code, Object data) {
        return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(code,null, data)));
    }

    public static TextWebSocketFrame success(Integer code, String message, Object data) {
        return new TextWebSocketFrame(JSON.toJSONString(new NettyResult(code,message, data)));
    }

    public Integer getCode() {
        return code;
    }

    public String getMessage() {
        return message;
    }

    public Object getData() {
        return data;
    }
}

最后到NettyServer中的ChannelInitializer加入AuthHandler:

我們重新項目后連接socket查看結果

如果不鑒權直接發(fā)送消息,服務端會主動斷開連接,客戶端需要重連。

這就代表已經(jīng)連接成功了。

2.2. 空閑檢測

創(chuàng)建WebSocketIdleStateHandler類繼承IdleStateHandler類,重寫channelIdle方法。

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;

/**
 * 空閑檢測
 * @author dsw
 * @date 2024/11/18 11:47
 */
public class WebSocketIdleStateHandler extends IdleStateHandler {

    Logger log = LoggerFactory.getLogger(WebSocketIdleStateHandler.class);

    /**
     * 默認的讀空閑時間
     */
    private static final int DEFAULT_READER_IDLE_TIME = 10;

    /**
     * 默認10秒讀空閑斷開客戶端
     */
    public WebSocketIdleStateHandler() {
        super(DEFAULT_READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS);
    }

    /**
     * 指定心跳時間(秒)
     *
     * @param readerIdleTimeSeconds 讀空閑時間
     * @param writerIdleTimeSeconds 寫空閑時間
     * @param allIdleTimeSeconds    讀寫空閑時間
     */
    public WebSocketIdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
        super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds, TimeUnit.SECONDS);
    }

    /**
     * 指定心跳時間及時間單位
     *
     * @param readerIdleTime 讀空閑時間
     * @param writerIdleTime 寫空閑時間
     * @param allIdleTime    讀寫空閑時間
     * @param unit           時間單位
     */
    public WebSocketIdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
        super(readerIdleTime, writerIdleTime, allIdleTime, unit);
    }

    /**
     * 當空閑事件觸發(fā)時執(zhí)行
     */
    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        //如果是讀空閑
        if (evt.state().equals(IdleState.READER_IDLE)) {
            Channel channel = ctx.channel();
            log.debug("服務端未檢測到客戶端【{}】的心跳包,強制關閉客戶端!", channel.id());
            channel.close();
        }
        super.channelIdle(ctx,evt);
    }
}

以上實現(xiàn)了父類的構造函數(shù),可以指定具體的空閑時間。當空閑時會觸發(fā)channelIdle方法,則服務端主動斷開連接。

最后到NettyServer中的ChannelInitializer加入WebSocketIdleStateHandler:

加到最前面。

示例設置了10秒斷開,需要使用中自行調整。

2.3. 消息通信

2.3.1. 接收客戶端的消息

創(chuàng)建WebSocketHandler類,繼承SimpleChannelInboundHandler類重寫channelRead0、handlerAdded、handlerRemoved、exceptionCaught方法。

  • channelRead0方法:監(jiān)聽客戶端發(fā)送過來的消息。
  • handlerAdded方法:websocket連接后會調用,將連接信息添加到通道組
  • handlerRemoved方法:斷開連接后會調用(服務端、客戶端斷開都會調用),用于用戶下線(刪除通道、刪除Redis中存儲的連接信息)
  • exceptionCaught方法:發(fā)生異常后調用,發(fā)生異常后服務端通常會主動斷開連接。
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.soc.dmoasp.common.core.constant.CacheConstants;
import com.soc.dmoasp.common.core.enums.NettyMsgEnum;
import com.soc.dmoasp.system.server.config.NettyConfig;
import com.soc.dmoasp.system.server.strategy.NettyStrategy;
import com.soc.dmoasp.system.server.strategy.NettyStrategyFactory;
import com.soc.dmoasp.system.server.vo.NettyResult;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.AttributeKey;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;


/**
 * webSocket處理
 * @author dsw
 * @date 2024/11/18 10:20
 */
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    Logger log = LoggerFactory.getLogger(WebSocketHandler.class);

    private final RedisTemplate<String, Object> redisTemplate;
    
    private final NettyStrategyFactory nettyStrategyFactory;

    public WebSocketHandler(RedisTemplate<String, Object> redisTemplate, NettyStrategyFactory nettyStrategyFactory) {
        this.redisTemplate = redisTemplate;
        this.nettyStrategyFactory = nettyStrategyFactory;
    }

    /**
    * webSocket連接創(chuàng)建后調用
    */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // 添加到channelGroup 通道組
        NettyConfig.getChannelGroup().add(ctx.channel());
    }
    /**
    * 讀取數(shù)據(jù)
    */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame frame) throws Exception {
        AttributeKey<String> userIdKey = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(userIdKey).get();
        log.info("收到消息 userId:{} message:{}",userId,frame.text());
        // 接收客戶端的消息
        JSONObject pullMessage = JSON.parseObject(frame.text());
        Integer code = pullMessage.getInteger("code");
        // 獲取消息類型
        NettyStrategy nettyStrategy = nettyStrategyFactory.getNettyStrategy(NettyMsgEnum.getByCode(code));
        // 處理消息
        TextWebSocketFrame pushMessage = nettyStrategy.execute(pullMessage);
        // 返回處理結果給客戶端
        ctx.channel().writeAndFlush(pushMessage);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        AttributeKey<String> userIdKey = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(userIdKey).get();
        log.info("用戶下線了 userId:{}",userId);
        // 刪除通道
        removeUserId(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.channel().writeAndFlush(NettyResult.fail("系統(tǒng)錯誤:" + cause.getMessage()));
    }

    /**
    * 刪除用戶與channel的對應關系
    */
    private void removeUserId(ChannelHandlerContext ctx) {
        AttributeKey<String> userIdKey = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(userIdKey).get();
      if(StringUtils.isNotBlank(userId)){
         redisTemplate.opsForHash().delete(CacheConstants.getUserChannelKey(),userId);
      }
   }
}

這里收到消息后通過一個策略模式進入不同的策略,通過NettyMsgEnum里面定義的code指定不同的策略類。

Strategy和StrategyFactory:

import com.alibaba.fastjson2.JSONObject;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

/**
 * Netty接收消息處理策略類
 * @author dsw
 * @date 2024/5/27 10:21
 */
public interface NettyStrategy {

    /**
     * 執(zhí)行添加數(shù)值
     *
     * @return
     */
    TextWebSocketFrame execute(JSONObject message);
}
import com.soc.dmoasp.common.core.enums.NettyMsgEnum;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * Netty策略工廠
 * @author dsw
 * @date 2024/11/18 10:20
 */
@Component
public class NettyStrategyFactory {

    Logger log = org.slf4j.LoggerFactory.getLogger(NettyStrategyFactory.class);

    /**
     * 通過Spring容器的方式注入
     */
    @Autowired
    private Map<String, NettyStrategy> nettyStrategy;

    /**
     * 獲取對應策略類
     * @param
     */
    public NettyStrategy getNettyStrategy(NettyMsgEnum nettyMsgEnum){
        if(!nettyStrategy.containsKey(nettyMsgEnum.getStrategyName())){
            log.warn("沒有對應的消息策略");
            throw new RuntimeException("沒有對應的消息策略");
        }
        return nettyStrategy.get(nettyMsgEnum.getStrategyName());
    }
}

我們實現(xiàn)一個心跳消息的策略:

import com.alibaba.fastjson2.JSONObject;
import com.soc.dmoasp.common.core.enums.NettyMsgEnum;
import com.soc.dmoasp.system.server.strategy.NettyStrategy;
import com.soc.dmoasp.system.server.vo.NettyResult;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springframework.stereotype.Component;

/**
 * Netty心跳消息
 * @author dsw
 * @date 2024/5/27 10:25
 */
@Component("Heart-Netty")
public class HeartStrategyImpl implements NettyStrategy {

    @Override
    public TextWebSocketFrame execute(JSONObject message) {
        String data = message.getString("data");
        if ("ping".equals(data)) {
            return NettyResult.success(NettyMsgEnum.HEART_MESSAGE.getCode(), null, "pong");
        }
        return NettyResult.fail("消息格式不正確");
    }
}

添加至ChannelInitializer后重啟查看效果。

2.3.2. 發(fā)送消息給客戶端

集群下面的Netty配置

方案1:使用Redis的發(fā)布訂閱

方案2:使用MQ的發(fā)布訂閱

我們這里使用使用Redis的發(fā)布訂閱實現(xiàn)。

添加Redis訂閱器:

import com.soc.dmoasp.common.core.constant.CacheConstants;
import com.soc.dmoasp.common.redis.configure.FastJson2JsonRedisSerializer;
import com.soc.dmoasp.system.server.receiver.PushMsgRedisReceiver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

import java.util.Collections;
import java.util.List;

/**
 * redisReceiver 配置
 * @author dsw
 * @date 2024/11/18 12:07
 */
@Configuration
public class RedisReceiverConfig {

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory,
                                                                       MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        List<PatternTopic> topics = Collections.singletonList(
            PatternTopic.of(CacheConstants.Topic.SYS_SOCKET_PUSH_TOPIC)
        );
        // 添加訂閱者監(jiān)聽類,數(shù)量不限.PatternTopic定義監(jiān)聽主題,這里監(jiān)聽test-topic主題
        container.addMessageListener(listenerAdapter, topics);
        return container;
    }

    @Bean
    @SuppressWarnings(value = { "unchecked", "rawtypes" })
    public MessageListenerAdapter listenerAdapter(PushMsgRedisReceiver pushMsgRedisReceiver) {
        MessageListenerAdapter adapter = new MessageListenerAdapter(pushMsgRedisReceiver);
        FastJson2JsonRedisSerializer serializer = new FastJson2JsonRedisSerializer(Object.class);
        adapter.setSerializer(serializer);
        return adapter;
    }
}
import com.soc.dmoasp.common.core.constant.CacheConstants;
import com.soc.dmoasp.common.core.exception.Asserts;
import com.soc.dmoasp.common.redis.dto.NettyMessage;
import com.soc.dmoasp.system.server.config.NettyConfig;
import com.soc.dmoasp.system.server.vo.NettyResult;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.Objects;

/**
 * Netty發(fā)送消息接收者
 * @author dsw
 * @date 2024/11/18 12:07
 */
@Component
public class PushMsgRedisReceiver {
    Logger log = LoggerFactory.getLogger(PushMsgRedisReceiver.class);

    @Autowired
    private RedisTemplate<String,Object> stringObjectRedisTemplate;

    /**
     * RedisTopic訂閱
     * @param nettyMessage netty消息
     * @param topic netty消息對應的topic
     */
    public void handleMessage(NettyMessage nettyMessage, String topic) {

        Object channelId = stringObjectRedisTemplate.opsForHash().get(CacheConstants.getUserChannelKey(), nettyMessage.getUserId());
        if (Objects.isNull(channelId)) {
            log.warn("推送消息失敗,用戶不在線! userId:{},msg:{}",nettyMessage.getUserId(),nettyMessage.getMessage());
            Asserts.fail("推送消息失敗,用戶不在線!");
        }
        Channel channel = NettyConfig.getChannelGroup().find((ChannelId) channelId);
        if(channel!=null){
            channel.writeAndFlush(NettyResult.success(nettyMessage.getCode(),nettyMessage.getMessage()));
            log.info("推送消息成功! userId:{},msg:{}",nettyMessage.getUserId(),nettyMessage.getMessage());
        }else {
            log.warn("推送消息失敗,沒有找到Channel! userId:{},msg:{}",nettyMessage.getUserId(),nettyMessage.getMessage());
        }
    }
}

發(fā)布訂閱的機制就是所有集群都會收到消息,收到消息后每個netty集群都去找對應的消息會話通道,如果沒找到則說明連接不到當前服務上,找到通道后則可以直接推送。 這里使用stringObjectRedisTemplate獲取用戶通道,避免序列化失敗。

RedisService中實現(xiàn)發(fā)布消息

/**
 * Redis消息發(fā)布訂閱 發(fā)布消息
 * @param channel 通道ID
 * @param message 消息
 */
@Async
public void convertAndSend(String channel, Object message) {
    redisTemplate.convertAndSend(channel, message);
}

發(fā)布消息的工具類:

import com.alibaba.fastjson2.JSONObject;
import com.soc.dmoasp.common.core.constant.CacheConstants;
import com.soc.dmoasp.common.redis.dto.NettyMessage;
import com.soc.dmoasp.common.redis.dto.PushSocketMsgDTO;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.Set;

/**
 * Netty發(fā)送消息給服務端
 * @author dsw
 * @date 2024/11/18 11:47
 */
@Service
public class PushSocketMsgService {

    Logger log = org.slf4j.LoggerFactory.getLogger(PushSocketMsgService.class);

    @Autowired
    private RedisTemplate<String, Object> stringObjectRedisTemplate;

    @Autowired
    private RedisService redisService;

    /**
     * 給所有用戶發(fā)送消息
     * @param msgDTO
     */
    public void pushMsgToAll( PushSocketMsgDTO msgDTO) {
        Set<Object> keys = stringObjectRedisTemplate.opsForHash().keys(CacheConstants.getUserChannelKey());
        keys.forEach(key -> this.pushMsgToUser(msgDTO.getCode(), key.toString(), msgDTO.getMessage()));
    }

    /**
     * 給指定用戶發(fā)送消息
     * @param msgDTO
     */
    public void pushMsgToUserList(PushSocketMsgDTO msgDTO) {
        for(Long userId : msgDTO.getUserIdList()){
            this.pushMsgToUser(msgDTO.getCode(), userId.toString(), msgDTO.getMessage());
        }
    }

    protected void pushMsgToUser(Integer code, String userId, JSONObject message) {
        //推送到其他負載處理
        NettyMessage nettyMessage = new NettyMessage();
        nettyMessage.setUserId(userId);
        nettyMessage.setCode(code);
        nettyMessage.setMessage(message);
        redisService.convertAndSend(CacheConstants.Topic.SYS_SOCKET_PUSH_TOPIC, nettyMessage);
        log.info("推送消息成功! userId:{},message:{}", userId, message);
    }
}

NettyMessage和PushSocketMsgDTO:

import com.alibaba.fastjson2.JSONObject;
import lombok.Data;

/**
 * Neety消息發(fā)布VO
 * @author dsw
 * @date 2024/11/18 13:49
 */
@Data
public class NettyMessage {

    private Integer code;

    private String userId;

    private JSONObject message;

}
import com.alibaba.fastjson2.JSONObject;
import lombok.Data;

/**
 * 發(fā)送socket消息DTO
 * @author dsw
 * @date 2024/11/18 13:39
 */
@Data
public class PushSocketMsgDTO {
    /**
     * 消息類型
     * 詳情看 NettyMsgEnum
     */
    private Integer code;
    /**
     * 用戶ID
     */
    private List<Long> userIdList;
    /**
     * 消息體
     */
    private JSONObject message;
}

測試結果:

以上就是SpringCloud整合Netty集群實現(xiàn)WebSocket的示例代碼的詳細內(nèi)容,更多關于SpringCloud Netty實現(xiàn)WebSocket的資料請關注腳本之家其它相關文章!

相關文章

  • Golang Protocol Buffer案例詳解

    Golang Protocol Buffer案例詳解

    這篇文章主要介紹了Golang Protocol Buffer案例詳解,本篇文章通過簡要的案例,講解了該項技術的了解與使用,以下就是詳細內(nèi)容,需要的朋友可以參考下
    2021-08-08
  • Java Scanner類用法及nextLine()產(chǎn)生的換行符問題實例分析

    Java Scanner類用法及nextLine()產(chǎn)生的換行符問題實例分析

    這篇文章主要介紹了Java Scanner類用法及nextLine()產(chǎn)生的換行符問題,結合實例形式分析了Scanner類功能、hasNextInt()和nextInt()方法使用及nextLine()產(chǎn)生的換行符問題解決方法,需要的朋友可以參考下
    2019-03-03
  • AbstractProcessor擴展MapStruct自動生成實體映射工具類

    AbstractProcessor擴展MapStruct自動生成實體映射工具類

    這篇文章主要為大家介紹了AbstractProcessor擴展MapStruct自動生成實體映射工具類實現(xiàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-01-01
  • 一文帶你看懂SpringBoot中的全局配置文件

    一文帶你看懂SpringBoot中的全局配置文件

    這篇文章主要介紹了一文帶你看懂SpringBoot中的全局配置文件,全局配置文件能夠對一些默認配置值進行修改,Spring Boot使用一個application.properties或者application.yaml的文件作為全局配置文件,需要的朋友可以參考下
    2023-08-08
  • Java?IO網(wǎng)絡模型實現(xiàn)解析

    Java?IO網(wǎng)絡模型實現(xiàn)解析

    這篇文章主要為大家介紹了Java?IO網(wǎng)絡模型實現(xiàn)解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-03-03
  • Mybatis-Plus或PageHelper多表分頁查詢總條數(shù)不對問題的解決方法

    Mybatis-Plus或PageHelper多表分頁查詢總條數(shù)不對問題的解決方法

    PageHelper 這個插件用了很多次了,今天使用的時候才遇到一個問題,這篇文章主要給大家介紹了關于Mybatis-Plus或PageHelper多表分頁查詢總條數(shù)不對問題的解決方法,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下
    2022-08-08
  • SpringBoot整合Mybatis與druid實現(xiàn)流程詳解

    SpringBoot整合Mybatis與druid實現(xiàn)流程詳解

    這篇文章主要介紹了springboot整合mybatis plus與druid詳情,文章圍繞主題展開詳細的內(nèi)容介紹,具有一定的參考價值,需要的下伙伴可以參考一下
    2022-10-10
  • MyBatis如何進行雙重foreach循環(huán)

    MyBatis如何進行雙重foreach循環(huán)

    這篇文章主要介紹了MyBatis如何進行雙重foreach循環(huán),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-02-02
  • SpringBoot+JPA?分頁查詢指定列并返回指定實體方式

    SpringBoot+JPA?分頁查詢指定列并返回指定實體方式

    這篇文章主要介紹了SpringBoot+JPA?分頁查詢指定列并返回指定實體方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-12-12
  • springboot+jwt實現(xiàn)token登陸權限認證的實現(xiàn)

    springboot+jwt實現(xiàn)token登陸權限認證的實現(xiàn)

    這篇文章主要介紹了springboot+jwt實現(xiàn)token登陸權限認證的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-06-06

最新評論