欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

websocket+redis動(dòng)態(tài)訂閱和動(dòng)態(tài)取消訂閱的實(shí)現(xiàn)示例

 更新時(shí)間:2022年05月17日 14:56:02   作者:柯騰_  
本文主要介紹了websocket+redis動(dòng)態(tài)訂閱和動(dòng)態(tài)取消訂閱,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧

原理

websocket的訂閱就是在前后端建立ws連接之后,前端通過發(fā)送一定格式的消息,后端解析出來(lái)去訂閱或者取消訂閱redis頻道。

訂閱頻道消息格式:

{
    "cmd":"subscribe",
    "topic":[
        "topic_name"
    ]
}

模糊訂閱格式

{
    "cmd":"psubscribe",
    "topic":[
        "topic_name"
    ]
}

取消訂閱格式

{
    "cmd":"unsubscribe",
    "topic":[
        "topic_name"
    ]
}

兩個(gè)核心類,一個(gè)是redis的訂閱監(jiān)聽類,一個(gè)是websocket的發(fā)布訂閱類。

redis訂閱監(jiān)聽類

package com.curtain.core;

import com.curtain.config.GetBeanUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;

import java.util.Arrays;

/**
?* @Author Curtain
?* @Date 2021/6/7 14:27
?* @Description
?*/
@Component
@Slf4j
public class RedisPubSub extends JedisPubSub {
? ? private JedisPool jedisPool = GetBeanUtil.getBean(JedisPool.class);
? ? private Jedis jedis;

? ? //訂閱
? ? public void subscribe(String... channels) {
? ? ? ? jedis = jedisPool.getResource();
? ? ? ? try {
? ? ? ? ? ? jedis.subscribe(this, channels);
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? log.error(e.getMessage());
? ? ? ? ? ? if (jedis != null)
? ? ? ? ? ? ? ? jedis.close();
? ? ? ? ? ? //遇到異常后關(guān)閉連接重新訂閱
? ? ? ? ? ? log.info("監(jiān)聽遇到異常,四秒后重新訂閱頻道:");
? ? ? ? ? ? Arrays.asList(channels).forEach(s -> {log.info(s);});
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? Thread.sleep(4000);
? ? ? ? ? ? } catch (InterruptedException interruptedException) {
? ? ? ? ? ? ? ? interruptedException.printStackTrace();
? ? ? ? ? ? }
? ? ? ? ? ? subscribe(channels);
? ? ? ? }
? ? }

? ? //模糊訂閱
? ? public void psubscribe(String... channels) {
? ? ? ? Jedis jedis = jedisPool.getResource();
? ? ? ? try {
? ? ? ? ? ? jedis.psubscribe(this, channels);
? ? ? ? } catch (ArithmeticException e) {//取消訂閱故意造成的異常
? ? ? ? ? ? if (jedis != null)
? ? ? ? ? ? ? ? jedis.close();
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? log.error(e.getMessage());
? ? ? ? ? ? if (jedis != null)
? ? ? ? ? ? ? ? jedis.close();
? ? ? ? ? ? //遇到異常后關(guān)閉連接重新訂閱
? ? ? ? ? ? log.info("監(jiān)聽遇到異常,四秒后重新訂閱頻道:");
? ? ? ? ? ? Arrays.asList(channels).forEach(s -> {log.info(s);});
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? Thread.sleep(4000);
? ? ? ? ? ? } catch (InterruptedException interruptedException) {
? ? ? ? ? ? ? ? interruptedException.printStackTrace();
? ? ? ? ? ? }
? ? ? ? ? ? psubscribe(channels);
? ? ? ? }
? ? }

? ? public void unsubscribeAndClose(String... channels){
? ? ? ? unsubscribe(channels);
? ? ? ? if (jedis != null && !isSubscribed())
? ? ? ? ? ? jedis.close();
? ? }

? ? public void punsubscribeAndClose(String... channels){
? ? ? ? punsubscribe(channels);
? ? ? ? if (jedis != null && !isSubscribed())
? ? ? ? ? ? jedis.close();
? ? }

? ? @Override
? ? public void onSubscribe(String channel, int subscribedChannels) {
? ? ? ? log.info("subscribe redis channel:" + channel + ", 線程id:" + Thread.currentThread().getId());
? ? }

