springboot基于Redis發(fā)布訂閱集群下WebSocket的解決方案
一、背景
單機(jī)節(jié)點(diǎn)下,WebSocket連接成功后,可以直接發(fā)送消息。而多節(jié)點(diǎn)下,連接時(shí)通過nginx會(huì)代理到不同節(jié)點(diǎn)。
假設(shè)一開始用戶連接了node1的socket服務(wù)。觸發(fā)消息發(fā)送的條件的時(shí)候也通過nginx進(jìn)行代理,假如代理轉(zhuǎn)到了node2節(jié)點(diǎn)上,那么node2節(jié)點(diǎn)的socket服務(wù)就發(fā)送不了消息,因?yàn)橐婚_始用戶注冊(cè)的是node1節(jié)點(diǎn)。這就導(dǎo)致了消息發(fā)送失敗。
為了解決這一方案,消息發(fā)送時(shí),就需要一個(gè)中間件來記錄,這樣,三個(gè)節(jié)點(diǎn)都可以獲取消息,然后在根據(jù)條件進(jìn)行消息推送。
二、解決方案(springboot 基于 Redis發(fā)布訂閱)
1、依賴
<!-- redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- websocket --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
2、創(chuàng)建業(yè)務(wù)處理類 Demo.class,該類可以實(shí)現(xiàn)MessageListener接口后重寫onMessage方法,也可以不實(shí)現(xiàn),自己寫方法。
import com.alibaba.fastjson.JSON; import com.dy.service.impl.OrdersServiceImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; import org.springframework.stereotype.Component; import java.util.HashMap; /** * @program: * @description: redis消息訂閱-業(yè)務(wù)處理 * @author: zhang yi * @create: 2021-01-25 16:46 */ @Component public class Demo implements MessageListener { Logger logger = LoggerFactory.getLogger(this.getClass()); @Override public void onMessage(Message message, byte[] pattern) { logger.info("消息訂閱成功---------"); logger.info("內(nèi)容:"+message.getBody()); logger.info("交換機(jī):"+message.getChannel()); } }
3、創(chuàng)建PubSubConfig配置類
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.PatternTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; /** * @program: * @description: redis發(fā)布訂閱配置 * @author: zhang yi * @create: 2021-01-25 16:49 */ @Configuration @EnableCaching public class PubSubConfig { Logger logger = LoggerFactory.getLogger(this.getClass()); //如果是多個(gè)交換機(jī),則參數(shù)為(RedisConnectionFactory connectionFactory, // MessageListenerAdapter listenerAdapter, // MessageListenerAdapter listenerAdapter2) @Bean RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // 可以添加多個(gè) messageListener,配置不同的交換機(jī) container.addMessageListener(listenerAdapter, new PatternTopic("channel:demo")); //container.addMessageListener(listenerAdapter2, new PatternTopic("channel:demo2")); return container; } /** * 消息監(jiān)聽器適配器,綁定消息處理器,利用反射技術(shù)調(diào)用消息處理器的業(yè)務(wù)方法 * @param demo 第一步的業(yè)務(wù)處理類 * @return */ @Bean MessageListenerAdapter listenerAdapter(Demo demo) { logger.info("----------------消息監(jiān)聽器加載成功----------------"); // onMessage 就是方法名,基于反射調(diào)用 return new MessageListenerAdapter(demo, "onMessage"); } /** * 多個(gè)交換機(jī)就多寫一個(gè) * @param subCheckOrder * @return */ //@Bean //MessageListenerAdapter listenerAdapter2(SubCheckOrder subCheckOrder) { // logger.info("----------------消息監(jiān)聽器加載成功----------------"); // return new MessageListenerAdapter(subCheckOrder, "onMessage"); //} @Bean StringRedisTemplate template(RedisConnectionFactory connectionFactory) { return new StringRedisTemplate(connectionFactory); } }
4、消息發(fā)布
@Autowired private RedisTemplate<String, Object> redisTemplate; redisTemplate.convertAndSend("channel:demo", "我是內(nèi)容");
三、具體用法
- socket連接成功。
- socket消息推送時(shí),把信息發(fā)布到redis中。socket服務(wù)訂閱redis的消息,訂閱成功后進(jìn)行推送。集群下的socket都能訂閱到消息,但是只有之前連接成功的節(jié)點(diǎn)能推送成功,其余的無法推送。
相關(guān)文章
Java API方式調(diào)用Kafka各種協(xié)議的方法
本篇文章主要介紹了Java API方式調(diào)用Kafka各種協(xié)議的方法,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-09-09SpringCloud如何使用Eureka實(shí)現(xiàn)服務(wù)之間的傳遞數(shù)據(jù)
這篇文章主要介紹了SpringCloud使用Eureka實(shí)現(xiàn)服務(wù)之間的傳遞數(shù)據(jù)操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-06-06解決spring cloud服務(wù)啟動(dòng)之后回到命令行會(huì)自動(dòng)掛掉問題
這篇文章主要介紹了解決spring cloud服務(wù)啟動(dòng)之后回到命令行會(huì)自動(dòng)掛掉問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-09-09SpringBoot整合Echarts實(shí)現(xiàn)用戶人數(shù)和性別展示功能(詳細(xì)步驟)
這篇文章主要介紹了SpringBoot整合Echarts實(shí)現(xiàn)用戶人數(shù)和性別展示,通過數(shù)據(jù)庫設(shè)計(jì)、實(shí)現(xiàn)數(shù)據(jù)訪問層、業(yè)務(wù)邏輯層和控制層的代碼編寫,以及前端頁面的開發(fā),本文詳細(xì)地介紹了SpringBoot整合Echarts的實(shí)現(xiàn)步驟和代碼,需要的朋友可以參考下2023-05-05MyBatis自定義映射resultMap的實(shí)現(xiàn)
本文主要介紹了MyBatis自定義映射resultMap的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-03-03JDBC連接MySql數(shù)據(jù)庫步驟 以及查詢、插入、刪除、更新等
這篇文章主要介紹了JDBC連接MySql數(shù)據(jù)庫步驟,以及查詢、插入、刪除、更新等十一個(gè)處理數(shù)據(jù)庫信息的功能,需要的朋友可以參考下2018-05-05Java8 實(shí)現(xiàn)stream將對(duì)象集合list中抽取屬性集合轉(zhuǎn)化為map或list
這篇文章主要介紹了Java8 實(shí)現(xiàn)stream將對(duì)象集合list中抽取屬性集合轉(zhuǎn)化為map或list的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2021-02-02