Springboot Websocket Stomp 消息訂閱推送
需求背景
閑話不扯,直奔主題。需要和web前端建立長(zhǎng)鏈接,互相實(shí)時(shí)通訊,因此想到了websocket,后面隨著需求的變更,需要用戶訂閱主題,實(shí)現(xiàn)消息的精準(zhǔn)推送,發(fā)布訂閱等,則想到了STOMP(Simple Text-Orientated Messaging Protocol) 面向消息的簡(jiǎn)單文本協(xié)議。
websocket協(xié)議
想到了之前寫(xiě)的一個(gè)websocket長(zhǎng)鏈接的demo,也貼上代碼供大家參考。
pom文件
直接引入spring-boot-starter-websocket即可。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
聲明websocket endpoint
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /** * @ClassName WebSocketConfig * @Author scott * @Date 2021/6/16 * @Version V1.0 **/ @Configuration public class WebSocketConfig { /** * 注入一個(gè)ServerEndpointExporter,該Bean會(huì)自動(dòng)注冊(cè)使用@ServerEndpoint注解申明的websocket endpoint */ @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
websocket實(shí)現(xiàn)類,其中通過(guò)注解監(jiān)聽(tīng)了各種事件,實(shí)現(xiàn)了推送消息等相關(guān)邏輯
import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.ruoyi.common.core.domain.AjaxResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @ClassName: DataTypePushWebSocket * @Author: scott * @Date: 2021/6/16 **/ @ServerEndpoint(value = "/ws/dataType/push/{token}") @Component public class DataTypePushWebSocket { private static final Logger log = LoggerFactory.getLogger(DataTypePushWebSocket.class); /** * 記錄當(dāng)前在線連接數(shù) */ private static AtomicInteger onlineCount = new AtomicInteger(0); private static Cache<String, Session> SESSION_CACHE = CacheBuilder.newBuilder() .initialCapacity(10) .maximumSize(300) .expireAfterWrite(10, TimeUnit.MINUTES) .build(); /** * 連接建立成功調(diào)用的方法 */ @OnOpen public void onOpen(Session session, @PathParam("token")String token) { String sessionId = session.getId(); onlineCount.incrementAndGet(); // 在線數(shù)加1 this.sendMessage("sessionId:" + sessionId +",已經(jīng)和server建立連接", session); SESSION_CACHE.put(sessionId,session); log.info("有新連接加入:{},當(dāng)前在線連接數(shù)為:{}", session.getId(), onlineCount.get()); } /** * 連接關(guān)閉調(diào)用的方法 */ @OnClose public void onClose(Session session,@PathParam("token")String token) { onlineCount.decrementAndGet(); // 在線數(shù)減1 SESSION_CACHE.invalidate(session.getId()); log.info("有一連接關(guān)閉:{},當(dāng)前在線連接數(shù)為:{}", session.getId(), onlineCount.get()); } /** * 收到客戶端消息后調(diào)用的方法 * * @param message 客戶端發(fā)送過(guò)來(lái)的消息 */ @OnMessage public void onMessage(String message, Session session,@PathParam("token")String token) { log.info("服務(wù)端收到客戶端[{}]的消息:{}", session.getId(), message); this.sendMessage("服務(wù)端已收到推送消息:" + message, session); } @OnError public void onError(Session session, Throwable error) { log.error("發(fā)生錯(cuò)誤"); error.printStackTrace(); } /** * 服務(wù)端發(fā)送消息給客戶端 */ private static void sendMessage(String message, Session toSession) { try { log.info("服務(wù)端給客戶端[{}]發(fā)送消息{}", toSession.getId(), message); toSession.getBasicRemote().sendText(message); } catch (Exception e) { log.error("服務(wù)端發(fā)送消息給客戶端失敗:{}", e); } } public static AjaxResult sendMessage(String message, String sessionId){ Session session = SESSION_CACHE.getIfPresent(sessionId); if(Objects.isNull(session)){ return AjaxResult.error("token已失效"); } sendMessage(message,session); return AjaxResult.success(); } public static AjaxResult sendBroadcast(String message){ long size = SESSION_CACHE.size(); if(size <=0){ return AjaxResult.error("當(dāng)前沒(méi)有在線客戶端,無(wú)法推送消息"); } ConcurrentMap<String, Session> sessionConcurrentMap = SESSION_CACHE.asMap(); Set<String> keys = sessionConcurrentMap.keySet(); for (String key : keys) { Session session = SESSION_CACHE.getIfPresent(key); DataTypePushWebSocket.sendMessage(message,session); } return AjaxResult.success(); } }
至此websocket服務(wù)端代碼已經(jīng)完成。
stomp協(xié)議
前端代碼.這個(gè)是在某個(gè)vue工程中寫(xiě)的js,各位大佬自己動(dòng)手改改即可。其中Settings.wsPath是后端定義的ws地址例如ws://localhost:9003/ws
import Stomp from 'stompjs' import Settings from '@/settings.js' export default { // 是否啟用日志 默認(rèn)啟用 debug:true, // 客戶端連接信息 stompClient:{}, // 初始化 init(callBack){ this.stompClient = Stomp.client(Settings.wsPath) this.stompClient.hasDebug = this.debug this.stompClient.connect({},suce =>{ this.console("連接成功,信息如下 ↓") this.console(this.stompClient) if(callBack){ callBack() } },err => { if(err) { this.console("連接失敗,信息如下 ↓") this.console(err) } }) }, // 訂閱 sub(address,callBack){ if(!this.stompClient.connected){ this.console("沒(méi)有連接,無(wú)法訂閱") return } // 生成 id let timestamp= new Date().getTime() + address this.console("訂閱成功 -> "+address) this.stompClient.subscribe(address,message => { this.console(address+" 訂閱消息通知,信息如下 ↓") this.console(message) let data = message.body callBack(data) },{ id: timestamp }) }, unSub(address){ if(!this.stompClient.connected){ this.console("沒(méi)有連接,無(wú)法取消訂閱 -> "+address) return } let id = "" for(let item in this.stompClient.subscriptions){ if(item.endsWith(address)){ id = item break } } this.stompClient.unsubscribe(id) this.console("取消訂閱成功 -> id:"+ id + " address:"+address) }, // 斷開(kāi)連接 disconnect(callBack){ if(!this.stompClient.connected){ this.console("沒(méi)有連接,無(wú)法斷開(kāi)連接") return } this.stompClient.disconnect(() =>{ console.log("斷開(kāi)成功") if(callBack){ callBack() } }) }, // 單位 秒 reconnect(time){ setInterval(() =>{ if(!this.stompClient.connected){ this.console("重新連接中...") this.init() } },time * 1000) }, console(msg){ if(this.debug){ console.log(msg) } }, // 向訂閱發(fā)送消息 send(address,msg) { this.stompClient.send(address,{},msg) } }
后端stomp config,里面都有注釋,寫(xiě)的很詳細(xì),并且我加入了和前端的心跳ping pong。
package com.cn.scott.config; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; /** * @ClassName: WebSocketStompConfig * @Author: scott * @Date: 2021/7/8 **/ @Configuration @EnableWebSocketMessageBroker public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer { private static long HEART_BEAT=10000; @Override public void registerStompEndpoints(StompEndpointRegistry registry) { //允許使用socketJs方式訪問(wèn),訪問(wèn)點(diǎn)為webSocket,允許跨域 //在網(wǎng)頁(yè)上我們就可以通過(guò)這個(gè)鏈接 //ws://127.0.0.1:port/ws來(lái)和服務(wù)器的WebSocket連接 registry.addEndpoint("/ws").setAllowedOrigins("*"); } @Override public void configureMessageBroker(MessageBrokerRegistry registry) { ThreadPoolTaskScheduler te = new ThreadPoolTaskScheduler(); te.setPoolSize(1); te.setThreadNamePrefix("wss-heartbeat-thread-"); te.initialize(); //基于內(nèi)存的STOMP消息代理來(lái)代替mq的消息代理 //訂閱Broker名稱,/user代表點(diǎn)對(duì)點(diǎn)即發(fā)指定用戶,/topic代表發(fā)布廣播即群發(fā) //setHeartbeatValue 設(shè)置心跳及心跳時(shí)間 registry.enableSimpleBroker("/user", "/topic").setHeartbeatValue(new long[]{HEART_BEAT,HEART_BEAT}).setTaskScheduler(te); //點(diǎn)對(duì)點(diǎn)使用的訂閱前綴,不設(shè)置的話,默認(rèn)也是/user/ registry.setUserDestinationPrefix("/user/"); } }
后端stomp協(xié)議接受、訂閱等動(dòng)作通知
package com.cn.scott.ws; import com.alibaba.fastjson.JSON; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.DestinationVariable; import org.springframework.messaging.handler.annotation.MessageMapping; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.messaging.simp.annotation.SubscribeMapping; import org.springframework.web.bind.annotation.RestController; /** * @ClassName StompSocketHandler * @Author scott * @Date 2021/6/30 * @Version V1.0 **/ @RestController public class StompSocketHandler { @Autowired private SimpMessagingTemplate simpMessagingTemplate; /** * @MethodName: subscribeMapping * @Description: 訂閱成功通知 * @Param: [id] * @Return: void * @Author: scott * @Date: 2021/6/30 **/ @SubscribeMapping("/user/{id}/listener") public void subscribeMapping(@DestinationVariable("id") final long id) { System.out.println(">>>>>>用戶:"+id +",已訂閱"); SubscribeMsg param = new SubscribeMsg(id,String.format("用戶【%s】已訂閱成功", id)); sendToUser(param); } /** * @MethodName: test * @Description: 接收訂閱topic消息 * @Param: [id, msg] * @Return: void * @Author: scott * @Date: 2021/6/30 **/ @MessageMapping(value = "/user/{id}/listener") public void UserSubListener(@DestinationVariable long id, String msg) { System.out.println("收到客戶端:" +id+",的消息"); SubscribeMsg param = new SubscribeMsg(id,String.format("已收到用戶【%s】發(fā)送消息【%s】", id,msg)); sendToUser(param); } @GetMapping("/refresh/{userId}") public void refresh(@PathVariable Long userId, String msg) { StompSocketHandler.SubscribeMsg param = new StompSocketHandler.SubscribeMsg(userId,String.format("服務(wù)端向用戶【%s】發(fā)送消息【%s】", userId,msg)); sendToUser(param); } /** * @MethodName: sendToUser * @Description: 推送消息給訂閱用戶 * @Param: [userId] * @Return: void * @Author: scott * @Date: 2021/6/30 **/ public void sendToUser(SubscribeMsg screenChangeMsg){ //這里可以控制權(quán)限等。。。 simpMessagingTemplate.convertAndSendToUser(screenChangeMsg.getUserId().toString(),"/listener", JSON.toJSONString(screenChangeMsg)); } /** * @MethodName: sendBroadCast * @Description: 發(fā)送廣播,需要用戶事先訂閱廣播 * @Param: [topic, msg] * @Return: void * @Author: scott * @Date: 2021/6/30 **/ public void sendBroadCast(String topic,String msg){ simpMessagingTemplate.convertAndSend(topic,msg); } /** * @ClassName: SubMsg * @Author: scott * @Date: 2021/6/30 **/ public static class SubscribeMsg { private Long userId; private String msg; public SubscribeMsg(Long UserId, String msg){ this.userId = UserId; this.msg = msg; } public Long getUserId() { return userId; } public String getMsg() { return msg; } } }
連接展示
建立連接成功,這里可以看出是基于websocket協(xié)議
連接信息
ping pong
調(diào)用接口向訂閱用戶1發(fā)送消息,http://localhost:9003/refresh/1?msg=HelloStomp,可以在客戶端控制臺(tái)查看已經(jīng)收到了消息。這個(gè)時(shí)候不同用戶通過(guò)自己的userId可以區(qū)分訂閱的主題,可以做到通過(guò)userId精準(zhǔn)的往客戶端推送消息。
還記得我們?cè)诤蠖伺渲玫臅r(shí)候還指定了廣播的訂閱主題/topic,這時(shí)我們前端通過(guò)js只要訂閱了這個(gè)主題,那么后端在像這個(gè)主題推送消息時(shí),所有訂閱的客戶端都能收到,感興趣的小伙伴可以自己試試,api我都寫(xiě)好了。
至此,實(shí)戰(zhàn)完畢,喜歡的小伙伴麻煩關(guān)注加點(diǎn)贊。
springboot + stomp后端源碼地址:https://gitee.com/ErGouGeSiBaKe/stomp-server
到此這篇關(guān)于Springboot Websocket Stomp 消息訂閱推送的文章就介紹到這了,更多相關(guān)Springboot Websocket Stomp 消息訂閱推送內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java實(shí)現(xiàn)從網(wǎng)上下載圖片到本地的方法
這篇文章主要介紹了java實(shí)現(xiàn)從網(wǎng)上下載圖片到本地的方法,涉及java針對(duì)文件操作的相關(guān)技巧,非常簡(jiǎn)單實(shí)用,需要的朋友可以參考下2015-07-07Java SpringBoot實(shí)現(xiàn)帶界面的代碼生成器詳解
這篇文章主要介紹了Java SpringBoot如何實(shí)現(xiàn)帶界面的代碼生成器,幫助大家更好的理解和使用Java SpringBoot編程語(yǔ)言,感興趣的朋友可以了解下2021-09-09hadoop?詳解如何實(shí)現(xiàn)數(shù)據(jù)排序
在很多業(yè)務(wù)場(chǎng)景下,需要對(duì)原始的數(shù)據(jù)讀取分析后,將輸出的結(jié)果按照指定的業(yè)務(wù)字段進(jìn)行排序輸出,方便上層應(yīng)用對(duì)結(jié)果數(shù)據(jù)進(jìn)行展示或使用,減少二次排序的成本2022-02-02淺析Bean?Searcher?與?MyBatis?Plus?區(qū)別介紹
Bean?Searcher號(hào)稱任何復(fù)雜的查詢都可以一行代碼搞定,但?Mybatis?Plus?似乎也有類似的動(dòng)態(tài)查詢功能,最近火起的?Bean?Searcher?與?MyBatis?Plus?倒底有啥區(qū)別?帶著這個(gè)問(wèn)題一起通過(guò)本文學(xué)習(xí)下吧2022-05-05Java基礎(chǔ)詳解之集合框架工具Collections
這篇文章主要介紹了Java基礎(chǔ)詳解之集合框架工具Collections,文中有非常詳細(xì)的代碼示例,對(duì)正在學(xué)習(xí)java的小伙伴們有很好地幫助,需要的朋友可以參考下2021-04-04Java實(shí)現(xiàn)將html字符串插入到PPT幻燈片
Java后端代碼操作PPT幻燈片時(shí),可直接在幻燈片中繪制形狀,并在形狀中添加文本字符串內(nèi)容。本篇文章主要介紹通過(guò)java實(shí)現(xiàn)將html字符串添加到PPT幻燈片的的方法,可添加文字、圖片、視頻、音頻等。以下是具體方法和步驟。2021-11-11解決Lombok使用@Builder無(wú)法build父類屬性的問(wèn)題
這篇文章主要介紹了解決Lombok使用@Builder無(wú)法build父類屬性的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-09-09Eclipse連接Mysql數(shù)據(jù)庫(kù)操作總結(jié)
這篇文章主要介紹了Eclipse連接Mysql數(shù)據(jù)庫(kù)操作總結(jié)的相關(guān)資料,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下2016-08-08EasyUi+Spring Data 實(shí)現(xiàn)按條件分頁(yè)查詢的實(shí)例代碼
這篇文章主要介紹了EasyUi+Spring Data 實(shí)現(xiàn)按條件分頁(yè)查詢的實(shí)例代碼,非常具有實(shí)用價(jià)值,需要的朋友可以參考下2017-07-07Spring 源碼解析CommonAnnotationBeanPostProcessor
這篇文章主要為大家介紹了Spring 源碼解析CommonAnnotationBeanPostProcessor示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10