? ? @Override
? ? public void onPSubscribe(String pattern, int subscribedChannels) {
? ? ? ? log.info("psubscribe redis channel:" + pattern + ", 線程id:" + Thread.currentThread().getId());
? ? }

? ? @Override
? ? public void onPMessage(String pattern, String channel, String message) {
? ? ? ? log.info("receive from redis channal: " + channel + ",pattern: " + pattern + ",message:" + message + ", 線程id:" + Thread.currentThread().getId());
? ? ? ? WebSocketServer.publish(message, pattern);
? ? ? ? WebSocketServer.publish(message, channel);

? ? }

? ? @Override
? ? public void onMessage(String channel, String message) {
? ? ? ? log.info("receive from redis channal: " + channel + ",message:" + message + ", 線程id:" + Thread.currentThread().getId());
? ? ? ? WebSocketServer.publish(message, channel);
? ? }

? ? @Override
? ? public void onUnsubscribe(String channel, int subscribedChannels) {
? ? ? ? log.info("unsubscribe redis channel:" + channel);
? ? }

? ? @Override
? ? public void onPUnsubscribe(String pattern, int subscribedChannels) {
? ? ? ? log.info("punsubscribe redis channel:" + pattern);
? ? }
}

1.jedis監(jiān)聽redis頻道的時(shí)候如果遇見異常會(huì)關(guān)閉連接導(dǎo)致后續(xù)沒有監(jiān)聽該頻道,所以這里在subscribe捕獲到異常的時(shí)候會(huì)重新創(chuàng)建一個(gè)jedis連接訂閱該redis頻道。

webSocket訂閱推送類

這個(gè)類會(huì)有兩個(gè)ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>>類型類變量,分別存儲(chǔ)訂閱和模糊訂閱的信息。

外面一層的String對(duì)應(yīng)的值是topic_name,里面一層的String對(duì)應(yīng)的值是sessionId。前端發(fā)送過來(lái)的消息里面對(duì)應(yīng)的這三類操作其實(shí)就是對(duì)這兩個(gè)map里面的。

還有個(gè)ConcurrentHashMap<String, RedisPubSub>類型的變量,存儲(chǔ)的是事件-RedisPubSub,便于取消訂閱的時(shí)候找到監(jiān)聽該頻道(事件)的RedisPubSub對(duì)象。

信息進(jìn)行增加或者刪除;后端往前端推送數(shù)據(jù)也會(huì)根據(jù)不同的topic_name推送到不同的訂閱者這邊。

package com.curtain.core;

import com.alibaba.fastjson.JSON;
import com.curtain.config.WebsocketProperties;
import com.curtain.service.Cancelable;
import com.curtain.service.impl.TaskExecuteService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;


