欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

SpringBoot整合WebSocket的客戶端和服務端的實現(xiàn)代碼

 更新時間:2022年07月08日 08:30:58   作者:憤青程序猿  
這篇文章主要介紹了SpringBoot整合WebSocket的客戶端和服務端的實現(xiàn),本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下

本文是項目中使用了websocket進行一些數(shù)據(jù)的推送,對比項目做了一個demo,ws的相關問題不做細數(shù),僅做一下記錄。

此demo針對ws的搭建主要邏輯背景是一個服務端B:通訊層 產(chǎn)生消息推送出去,另外一個項目A充當客戶端和服務端,A的客戶端:是接收通訊層去無差別接收這些消息,A的服務端:根據(jù)地址ip去訂閱。用戶通過訂閱A的ws,同時記錄下自己的信息,項目B推送的消息,項目A接收到之后通過當初訂閱的邏輯和一些權限過濾條件對項目B產(chǎn)生的消息進行過濾再推送到用戶客戶端上。

一、項目中服務端的創(chuàng)建

首先引入maven倉庫

 <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

websocket的服務端搭建

同時注意springboot要開啟ws服務

啟動類加上@EnableScheduling

簡要解讀demo

/webSocket/{id}:鏈接的id是業(yè)務上的一個id,這邊之前做過類似拍賣的,相當于一個服務端或者業(yè)務上的一個標識,是客戶端指明鏈接到哪一個拍賣間的標識

@ServerEndpoint:作為服務端的注解。

package com.ghh.myproject.websocket;
import cn.hutool.core.lang.UUID;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ServerEndpoint("/webSocket/{id}")
@Component
public class WebSocket {
    private Logger log = LoggerFactory.getLogger(WebSocket.class);
    private static int onlineCount = 0;
    /** 創(chuàng)建一個map存放   產(chǎn)生的ws鏈接推送 */
    private static Map<String, WebSocket> clients = new ConcurrentHashMap<>();
    /** 創(chuàng)建一個map存放   當前接入的客戶端 */
    private static Map<String, String> idMap = new ConcurrentHashMap<>();
    
    private Session session;
    /** 鏈接進入的一個場景id */
    private String id;
    /** 每一個鏈接的一個唯一標識 */
    private String userNo;
    /**
    * @Description: 第三方文接入當前項目websocket后的記錄信息
    * @DateTime: 2021/7/5 10:02
    * @Author: GHH
    * @Params: [id, session]
    * @Return void
    */
    @OnOpen
    public void onOpen(@PathParam("id") String id, Session session) throws IOException {
        log.info("已連接到id:{}競拍場,當前競拍場人數(shù):{}", id, getUserNosById(id).size());
        this.id = id;
        this.session = session;
        // 生成一個隨機序列號來存儲一個id下的所有用戶
        this.userNo = UUID.fastUUID().toString();
        addOnlineCount();
        //根據(jù)隨機序列號存儲一個socket連接
        clients.put(userNo, this);
        idMap.put(userNo, id);
    }
    /**
    * @Description: 關閉連接
    * @DateTime: 2021/7/5 10:02
    * @Author: GHH
    * @Params: []
    * @Return void
    */
    @OnClose
    public void onClose() throws IOException {
        clients.remove(userNo);
        idMap.remove(userNo);
        subOnlineCount();
    }
    /**
    * @Description: 客戶端發(fā)送消息調(diào)用此方法
    * @DateTime: 2021/6/16 15:35
    * @Author: GHH
    * @Params: [message]
    * @Return void
    */
    @OnMessage
    public void onMessage(String message) throws IOException {
//        JSONObject jsonTo = JSONObject.parseObject(message);
//        String mes = (String) jsonTo.get("message");
//        if (!("All").equals(jsonTo.get("To"))) {
//            sendMessageTo(mes, jsonTo.get("To").toString());
//        } else {
//            sendMessageAll(message);
//        }
        log.info("onMessage方法成功");
    }
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("{}", error);
    }
    public static void sendMessageTo(String message, String userNo) throws IOException {
        // session.getBasicRemote().sendText(message);
        //session.getAsyncRemote().sendText(message);
        WebSocket webSocket = clients.get(userNo);
        if (webSocket != null && webSocket.session.isOpen()) {
            webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message));
        }
    }
    /**
    * @Description: 推送到指定的id值的記錄
    * @DateTime: 2021/6/15 17:11
    * @Author: GHH
    * @Params: [message, id]
    * @Return void
    */
    public static void sendMessageToById(String message, String id) {
        // session.getBasicRemote().sendText(message);
        //session.getAsyncRemote().sendText(message);
        //根據(jù)id獲取所有的userNo鏈接的用戶
        List<String> userNos = getUserNosById(id);
        for (WebSocket item : clients.values()) {
            //遍歷鏈接的value值,如果當前傳入的id中鏈接的用戶包含value值,則推送。
            if (userNos.contains(item.userNo)) {
                item.session.getAsyncRemote().sendText(message);
            }
        }
    }
    /**
    * @Description: 推送所有開啟的信息
    * @DateTime: 2021/6/15 17:13
    * @Author: GHH
    * @Params: [message]
    * @Return void
    */
    public static void sendMessageAll(String message){
        for (WebSocket item : clients.values()) {
            item.session.getAsyncRemote().sendText(message);
        }
    }
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }
    public static synchronized void addOnlineCount() {
        WebSocket.onlineCount++;
    }
    public static synchronized void subOnlineCount() {
        WebSocket.onlineCount--;
    }
    public static synchronized Map<String, WebSocket> getClients() {
        return clients;
    }
    /**
    * @Description: 根據(jù)相應場景的一些邏輯處理
    * @DateTime: 2021/7/5 10:03
    * @Author: GHH
    * @Params: [id]
    * @Return java.util.List<java.lang.String>
    */
    public static List<String> getUserNosById(String id) {
        ArrayList<String> userNos = new ArrayList<>();
        for (Map.Entry<String, String> entry : idMap.entrySet()) {
            if (entry.getValue().equals(id)) {
                userNos.add(entry.getKey());
            }
        }
        return userNos;
    }
}

 demo中模擬的是定時器推送,第一個參數(shù)是消息內(nèi)容,第二個是推送到哪一個拍賣間或者其他業(yè)務上的內(nèi)容。方法的具體內(nèi)容上一段代碼有詳細解釋,有通過id,或者發(fā)送給全部ws鏈接的客戶端

