redis做websocket分布式消息推送服務的實現(xiàn)
應用場景說明:
由于redis并非專業(yè)的MQ中間件,消息的防丟失策略并不完整,存在丟失消息的可能。該方案為在再pc web管理平臺的右下角彈出,顯示新接收到的消息數(shù),哪怕沒有收到這個通知,也可以自己在消息中心看看。所以對可靠性要求不高。如果業(yè)務場景要求可靠性高,還是請使用專業(yè)的MQ中間件。該方案已在多個實際項目中運行。
流程架構(gòu):
websocket實現(xiàn)同一賬戶多點登錄、websocket服務多節(jié)點部署推送方案。
簡單架構(gòu)圖
假設用戶A在兩個地方登錄,連接到兩個websocketServer服務節(jié)點1和2,用戶B連接到2節(jié)點。
websocketServer將websocket session保存在各自的Map<String,Session>中,key為userid,value為websocket Session。節(jié)點1保存了用戶A的websocket session,節(jié)點2保存了用戶A、B的websocket session。
消息生產(chǎn)者發(fā)布消息的時候為json格式,如:[{"receive"="userid_a","msg"="您有1個未讀消息"},{"receive"="userid_b","msg"="您有3個未讀消息"}],將消息發(fā)到redis的一個Channel,如showNewestMsg。
websocketServer中訂閱redis的channel=showNewestMsg,收到消息后根據(jù)消息中receive沖map中找到對應的websocket session,發(fā)消息給客戶端。
核心代碼:
1.該項目為springboot項目,先引入jar包,由于是從實際項目中抽出來寫的記錄,可能還缺jar請自行導入。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--websocket--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <!-- redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- 工具類 --> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.3.6</version> </dependency> <dependency> <groupId>net.sf.json-lib</groupId> <artifactId>json-lib</artifactId> <version>2.4</version> <classifier>jdk15</classifier> </dependency>
2.websocket配置
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * spring websocket組件初始化 * @author csf * */ //war包啟動tomcat7及以下版本要關閉@Configuration注解,否則將無法啟動websocket服務 @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
注意:war包啟動tomcat7及以下版本要關閉@Configuration注解,否則將無法啟動websocket服務。
3.websocket服務端實現(xiàn)
import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.PostConstruct; import javax.annotation.Resource; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import com.kingengine.plug.service.MessageService; import cn.hutool.core.util.StrUtil; import net.sf.json.JSONArray; import net.sf.json.JSONObject; /** * WebSocket服務類 * @author csf * @date 2020年8月10日 */ @ServerEndpoint("/websocket/{custId}") @Component public class WebSocketServer { @Resource private MessageService messageService; Logger log = LoggerFactory.getLogger(this.getClass()); // 當前在線連接數(shù) private static int onlineCount = 0; // 存放每個用戶對應的WebSocket連接對象,key為custId_HHmmss,確保一個登錄用戶只建立一個連接 private static Map<String, Session> webSocketSessionMap = new ConcurrentHashMap<String, Session>(); // 與某個客戶端的連接會話,需要通過它來給客戶端發(fā)送數(shù)據(jù) private Session session; // 接收用戶id private String custId = ""; private static WebSocketServer webSocketServer; // 通過@PostConstruct實現(xiàn)初始化bean之前進行的操作 @PostConstruct public void init() { // 初使化時將已靜態(tài)化的webSocketServer實例化 webSocketServer = this; webSocketServer.messageService = this.messageService; } /** * 連接建立成功調(diào)用的方法 * @param session 連接會話,由框架創(chuàng)建 * @param custId 用戶id, 為處理用戶多點登錄都能收到消息,需傳該格式custId_HHmmss * @author csf * @date 2020年8月10日 */ @OnOpen public void onOpen(Session session, @PathParam("custId") String custId) { if (!webSocketSessionMap.containsKey(custId)) { this.session = session; webSocketSessionMap.put(custId, session); addOnlineCount(); // 在線數(shù)加1 log.info("有新連接[{}]接入,當前websocket連接數(shù)為:{}", custId, getOnlineCount()); } this.custId = custId; try { // 第一次建立連接,推送消息給客戶端,只會執(zhí)行一次。后續(xù)的新消息由com.kingengine.plug.redis.RedisReceiver接收到redis訂閱消息推送 // 獲取未讀消息數(shù) // 由于前端傳進來的custId是有時間后綴的,查詢時需要去掉后綴。 String qryCustId = custId.split("_")[0]; JSONObject unreadMsg = webSocketServer.messageService.getUnreadCount(qryCustId); // 獲取最新消息 /* JSONObject newMsg = webSocketServer.messageService.getNewestMsg(qryCustId); // 發(fā)送消息 JSONArray msgArr = new JSONArray(); if (newMsg!=null) { msgArr.add(newMsg); }*/ JSONArray msgArr = new JSONArray(); msgArr.add(unreadMsg); sendMessage(custId, msgArr.toString()); } catch (Exception e) { log.error("客戶端連接websocket服務異常"); e.printStackTrace(); } } /** * 連接關閉調(diào)用的方法 */ @OnClose public void onClose(@PathParam("custId") String sessionKey) { if (webSocketSessionMap.containsKey(sessionKey)) { try { webSocketSessionMap.get(sessionKey).close(); webSocketSessionMap.remove(sessionKey); } catch (IOException e) { log.error("連接[{}]關閉失敗。", sessionKey); e.printStackTrace(); } subOnlineCount(); log.info("連接[{}]關閉,當前websocket連接數(shù):{}", sessionKey, onlineCount); } } /** * 接收客戶端發(fā)送的消息 * @param message 客戶端發(fā)送過來的消息 * @param session websocket會話 */ @OnMessage public void onMessage(String message, Session session) { log.info("收到來自客戶端" + custId + "的信息:" + message); } /** * 連接錯誤時觸發(fā) * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { try { session.close(); } catch (IOException e) { log.error("發(fā)生錯誤,連接[{}]關閉失敗。"); e.printStackTrace(); } // log.error("websocket發(fā)生錯誤"); // error.printStackTrace(); } /** * 給指定的客戶端推送消息,可單發(fā)和群發(fā) * @param sessionKeys 發(fā)送消息給目標客戶端sessionKey,多個逗號“,”隔開1234,2345... * @param message * @throws IOException * @author csf * @date 2020年8月11日 */ public void sendMessage(String sessionKeys, String message) { if (StrUtil.isNotBlank(sessionKeys)) { String[] sessionKeyArr = sessionKeys.split(","); for (String key : sessionKeyArr) { try { // 可能存在一個賬號多點登錄 List<Session> sessionList = getLikeByMap(webSocketSessionMap, key); for (Session session : sessionList) { session.getBasicRemote().sendText(message); } } catch (IOException e) { e.printStackTrace(); continue;// 某個客戶端發(fā)送異常,不影響其他客戶端發(fā)送 } } } else { log.info("sessionKeys為空,沒有目標客戶端"); } } /** * 給當前客戶端推送消息,首次建立連接時調(diào)用 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); } /** * 檢查webSocket連接是否在線 * @param sesstionKey webSocketMap中維護的key * @return 是否在線 */ public static boolean checkOnline(String sesstionKey) { if (webSocketSessionMap.containsKey(sesstionKey)) { return true; } else { return false; } } /** * 獲取包含key的所有map值 * @param map * @param keyLike * @return * @author csf * @date 2020年8月13日 */ private List<Session> getLikeByMap(Map<String, Session> map, String keyLike) { List<Session> list = new ArrayList<>(); for (String key : map.keySet()) { if (key.contains(keyLike)) { list.add(map.get(key)); } } return list; } public static synchronized int getOnlineCount() { return onlineCount; } public static synchronized void addOnlineCount() { WebSocketServer.onlineCount++; } public static synchronized void subOnlineCount() { WebSocketServer.onlineCount--; } }
4.redis消息訂閱配置
import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; @Configuration @EnableCaching public class RedisCacheConfig { @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // 可以添加多個 messageListener,配置不同的交換機 container.addMessageListener(listenerAdapter, new PatternTopic("showNewestMsg"));// 訂閱最新消息頻道 return container; } @Bean MessageListenerAdapter listenerAdapter(RedisReceiver receiver) { // 消息監(jiān)聽適配器 return new MessageListenerAdapter(receiver, "onMessage"); } @Bean StringRedisTemplate template(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } }
5.redis配置,直接放在springboot項目application.properties或application.yml中
# 數(shù)據(jù)庫索引(默認為0) spring.redis.database=0 spring.redis.host=192.168.1.100 spring.redis.port=6379 spring.redis.password=123456 # 連接池最大連接數(shù)(使用負值表示沒有限制) spring.redis.pool.max-active=8 # 連接池最大阻塞等待時間(使用負值表示沒有限制) spring.redis.pool.max-wait=-1 # 連接池中的最大空閑連接 spring.redis.pool.max-idle=8 # 連接池中的最小空閑連接 spring.redis.pool.min-idle=0 # 連接超時時間(毫秒) spring.redis.timeout=5000
6.接收消息生產(chǎn)者發(fā)布的消息,推送給對應的客戶端
import java.io.UnsupportedEncodingException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component; import com.kingengine.plug.websocket.WebSocketServer; import cn.hutool.core.codec.Base64; import cn.hutool.core.util.StrUtil; import net.sf.json.JSONArray; import net.sf.json.JSONObject; /** * 消息監(jiān)聽對象,接收訂閱消息 * @author csf * @date 2020年8月13日 */ @Component public class RedisReceiver implements MessageListener { Logger log = LoggerFactory.getLogger(this.getClass()); @Autowired WebSocketServer webSocketServer; /** * 處理接收到的訂閱消息 */ @Override public void onMessage(Message message, byte[] pattern) { String channel = new String(message.getChannel());// 訂閱的頻道名稱 String msg = ""; try { msg = new String(message.getBody(), "GBK");//注意與發(fā)布消息編碼一致,否則會亂碼 if (StrUtil.isNotBlank(msg)){ if ("showNewestMsg".endsWith(channel))// 最新消息 { JSONObject json = JSONObject.fromObject(msg); webSocketServer.sendMessage(json.get("receive"),json.get("msg")); }else{ //TODO 其他訂閱的消息處理 } }else{ log.info("消息內(nèi)容為空,不處理。"); } } catch (Exception e) { log.error("處理消息異常:"+e.toString()) e.printStackTrace(); } } }
7.消息發(fā)布測試
import java.io.UnsupportedEncodingException; import java.util.HashMap; import java.util.Map; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import net.sf.json.JSONObject; @RequestMapping("redis") @RestController public class RedisTestController { @Autowired StringRedisTemplate template; /** * 發(fā)布消息測試 *@param userid * @param msg * @return */ @PostMapping("sendMessage") public String sendMessage(String userid,String msg) { try { String newMessge=new String(msg.getBytes("GBK"),"GBK"); Map<String,String> map = new HashMap<String, String>(); map.put("receive", userid); map.put("msg", newMessge); template.convertAndSend("showNewestMsg", JSONObject.fromObject(map).toString()); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return "消息發(fā)布成功!"; } }
8.客戶端代碼
<!DOCTYPE html> <html> <head> <title>WebSocket測試</title> <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/> </head> <body> <div> 來自服務端消息: <p id="message"></p> </div> </body> <script src="http://apps.bdimg.com/libs/jquery/1.6.4/jquery.min.js"></script> <script> let webSocketClient; if (window.WebSocket) { let custid="132456_" + Math.random();//該參數(shù)會作為websocketServer中存儲session的key,要保證唯一。 webSocketClient = new WebSocket("ws://127.0.0.1:8082/bootapp/websocket/" + custid); //連通之后的回調(diào)事件 webSocketClient.onopen = function () { webSocketClient.send("這里是地球,收到請回答。。。"); // webSocket.send('{"type":"1","data":"121"}'); }; //接收后臺服務端的消息 webSocketClient.onmessage = function (evt) { console.log("數(shù)據(jù)已接收:" + evt.data); showMessage("未讀消息:" + evt.data); }; //連接關閉的回調(diào)事件 webSocketClient.onclose = function () { alert("連接已關閉..."); }; }else{ alert("瀏覽器不支持websocket"); } function showMessage(message) { $("#message").html(message); } </script> </html>
核心代碼至此。
到此這篇關于redis做websocket分布式消息推送服務的實現(xiàn)的文章就介紹到這了,更多相關redis websocket分布式消息推送服務內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Satoken+Redis實現(xiàn)短信登錄、注冊、鑒權功能
這篇文章主要介紹了Satoken+Redis實現(xiàn)短信登錄、注冊、鑒權功能,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧2024-01-01