欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

redis做websocket分布式消息推送服務的實現(xiàn)

 更新時間:2024年12月16日 09:21:25   作者:青龍老賊  
本文介紹了使用Redis作為消息隊列實現(xiàn)WebSocket分布式消息推送服務的方案,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧

應用場景說明:

由于redis并非專業(yè)的MQ中間件,消息的防丟失策略并不完整,存在丟失消息的可能。該方案為在再pc web管理平臺的右下角彈出,顯示新接收到的消息數(shù),哪怕沒有收到這個通知,也可以自己在消息中心看看。所以對可靠性要求不高。如果業(yè)務場景要求可靠性高,還是請使用專業(yè)的MQ中間件。該方案已在多個實際項目中運行。

流程架構(gòu):

websocket實現(xiàn)同一賬戶多點登錄、websocket服務多節(jié)點部署推送方案。

簡單架構(gòu)圖

假設用戶A在兩個地方登錄,連接到兩個websocketServer服務節(jié)點1和2,用戶B連接到2節(jié)點。

websocketServer將websocket session保存在各自的Map<String,Session>中,key為userid,value為websocket Session。節(jié)點1保存了用戶A的websocket session,節(jié)點2保存了用戶A、B的websocket session。

消息生產(chǎn)者發(fā)布消息的時候為json格式,如:[{"receive"="userid_a","msg"="您有1個未讀消息"},{"receive"="userid_b","msg"="您有3個未讀消息"}],將消息發(fā)到redis的一個Channel,如showNewestMsg。

websocketServer中訂閱redis的channel=showNewestMsg,收到消息后根據(jù)消息中receive沖map中找到對應的websocket session,發(fā)消息給客戶端。

核心代碼:

1.該項目為springboot項目,先引入jar包,由于是從實際項目中抽出來寫的記錄,可能還缺jar請自行導入。

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!--websocket-->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

<!-- redis -->
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- 工具類 -->
<dependency>
	<groupId>cn.hutool</groupId>
	<artifactId>hutool-all</artifactId>
	<version>5.3.6</version>
</dependency>

<dependency>
	<groupId>net.sf.json-lib</groupId>
	<artifactId>json-lib</artifactId>
	<version>2.4</version>
	<classifier>jdk15</classifier>
</dependency>

2.websocket配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * spring websocket組件初始化
 * @author csf
 * 
 */
//war包啟動tomcat7及以下版本要關閉@Configuration注解,否則將無法啟動websocket服務
@Configuration
public class WebSocketConfig
{
    @Bean
    public ServerEndpointExporter serverEndpointExporter()
    {
        return new ServerEndpointExporter();
    }
}

注意:war包啟動tomcat7及以下版本要關閉@Configuration注解,否則將無法啟動websocket服務。

3.websocket服務端實現(xiàn)

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import com.kingengine.plug.service.MessageService;

import cn.hutool.core.util.StrUtil;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;

/**
 * WebSocket服務類
 * @author csf
 * @date 2020年8月10日
 */
@ServerEndpoint("/websocket/{custId}")
@Component
public class WebSocketServer
{
    @Resource
    private MessageService messageService;
    
    Logger log = LoggerFactory.getLogger(this.getClass());
    
    // 當前在線連接數(shù)
    private static int onlineCount = 0;
    
    // 存放每個用戶對應的WebSocket連接對象,key為custId_HHmmss,確保一個登錄用戶只建立一個連接
    private static Map<String, Session> webSocketSessionMap = new ConcurrentHashMap<String, Session>();
    
    // 與某個客戶端的連接會話,需要通過它來給客戶端發(fā)送數(shù)據(jù)
    private Session session;
    
    // 接收用戶id
    private String custId = "";
    
    private static WebSocketServer webSocketServer;
    
    // 通過@PostConstruct實現(xiàn)初始化bean之前進行的操作
    @PostConstruct
    public void init()
    {
        // 初使化時將已靜態(tài)化的webSocketServer實例化
        webSocketServer = this;
        webSocketServer.messageService = this.messageService;
    }
    
