SpringBoot整合WebSocket的客戶端和服務端的實現(xiàn)代碼
本文是項目中使用了websocket進行一些數(shù)據(jù)的推送,對比項目做了一個demo,ws的相關問題不做細數(shù),僅做一下記錄。
此demo針對ws的搭建主要邏輯背景是一個服務端B:通訊層 產(chǎn)生消息推送出去,另外一個項目A充當客戶端和服務端,A的客戶端:是接收通訊層去無差別接收這些消息,A的服務端:根據(jù)地址ip去訂閱。用戶通過訂閱A的ws,同時記錄下自己的信息,項目B推送的消息,項目A接收到之后通過當初訂閱的邏輯和一些權限過濾條件對項目B產(chǎn)生的消息進行過濾再推送到用戶客戶端上。
一、項目中服務端的創(chuàng)建
首先引入maven倉庫
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
websocket的服務端搭建
同時注意springboot要開啟ws服務
啟動類加上@EnableScheduling
簡要解讀demo
/webSocket/{id}:鏈接的id是業(yè)務上的一個id,這邊之前做過類似拍賣的,相當于一個服務端或者業(yè)務上的一個標識,是客戶端指明鏈接到哪一個拍賣間的標識
@ServerEndpoint:作為服務端的注解。
package com.ghh.myproject.websocket; import cn.hutool.core.lang.UUID; import com.alibaba.fastjson.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @ServerEndpoint("/webSocket/{id}") @Component public class WebSocket { private Logger log = LoggerFactory.getLogger(WebSocket.class); private static int onlineCount = 0; /** 創(chuàng)建一個map存放 產(chǎn)生的ws鏈接推送 */ private static Map<String, WebSocket> clients = new ConcurrentHashMap<>(); /** 創(chuàng)建一個map存放 當前接入的客戶端 */ private static Map<String, String> idMap = new ConcurrentHashMap<>(); private Session session; /** 鏈接進入的一個場景id */ private String id; /** 每一個鏈接的一個唯一標識 */ private String userNo; /** * @Description: 第三方文接入當前項目websocket后的記錄信息 * @DateTime: 2021/7/5 10:02 * @Author: GHH * @Params: [id, session] * @Return void */ @OnOpen public void onOpen(@PathParam("id") String id, Session session) throws IOException { log.info("已連接到id:{}競拍場,當前競拍場人數(shù):{}", id, getUserNosById(id).size()); this.id = id; this.session = session; // 生成一個隨機序列號來存儲一個id下的所有用戶 this.userNo = UUID.fastUUID().toString(); addOnlineCount(); //根據(jù)隨機序列號存儲一個socket連接 clients.put(userNo, this); idMap.put(userNo, id); } /** * @Description: 關閉連接 * @DateTime: 2021/7/5 10:02 * @Author: GHH * @Params: [] * @Return void */ @OnClose public void onClose() throws IOException { clients.remove(userNo); idMap.remove(userNo); subOnlineCount(); } /** * @Description: 客戶端發(fā)送消息調(diào)用此方法 * @DateTime: 2021/6/16 15:35 * @Author: GHH * @Params: [message] * @Return void */ @OnMessage public void onMessage(String message) throws IOException { // JSONObject jsonTo = JSONObject.parseObject(message); // String mes = (String) jsonTo.get("message"); // if (!("All").equals(jsonTo.get("To"))) { // sendMessageTo(mes, jsonTo.get("To").toString()); // } else { // sendMessageAll(message); // } log.info("onMessage方法成功"); } @OnError public void onError(Session session, Throwable error) { log.error("{}", error); } public static void sendMessageTo(String message, String userNo) throws IOException { // session.getBasicRemote().sendText(message); //session.getAsyncRemote().sendText(message); WebSocket webSocket = clients.get(userNo); if (webSocket != null && webSocket.session.isOpen()) { webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message)); } } /** * @Description: 推送到指定的id值的記錄 * @DateTime: 2021/6/15 17:11 * @Author: GHH * @Params: [message, id] * @Return void */ public static void sendMessageToById(String message, String id) { // session.getBasicRemote().sendText(message); //session.getAsyncRemote().sendText(message); //根據(jù)id獲取所有的userNo鏈接的用戶 List<String> userNos = getUserNosById(id); for (WebSocket item : clients.values()) { //遍歷鏈接的value值,如果當前傳入的id中鏈接的用戶包含value值,則推送。 if (userNos.contains(item.userNo)) { item.session.getAsyncRemote().sendText(message); } } } /** * @Description: 推送所有開啟的信息 * @DateTime: 2021/6/15 17:13 * @Author: GHH * @Params: [message] * @Return void */ public static void sendMessageAll(String message){ for (WebSocket item : clients.values()) { item.session.getAsyncRemote().sendText(message); } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocket.onlineCount++; } public static synchronized void subOnlineCount() { WebSocket.onlineCount--; } public static synchronized Map<String, WebSocket> getClients() { return clients; } /** * @Description: 根據(jù)相應場景的一些邏輯處理 * @DateTime: 2021/7/5 10:03 * @Author: GHH * @Params: [id] * @Return java.util.List<java.lang.String> */ public static List<String> getUserNosById(String id) { ArrayList<String> userNos = new ArrayList<>(); for (Map.Entry<String, String> entry : idMap.entrySet()) { if (entry.getValue().equals(id)) { userNos.add(entry.getKey()); } } return userNos; } }
demo中模擬的是定時器推送,第一個參數(shù)是消息內(nèi)容,第二個是推送到哪一個拍賣間或者其他業(yè)務上的內(nèi)容。方法的具體內(nèi)容上一段代碼有詳細解釋,有通過id,或者發(fā)送給全部ws鏈接的客戶端
WebSocket.sendMessageToById(""+count,2+"");
@Scheduled(cron = "*/5 * * * * ?") public void job1(){ log.info("測試生成次數(shù):{}",count); redisTemplate.opsForValue().set("測試"+count, ""+count++); if (count%2==0){ WebSocket.sendMessageToById(""+count,2+""); }else { WebSocket.sendMessageToById(""+count,1+""); } log.info("websocket發(fā)送"+count); }
二、java充當客戶端鏈接ws
上述是java作為ws服務端推送當前業(yè)務信息的一個demo。我們項目目前做的是一個通訊層的概念,只能夠推送數(shù)據(jù)內(nèi)容,卻無法根據(jù)用戶權限去推送不同的數(shù)據(jù)。
ws客戶端的搭建,首先鏈接ws服務端。首先是我們另外一個服務的ws配置信息,我這邊demo是模擬鏈接上面的ws服務
1、ws客戶端的配置
package com.ghh.websocketRecive.wsMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.websocket.ContainerProvider; import javax.websocket.Session; import javax.websocket.WebSocketContainer; import java.net.URI; /** * @author ghh * @date 2019-08-16 16:02 */ @Component @Slf4j public class WSClient { public static Session session; public static void startWS() { try { if (WSClient.session != null) { WSClient.session.close(); } WebSocketContainer container = ContainerProvider.getWebSocketContainer(); //設置消息大小最大為10M container.setDefaultMaxBinaryMessageBufferSize(10*1024*1024); container.setDefaultMaxTextMessageBufferSize(10*1024*1024); // 客戶端,開啟服務端websocket。 String uri = "ws://192.168.0.108:8082/webSocket/1"; Session session = container.connectToServer(WSHandler.class, URI.create(uri)); WSClient.session = session; } catch (Exception ex) { log.info(ex.getMessage()); } } }
2、配置信息需要在項目啟動的時候去啟用和鏈接ws服務
package com.ghh.websocketRecive; import com.ghh.websocketRecive.wsMessage.WSClient; import lombok.extern.slf4j.Slf4j; import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; import javax.annotation.PostConstruct; @Slf4j @EnableScheduling @SpringBootApplication @MapperScan("com.ghh.websocketRecive.dao") public class WebsocketReciveApplication { public static void main(String[] args) { SpringApplication.run(WebsocketReciveApplication.class, args); } @PostConstruct public void init(){ log.info("初始化應用程序"); // 初始化ws,鏈接服務端 WSClient.startWS(); } }
3、接收服務端推送的消息進行權限過濾demo
@ClientEndpoint:作為ws的客戶端注解,@OnMessage接收服務端推送的消息。
package com.ghh.websocketRecive.wsMessage; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.ghh.websocketRecive.entity.Student; import com.ghh.websocketRecive.service.UserService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.websocket.*; import java.util.Objects; import java.util.Set; import static com.ghh.websocketRecive.wsMessage.WSClient.startWS; @ClientEndpoint @Slf4j @Component public class WSHandler { @Autowired RedisTemplate<String,String> redisTemplate; private static RedisTemplate<String,String> redisTemplateService; @PostConstruct public void init() { redisTemplateService=redisTemplate; } @OnOpen public void onOpen(Session session) { WSClient.session = session; } @OnMessage public void processMessage(String message) { log.info("websocketRecive接收推送消息"+message); int permission = Integer.parseInt(message)%5; //查詢所有訂閱的客戶端的ip。 Set<String> keys = redisTemplateService.keys("ip:*"); for (String key : keys) { // 根據(jù)登錄后存儲的客戶端ip,獲取權限地址 String s = redisTemplateService.opsForValue().get(key); String[] split = s.split(","); for (String s1 : split) { //向含有推送過來的數(shù)據(jù)權限地址的客戶端推送告警數(shù)據(jù)。 if (s1.equals(permission+"")){ WebSocket.sendMessageToByIp(message,key.split(":")[1]); } } } } @OnError public void processError(Throwable t) { WSClient.session = null; try { Thread.sleep(5000); startWS(); } catch (InterruptedException e) { log.error("---websocket processError InterruptedException---", e); } log.error("---websocket processError error---", t); } @OnClose public void processClose(Session session, CloseReason closeReason) { log.error(session.getId() + closeReason.toString()); } public void send(String sessionId, String message) { try { log.info("send Msg:" + message); if (Objects.nonNull(WSClient.session)) { WSClient.session.getBasicRemote().sendText(message); } else { log.info("---websocket error----"); } } catch (Exception e) { log.error("---websocket send error---", e); } } }
4、ws客戶端推送消息,推送消息和上面服務端類似。
這邊是根據(jù)ip
package com.ghh.websocketRecive.wsMessage; import cn.hutool.core.lang.UUID; import com.alibaba.fastjson.JSON; import com.ghh.websocketRecive.service.UserService; import lombok.Builder; import lombok.Data; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @ServerEndpoint("/webSocket/{ip}") @Component public class WebSocket { private Logger log = LoggerFactory.getLogger(WebSocket.class); private static int onlineCount = 0; private static Map<String, WebSocket> clients = new ConcurrentHashMap<>(); private Session session; /** 當前連接服務端的客戶端ip */ private String ip; @Autowired RedisTemplate<String,String> redisTemplate; private static RedisTemplate<String,String> redisTemplateService; @PostConstruct public void init() { redisTemplateService = redisTemplate; } @OnOpen public void onOpen(@PathParam("ip") String ip, Session session) throws IOException { log.info("ip:{}客戶端已連接:,當前客戶端數(shù)量:{}", ip, onlineCount+1); this.ip = ip; this.session = session; // 接入一個websocket則生成一個隨機序列號 addOnlineCount(); //根據(jù)隨機序列號存儲一個socket連接 clients.put(ip, this); } @OnClose public void onClose() throws IOException { clients.remove(ip); onlineCount--; subOnlineCount(); } /** * @Description: 客戶端發(fā)送消息調(diào)用此方法 * @DateTime: 2021/6/16 15:35 * @Author: GHH * @Params: [message] * @Return void */ @OnMessage public void onMessage(String message) throws IOException { log.info("客戶端發(fā)送消onMessage方法成功"); } @OnError public void onError(Session session, Throwable error) { log.error("{}", error); } public static void sendMessageTo(String message, String userNo) throws IOException { WebSocket webSocket = clients.get(userNo); if (webSocket != null && webSocket.session.isOpen()) { webSocket.session.getAsyncRemote().sendText(JSON.toJSONString(message)); } } /** * @Description: 推送到指定的ip值的記錄 * @DateTime: 2021/6/15 17:11 * @Author: GHH * @Params: [message, id] * @Return void */ public static void sendMessageToByIp(String message, String ip) { for (WebSocket item : clients.values()) { //遍歷鏈接的value值,如果當前傳入的ip中鏈接的用戶包含value值,則推送。 if (item.ip.equals(ip)) { item.session.getAsyncRemote().sendText(message); } } } /** * @Description: 推送所有開啟的信息 * @DateTime: 2021/6/15 17:13 * @Author: GHH * @Params: [message] * @Return void */ public static void sendMessageAll(String message){ for (WebSocket item : clients.values()) { item.session.getAsyncRemote().sendText(message); } } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocket.onlineCount++; } public static synchronized void subOnlineCount() { WebSocket.onlineCount--; } public static synchronized Map<String, WebSocket> getClients() { return clients; } }
概述:
至此,簡易的demo搭建完成,項目gitee網(wǎng)址:https://gitee.com/ghhNB/study.git
到此這篇關于SpringBoot整合WebSocket的客戶端和服務端的實現(xiàn)的文章就介紹到這了,更多相關SpringBoot整合WebSocket內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Java之String、StringBuffer、StringBuilder的區(qū)別分析
今天搞安卓在看書的時候遇到了StringBuilder這個類型的東東,有點小迷,不知道它跟string、stringbuffer的關系式怎么樣的,趕快查閱相關資料,了解了個大概,拿出來分享一下2012-11-11Java基于二維數(shù)組實現(xiàn)的數(shù)獨問題示例
這篇文章主要介紹了Java基于二維數(shù)組實現(xiàn)的數(shù)獨問題,涉及java針對數(shù)組的遍歷、計算、轉(zhuǎn)換等相關操作技巧,需要的朋友可以參考下2018-01-01Springboot配置security basic path無效解決方案
這篇文章主要介紹了Springboot配置security basic path無效解決方案,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-09-09SpringBoot構造器注入循環(huán)依賴及解決方案
這篇文章主要介紹了SpringBoot構造器注入循環(huán)依賴及解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-03-03Java 多線程Synchronized和Lock的區(qū)別
這篇文章主要介紹了Java 多線程Synchronized和Lock的區(qū)別,幫助大家更好的理解和使用Java,感興趣的朋友可以了解下2021-01-01SpringBoot如何接收Post請求Body里面的參數(shù)
這篇文章主要介紹了SpringBoot如何接收Post請求Body里面的參數(shù),具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03Mybatis-Plus批量添加或修改數(shù)據(jù)的3種方式總結
使用Mybatis-plus可以很方便的實現(xiàn)批量新增和批量修改,不僅比自己寫foreach遍歷方便很多,而且性能也更加優(yōu)秀,下面這篇文章主要給大家介紹了關于Mybatis-Plus批量添加或修改數(shù)據(jù)的3種方式,需要的朋友可以參考下2023-05-05