websocket+redis動(dòng)態(tài)訂閱和動(dòng)態(tài)取消訂閱的實(shí)現(xiàn)示例
原理
websocket的訂閱就是在前后端建立ws連接之后,前端通過發(fā)送一定格式的消息,后端解析出來(lái)去訂閱或者取消訂閱redis頻道。
訂閱頻道消息格式:
{ "cmd":"subscribe", "topic":[ "topic_name" ] }
模糊訂閱格式
{ "cmd":"psubscribe", "topic":[ "topic_name" ] }
取消訂閱格式
{ "cmd":"unsubscribe", "topic":[ "topic_name" ] }
兩個(gè)核心類,一個(gè)是redis的訂閱監(jiān)聽類,一個(gè)是websocket的發(fā)布訂閱類。
redis訂閱監(jiān)聽類
package com.curtain.core; import com.curtain.config.GetBeanUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPubSub; import java.util.Arrays; /** ?* @Author Curtain ?* @Date 2021/6/7 14:27 ?* @Description ?*/ @Component @Slf4j public class RedisPubSub extends JedisPubSub { ? ? private JedisPool jedisPool = GetBeanUtil.getBean(JedisPool.class); ? ? private Jedis jedis; ? ? //訂閱 ? ? public void subscribe(String... channels) { ? ? ? ? jedis = jedisPool.getResource(); ? ? ? ? try { ? ? ? ? ? ? jedis.subscribe(this, channels); ? ? ? ? } catch (Exception e) { ? ? ? ? ? ? log.error(e.getMessage()); ? ? ? ? ? ? if (jedis != null) ? ? ? ? ? ? ? ? jedis.close(); ? ? ? ? ? ? //遇到異常后關(guān)閉連接重新訂閱 ? ? ? ? ? ? log.info("監(jiān)聽遇到異常,四秒后重新訂閱頻道:"); ? ? ? ? ? ? Arrays.asList(channels).forEach(s -> {log.info(s);}); ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? Thread.sleep(4000); ? ? ? ? ? ? } catch (InterruptedException interruptedException) { ? ? ? ? ? ? ? ? interruptedException.printStackTrace(); ? ? ? ? ? ? } ? ? ? ? ? ? subscribe(channels); ? ? ? ? } ? ? } ? ? //模糊訂閱 ? ? public void psubscribe(String... channels) { ? ? ? ? Jedis jedis = jedisPool.getResource(); ? ? ? ? try { ? ? ? ? ? ? jedis.psubscribe(this, channels); ? ? ? ? } catch (ArithmeticException e) {//取消訂閱故意造成的異常 ? ? ? ? ? ? if (jedis != null) ? ? ? ? ? ? ? ? jedis.close(); ? ? ? ? } catch (Exception e) { ? ? ? ? ? ? log.error(e.getMessage()); ? ? ? ? ? ? if (jedis != null) ? ? ? ? ? ? ? ? jedis.close(); ? ? ? ? ? ? //遇到異常后關(guān)閉連接重新訂閱 ? ? ? ? ? ? log.info("監(jiān)聽遇到異常,四秒后重新訂閱頻道:"); ? ? ? ? ? ? Arrays.asList(channels).forEach(s -> {log.info(s);}); ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? Thread.sleep(4000); ? ? ? ? ? ? } catch (InterruptedException interruptedException) { ? ? ? ? ? ? ? ? interruptedException.printStackTrace(); ? ? ? ? ? ? } ? ? ? ? ? ? psubscribe(channels); ? ? ? ? } ? ? } ? ? public void unsubscribeAndClose(String... channels){ ? ? ? ? unsubscribe(channels); ? ? ? ? if (jedis != null && !isSubscribed()) ? ? ? ? ? ? jedis.close(); ? ? } ? ? public void punsubscribeAndClose(String... channels){ ? ? ? ? punsubscribe(channels); ? ? ? ? if (jedis != null && !isSubscribed()) ? ? ? ? ? ? jedis.close(); ? ? } ? ? @Override ? ? public void onSubscribe(String channel, int subscribedChannels) { ? ? ? ? log.info("subscribe redis channel:" + channel + ", 線程id:" + Thread.currentThread().getId()); ? ? } ? ? @Override ? ? public void onPSubscribe(String pattern, int subscribedChannels) { ? ? ? ? log.info("psubscribe redis channel:" + pattern + ", 線程id:" + Thread.currentThread().getId()); ? ? } ? ? @Override ? ? public void onPMessage(String pattern, String channel, String message) { ? ? ? ? log.info("receive from redis channal: " + channel + ",pattern: " + pattern + ",message:" + message + ", 線程id:" + Thread.currentThread().getId()); ? ? ? ? WebSocketServer.publish(message, pattern); ? ? ? ? WebSocketServer.publish(message, channel); ? ? } ? ? @Override ? ? public void onMessage(String channel, String message) { ? ? ? ? log.info("receive from redis channal: " + channel + ",message:" + message + ", 線程id:" + Thread.currentThread().getId()); ? ? ? ? WebSocketServer.publish(message, channel); ? ? } ? ? @Override ? ? public void onUnsubscribe(String channel, int subscribedChannels) { ? ? ? ? log.info("unsubscribe redis channel:" + channel); ? ? } ? ? @Override ? ? public void onPUnsubscribe(String pattern, int subscribedChannels) { ? ? ? ? log.info("punsubscribe redis channel:" + pattern); ? ? } }
1.jedis監(jiān)聽redis頻道的時(shí)候如果遇見異常會(huì)關(guān)閉連接導(dǎo)致后續(xù)沒有監(jiān)聽該頻道,所以這里在subscribe捕獲到異常的時(shí)候會(huì)重新創(chuàng)建一個(gè)jedis連接訂閱該redis頻道。
webSocket訂閱推送類
這個(gè)類會(huì)有兩個(gè)ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>>類型類變量,分別存儲(chǔ)訂閱和模糊訂閱的信息。
外面一層的String對(duì)應(yīng)的值是topic_name,里面一層的String對(duì)應(yīng)的值是sessionId。前端發(fā)送過來(lái)的消息里面對(duì)應(yīng)的這三類操作其實(shí)就是對(duì)這兩個(gè)map里面的。
還有個(gè)ConcurrentHashMap<String, RedisPubSub>類型的變量,存儲(chǔ)的是事件-RedisPubSub,便于取消訂閱的時(shí)候找到監(jiān)聽該頻道(事件)的RedisPubSub對(duì)象。
信息進(jìn)行增加或者刪除;后端往前端推送數(shù)據(jù)也會(huì)根據(jù)不同的topic_name推送到不同的訂閱者這邊。
package com.curtain.core; import com.alibaba.fastjson.JSON; import com.curtain.config.WebsocketProperties; import com.curtain.service.Cancelable; import com.curtain.service.impl.TaskExecuteService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** ?* @Author Curtain ?* @Date 2021/5/14 16:49 ?* @Description ?*/ @ServerEndpoint("/ws") @Component @Slf4j public class WebSocketServer { ? ? /** ? ? ?* concurrent包的線程安全Set,用來(lái)存放每個(gè)客戶端對(duì)應(yīng)的MyWebSocket對(duì)象。 ? ? ?*/ ? ? private static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> webSocketMap = new ConcurrentHashMap<>(); ? ? /** ? ? ?* 存放psub的事件 ? ? ?**/ ? ? private static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> pWebSocketMap = new ConcurrentHashMap<>(); ? ? /** ? ? ?* 存放topic(pattern)-對(duì)應(yīng)的RedisPubsub ? ? ?*/ ? ? private static volatile ConcurrentHashMap<String, RedisPubSub> redisPubSubMap = new ConcurrentHashMap<>(); ? ? /** ? ? ?* 與某個(gè)客戶端的連接會(huì)話,需要通過它來(lái)給客戶端發(fā)送數(shù)據(jù) ? ? ?*/ ? ? private Session session; ? ? private String sessionId = ""; ? ? //要注入的對(duì)象 ? ? private static TaskExecuteService executeService; ? ? private static WebsocketProperties properties; ? ? private Cancelable cancelable; ? ? @Autowired ? ? public void setTaskExecuteService(TaskExecuteService taskExecuteService) { ? ? ? ? WebSocketServer.executeService = taskExecuteService; ? ? } ? ? @Autowired ? ? public void setWebsocketProperties(WebsocketProperties properties) { ? ? ? ? WebSocketServer.properties = properties; ? ? } ? ? /** ? ? ?* 連接建立成功調(diào)用的方法 ? ? ?*/ ? ? @OnOpen ? ? public void onOpen(Session session) { ? ? ? ? this.session = session; ? ? ? ? this.sessionId = session.getId(); ? ? ? ? //構(gòu)造推送數(shù)據(jù) ? ? ? ? Map pubHeader = new HashMap(); ? ? ? ? pubHeader.put("name", "connect_status"); ? ? ? ? pubHeader.put("type", "create"); ? ? ? ? pubHeader.put("from", "pubsub"); ? ? ? ? pubHeader.put("time", new Date().getTime() / 1000); ? ? ? ? Map pubPayload = new HashMap(); ? ? ? ? pubPayload.put("status", "success"); ? ? ? ? Map pubMap = new HashMap(); ? ? ? ? pubMap.put("header", pubHeader); ? ? ? ? pubMap.put("payload", pubPayload); ? ? ? ? sendMessage(JSON.toJSONString(pubMap)); ? ? ? ? cancelable = executeService.runPeriodly(() -> { ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? if (cancelable != null && !session.isOpen()) { ? ? ? ? ? ? ? ? ? ? log.info("斷開連接,停止發(fā)送ping"); ? ? ? ? ? ? ? ? ? ? cancelable.cancel(); ? ? ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? ? ? String data = "ping"; ? ? ? ? ? ? ? ? ? ? ByteBuffer payload = ByteBuffer.wrap(data.getBytes()); ? ? ? ? ? ? ? ? ? ? session.getBasicRemote().sendPing(payload); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } catch (IOException e) { ? ? ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? ? ? } ? ? ? ? }, properties.getPeriod()); ? ? } ? ? @OnMessage ? ? public void onMessage(String message) { ? ? ? ? synchronized (session) { ? ? ? ? ? ? Map msgMap = (Map) JSON.parse(message); ? ? ? ? ? ? String cmd = (String) msgMap.get("cmd"); ? ? ? ? ? ? //訂閱消息 ? ? ? ? ? ? if ("subscribe".equals(cmd)) { ? ? ? ? ? ? ? ? List<String> topics = (List<String>) msgMap.get("topic"); ? ? ? ? ? ? ? ? //本地記錄訂閱信息 ? ? ? ? ? ? ? ? for (int i = 0; i < topics.size(); i++) { ? ? ? ? ? ? ? ? ? ? String topic = topics.get(i); ? ? ? ? ? ? ? ? ? ? log.info("============================subscribe-start============================"); ? ? ? ? ? ? ? ? ? ? log.info("sessionId:" + this.sessionId + ",開始訂閱:" + topic); ? ? ? ? ? ? ? ? ? ? if (webSocketMap.containsKey(topic)) {//有人訂閱過了 ? ? ? ? ? ? ? ? ? ? ? ? webSocketMap.get(topic).put(this.sessionId, this); ? ? ? ? ? ? ? ? ? ? } else {//之前還沒人訂閱過,所以需要訂閱redis頻道 ? ? ? ? ? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = new ConcurrentHashMap<>(); ? ? ? ? ? ? ? ? ? ? ? ? map.put(this.sessionId, this); ? ? ? ? ? ? ? ? ? ? ? ? webSocketMap.put(topic, map); ? ? ? ? ? ? ? ? ? ? ? ? new Thread(() -> { ? ? ? ? ? ? ? ? ? ? ? ? ? ? RedisPubSub redisPubSub = new RedisPubSub(); ? ? ? ? ? ? ? ? ? ? ? ? ? ? //存入map ? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSubMap.put(topic, redisPubSub); ? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSub.subscribe(topic); ? ? ? ? ? ? ? ? ? ? ? ? }).start(); ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? log.info("sessionId:" + this.sessionId + ",完成訂閱:" + topic); ? ? ? ? ? ? ? ? ? ? log(); ? ? ? ? ? ? ? ? ? ? log.info("============================subscribe-end============================"); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? ? ? //psubscribe ? ? ? ? ? ? if ("psubscribe".equals(cmd)) { ? ? ? ? ? ? ? ? List<String> topics = (List<String>) msgMap.get("topic"); ? ? ? ? ? ? ? ? //本地記錄訂閱信息 ? ? ? ? ? ? ? ? for (int i = 0; i < topics.size(); i++) { ? ? ? ? ? ? ? ? ? ? String topic = topics.get(i); ? ? ? ? ? ? ? ? ? ? log.info("============================psubscribe-start============================"); ? ? ? ? ? ? ? ? ? ? log.info("sessionId:" + this.sessionId + ",開始模糊訂閱:" + topic); ? ? ? ? ? ? ? ? ? ? if (pWebSocketMap.containsKey(topic)) {//有人訂閱過了 ? ? ? ? ? ? ? ? ? ? ? ? pWebSocketMap.get(topic).put(this.sessionId, this); ? ? ? ? ? ? ? ? ? ? } else {//之前還沒人訂閱過,所以需要訂閱redis頻道 ? ? ? ? ? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = new ConcurrentHashMap<>(); ? ? ? ? ? ? ? ? ? ? ? ? map.put(this.sessionId, this); ? ? ? ? ? ? ? ? ? ? ? ? pWebSocketMap.put(topic, map); ? ? ? ? ? ? ? ? ? ? ? ? new Thread(() -> { ? ? ? ? ? ? ? ? ? ? ? ? ? ? RedisPubSub redisPubSub = new RedisPubSub(); ? ? ? ? ? ? ? ? ? ? ? ? ? ? //存入map ? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSubMap.put(topic, redisPubSub); ? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSub.psubscribe(topic); ? ? ? ? ? ? ? ? ? ? ? ? }).start(); ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? log.info("sessionId:" + this.sessionId + ",完成模糊訂閱:" + topic); ? ? ? ? ? ? ? ? ? ? log(); ? ? ? ? ? ? ? ? ? ? log.info("============================psubscribe-end============================"); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? ? ? //取消訂閱 ? ? ? ? ? ? if ("unsubscribe".equals(cmd)) { ? ? ? ? ? ? ? ? List<String> topics = (List<String>) msgMap.get("topic"); ? ? ? ? ? ? ? ? //刪除本地對(duì)應(yīng)的訂閱信息 ? ? ? ? ? ? ? ? for (String topic : topics) { ? ? ? ? ? ? ? ? ? ? log.info("============================unsubscribe-start============================"); ? ? ? ? ? ? ? ? ? ? log.info("sessionId:" + this.sessionId + ",開始刪除訂閱:" + topic); ? ? ? ? ? ? ? ? ? ? if (webSocketMap.containsKey(topic)) { ? ? ? ? ? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic); ? ? ? ? ? ? ? ? ? ? ? ? map.remove(this.sessionId); ? ? ? ? ? ? ? ? ? ? ? ? if (map.size() == 0) {//如果這個(gè)頻道沒有用戶訂閱了,則取消訂閱該redis頻道 ? ? ? ? ? ? ? ? ? ? ? ? ? ? webSocketMap.remove(topic); ? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSubMap.get(topic).unsubscribeAndClose(topic); ? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSubMap.remove(topic); ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? if (pWebSocketMap.containsKey(topic)) { ? ? ? ? ? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic); ? ? ? ? ? ? ? ? ? ? ? ? map.remove(this.sessionId); ? ? ? ? ? ? ? ? ? ? ? ? if (map.size() == 0) {//如果這個(gè)頻道沒有用戶訂閱了,則取消訂閱該redis頻道 ? ? ? ? ? ? ? ? ? ? ? ? ? ? pWebSocketMap.remove(topic); ? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSubMap.get(topic).punsubscribeAndClose(topic); ? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSubMap.remove(topic); ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? log.info("sessionId:" + this.sessionId + ",完成刪除訂閱:" + topic); ? ? ? ? ? ? ? ? ? ? log(); ? ? ? ? ? ? ? ? ? ? log.info("============================unsubscribe-end============================"); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? } ? ? } ? ? @OnMessage ? ? public void onPong(PongMessage pongMessage) { ? ? ? ? try { ? ? ? ? ? ? log.debug(new String(pongMessage.getApplicationData().array(), "utf-8") + "接收到pong"); ? ? ? ? } catch (UnsupportedEncodingException e) { ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? } ? ? } ? ? /** ? ? ?* 連接關(guān)閉調(diào)用的方法 ? ? ?*/ ? ? @OnClose ? ? public void onClose() { ? ? ? ? synchronized (session) { ? ? ? ? ? ? log.info("============================onclose-start============================"); ? ? ? ? ? ? //刪除訂閱 ? ? ? ? ? ? Iterator iterator = webSocketMap.keySet().iterator(); ? ? ? ? ? ? while (iterator.hasNext()) { ? ? ? ? ? ? ? ? String topic = (String) iterator.next(); ? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic); ? ? ? ? ? ? ? ? map.remove(this.sessionId); ? ? ? ? ? ? ? ? if (map.size() == 0) {//如果這個(gè)頻道沒有用戶訂閱了,則取消訂閱該redis頻道 ? ? ? ? ? ? ? ? ? ? webSocketMap.remove(topic); ? ? ? ? ? ? ? ? ? ? redisPubSubMap.get(topic).unsubscribeAndClose(topic); ? ? ? ? ? ? ? ? ? ? redisPubSubMap.remove(topic); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? ? ? //刪除模糊訂閱 ? ? ? ? ? ? Iterator iteratorP = pWebSocketMap.keySet().iterator(); ? ? ? ? ? ? while (iteratorP.hasNext()) { ? ? ? ? ? ? ? ? String topic = (String) iteratorP.next(); ? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic); ? ? ? ? ? ? ? ? map.remove(this.sessionId); ? ? ? ? ? ? ? ? if (map.size() == 0) {//如果這個(gè)頻道沒有用戶訂閱了,則取消訂閱該redis頻道 ? ? ? ? ? ? ? ? ? ? pWebSocketMap.remove(topic); ? ? ? ? ? ? ? ? ? ? redisPubSubMap.get(topic).punsubscribeAndClose(topic); ? ? ? ? ? ? ? ? ? ? redisPubSubMap.remove(topic); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? ? ? log.info("sessionId:" + this.sessionId + ",斷開連接:"); ? ? ? ? ? ? //debug ? ? ? ? ? ? log(); ? ? ? ? ? ? log.info("============================onclose-end============================"); ? ? ? ? } ? ? } ? ? /** ? ? ?* @param session ? ? ?* @param error ? ? ?*/ ? ? @OnError ? ? public void onError(Session session, Throwable error) { ? ? ? ? synchronized (session) { ? ? ? ? ? ? log.info("============================onError-start============================"); ? ? ? ? ? ? log.error("用戶錯(cuò)誤,sessionId:" + session.getId() + ",原因:" + error.getMessage()); ? ? ? ? ? ? error.printStackTrace(); ? ? ? ? ? ? log.info("關(guān)閉錯(cuò)誤用戶對(duì)應(yīng)的連接"); ? ? ? ? ? ? //刪除訂閱 ? ? ? ? ? ? Iterator iterator = webSocketMap.keySet().iterator(); ? ? ? ? ? ? while (iterator.hasNext()) { ? ? ? ? ? ? ? ? String topic = (String) iterator.next(); ? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic); ? ? ? ? ? ? ? ? map.remove(this.sessionId); ? ? ? ? ? ? ? ? if (map.size() == 0) {//如果這個(gè)頻道沒有用戶訂閱了,則取消訂閱該redis頻道 ? ? ? ? ? ? ? ? ? ? webSocketMap.remove(topic); ? ? ? ? ? ? ? ? ? ? redisPubSubMap.get(topic).unsubscribeAndClose(topic); ? ? ? ? ? ? ? ? ? ? redisPubSubMap.remove(topic); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? ? ? //刪除模糊訂閱 ? ? ? ? ? ? Iterator iteratorP = pWebSocketMap.keySet().iterator(); ? ? ? ? ? ? while (iteratorP.hasNext()) { ? ? ? ? ? ? ? ? String topic = (String) iteratorP.next(); ? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic); ? ? ? ? ? ? ? ? map.remove(this.sessionId); ? ? ? ? ? ? ? ? if (map.size() == 0) {//如果這個(gè)頻道沒有用戶訂閱了,則取消訂閱該redis頻道 ? ? ? ? ? ? ? ? ? ? pWebSocketMap.remove(topic); ? ? ? ? ? ? ? ? ? ? redisPubSubMap.get(topic).punsubscribeAndClose(topic); ? ? ? ? ? ? ? ? ? ? redisPubSubMap.remove(topic); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? ? ? log.info("完成錯(cuò)誤用戶對(duì)應(yīng)的連接關(guān)閉"); ? ? ? ? ? ? //debug ? ? ? ? ? ? log(); ? ? ? ? ? ? log.info("============================onError-end============================"); ? ? ? ? } ? ? } ? ? /** ? ? ?* 實(shí)現(xiàn)服務(wù)器主動(dòng)推送 ? ? ?*/ ? ? public void sendMessage(String message) { ? ? ? ? synchronized (session) { ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? this.session.getBasicRemote().sendText(message); ? ? ? ? ? ? } catch (IOException e) { ? ? ? ? ? ? ? ? e.printStackTrace(); ? ? ? ? ? ? } ? ? ? ? } ? ? } ? ? public static void publish(String msg, String topic) { ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic); ? ? ? ? if (map != null && map.values() != null) { ? ? ? ? ? ? for (WebSocketServer webSocketServer : map.values()) ? ? ? ? ? ? ? ? webSocketServer.sendMessage(msg); ? ? ? ? } ? ? ? ? map = pWebSocketMap.get(topic); ? ? ? ? if (map != null && map.values() != null) { ? ? ? ? ? ? for (WebSocketServer webSocketServer : map.values()) ? ? ? ? ? ? ? ? webSocketServer.sendMessage(msg); ? ? ? ? } ? ? } ? ? private void log() { ? ? ? ? log.info("<<<<<<<<<<<完成操作后,打印訂閱信息開始>>>>>>>>>>"); ? ? ? ? Iterator iterator1 = webSocketMap.keySet().iterator(); ? ? ? ? while (iterator1.hasNext()) { ? ? ? ? ? ? String topic = (String) iterator1.next(); ? ? ? ? ? ? log.info("topic:" + topic); ? ? ? ? ? ? Iterator iterator2 = webSocketMap.get(topic).keySet().iterator(); ? ? ? ? ? ? while (iterator2.hasNext()) { ? ? ? ? ? ? ? ? String session = (String) iterator2.next(); ? ? ? ? ? ? ? ? log.info("訂閱" + topic + "的sessionId:" + session); ? ? ? ? ? ? } ? ? ? ? } ? ? ? ? log.info("<<<<<<<<<<<完成操作后,打印訂閱信息結(jié)束>>>>>>>>>>"); ? ? } }
項(xiàng)目地址
上面介紹了核心代碼,下面是完整代碼地址
https://github.com/Curtain-Wang/websocket-redis-subscribe.git
Update20220415
參考評(píng)論區(qū)老哥的建議,將redis訂閱監(jiān)聽類里面的subscribe和psubscribe方法調(diào)整如下:
? ? //訂閱 ? ? @Override ? ? public void subscribe(String... channels) { ? ? ? ? boolean done = true; ? ? ? ? while (done){ ? ? ? ? ? ? Jedis jedis = jedisPool.getResource(); ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? jedis.subscribe(this, channels); ? ? ? ? ? ? ? ? done = false; ? ? ? ? ? ? } catch (Exception e) { ? ? ? ? ? ? ? ? log.error(e.getMessage()); ? ? ? ? ? ? ? ? if (jedis != null) ? ? ? ? ? ? ? ? ? ? jedis.close(); ? ? ? ? ? ? ? ? //遇到異常后關(guān)閉連接重新訂閱 ? ? ? ? ? ? ? ? log.info("監(jiān)聽遇到異常,四秒后重新訂閱頻道:"); ? ? ? ? ? ? ? ? Arrays.asList(channels).forEach(s -> {log.info(s);}); ? ? ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? ? ? Thread.sleep(4000); ? ? ? ? ? ? ? ? } catch (InterruptedException interruptedException) { ? ? ? ? ? ? ? ? ? ? interruptedException.printStackTrace(); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? } ? ? } ? ? //模糊訂閱 ? ? @Override ? ? public void psubscribe(String... channels) { ? ? ? ? boolean done = true; ? ? ? ? while (done){ ? ? ? ? ? ? Jedis jedis = jedisPool.getResource(); ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? jedis.psubscribe(this, channels); ? ? ? ? ? ? ? ? done = false; ? ? ? ? ? ? } catch (Exception e) { ? ? ? ? ? ? ? ? log.error(e.getMessage()); ? ? ? ? ? ? ? ? if (jedis != null) ? ? ? ? ? ? ? ? ? ? jedis.close(); ? ? ? ? ? ? ? ? //遇到異常后關(guān)閉連接重新訂閱 ? ? ? ? ? ? ? ? log.info("監(jiān)聽遇到異常,四秒后重新訂閱頻道:"); ? ? ? ? ? ? ? ? Arrays.asList(channels).forEach(s -> {log.info(s);}); ? ? ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? ? ? Thread.sleep(4000); ? ? ? ? ? ? ? ? } catch (InterruptedException interruptedException) { ? ? ? ? ? ? ? ? ? ? interruptedException.printStackTrace(); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? } ? ? }
到此這篇關(guān)于websocket+redis動(dòng)態(tài)訂閱和動(dòng)態(tài)取消訂閱的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)websocket redis動(dòng)態(tài)訂閱 內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
如何在SpringBoot中使用Redis實(shí)現(xiàn)分布式鎖
這篇文章主要介紹了如何在SpringBoot中使用Redis實(shí)現(xiàn)分布式鎖,在實(shí)際開發(fā)中有可能會(huì)遇到多個(gè)線程同時(shí)訪問同一個(gè)共享變量,那么上鎖就很重要了,需要的朋友可以參考下2023-03-03Unable?to?connect?to?Redis無(wú)法連接到Redis解決的全過程
這篇文章主要給大家介紹了關(guān)于Unable?to?connect?to?Redis無(wú)法連接到Redis解決的相關(guān)資料,文中通過圖文以及實(shí)例代碼將解決的過程介紹的非常詳細(xì),需要的朋友可以參考下2023-03-03通過redis的腳本lua如何實(shí)現(xiàn)搶紅包功能
這篇文章主要給大家介紹了關(guān)于通過redis的腳本lua如何實(shí)現(xiàn)搶紅包功能的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05Redis教程(六):Sorted-Sets數(shù)據(jù)類型
這篇文章主要介紹了Redis教程(六):Sorted-Sets數(shù)據(jù)類型,本文講解了Sorted-Sets數(shù)據(jù)類型概述、相關(guān)命令列表、命令使用示例、應(yīng)用范圍等內(nèi)容,需要的朋友可以參考下2015-04-04使用Redis命令操作數(shù)據(jù)庫(kù)的常見錯(cuò)誤及解決方法
由于Redis是內(nèi)存數(shù)據(jù)庫(kù),因此可能會(huì)存在一些安全問題,下面這篇文章主要給大家介紹了關(guān)于使用Redis命令操作數(shù)據(jù)庫(kù)的常見錯(cuò)誤及解決方法,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下2024-02-02