WebSocket.sendMessageToById(""+count,2+"");
@Scheduled(cron = "*/5 * * * * ?")
    public void job1(){
        log.info("測試生成次數(shù):{}",count);
        redisTemplate.opsForValue().set("測試"+count, ""+count++);
        if (count%2==0){
            WebSocket.sendMessageToById(""+count,2+"");
        }else {
            WebSocket.sendMessageToById(""+count,1+"");
        }

        log.info("websocket發(fā)送"+count);
    }

二、java充當客戶端鏈接ws

上述是java作為ws服務端推送當前業(yè)務信息的一個demo。我們項目目前做的是一個通訊層的概念,只能夠推送數(shù)據(jù)內(nèi)容,卻無法根據(jù)用戶權限去推送不同的數(shù)據(jù)。

ws客戶端的搭建,首先鏈接ws服務端。首先是我們另外一個服務的ws配置信息,我這邊demo是模擬鏈接上面的ws服務

1、ws客戶端的配置

package com.ghh.websocketRecive.wsMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.ContainerProvider;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import java.net.URI;
/**
 * @author ghh
 * @date 2019-08-16 16:02
 */
@Component
@Slf4j
public class WSClient {
    public static Session session;
    public static void startWS() {
        try {
            if (WSClient.session != null) {
                WSClient.session.close();
            }
            WebSocketContainer container = ContainerProvider.getWebSocketContainer();
            //設置消息大小最大為10M
            container.setDefaultMaxBinaryMessageBufferSize(10*1024*1024);
            container.setDefaultMaxTextMessageBufferSize(10*1024*1024);
            // 客戶端,開啟服務端websocket。
            String uri = "ws://192.168.0.108:8082/webSocket/1";
            Session session = container.connectToServer(WSHandler.class, URI.create(uri));
            WSClient.session = session;
        } catch (Exception ex) {
            log.info(ex.getMessage());
        }
    }
}

2、配置信息需要在項目啟動的時候去啟用和鏈接ws服務

package com.ghh.websocketRecive;
import com.ghh.websocketRecive.wsMessage.WSClient;
import lombok.extern.slf4j.Slf4j;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
import javax.annotation.PostConstruct;
@Slf4j
@EnableScheduling
@SpringBootApplication
@MapperScan("com.ghh.websocketRecive.dao")
public class WebsocketReciveApplication {
    public static void main(String[] args) {
        SpringApplication.run(WebsocketReciveApplication.class, args);
    }
    @PostConstruct
    public void init(){
        log.info("初始化應用程序");     // 初始化ws,鏈接服務端
        WSClient.startWS();
    }
}

3、接收服務端推送的消息進行權限過濾demo