    /**
     * 連接建立成功調(diào)用的方法
     * @param session 連接會話,由框架創(chuàng)建
     * @param custId 用戶id, 為處理用戶多點登錄都能收到消息,需傳該格式custId_HHmmss
     * @author csf
     * @date 2020年8月10日
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("custId") String custId)
    {
        if (!webSocketSessionMap.containsKey(custId))
        {
            this.session = session;
            webSocketSessionMap.put(custId, session);
            addOnlineCount(); // 在線數(shù)加1
            log.info("有新連接[{}]接入,當前websocket連接數(shù)為:{}", custId, getOnlineCount());
        }
        
        this.custId = custId;
        try
        {
            // 第一次建立連接,推送消息給客戶端,只會執(zhí)行一次。后續(xù)的新消息由com.kingengine.plug.redis.RedisReceiver接收到redis訂閱消息推送
            // 獲取未讀消息數(shù)
            // 由于前端傳進來的custId是有時間后綴的,查詢時需要去掉后綴。
            String qryCustId = custId.split("_")[0];
            JSONObject unreadMsg = webSocketServer.messageService.getUnreadCount(qryCustId);
            
            // 獲取最新消息
            /*  JSONObject newMsg = webSocketServer.messageService.getNewestMsg(qryCustId);
            // 發(fā)送消息
            JSONArray msgArr = new JSONArray();
            if (newMsg!=null)
            {
                msgArr.add(newMsg);
            }*/
            JSONArray msgArr = new JSONArray();
            msgArr.add(unreadMsg);
            sendMessage(custId, msgArr.toString());
        }
        catch (Exception e)
        {
            log.error("客戶端連接websocket服務異常");
            e.printStackTrace();
        }
    }
    
    /**
     * 連接關閉調(diào)用的方法
     */
    @OnClose
    public void onClose(@PathParam("custId") String sessionKey)
    {
        if (webSocketSessionMap.containsKey(sessionKey))
        {
            try
            {
                webSocketSessionMap.get(sessionKey).close();
                webSocketSessionMap.remove(sessionKey);
            }
            catch (IOException e)
            {
                log.error("連接[{}]關閉失敗。", sessionKey);
                e.printStackTrace();
            }
            subOnlineCount();
            log.info("連接[{}]關閉,當前websocket連接數(shù):{}", sessionKey, onlineCount);
        }
    }
    
    /**
     * 接收客戶端發(fā)送的消息
     * @param message 客戶端發(fā)送過來的消息
     * @param session websocket會話
     */
    @OnMessage
    public void onMessage(String message, Session session)
    {
        log.info("收到來自客戶端" + custId + "的信息:" + message);
    }
    
    /**
     * 連接錯誤時觸發(fā)
     * @param session
     * @param error
     */
    @OnError
    public void onError(Session session, Throwable error)
    {
        try
        {
            session.close();
        }
        catch (IOException e)
        {
            log.error("發(fā)生錯誤,連接[{}]關閉失敗。");
            e.printStackTrace();
        }
        // log.error("websocket發(fā)生錯誤");
        // error.printStackTrace();
    }
    
    /**
     * 給指定的客戶端推送消息,可單發(fā)和群發(fā)
     * @param sessionKeys 發(fā)送消息給目標客戶端sessionKey,多個逗號“,”隔開1234,2345...
     * @param message
     * @throws IOException
     * @author csf
     * @date 2020年8月11日
     */
    public void sendMessage(String sessionKeys, String message)
    {
        if (StrUtil.isNotBlank(sessionKeys))
        {
            String[] sessionKeyArr = sessionKeys.split(",");
            for (String key : sessionKeyArr)
            {
                try
                {
                    // 可能存在一個賬號多點登錄
                    List<Session> sessionList = getLikeByMap(webSocketSessionMap, key);
                    for (Session session : sessionList)
                    {
                        session.getBasicRemote().sendText(message);
                    }
                }
                catch (IOException e)
                {
                    e.printStackTrace();
                    continue;// 某個客戶端發(fā)送異常,不影響其他客戶端發(fā)送
                }
            }
        }
        else
        {
            log.info("sessionKeys為空,沒有目標客戶端");
        }
    }
    
    /**
     * 給當前客戶端推送消息,首次建立連接時調(diào)用
     */
    public void sendMessage(String message)
        throws IOException
    {
        this.session.getBasicRemote().sendText(message);
    }
    
    /**
     * 檢查webSocket連接是否在線
     * @param sesstionKey webSocketMap中維護的key
     * @return 是否在線
     */
    public static boolean checkOnline(String sesstionKey)
    {
        if (webSocketSessionMap.containsKey(sesstionKey))
        {
            return true;
        }
        else
        {
            return false;
        }
    }
    
    /**
     * 獲取包含key的所有map值
     * @param map
     * @param keyLike
     * @return
     * @author csf
     * @date 2020年8月13日
     */
    private List<Session> getLikeByMap(Map<String, Session> map, String keyLike)
    {
        List<Session> list = new ArrayList<>();
        for (String key : map.keySet())
        {
            if (key.contains(keyLike))
            {
                list.add(map.get(key));
            }
        }
        return list;
    }
    
    public static synchronized int getOnlineCount()
    {
        return onlineCount;
    }
    
    public static synchronized void addOnlineCount()
    {
        WebSocketServer.onlineCount++;
    }
    
    public static synchronized void subOnlineCount()
    {
        WebSocketServer.onlineCount--;
    }
}

