SpringBoot整合Redis實現(xiàn)消息發(fā)布與訂閱的示例代碼
當(dāng)我們在多個集群應(yīng)用中使用到本地緩存時,在數(shù)據(jù)庫數(shù)據(jù)得到更新后,為保持各個副本當(dāng)前被修改的數(shù)據(jù)與數(shù)據(jù)庫數(shù)據(jù)保持同步,在數(shù)據(jù)被操作后向其他集群應(yīng)用發(fā)出被更新數(shù)據(jù)的通知,使其刪除;下次當(dāng)其他應(yīng)用請求該被更新的數(shù)據(jù)時,應(yīng)用會到數(shù)據(jù)庫去取,也就是最新的數(shù)據(jù),從而使得被更新數(shù)據(jù)與數(shù)據(jù)庫保持同步!
能實現(xiàn)發(fā)送與接收信息的中間介有很多,比如:RocketMQ、RabbitMQ、ActiveMQ、Kafka等,本次主要簡單介紹Redis的推送與訂閱功能并集成Spring Boot的實現(xiàn)。
1.添加SpringBoot 集成Redis maven依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2.配置Redis配置 RedisConfig.java
@Configuration
public class RedisConfig {
@Value("${redis.server.nodes}")
private String redisServerNodes;
@Value("${redis.server.password}")
private String redisServerPassword;
//Redis集群配置,單機(jī)的redis注釋掉該方法,在application配置文件里面配置就可以了
@Bean
public RedisClusterConfiguration getRedisClusterConfiguration() {
RedisClusterConfiguration redisClusterConfiguration = new RedisClusterConfiguration();
String[] serverArray = redisServerNodes.split(",");
Set<RedisNode> nodes = new HashSet<RedisNode>();
for (String ipPort : serverArray) {
String[] ipAndPort = ipPort.split(":");
nodes.add(new RedisNode(ipAndPort[0].trim(), Integer.parseInt(ipAndPort[1])));
}
redisClusterConfiguration.setClusterNodes(nodes);
RedisPassword pwd = RedisPassword.of(redisServerPassword);
redisClusterConfiguration.setPassword(pwd);
return redisClusterConfiguration;
}
//指定redisTemplate類型,如下為<String, Object>
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
// 使用JSON格式序列化對象,對緩存數(shù)據(jù)key和value進(jìn)行轉(zhuǎn)換
Jackson2JsonRedisSerializer<Object> jacksonSeial = new Jackson2JsonRedisSerializer<>(Object.class);
// 解決查詢緩存轉(zhuǎn)換異常的問題
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jacksonSeial.setObjectMapper(objectMapper);
// 設(shè)置RedisTemplate模板API的序列化方式為JSON
template.setDefaultSerializer(jacksonSeial);
return template;
}
/**
* Redis消息監(jiān)聽器容器
* 這個容器加載了RedisConnectionFactory和消息監(jiān)聽器
* 可添加多個不同話題的redis監(jiān)聽器,需要將消息監(jiān)聽器和消息頻道綁定,
* 通過反射調(diào)用消息訂閱處理器的相關(guān)方法進(jìn)行業(yè)務(wù)處理
*
* @param redisConnectionFactory 連接工廠
* @param listener Redis消息監(jiān)聽器
* @param MessageListenerAdapter Redis消息監(jiān)聽適配器
* @return RedisMessageListenerContainer 消息監(jiān)聽容器
*/
@Bean
@SuppressWarnings("all")
public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory,
RedisMessageListener listener,
MessageListenerAdapter adapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
// 所有的訂閱消息,都需要在這里進(jìn)行注冊綁定
// new PatternTopic(TOPIC_NAME1) 表示發(fā)布信息的頻道
// 可以添加多個頻道以及配置不同的頻道
container.addMessageListener(listener, new PatternTopic(SystemConstants.TOPIC_NAME1));
container.addMessageListener(adapter, new PatternTopic(SystemConstants.TOPIC_NAME2));
/**
* 設(shè)置序列化對象
* 特別注意:1. 發(fā)布的時候和訂閱方都需要設(shè)置序列化
* 2. 設(shè)置序列化對象必須放在 {加入消息監(jiān)聽器} 這步后面,不然接收器接收不到消息
*/
Jackson2JsonRedisSerializer seria = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);
container.setTopicSerializer(seria);
return container;
}
/**
* 這個地方是給messageListenerAdapter 傳入一個消息接受的處理器,利用反射的方法調(diào)用“receiveMessage”
* 也有好幾個重載方法,這邊默認(rèn)調(diào)用處理器的方法 叫OnMessage
*
* @param redisMessageReceiver 消息接收對象
* @return 消息監(jiān)聽適配器
*/
@Bean
public MessageListenerAdapter listenerAdapter(RedisMessageReceiver redisMessageReceiver) {
MessageListenerAdapter receiveMessage = new MessageListenerAdapter(printMessageReceiver, "onMessage");
Jackson2JsonRedisSerializer<Object> seria = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL);
seria.setObjectMapper(objectMapper);
receiveMessage.setSerializer(seria);
return receiveMessage;
}
}3.Redis的訂閱主要有兩種實現(xiàn)方式
方式一:編寫Redis監(jiān)聽類RedisMessageListener,實現(xiàn)Redis的監(jiān)聽接口MessageListener,并重寫onMessage方法
方式二:編寫Redis消息監(jiān)聽適配器類,并在RedisConfig.java中配置消息監(jiān)聽適配器bean
方式一 與 方式二 主要是實現(xiàn)訂閱Redis推送的消息后的具體操作,這兩種方式可以同時使用來訂閱多個頻道里的消息
//方式一:
@Slf4j
@Component
public class RedisMessageListener implements MessageListener {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Autowired
private CacheManager cacheManager;
@Override
public void onMessage(Message message, byte[] pattern) {
// 接收的topic
log.info("接收消息頻道:" + new String(pattern));
//序列化對象(特別注意:發(fā)布的時候需要設(shè)置序列化;訂閱方也需要設(shè)置序列化)
MessageDto<?> messageDto = (MessageDto<?>) redisTemplate.getValueSerializer().deserialize(message.getBody());
//MessageDto<T>為自己編寫的一個消息對象類(如自定義有:String data,String title,T content 等屬性)
log.info("接收消息內(nèi)容:{}", messageDto);
//根據(jù)消息內(nèi)容做具體業(yè)務(wù)邏輯。。。。。。。。。
//。。。。。。。。。。。。。。。。。。。。。。
}
}//方式二
@Slf4j
@Component
public class RedisMessageReceiver {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
/**
* 方法命名規(guī)則必須為function(Object messageDto) / function(Object messageDto,String topic)
* @param messageDto 消息對象
* @param topic 消息頻道名稱
*/
public void onMessage(Object messageDto , String topic) {
// 接收消息頻道
log.info("接收消息頻道:" + topic);
//接收消息內(nèi)容
log.info("接收消息內(nèi)容:{}",messageDto);
}
}4.編寫Redis消息的推送工具類,在需要推送消息的地方使用來向Redis推送消息(如:操作數(shù)據(jù)的地方)
@Slf4j
@Component
public class RedisMessageUtils {
@Autowired
private RedisTemplate<String,Object> redisTemplate;
/**
* 向通道發(fā)布消息
*/
public void sendMessage(String topic, Object message) {
if (!StringUtils.hasText(topic)) {
return;
}
try {
redisTemplate.convertAndSend(topic, message);
log.info("發(fā)送消息成功,topic:{},message:{}", topic, message);
} catch (Exception e) {
log.info("發(fā)送消息失敗,topic:{},message:{}", topic, message);
e.printStackTrace();
}
}
}5.使用
@RestController
@RequestMapping("/user")
public class UserController{
@Autowired
private UserService userService;
@PostMapping("/getUsers")
public List<User> queryUsers(@RequestBody UserDto userDto){
List<User> users=userService.queryUsers(userDto);
//發(fā)送測試消息
RedisMessageUtils.sendMessage(SystemConstants.TOPIC_NAME2, new MessageDto());
return users;
}
}成功示例:
2099-12-31 23:59:59.999 [http-nio-8888-exec-1] INFO com.xxx.yyy.util.RedisMessageUtils : 發(fā)送消息成功,topic:TOPIC2,message:MessageDto(data=null, title=null, content=null)
2099-12-31 23:59:59.999 [container-2] INFO com.xxx.yyy.zzz.RedisMessageReceiver : 接收消息頻道:TOPIC2
2099-12-31 23:59:59.999 [container-2] INFO com.xxx.yyy.zzz.RedisMessageReceiver : 接收消息內(nèi)容:MessageDto(data=null, title=null, content=null)
以上就是SpringBoot整合Redis實現(xiàn)消息發(fā)布與訂閱的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot Redis消息發(fā)布 訂閱的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
springboot+vue前后端分離項目中使用jwt實現(xiàn)登錄認(rèn)證
本文介紹了如何在SpringBoot+Vue前后端分離的項目中使用JWT實現(xiàn)登錄認(rèn)證,內(nèi)容包括后端的響應(yīng)工具類、JWT工具類、登錄用戶實體類、登錄接口、測試接口、過濾器、啟動類以及前端的登錄頁面實現(xiàn),感興趣的可以了解一下2024-10-10
idea導(dǎo)入springboot項目沒有maven的解決
這篇文章主要介紹了idea導(dǎo)入springboot項目沒有maven的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-04-04
Spring Security OAuth2認(rèn)證授權(quán)示例詳解
這篇文章主要介紹了Spring Security OAuth2認(rèn)證授權(quán)示例詳解,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09
Java數(shù)組操作經(jīng)典例題大總結(jié)
數(shù)組是在內(nèi)存中存儲相同數(shù)據(jù)類型的連續(xù)的空間,聲明一個數(shù)組就是在內(nèi)存空間中劃出一串連續(xù)的空間,下面這篇文章主要給大家介紹了關(guān)于Java數(shù)組操作經(jīng)典例題的相關(guān)資料,需要的朋友可以參考下2022-03-03
springboot 多環(huán)境配置 yml文件版的實現(xiàn)方法
這篇文章主要介紹了springboot 多環(huán)境配置 yml文件版的實現(xiàn)方法,本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-06-06