@ClientEndpoint:作為ws的客戶端注解,@OnMessage接收服務端推送的消息。

package com.ghh.websocketRecive.wsMessage;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.ghh.websocketRecive.entity.Student;
import com.ghh.websocketRecive.service.UserService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import java.util.Objects;
import java.util.Set;
import static com.ghh.websocketRecive.wsMessage.WSClient.startWS;
@ClientEndpoint
@Slf4j
@Component
public class WSHandler {
    @Autowired
    RedisTemplate<String,String> redisTemplate;
    private static RedisTemplate<String,String> redisTemplateService;
    @PostConstruct
    public void init() {
        redisTemplateService=redisTemplate;
    }
    @OnOpen
    public void onOpen(Session session) {
        WSClient.session = session;
    }
    @OnMessage
    public void processMessage(String message) {
        log.info("websocketRecive接收推送消息"+message);
        int permission = Integer.parseInt(message)%5;
        //查詢所有訂閱的客戶端的ip。
        Set<String> keys = redisTemplateService.keys("ip:*");
        for (String key : keys) {
            // 根據(jù)登錄后存儲的客戶端ip,獲取權限地址
            String s = redisTemplateService.opsForValue().get(key);
            String[] split = s.split(",");
            for (String s1 : split) {
                //向含有推送過來的數(shù)據(jù)權限地址的客戶端推送告警數(shù)據(jù)。
                if (s1.equals(permission+"")){
                    WebSocket.sendMessageToByIp(message,key.split(":")[1]);
                }
            }
        }
    }
    @OnError
    public void processError(Throwable t) {
        WSClient.session = null;
        try {
            Thread.sleep(5000);
            startWS();
        } catch (InterruptedException e) {
            log.error("---websocket processError InterruptedException---", e);
        }
        log.error("---websocket processError error---", t);
    }
    @OnClose
    public void processClose(Session session, CloseReason closeReason) {
        log.error(session.getId() + closeReason.toString());
    }
    public void send(String sessionId, String message) {
        try {
            log.info("send Msg:" + message);
            if (Objects.nonNull(WSClient.session)) {
                WSClient.session.getBasicRemote().sendText(message);
            } else {
                log.info("---websocket error----");
            }
        } catch (Exception e) {
            log.error("---websocket send error---", e);
        }
    }
}

4、ws客戶端推送消息,推送消息和上面服務端類似。

這邊是根據(jù)ip

package com.ghh.websocketRecive.wsMessage;
import cn.hutool.core.lang.UUID;
import com.alibaba.fastjson.JSON;
import com.ghh.websocketRecive.service.UserService;
import lombok.Builder;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ServerEndpoint("/webSocket/{ip}")
@Component
public class WebSocket {
    private Logger log = LoggerFactory.getLogger(WebSocket.class);
    private static int onlineCount = 0;
    private static Map<String, WebSocket> clients = new ConcurrentHashMap<>();
    private Session session;
    /** 當前連接服務端的客戶端ip */
    private String ip;
    @Autowired
    RedisTemplate<String,String> redisTemplate;
    private static RedisTemplate<String,String> redisTemplateService;
    @PostConstruct
    public void init() {
        redisTemplateService = redisTemplate;
    }
    @OnOpen
    public void onOpen(@PathParam("ip") String ip, Session session) throws IOException {
        log.info("ip:{}客戶端已連接:,當前客戶端數(shù)量:{}", ip, onlineCount+1);
        this.ip = ip;
        this.session = session;
        // 接入一個websocket則生成一個隨機序列號
        addOnlineCount();
        //根據(jù)隨機序列號存儲一個socket連接
        clients.put(ip, this);
    }
    @OnClose
    public void onClose() throws IOException {
        clients.remove(ip);
        onlineCount--;
        subOnlineCount();
    }
    /**
    * @Description: 客戶端發(fā)送消息調(diào)用此方法
    * @DateTime: 2021/6/16 15:35
    * @Author: GHH
    * @Params: [message]
    * @Return void
    */
    @OnMessage
    public void onMessage(String message) throws IOException {
        log.info("客戶端發(fā)送消onMessage方法成功");
    }
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("{}", error);
    }
    public static void sendMessageTo(String message, String userNo) throws IOException {
        WebSocket webSocket = clients.get(userNo);
        if (webSocket != null && webSocket.session.isOpen()) {
            webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message));
        }
    }
    /**
    * @Description: 推送到指定的ip值的記錄
    * @DateTime: 2021/6/15 17:11
    * @Author: GHH
    * @Params: [message, id]
    * @Return void
    */
    public static void sendMessageToByIp(String message, String ip) {
        for (WebSocket item : clients.values()) {
            //遍歷鏈接的value值,如果當前傳入的ip中鏈接的用戶包含value值,則推送。
            if (item.ip.equals(ip)) {
                item.session.getAsyncRemote().sendText(message);
            }
        }
    }
    /**
    * @Description: 推送所有開啟的信息
    * @DateTime: 2021/6/15 17:13
    * @Author: GHH
    * @Params: [message]
    * @Return void
    */
    public static void sendMessageAll(String message){
        for (WebSocket item : clients.values()) {
            item.session.getAsyncRemote().sendText(message);
        }
    }
    public static synchronized int getOnlineCount() {
        return onlineCount;
    }
    public static synchronized void addOnlineCount() {
        WebSocket.onlineCount++;
    }
    public static synchronized void subOnlineCount() {
        WebSocket.onlineCount--;
    }
    public static synchronized Map<String, WebSocket> getClients() {
        return clients;
    }
}