/**
?* @Author Curtain
?* @Date 2021/5/14 16:49
?* @Description
?*/
@ServerEndpoint("/ws")
@Component
@Slf4j
public class WebSocketServer {
? ? /**
? ? ?* concurrent包的線程安全Set,用來(lái)存放每個(gè)客戶端對(duì)應(yīng)的MyWebSocket對(duì)象。
? ? ?*/
? ? private static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> webSocketMap = new ConcurrentHashMap<>();
? ? /**
? ? ?* 存放psub的事件
? ? ?**/
? ? private static volatile ConcurrentHashMap<String, ConcurrentHashMap<String, WebSocketServer>> pWebSocketMap = new ConcurrentHashMap<>();
? ? /**
? ? ?* 存放topic(pattern)-對(duì)應(yīng)的RedisPubsub
? ? ?*/
? ? private static volatile ConcurrentHashMap<String, RedisPubSub> redisPubSubMap = new ConcurrentHashMap<>();
? ? /**
? ? ?* 與某個(gè)客戶端的連接會(huì)話,需要通過它來(lái)給客戶端發(fā)送數(shù)據(jù)
? ? ?*/
? ? private Session session;
? ? private String sessionId = "";
? ? //要注入的對(duì)象
? ? private static TaskExecuteService executeService;
? ? private static WebsocketProperties properties;

? ? private Cancelable cancelable;

? ? @Autowired
? ? public void setTaskExecuteService(TaskExecuteService taskExecuteService) {
? ? ? ? WebSocketServer.executeService = taskExecuteService;
? ? }

? ? @Autowired
? ? public void setWebsocketProperties(WebsocketProperties properties) {
? ? ? ? WebSocketServer.properties = properties;
? ? }

? ? /**
? ? ?* 連接建立成功調(diào)用的方法
? ? ?*/
? ? @OnOpen
? ? public void onOpen(Session session) {
? ? ? ? this.session = session;
? ? ? ? this.sessionId = session.getId();
? ? ? ? //構(gòu)造推送數(shù)據(jù)
? ? ? ? Map pubHeader = new HashMap();
? ? ? ? pubHeader.put("name", "connect_status");
? ? ? ? pubHeader.put("type", "create");
? ? ? ? pubHeader.put("from", "pubsub");
? ? ? ? pubHeader.put("time", new Date().getTime() / 1000);
? ? ? ? Map pubPayload = new HashMap();
? ? ? ? pubPayload.put("status", "success");
? ? ? ? Map pubMap = new HashMap();
? ? ? ? pubMap.put("header", pubHeader);
? ? ? ? pubMap.put("payload", pubPayload);
? ? ? ? sendMessage(JSON.toJSONString(pubMap));
? ? ? ? cancelable = executeService.runPeriodly(() -> {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? if (cancelable != null && !session.isOpen()) {
? ? ? ? ? ? ? ? ? ? log.info("斷開連接,停止發(fā)送ping");
? ? ? ? ? ? ? ? ? ? cancelable.cancel();
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? String data = "ping";
? ? ? ? ? ? ? ? ? ? ByteBuffer payload = ByteBuffer.wrap(data.getBytes());
? ? ? ? ? ? ? ? ? ? session.getBasicRemote().sendPing(payload);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } catch (IOException e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
? ? ? ? }, properties.getPeriod());

? ? }

? ? @OnMessage
? ? public void onMessage(String message) {
? ? ? ? synchronized (session) {
? ? ? ? ? ? Map msgMap = (Map) JSON.parse(message);
? ? ? ? ? ? String cmd = (String) msgMap.get("cmd");
? ? ? ? ? ? //訂閱消息
? ? ? ? ? ? if ("subscribe".equals(cmd)) {
? ? ? ? ? ? ? ? List<String> topics = (List<String>) msgMap.get("topic");
? ? ? ? ? ? ? ? //本地記錄訂閱信息
? ? ? ? ? ? ? ? for (int i = 0; i < topics.size(); i++) {
? ? ? ? ? ? ? ? ? ? String topic = topics.get(i);
? ? ? ? ? ? ? ? ? ? log.info("============================subscribe-start============================");
? ? ? ? ? ? ? ? ? ? log.info("sessionId:" + this.sessionId + ",開始訂閱:" + topic);
? ? ? ? ? ? ? ? ? ? if (webSocketMap.containsKey(topic)) {//有人訂閱過了
? ? ? ? ? ? ? ? ? ? ? ? webSocketMap.get(topic).put(this.sessionId, this);
? ? ? ? ? ? ? ? ? ? } else {//之前還沒人訂閱過,所以需要訂閱redis頻道
? ? ? ? ? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = new ConcurrentHashMap<>();
? ? ? ? ? ? ? ? ? ? ? ? map.put(this.sessionId, this);
? ? ? ? ? ? ? ? ? ? ? ? webSocketMap.put(topic, map);
? ? ? ? ? ? ? ? ? ? ? ? new Thread(() -> {
? ? ? ? ? ? ? ? ? ? ? ? ? ? RedisPubSub redisPubSub = new RedisPubSub();
? ? ? ? ? ? ? ? ? ? ? ? ? ? //存入map
? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSubMap.put(topic, redisPubSub);
? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSub.subscribe(topic);
? ? ? ? ? ? ? ? ? ? ? ? }).start();
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? log.info("sessionId:" + this.sessionId + ",完成訂閱:" + topic);
? ? ? ? ? ? ? ? ? ? log();
? ? ? ? ? ? ? ? ? ? log.info("============================subscribe-end============================");
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? //psubscribe
? ? ? ? ? ? if ("psubscribe".equals(cmd)) {
? ? ? ? ? ? ? ? List<String> topics = (List<String>) msgMap.get("topic");
? ? ? ? ? ? ? ? //本地記錄訂閱信息
? ? ? ? ? ? ? ? for (int i = 0; i < topics.size(); i++) {
? ? ? ? ? ? ? ? ? ? String topic = topics.get(i);
? ? ? ? ? ? ? ? ? ? log.info("============================psubscribe-start============================");
? ? ? ? ? ? ? ? ? ? log.info("sessionId:" + this.sessionId + ",開始模糊訂閱:" + topic);
? ? ? ? ? ? ? ? ? ? if (pWebSocketMap.containsKey(topic)) {//有人訂閱過了
? ? ? ? ? ? ? ? ? ? ? ? pWebSocketMap.get(topic).put(this.sessionId, this);
? ? ? ? ? ? ? ? ? ? } else {//之前還沒人訂閱過,所以需要訂閱redis頻道
? ? ? ? ? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = new ConcurrentHashMap<>();
? ? ? ? ? ? ? ? ? ? ? ? map.put(this.sessionId, this);
? ? ? ? ? ? ? ? ? ? ? ? pWebSocketMap.put(topic, map);
? ? ? ? ? ? ? ? ? ? ? ? new Thread(() -> {
? ? ? ? ? ? ? ? ? ? ? ? ? ? RedisPubSub redisPubSub = new RedisPubSub();
? ? ? ? ? ? ? ? ? ? ? ? ? ? //存入map
? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSubMap.put(topic, redisPubSub);
? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSub.psubscribe(topic);
? ? ? ? ? ? ? ? ? ? ? ? }).start();
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? log.info("sessionId:" + this.sessionId + ",完成模糊訂閱:" + topic);
? ? ? ? ? ? ? ? ? ? log();
? ? ? ? ? ? ? ? ? ? log.info("============================psubscribe-end============================");
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? //取消訂閱
? ? ? ? ? ? if ("unsubscribe".equals(cmd)) {
? ? ? ? ? ? ? ? List<String> topics = (List<String>) msgMap.get("topic");
? ? ? ? ? ? ? ? //刪除本地對(duì)應(yīng)的訂閱信息
? ? ? ? ? ? ? ? for (String topic : topics) {
? ? ? ? ? ? ? ? ? ? log.info("============================unsubscribe-start============================");
? ? ? ? ? ? ? ? ? ? log.info("sessionId:" + this.sessionId + ",開始刪除訂閱:" + topic);
? ? ? ? ? ? ? ? ? ? if (webSocketMap.containsKey(topic)) {
? ? ? ? ? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
? ? ? ? ? ? ? ? ? ? ? ? map.remove(this.sessionId);
? ? ? ? ? ? ? ? ? ? ? ? if (map.size() == 0) {//如果這個(gè)頻道沒有用戶訂閱了,則取消訂閱該redis頻道
? ? ? ? ? ? ? ? ? ? ? ? ? ? webSocketMap.remove(topic);
? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSubMap.get(topic).unsubscribeAndClose(topic);
? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSubMap.remove(topic);
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? if (pWebSocketMap.containsKey(topic)) {
? ? ? ? ? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic);
? ? ? ? ? ? ? ? ? ? ? ? map.remove(this.sessionId);
? ? ? ? ? ? ? ? ? ? ? ? if (map.size() == 0) {//如果這個(gè)頻道沒有用戶訂閱了,則取消訂閱該redis頻道
? ? ? ? ? ? ? ? ? ? ? ? ? ? pWebSocketMap.remove(topic);
? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSubMap.get(topic).punsubscribeAndClose(topic);
? ? ? ? ? ? ? ? ? ? ? ? ? ? redisPubSubMap.remove(topic);
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? log.info("sessionId:" + this.sessionId + ",完成刪除訂閱:" + topic);
? ? ? ? ? ? ? ? ? ? log();
? ? ? ? ? ? ? ? ? ? log.info("============================unsubscribe-end============================");
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }

? ? @OnMessage
? ? public void onPong(PongMessage pongMessage) {
? ? ? ? try {
? ? ? ? ? ? log.debug(new String(pongMessage.getApplicationData().array(), "utf-8") + "接收到pong");
? ? ? ? } catch (UnsupportedEncodingException e) {
? ? ? ? ? ? e.printStackTrace();
? ? ? ? }
? ? }

? ? /**
? ? ?* 連接關(guān)閉調(diào)用的方法
? ? ?*/
? ? @OnClose
? ? public void onClose() {
? ? ? ? synchronized (session) {
? ? ? ? ? ? log.info("============================onclose-start============================");
? ? ? ? ? ? //刪除訂閱
? ? ? ? ? ? Iterator iterator = webSocketMap.keySet().iterator();
? ? ? ? ? ? while (iterator.hasNext()) {
? ? ? ? ? ? ? ? String topic = (String) iterator.next();
? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
? ? ? ? ? ? ? ? map.remove(this.sessionId);
? ? ? ? ? ? ? ? if (map.size() == 0) {//如果這個(gè)頻道沒有用戶訂閱了,則取消訂閱該redis頻道
? ? ? ? ? ? ? ? ? ? webSocketMap.remove(topic);
? ? ? ? ? ? ? ? ? ? redisPubSubMap.get(topic).unsubscribeAndClose(topic);
? ? ? ? ? ? ? ? ? ? redisPubSubMap.remove(topic);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? //刪除模糊訂閱
? ? ? ? ? ? Iterator iteratorP = pWebSocketMap.keySet().iterator();
? ? ? ? ? ? while (iteratorP.hasNext()) {
? ? ? ? ? ? ? ? String topic = (String) iteratorP.next();
? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic);
? ? ? ? ? ? ? ? map.remove(this.sessionId);
? ? ? ? ? ? ? ? if (map.size() == 0) {//如果這個(gè)頻道沒有用戶訂閱了,則取消訂閱該redis頻道
? ? ? ? ? ? ? ? ? ? pWebSocketMap.remove(topic);
? ? ? ? ? ? ? ? ? ? redisPubSubMap.get(topic).punsubscribeAndClose(topic);
? ? ? ? ? ? ? ? ? ? redisPubSubMap.remove(topic);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? log.info("sessionId:" + this.sessionId + ",斷開連接:");
? ? ? ? ? ? //debug
? ? ? ? ? ? log();
? ? ? ? ? ? log.info("============================onclose-end============================");
? ? ? ? }
? ? }


? ? /**
? ? ?* @param session
? ? ?* @param error
? ? ?*/
? ? @OnError
? ? public void onError(Session session, Throwable error) {
? ? ? ? synchronized (session) {
? ? ? ? ? ? log.info("============================onError-start============================");
? ? ? ? ? ? log.error("用戶錯(cuò)誤,sessionId:" + session.getId() + ",原因:" + error.getMessage());
? ? ? ? ? ? error.printStackTrace();
? ? ? ? ? ? log.info("關(guān)閉錯(cuò)誤用戶對(duì)應(yīng)的連接");
? ? ? ? ? ? //刪除訂閱
? ? ? ? ? ? Iterator iterator = webSocketMap.keySet().iterator();
? ? ? ? ? ? while (iterator.hasNext()) {
? ? ? ? ? ? ? ? String topic = (String) iterator.next();
? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
? ? ? ? ? ? ? ? map.remove(this.sessionId);
? ? ? ? ? ? ? ? if (map.size() == 0) {//如果這個(gè)頻道沒有用戶訂閱了,則取消訂閱該redis頻道
? ? ? ? ? ? ? ? ? ? webSocketMap.remove(topic);
? ? ? ? ? ? ? ? ? ? redisPubSubMap.get(topic).unsubscribeAndClose(topic);
? ? ? ? ? ? ? ? ? ? redisPubSubMap.remove(topic);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? //刪除模糊訂閱
? ? ? ? ? ? Iterator iteratorP = pWebSocketMap.keySet().iterator();
? ? ? ? ? ? while (iteratorP.hasNext()) {
? ? ? ? ? ? ? ? String topic = (String) iteratorP.next();
? ? ? ? ? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = pWebSocketMap.get(topic);
? ? ? ? ? ? ? ? map.remove(this.sessionId);
? ? ? ? ? ? ? ? if (map.size() == 0) {//如果這個(gè)頻道沒有用戶訂閱了,則取消訂閱該redis頻道
? ? ? ? ? ? ? ? ? ? pWebSocketMap.remove(topic);
? ? ? ? ? ? ? ? ? ? redisPubSubMap.get(topic).punsubscribeAndClose(topic);
? ? ? ? ? ? ? ? ? ? redisPubSubMap.remove(topic);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? log.info("完成錯(cuò)誤用戶對(duì)應(yīng)的連接關(guān)閉");
? ? ? ? ? ? //debug
? ? ? ? ? ? log();
? ? ? ? ? ? log.info("============================onError-end============================");
? ? ? ? }
? ? }

? ? /**
? ? ?* 實(shí)現(xiàn)服務(wù)器主動(dòng)推送
? ? ?*/
? ? public void sendMessage(String message) {
? ? ? ? synchronized (session) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? this.session.getBasicRemote().sendText(message);
? ? ? ? ? ? } catch (IOException e) {
? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? }
? ? ? ? }
? ? }

? ? public static void publish(String msg, String topic) {
? ? ? ? ConcurrentHashMap<String, WebSocketServer> map = webSocketMap.get(topic);
? ? ? ? if (map != null && map.values() != null) {
? ? ? ? ? ? for (WebSocketServer webSocketServer : map.values())
? ? ? ? ? ? ? ? webSocketServer.sendMessage(msg);
? ? ? ? }
? ? ? ? map = pWebSocketMap.get(topic);
? ? ? ? if (map != null && map.values() != null) {
? ? ? ? ? ? for (WebSocketServer webSocketServer : map.values())
? ? ? ? ? ? ? ? webSocketServer.sendMessage(msg);
? ? ? ? }
? ? }

? ? private void log() {
? ? ? ? log.info("<<<<<<<<<<<完成操作后,打印訂閱信息開始>>>>>>>>>>");
? ? ? ? Iterator iterator1 = webSocketMap.keySet().iterator();
? ? ? ? while (iterator1.hasNext()) {
? ? ? ? ? ? String topic = (String) iterator1.next();
? ? ? ? ? ? log.info("topic:" + topic);
? ? ? ? ? ? Iterator iterator2 = webSocketMap.get(topic).keySet().iterator();
? ? ? ? ? ? while (iterator2.hasNext()) {
? ? ? ? ? ? ? ? String session = (String) iterator2.next();
? ? ? ? ? ? ? ? log.info("訂閱" + topic + "的sessionId:" + session);
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? log.info("<<<<<<<<<<<完成操作后,打印訂閱信息結(jié)束>>>>>>>>>>");
? ? }
}

項(xiàng)目地址

上面介紹了核心代碼,下面是完整代碼地址

https://github.com/Curtain-Wang/websocket-redis-subscribe.git

Update20220415

參考評(píng)論區(qū)老哥的建議,將redis訂閱監(jiān)聽類里面的subscribe和psubscribe方法調(diào)整如下:

? ? //訂閱
? ? @Override
? ? public void subscribe(String... channels) {
? ? ? ? boolean done = true;
? ? ? ? while (done){
? ? ? ? ? ? Jedis jedis = jedisPool.getResource();
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? jedis.subscribe(this, channels);
? ? ? ? ? ? ? ? done = false;
? ? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ? log.error(e.getMessage());
? ? ? ? ? ? ? ? if (jedis != null)
? ? ? ? ? ? ? ? ? ? jedis.close();
? ? ? ? ? ? ? ? //遇到異常后關(guān)閉連接重新訂閱
? ? ? ? ? ? ? ? log.info("監(jiān)聽遇到異常,四秒后重新訂閱頻道:");
? ? ? ? ? ? ? ? Arrays.asList(channels).forEach(s -> {log.info(s);});
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? Thread.sleep(4000);
? ? ? ? ? ? ? ? } catch (InterruptedException interruptedException) {
? ? ? ? ? ? ? ? ? ? interruptedException.printStackTrace();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? //模糊訂閱
? ? @Override
? ? public void psubscribe(String... channels) {
? ? ? ? boolean done = true;
? ? ? ? while (done){
? ? ? ? ? ? Jedis jedis = jedisPool.getResource();
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? jedis.psubscribe(this, channels);
? ? ? ? ? ? ? ? done = false;
? ? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ? log.error(e.getMessage());
? ? ? ? ? ? ? ? if (jedis != null)
? ? ? ? ? ? ? ? ? ? jedis.close();
? ? ? ? ? ? ? ? //遇到異常后關(guān)閉連接重新訂閱
? ? ? ? ? ? ? ? log.info("監(jiān)聽遇到異常,四秒后重新訂閱頻道:");
? ? ? ? ? ? ? ? Arrays.asList(channels).forEach(s -> {log.info(s);});
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? Thread.sleep(4000);
? ? ? ? ? ? ? ? } catch (InterruptedException interruptedException) {
? ? ? ? ? ? ? ? ? ? interruptedException.printStackTrace();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }

到此這篇關(guān)于websocket+redis動(dòng)態(tài)訂閱和動(dòng)態(tài)取消訂閱的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)websocket redis動(dòng)態(tài)訂閱 內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • redis基本安裝判斷、啟動(dòng)使用方法示例

    redis基本安裝判斷、啟動(dòng)使用方法示例

    這篇文章主要介紹了redis基本安裝判斷、啟動(dòng)使用方法,結(jié)合實(shí)例形式分析了Redis針對(duì)是否安裝的判斷、啟動(dòng)等使用方法,需要的朋友可以參考下
    2020-02-02
  • Redis?腳本和連接命令示例詳解

    Redis?腳本和連接命令示例詳解

    Redis腳本是一種可以實(shí)現(xiàn)復(fù)雜任務(wù)的腳本語(yǔ)言,可以用來(lái)快速履行復(fù)雜任務(wù),靈活處理數(shù)據(jù)管理和管理復(fù)雜的利用場(chǎng)景,這篇文章主要介紹了Redis?腳本和連接命令,需要的朋友可以參考下
    2023-09-09
  • React中immutable的使用

    React中immutable的使用

    這篇文章主要介紹了React中immutable的使用,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-04-04
  • 如何在SpringBoot中使用Redis實(shí)現(xiàn)分布式鎖

    如何在SpringBoot中使用Redis實(shí)現(xiàn)分布式鎖

    這篇文章主要介紹了如何在SpringBoot中使用Redis實(shí)現(xiàn)分布式鎖,在實(shí)際開發(fā)中有可能會(huì)遇到多個(gè)線程同時(shí)訪問同一個(gè)共享變量,那么上鎖就很重要了,需要的朋友可以參考下
    2023-03-03
  • Unable?to?connect?to?Redis無(wú)法連接到Redis解決的全過程

    Unable?to?connect?to?Redis無(wú)法連接到Redis解決的全過程

    這篇文章主要給大家介紹了關(guān)于Unable?to?connect?to?Redis無(wú)法連接到Redis解決的相關(guān)資料,文中通過圖文以及實(shí)例代碼將解決的過程介紹的非常詳細(xì),需要的朋友可以參考下
    2023-03-03
  • 通過redis的腳本lua如何實(shí)現(xiàn)搶紅包功能

    通過redis的腳本lua如何實(shí)現(xiàn)搶紅包功能

    這篇文章主要給大家介紹了關(guān)于通過redis的腳本lua如何實(shí)現(xiàn)搶紅包功能的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2020-05-05
  • Redis教程(六):Sorted-Sets數(shù)據(jù)類型

    Redis教程(六):Sorted-Sets數(shù)據(jù)類型

    這篇文章主要介紹了Redis教程(六):Sorted-Sets數(shù)據(jù)類型,本文講解了Sorted-Sets數(shù)據(jù)類型概述、相關(guān)命令列表、命令使用示例、應(yīng)用范圍等內(nèi)容,需要的朋友可以參考下
    2015-04-04
  • 使用Redis命令操作數(shù)據(jù)庫(kù)的常見錯(cuò)誤及解決方法

    使用Redis命令操作數(shù)據(jù)庫(kù)的常見錯(cuò)誤及解決方法

    由于Redis是內(nèi)存數(shù)據(jù)庫(kù),因此可能會(huì)存在一些安全問題,下面這篇文章主要給大家介紹了關(guān)于使用Redis命令操作數(shù)據(jù)庫(kù)的常見錯(cuò)誤及解決方法,文中通過代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2024-02-02
  • Redis 設(shè)置密碼無(wú)效問題解決

    Redis 設(shè)置密碼無(wú)效問題解決

    本文主要介紹了Redis 設(shè)置密碼無(wú)效問題解決,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-02-02
  • 大家都應(yīng)該知道的Redis過期鍵與過期策略

    大家都應(yīng)該知道的Redis過期鍵與過期策略

    這篇文章主要給大家介紹了一些應(yīng)該知道的Redis過期鍵與過期策略的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用Redis具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2019-11-11

最新評(píng)論