欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Springboot Websocket Stomp 消息訂閱推送

 更新時(shí)間:2021年07月09日 11:23:22   作者:代碼大師麥克勞瑞  
本文主要介紹了Springboot Websocket Stomp 消息訂閱推送,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧

需求背景

閑話不扯,直奔主題。需要和web前端建立長(zhǎng)鏈接,互相實(shí)時(shí)通訊,因此想到了websocket,后面隨著需求的變更,需要用戶訂閱主題,實(shí)現(xiàn)消息的精準(zhǔn)推送,發(fā)布訂閱等,則想到了STOMP(Simple Text-Orientated Messaging Protocol) 面向消息的簡(jiǎn)單文本協(xié)議。

websocket協(xié)議

想到了之前寫(xiě)的一個(gè)websocket長(zhǎng)鏈接的demo,也貼上代碼供大家參考。

pom文件
直接引入spring-boot-starter-websocket即可。

    	<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

聲明websocket endpoint

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @ClassName WebSocketConfig
 * @Author scott
 * @Date 2021/6/16
 * @Version V1.0
 **/
@Configuration
public class WebSocketConfig {

    /**
     * 注入一個(gè)ServerEndpointExporter,該Bean會(huì)自動(dòng)注冊(cè)使用@ServerEndpoint注解申明的websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}

websocket實(shí)現(xiàn)類,其中通過(guò)注解監(jiān)聽(tīng)了各種事件,實(shí)現(xiàn)了推送消息等相關(guān)邏輯

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.ruoyi.common.core.domain.AjaxResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ClassName: DataTypePushWebSocket
 * @Author: scott
 * @Date: 2021/6/16
**/
@ServerEndpoint(value = "/ws/dataType/push/{token}")
@Component
public class DataTypePushWebSocket {

    private static final Logger log = LoggerFactory.getLogger(DataTypePushWebSocket.class);

    /**
     * 記錄當(dāng)前在線連接數(shù)
     */
    private static AtomicInteger onlineCount = new AtomicInteger(0);

    private static Cache<String, Session> SESSION_CACHE = CacheBuilder.newBuilder()
            .initialCapacity(10)
            .maximumSize(300)
            .expireAfterWrite(10, TimeUnit.MINUTES)
            .build();

    /**
     * 連接建立成功調(diào)用的方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("token")String token) {
        String sessionId = session.getId();
        onlineCount.incrementAndGet(); // 在線數(shù)加1
        this.sendMessage("sessionId:" + sessionId +",已經(jīng)和server建立連接", session);
        SESSION_CACHE.put(sessionId,session);
        log.info("有新連接加入:{},當(dāng)前在線連接數(shù)為:{}", session.getId(), onlineCount.get());
    }

    /**
     * 連接關(guān)閉調(diào)用的方法
     */
    @OnClose
    public void onClose(Session session,@PathParam("token")String token) {
        onlineCount.decrementAndGet(); // 在線數(shù)減1
        SESSION_CACHE.invalidate(session.getId());
        log.info("有一連接關(guān)閉:{},當(dāng)前在線連接數(shù)為:{}", session.getId(), onlineCount.get());
    }

    /**
     * 收到客戶端消息后調(diào)用的方法
     *
     * @param message 客戶端發(fā)送過(guò)來(lái)的消息
     */
    @OnMessage
    public void onMessage(String message, Session session,@PathParam("token")String token) {
        log.info("服務(wù)端收到客戶端[{}]的消息:{}", session.getId(), message);
        this.sendMessage("服務(wù)端已收到推送消息:" + message, session);
    }

    @OnError
    public void onError(Session session, Throwable error) {
        log.error("發(fā)生錯(cuò)誤");
        error.printStackTrace();
    }

    /**
     * 服務(wù)端發(fā)送消息給客戶端
     */
    private static void sendMessage(String message, Session toSession) {
        try {
            log.info("服務(wù)端給客戶端[{}]發(fā)送消息{}", toSession.getId(), message);
            toSession.getBasicRemote().sendText(message);
        } catch (Exception e) {
            log.error("服務(wù)端發(fā)送消息給客戶端失敗:{}", e);
        }
    }

    public static AjaxResult sendMessage(String message, String sessionId){
        Session session = SESSION_CACHE.getIfPresent(sessionId);
        if(Objects.isNull(session)){
            return AjaxResult.error("token已失效");
        }
        sendMessage(message,session);
        return AjaxResult.success();
    }

