SpringBoot整合Redis實(shí)現(xiàn)消息發(fā)布與訂閱的示例代碼
當(dāng)我們在多個(gè)集群應(yīng)用中使用到本地緩存時(shí),在數(shù)據(jù)庫數(shù)據(jù)得到更新后,為保持各個(gè)副本當(dāng)前被修改的數(shù)據(jù)與數(shù)據(jù)庫數(shù)據(jù)保持同步,在數(shù)據(jù)被操作后向其他集群應(yīng)用發(fā)出被更新數(shù)據(jù)的通知,使其刪除;下次當(dāng)其他應(yīng)用請求該被更新的數(shù)據(jù)時(shí),應(yīng)用會(huì)到數(shù)據(jù)庫去取,也就是最新的數(shù)據(jù),從而使得被更新數(shù)據(jù)與數(shù)據(jù)庫保持同步!
能實(shí)現(xiàn)發(fā)送與接收信息的中間介有很多,比如:RocketMQ、RabbitMQ、ActiveMQ、Kafka等,本次主要簡單介紹Redis的推送與訂閱功能并集成Spring Boot的實(shí)現(xiàn)。
1.添加SpringBoot 集成Redis maven依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
2.配置Redis配置 RedisConfig.java
@Configuration public class RedisConfig { @Value("${redis.server.nodes}") private String redisServerNodes; @Value("${redis.server.password}") private String redisServerPassword; //Redis集群配置,單機(jī)的redis注釋掉該方法,在application配置文件里面配置就可以了 @Bean public RedisClusterConfiguration getRedisClusterConfiguration() { RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration(); String[] serverArray = redisServerNodes.split(","); Set<RedisNode> nodes = new HashSet<RedisNode>(); for (String ipPort : serverArray) { String[] ipAndPort = ipPort.split(":"); nodes.add(new RedisNode(ipAndPort[0].trim(), Integer.parseInt(ipAndPort[1]))); } redisClusterConfiguration.setClusterNodes(nodes); RedisPassword pwd = RedisPassword.of(redisServerPassword); redisClusterConfiguration.setPassword(pwd); return redisClusterConfiguration; } //指定redisTemplate類型,如下為<String, Object> @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> template = new RedisTemplate(); template.setConnectionFactory(redisConnectionFactory); // 使用JSON格式序列化對象,對緩存數(shù)據(jù)key和value進(jìn)行轉(zhuǎn)換 Jackson2JsonRedisSerializer<Object> jacksonSeial = new Jackson2JsonRedisSerializer<>(Object.class); // 解決查詢緩存轉(zhuǎn)換異常的問題 ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jacksonSeial.setObjectMapper(objectMapper); // 設(shè)置RedisTemplate模板API的序列化方式為JSON template.setDefaultSerializer(jacksonSeial); return template; } /** * Redis消息監(jiān)聽器容器 * 這個(gè)容器加載了RedisConnectionFactory和消息監(jiān)聽器 * 可添加多個(gè)不同話題的redis監(jiān)聽器,需要將消息監(jiān)聽器和消息頻道綁定, * 通過反射調(diào)用消息訂閱處理器的相關(guān)方法進(jìn)行業(yè)務(wù)處理 * * @param redisConnectionFactory 連接工廠 * @param listener Redis消息監(jiān)聽器 * @param MessageListenerAdapter Redis消息監(jiān)聽適配器 * @return RedisMessageListenerContainer 消息監(jiān)聽容器 */ @Bean @SuppressWarnings("all") public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, RedisMessageListener listener, MessageListenerAdapter adapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); // 所有的訂閱消息,都需要在這里進(jìn)行注冊綁定 // new PatternTopic(TOPIC_NAME1) 表示發(fā)布信息的頻道 // 可以添加多個(gè)頻道以及配置不同的頻道 container.addMessageListener(listener, new PatternTopic(SystemConstants.TOPIC_NAME1)); container.addMessageListener(adapter, new PatternTopic(SystemConstants.TOPIC_NAME2)); /** * 設(shè)置序列化對象 * 特別注意:1. 發(fā)布的時(shí)候和訂閱方都需要設(shè)置序列化 * 2. 設(shè)置序列化對象必須放在 {加入消息監(jiān)聽器} 這步后面,不然接收器接收不到消息 */ Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); seria.setObjectMapper(objectMapper); container.setTopicSerializer(seria); return container; } /** * 這個(gè)地方是給messageListenerAdapter 傳入一個(gè)消息接受的處理器,利用反射的方法調(diào)用“receiveMessage” * 也有好幾個(gè)重載方法,這邊默認(rèn)調(diào)用處理器的方法 叫OnMessage * * @param redisMessageReceiver 消息接收對象 * @return 消息監(jiān)聽適配器 */ @Bean public MessageListenerAdapter listenerAdapter(RedisMessageReceiver redisMessageReceiver) { MessageListenerAdapter receiveMessage = new MessageListenerAdapter(printMessageReceiver, "onMessage"); Jackson2JsonRedisSerializer<Object> seria = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL); seria.setObjectMapper(objectMapper); receiveMessage.setSerializer(seria); return receiveMessage; } }
3.Redis的訂閱主要有兩種實(shí)現(xiàn)方式
方式一:編寫Redis監(jiān)聽類RedisMessageListener,實(shí)現(xiàn)Redis的監(jiān)聽接口MessageListener,并重寫onMessage方法
方式二:編寫Redis消息監(jiān)聽適配器類,并在RedisConfig.java中配置消息監(jiān)聽適配器bean
方式一 與 方式二 主要是實(shí)現(xiàn)訂閱Redis推送的消息后的具體操作,這兩種方式可以同時(shí)使用來訂閱多個(gè)頻道里的消息
//方式一: @Slf4j @Component public class RedisMessageListener implements MessageListener { @Autowired private RedisTemplate<String, Object> redisTemplate; @Autowired private CacheManager cacheManager; @Override public void onMessage(Message message, byte[] pattern) { // 接收的topic log.info("接收消息頻道:" + new String(pattern)); //序列化對象(特別注意:發(fā)布的時(shí)候需要設(shè)置序列化;訂閱方也需要設(shè)置序列化) MessageDto<?> messageDto = (MessageDto<?>) redisTemplate.getValueSerializer().deserialize(message.getBody()); //MessageDto<T>為自己編寫的一個(gè)消息對象類(如自定義有:String data,String title,T content 等屬性) log.info("接收消息內(nèi)容:{}", messageDto); //根據(jù)消息內(nèi)容做具體業(yè)務(wù)邏輯。。。。。。。。。 //。。。。。。。。。。。。。。。。。。。。。。 } }
//方式二 @Slf4j @Component public class RedisMessageReceiver { @Autowired private RedisTemplate<String,Object> redisTemplate; /** * 方法命名規(guī)則必須為function(Object messageDto) / function(Object messageDto,String topic) * @param messageDto 消息對象 * @param topic 消息頻道名稱 */ public void onMessage(Object messageDto , String topic) { // 接收消息頻道 log.info("接收消息頻道:" + topic); //接收消息內(nèi)容 log.info("接收消息內(nèi)容:{}",messageDto); } }
4.編寫Redis消息的推送工具類,在需要推送消息的地方使用來向Redis推送消息(如:操作數(shù)據(jù)的地方)
@Slf4j @Component public class RedisMessageUtils { @Autowired private RedisTemplate<String,Object> redisTemplate; /** * 向通道發(fā)布消息 */ public void sendMessage(String topic, Object message) { if (!StringUtils.hasText(topic)) { return; } try { redisTemplate.convertAndSend(topic, message); log.info("發(fā)送消息成功,topic:{},message:{}", topic, message); } catch (Exception e) { log.info("發(fā)送消息失敗,topic:{},message:{}", topic, message); e.printStackTrace(); } } }
5.使用
@RestController @RequestMapping("/user") public class UserController{ @Autowired private UserService userService; @PostMapping("/getUsers") public List<User> queryUsers(@RequestBody UserDto userDto){ List<User> users=userService.queryUsers(userDto); //發(fā)送測試消息 RedisMessageUtils.sendMessage(SystemConstants.TOPIC_NAME2, new MessageDto()); return users; } }
成功示例:
2099-12-31 23:59:59.999 [http-nio-8888-exec-1] INFO com.xxx.yyy.util.RedisMessageUtils : 發(fā)送消息成功,topic:TOPIC2,message:MessageDto(data=null, title=null, content=null)
2099-12-31 23:59:59.999 [container-2] INFO com.xxx.yyy.zzz.RedisMessageReceiver : 接收消息頻道:TOPIC2
2099-12-31 23:59:59.999 [container-2] INFO com.xxx.yyy.zzz.RedisMessageReceiver : 接收消息內(nèi)容:MessageDto(data=null, title=null, content=null)
以上就是SpringBoot整合Redis實(shí)現(xiàn)消息發(fā)布與訂閱的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot Redis消息發(fā)布 訂閱的資料請關(guān)注腳本之家其它相關(guān)文章!
- SpringBoot集成Redis實(shí)現(xiàn)消息隊(duì)列的方法
- SpringBoot+Redis實(shí)現(xiàn)消息的發(fā)布與訂閱的示例代碼
- SpringBoot中使用Redis?Stream實(shí)現(xiàn)消息監(jiān)聽示例
- springboot整合redis之消息隊(duì)列
- SpringBoot使用Redis實(shí)現(xiàn)消息隊(duì)列的方法小結(jié)
- springboot集成redis實(shí)現(xiàn)消息的訂閱與發(fā)布
- SpringBoot集成Redisson實(shí)現(xiàn)消息隊(duì)列的示例代碼
- Springboot3+Redis實(shí)現(xiàn)消息隊(duì)列的多種方法小結(jié)
相關(guān)文章
springboot+vue前后端分離項(xiàng)目中使用jwt實(shí)現(xiàn)登錄認(rèn)證
本文介紹了如何在SpringBoot+Vue前后端分離的項(xiàng)目中使用JWT實(shí)現(xiàn)登錄認(rèn)證,內(nèi)容包括后端的響應(yīng)工具類、JWT工具類、登錄用戶實(shí)體類、登錄接口、測試接口、過濾器、啟動(dòng)類以及前端的登錄頁面實(shí)現(xiàn),感興趣的可以了解一下2024-10-10idea導(dǎo)入springboot項(xiàng)目沒有maven的解決
這篇文章主要介紹了idea導(dǎo)入springboot項(xiàng)目沒有maven的解決方案,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-04-04Spring Security OAuth2認(rèn)證授權(quán)示例詳解
這篇文章主要介紹了Spring Security OAuth2認(rèn)證授權(quán)示例詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09Java數(shù)組操作經(jīng)典例題大總結(jié)
數(shù)組是在內(nèi)存中存儲相同數(shù)據(jù)類型的連續(xù)的空間,聲明一個(gè)數(shù)組就是在內(nèi)存空間中劃出一串連續(xù)的空間,下面這篇文章主要給大家介紹了關(guān)于Java數(shù)組操作經(jīng)典例題的相關(guān)資料,需要的朋友可以參考下2022-03-03springboot 多環(huán)境配置 yml文件版的實(shí)現(xiàn)方法
這篇文章主要介紹了springboot 多環(huán)境配置 yml文件版的實(shí)現(xiàn)方法,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-06-06