4.redis消息訂閱配置

import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

@Configuration
@EnableCaching
public class RedisCacheConfig
{
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter)
    {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 可以添加多個 messageListener,配置不同的交換機
        container.addMessageListener(listenerAdapter, new PatternTopic("showNewestMsg"));// 訂閱最新消息頻道
        return container;
    }
    
    @Bean
    MessageListenerAdapter listenerAdapter(RedisReceiver receiver)
    {
        // 消息監(jiān)聽適配器
        return new MessageListenerAdapter(receiver, "onMessage");
    }
    
    @Bean
    StringRedisTemplate template(RedisConnectionFactory connectionFactory)
    {
        return new StringRedisTemplate(connectionFactory);
    }
}

5.redis配置,直接放在springboot項目application.properties或application.yml中

# 數(shù)據(jù)庫索引(默認為0)
spring.redis.database=0  
spring.redis.host=192.168.1.100
spring.redis.port=6379
spring.redis.password=123456
# 連接池最大連接數(shù)(使用負值表示沒有限制)
spring.redis.pool.max-active=8  
# 連接池最大阻塞等待時間(使用負值表示沒有限制)
spring.redis.pool.max-wait=-1  
# 連接池中的最大空閑連接
spring.redis.pool.max-idle=8  
# 連接池中的最小空閑連接
spring.redis.pool.min-idle=0  
# 連接超時時間(毫秒)
spring.redis.timeout=5000

6.接收消息生產(chǎn)者發(fā)布的消息,推送給對應的客戶端

import java.io.UnsupportedEncodingException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;

import com.kingengine.plug.websocket.WebSocketServer;

import cn.hutool.core.codec.Base64;
import cn.hutool.core.util.StrUtil;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;

/**
 * 消息監(jiān)聽對象,接收訂閱消息
 * @author csf
 * @date 2020年8月13日
 */
@Component
public class RedisReceiver implements MessageListener
{
    Logger log = LoggerFactory.getLogger(this.getClass());
    
    @Autowired
    WebSocketServer webSocketServer;
    
    /**
     * 處理接收到的訂閱消息
     */
    @Override
    public void onMessage(Message message, byte[] pattern)
    {
        String channel = new String(message.getChannel());// 訂閱的頻道名稱
        String msg = "";
        try
        {
            msg = new String(message.getBody(), "GBK");//注意與發(fā)布消息編碼一致,否則會亂碼
            if (StrUtil.isNotBlank(msg)){
                if ("showNewestMsg".endsWith(channel))// 最新消息
                {
                    JSONObject json = JSONObject.fromObject(msg);
                    webSocketServer.sendMessage(json.get("receive"),json.get("msg"));
                }else{
                    //TODO 其他訂閱的消息處理
                }
               
            }else{
                log.info("消息內(nèi)容為空,不處理。");
            }
        }
        catch (Exception e)
        {
            log.error("處理消息異常:"+e.toString())
            e.printStackTrace();
        }
    }
}

7.消息發(fā)布測試

import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import net.sf.json.JSONObject;

@RequestMapping("redis")
@RestController
public class RedisTestController
{
    @Autowired
    StringRedisTemplate template;
    
    /**
     * 發(fā)布消息測試
     *@param userid
     * @param msg
     * @return
     */
    @PostMapping("sendMessage")
    public String sendMessage(String userid,String msg)
    {
        try
        {
            String newMessge=new String(msg.getBytes("GBK"),"GBK");
            Map<String,String> map = new HashMap<String, String>();
            map.put("receive", userid);
            map.put("msg", newMessge);
            template.convertAndSend("showNewestMsg",         
          JSONObject.fromObject(map).toString());
        }
        catch (UnsupportedEncodingException e)
        {
            e.printStackTrace();
        }
        return "消息發(fā)布成功!";
    }
}