    public static AjaxResult sendBroadcast(String message){
        long size = SESSION_CACHE.size();
        if(size <=0){
            return AjaxResult.error("當(dāng)前沒(méi)有在線客戶端,無(wú)法推送消息");
        }
        ConcurrentMap<String, Session> sessionConcurrentMap = SESSION_CACHE.asMap();
        Set<String> keys = sessionConcurrentMap.keySet();
        for (String key : keys) {
            Session session = SESSION_CACHE.getIfPresent(key);
            DataTypePushWebSocket.sendMessage(message,session);
        }

        return AjaxResult.success();

    }

}

至此websocket服務(wù)端代碼已經(jīng)完成。

stomp協(xié)議

前端代碼.這個(gè)是在某個(gè)vue工程中寫(xiě)的js,各位大佬自己動(dòng)手改改即可。其中Settings.wsPath是后端定義的ws地址例如ws://localhost:9003/ws

import Stomp from 'stompjs'
import Settings from '@/settings.js'

export default {
  // 是否啟用日志 默認(rèn)啟用
  debug:true,
  // 客戶端連接信息
  stompClient:{},
  // 初始化
  init(callBack){
    this.stompClient = Stomp.client(Settings.wsPath)
    this.stompClient.hasDebug = this.debug
    this.stompClient.connect({},suce =>{
      this.console("連接成功,信息如下 ↓")
      this.console(this.stompClient)
      if(callBack){
        callBack()
      }
    },err => {
      if(err) {
        this.console("連接失敗,信息如下 ↓")
        this.console(err)
      }
    })
  },
  // 訂閱
  sub(address,callBack){
    if(!this.stompClient.connected){
      this.console("沒(méi)有連接,無(wú)法訂閱")
      return
    }
    // 生成 id
    let timestamp= new Date().getTime() + address
    this.console("訂閱成功 -> "+address)
    this.stompClient.subscribe(address,message => {
      this.console(address+" 訂閱消息通知,信息如下 ↓")
      this.console(message)
      let data = message.body
      callBack(data)
    },{
      id: timestamp
    })
  },
  unSub(address){
    if(!this.stompClient.connected){
      this.console("沒(méi)有連接,無(wú)法取消訂閱 -> "+address)
      return
    }
    let id = ""
    for(let item in this.stompClient.subscriptions){
      if(item.endsWith(address)){
        id = item
        break
      }
    }
    this.stompClient.unsubscribe(id)
    this.console("取消訂閱成功 -> id:"+ id + " address:"+address)
  },
  // 斷開(kāi)連接
  disconnect(callBack){
    if(!this.stompClient.connected){
      this.console("沒(méi)有連接,無(wú)法斷開(kāi)連接")
      return
    }
    this.stompClient.disconnect(() =>{
      console.log("斷開(kāi)成功")
      if(callBack){
        callBack()
      }
    })
  },
  // 單位 秒
  reconnect(time){
    setInterval(() =>{
      if(!this.stompClient.connected){
        this.console("重新連接中...")
        this.init()
      }
    },time * 1000)
  },
  console(msg){
    if(this.debug){
      console.log(msg)
    }
  },
  // 向訂閱發(fā)送消息
  send(address,msg) {
    this.stompClient.send(address,{},msg)
  }
}

后端stomp config,里面都有注釋,寫(xiě)的很詳細(xì),并且我加入了和前端的心跳ping pong。

package com.cn.scott.config;

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

/**
 * @ClassName: WebSocketStompConfig
 * @Author: scott
 * @Date: 2021/7/8
**/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {

    private static long HEART_BEAT=10000;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //允許使用socketJs方式訪問(wèn),訪問(wèn)點(diǎn)為webSocket,允許跨域
        //在網(wǎng)頁(yè)上我們就可以通過(guò)這個(gè)鏈接
        //ws://127.0.0.1:port/ws來(lái)和服務(wù)器的WebSocket連接
        registry.addEndpoint("/ws").setAllowedOrigins("*");
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        ThreadPoolTaskScheduler te = new ThreadPoolTaskScheduler();
        te.setPoolSize(1);
        te.setThreadNamePrefix("wss-heartbeat-thread-");
        te.initialize();
        //基于內(nèi)存的STOMP消息代理來(lái)代替mq的消息代理
        //訂閱Broker名稱,/user代表點(diǎn)對(duì)點(diǎn)即發(fā)指定用戶,/topic代表發(fā)布廣播即群發(fā)
        //setHeartbeatValue 設(shè)置心跳及心跳時(shí)間
        registry.enableSimpleBroker("/user", "/topic").setHeartbeatValue(new long[]{HEART_BEAT,HEART_BEAT}).setTaskScheduler(te);
        //點(diǎn)對(duì)點(diǎn)使用的訂閱前綴,不設(shè)置的話,默認(rèn)也是/user/
        registry.setUserDestinationPrefix("/user/");
    }
}