概述:

至此,簡易的demo搭建完成,項目gitee網(wǎng)址:https://gitee.com/ghhNB/study.git

到此這篇關于SpringBoot整合WebSocket的客戶端和服務端的實現(xiàn)的文章就介紹到這了,更多相關SpringBoot整合WebSocket內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Java之String、StringBuffer、StringBuilder的區(qū)別分析

    Java之String、StringBuffer、StringBuilder的區(qū)別分析

    今天搞安卓在看書的時候遇到了StringBuilder這個類型的東東,有點小迷,不知道它跟string、stringbuffer的關系式怎么樣的,趕快查閱相關資料,了解了個大概,拿出來分享一下
    2012-11-11
  • Java基于二維數(shù)組實現(xiàn)的數(shù)獨問題示例

    Java基于二維數(shù)組實現(xiàn)的數(shù)獨問題示例

    這篇文章主要介紹了Java基于二維數(shù)組實現(xiàn)的數(shù)獨問題,涉及java針對數(shù)組的遍歷、計算、轉(zhuǎn)換等相關操作技巧,需要的朋友可以參考下
    2018-01-01
  • Springboot配置security basic path無效解決方案

    Springboot配置security basic path無效解決方案

    這篇文章主要介紹了Springboot配置security basic path無效解決方案,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下
    2020-09-09
  • JAVA 枚舉相關知識匯總

    JAVA 枚舉相關知識匯總

    這篇文章主要介紹了JAVA 枚舉相關知識,文中講解的非常詳細,代碼幫助大家更好的參考和學習,感興趣的朋友可以了解下
    2020-06-06
  • SpringBoot構造器注入循環(huán)依賴及解決方案

    SpringBoot構造器注入循環(huán)依賴及解決方案

    這篇文章主要介紹了SpringBoot構造器注入循環(huán)依賴及解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-03-03
  • jdbc連接oracle數(shù)據(jù)庫功能示例

    jdbc連接oracle數(shù)據(jù)庫功能示例

    這篇文章主要介紹了jdbc連接oracle數(shù)據(jù)庫功能,結合實例形式詳細分析了java基于jdbc連接Oracle數(shù)據(jù)庫的具體操作步驟與相關實現(xiàn)技巧,需要的朋友可以參考下
    2017-01-01
  • Java 多線程Synchronized和Lock的區(qū)別

    Java 多線程Synchronized和Lock的區(qū)別

    這篇文章主要介紹了Java 多線程Synchronized和Lock的區(qū)別,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下
    2021-01-01
  • SpringBoot如何接收Post請求Body里面的參數(shù)

    SpringBoot如何接收Post請求Body里面的參數(shù)

    這篇文章主要介紹了SpringBoot如何接收Post請求Body里面的參數(shù),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-03-03
  • Spring如何通過注解存儲和讀取對象詳解

    Spring如何通過注解存儲和讀取對象詳解

    在Spring中,要想更簡單的存儲和讀取對象的核心是使用注解,這篇文章主要給大家介紹了關于Spring如何通過注解存儲和讀取對象的相關資料,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下
    2022-07-07
  • Mybatis-Plus批量添加或修改數(shù)據(jù)的3種方式總結

    Mybatis-Plus批量添加或修改數(shù)據(jù)的3種方式總結

    使用Mybatis-plus可以很方便的實現(xiàn)批量新增和批量修改,不僅比自己寫foreach遍歷方便很多,而且性能也更加優(yōu)秀,下面這篇文章主要給大家介紹了關于Mybatis-Plus批量添加或修改數(shù)據(jù)的3種方式,需要的朋友可以參考下
    2023-05-05

最新評論