8.客戶端代碼

<!DOCTYPE html>

<html>

<head>
    <title>WebSocket測試</title>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
</head>

<body>
<div>
    來自服務端消息:
    <p id="message"></p>
</div>
</body>


<script src="http://apps.bdimg.com/libs/jquery/1.6.4/jquery.min.js"></script>

<script>
    let webSocketClient;
    if (window.WebSocket)
    {
       let custid="132456_" + Math.random();//該參數(shù)會作為websocketServer中存儲session的key,要保證唯一。
        webSocketClient = new WebSocket("ws://127.0.0.1:8082/bootapp/websocket/" + custid);

        //連通之后的回調(diào)事件
        webSocketClient.onopen = function () {
            webSocketClient.send("這里是地球,收到請回答。。。");
            //	webSocket.send('{"type":"1","data":"121"}');
        };

        //接收后臺服務端的消息
        webSocketClient.onmessage = function (evt) {
            console.log("數(shù)據(jù)已接收:" + evt.data);
            showMessage("未讀消息:" + evt.data);
        };

        //連接關閉的回調(diào)事件
        webSocketClient.onclose = function () {
            alert("連接已關閉...");
        };

    }else{
        alert("瀏覽器不支持websocket");
    }

    function showMessage(message) {
        $("#message").html(message);
    }
</script>
</html>

核心代碼至此。

到此這篇關于redis做websocket分布式消息推送服務的實現(xiàn)的文章就介紹到這了,更多相關redis websocket分布式消息推送服務內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • 解讀緩存db redis local的取舍之道

    解讀緩存db redis local的取舍之道

    這篇文章主要介紹了解讀緩存db redis local的取舍之道,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-05-05
  • Redis字符串類型的常用命令小結(jié)

    Redis字符串類型的常用命令小結(jié)

    這篇文章給大家整理了在操作Redis字符串類型中的常用命令,文章總結(jié)的很全面,對大家學習Redis具有一定的參考借鑒價值,下面來一起看看吧。
    2016-09-09
  • redis啟動失敗問題之完美解決方案

    redis啟動失敗問題之完美解決方案

    這篇文章主要介紹了redis啟動失敗問題之完美解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-09-09
  • Redis集群方案

    Redis集群方案

    前段時間搞了搞Redis集群,想用做推薦系統(tǒng)的線上存儲,說來挺有趣,這邊基礎架構(gòu)不太完善,因此需要我們做推薦系統(tǒng)的自己來搭這個存儲環(huán)境,就自己折騰了折騰
    2020-07-07
  • Satoken+Redis實現(xiàn)短信登錄、注冊、鑒權功能

    Satoken+Redis實現(xiàn)短信登錄、注冊、鑒權功能

    這篇文章主要介紹了Satoken+Redis實現(xiàn)短信登錄、注冊、鑒權功能,本文通過實例代碼給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2024-01-01
  • Redis 單機安裝和哨兵模式集群安裝的實現(xiàn)

    Redis 單機安裝和哨兵模式集群安裝的實現(xiàn)

    本文主要介紹了Redis 單機安裝和哨兵模式集群安裝的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2022-07-07
  • redis命令行查看中文不亂碼的方法(十六進制字符串處理)

    redis命令行查看中文不亂碼的方法(十六進制字符串處理)

    這篇文章主要給大家介紹了關于redis命令行查看中文不亂碼的方法,其中詳細介紹了十六進制字符串處理的相關資料,文中給出了詳細的示例代碼,供大家參考學習,下面隨著小編來一起學習學習吧。
    2017-10-10
  • 使用Redis實現(xiàn)向量相似度搜索

    使用Redis實現(xiàn)向量相似度搜索

    在自然語言處理領域,有一個常見且重要的任務就是文本相似度搜索,所以本文為大家介紹一下如何利用Redis實現(xiàn)向量相似度搜索,解決文本、圖像和音頻之間的相似度匹配問題,需要的可以了解下
    2023-07-07
  • Redis鍵值設計的具體實現(xiàn)

    Redis鍵值設計的具體實現(xiàn)

    本文主要介紹了Redis鍵值設計的具體實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2024-06-06
  • 使用SpringBoot集成redis的方法

    使用SpringBoot集成redis的方法

    這篇文章主要介紹了SpringBoot集成redis的方法,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-03-03

最新評論