后端stomp協(xié)議接受、訂閱等動(dòng)作通知

package com.cn.scott.ws;

import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.DestinationVariable;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.messaging.simp.annotation.SubscribeMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName StompSocketHandler
 * @Author scott
 * @Date 2021/6/30
 * @Version V1.0
 **/
@RestController
public class StompSocketHandler {

    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    /**
    * @MethodName: subscribeMapping
     * @Description: 訂閱成功通知
     * @Param: [id]
     * @Return: void
     * @Author: scott
     * @Date: 2021/6/30
    **/
    @SubscribeMapping("/user/{id}/listener")
    public void subscribeMapping(@DestinationVariable("id") final long id) {
        System.out.println(">>>>>>用戶:"+id +",已訂閱");
        SubscribeMsg param = new SubscribeMsg(id,String.format("用戶【%s】已訂閱成功", id));
        sendToUser(param);
    }


    /**
    * @MethodName: test
     * @Description: 接收訂閱topic消息
     * @Param: [id, msg]
     * @Return: void
     * @Author: scott
     * @Date: 2021/6/30
    **/
    @MessageMapping(value = "/user/{id}/listener")
    public void UserSubListener(@DestinationVariable long  id, String msg) {
        System.out.println("收到客戶端:" +id+",的消息");
        SubscribeMsg param = new SubscribeMsg(id,String.format("已收到用戶【%s】發(fā)送消息【%s】", id,msg));
        sendToUser(param);
    }
    
     @GetMapping("/refresh/{userId}")
    public void refresh(@PathVariable Long userId, String msg) {
        StompSocketHandler.SubscribeMsg param = new StompSocketHandler.SubscribeMsg(userId,String.format("服務(wù)端向用戶【%s】發(fā)送消息【%s】", userId,msg));
        sendToUser(param);
    }

    /**
    * @MethodName: sendToUser
     * @Description: 推送消息給訂閱用戶
     * @Param: [userId]
     * @Return: void
     * @Author: scott
     * @Date: 2021/6/30
    **/
    public void sendToUser(SubscribeMsg screenChangeMsg){
        //這里可以控制權(quán)限等。。。
        simpMessagingTemplate.convertAndSendToUser(screenChangeMsg.getUserId().toString(),"/listener", JSON.toJSONString(screenChangeMsg));
    }

    /**
    * @MethodName: sendBroadCast
     * @Description: 發(fā)送廣播,需要用戶事先訂閱廣播
     * @Param: [topic, msg]
     * @Return: void
     * @Author: scott
     * @Date: 2021/6/30
    **/
    public void sendBroadCast(String topic,String msg){
        simpMessagingTemplate.convertAndSend(topic,msg);
    }


    /**
     * @ClassName: SubMsg
     * @Author: scott
     * @Date: 2021/6/30
    **/
    public static class SubscribeMsg {
        private Long userId;
        private String msg;
        public SubscribeMsg(Long UserId, String msg){
            this.userId = UserId;
            this.msg = msg;
        }
        public Long getUserId() {
            return userId;
        }
        public String getMsg() {
            return msg;
        }
    }
}

連接展示

建立連接成功,這里可以看出是基于websocket協(xié)議

在這里插入圖片描述

連接信息

在這里插入圖片描述

ping pong

在這里插入圖片描述

調(diào)用接口向訂閱用戶1發(fā)送消息,http://localhost:9003/refresh/1?msg=HelloStomp,可以在客戶端控制臺(tái)查看已經(jīng)收到了消息。這個(gè)時(shí)候不同用戶通過(guò)自己的userId可以區(qū)分訂閱的主題,可以做到通過(guò)userId精準(zhǔn)的往客戶端推送消息。

在這里插入圖片描述

還記得我們?cè)诤蠖伺渲玫臅r(shí)候還指定了廣播的訂閱主題/topic,這時(shí)我們前端通過(guò)js只要訂閱了這個(gè)主題,那么后端在像這個(gè)主題推送消息時(shí),所有訂閱的客戶端都能收到,感興趣的小伙伴可以自己試試,api我都寫(xiě)好了。

在這里插入圖片描述

至此,實(shí)戰(zhàn)完畢,喜歡的小伙伴麻煩關(guān)注加點(diǎn)贊。

springboot + stomp后端源碼地址:https://gitee.com/ErGouGeSiBaKe/stomp-server

到此這篇關(guān)于Springboot Websocket Stomp 消息訂閱推送的文章就介紹到這了,更多相關(guān)Springboot Websocket Stomp 消息訂閱推送內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

最新評(píng)論