SpringBoot?Redis?發(fā)布訂閱模式(Pub/Sub)的具體使用
作者:實(shí)習(xí)小生
注意:redis的發(fā)布訂閱模式不可以將消息進(jìn)行持久化,訂閱者發(fā)生網(wǎng)絡(luò)斷開、宕機(jī)等可能導(dǎo)致錯(cuò)過消息。
Redis命令行下使用發(fā)布訂閱
publish 發(fā)布
發(fā)布者通過以下命令可以往指定channel發(fā)布message
redis> publish channel message
subscribe 訂閱
訂閱者通過以下命令可以訂閱一個(gè)或多個(gè)頻道,如果頻道不存在則會(huì)創(chuàng)建
redis> subscribe channel [channel ...]
對(duì)于redis的發(fā)布訂閱的命令就這么簡(jiǎn)單。那么接下來(lái)我們?cè)趕pringboot中如何使用發(fā)布訂閱的功能呢?
SpringBoot中使用Redis的發(fā)布訂閱功能
添加依賴配置redis信息和連接池什么的就不說了,如果添加的有commons-pool2依賴的話,會(huì)自動(dòng)幫我們配置redis連接池的
發(fā)布者
相對(duì)于訂閱者來(lái)說,發(fā)布者的實(shí)現(xiàn)方式很簡(jiǎn)單,以下方式就可以往channel中發(fā)送message了。
@Resource
private RedisTemplate<String, Object> redisTemplate;
public void publish(){
// 使用高級(jí)的redisTemplate
redisTemplate.convertAndSend("channel","message");
// 使用低級(jí)的connection 實(shí)際上redisTemplate的底層就是使用的下面的方式
redisTemplate.execute(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
connection.publish("channel".getBytes(StandardCharsets.UTF_8), "message".getBytes(StandardCharsets.UTF_8));
return null;
}
}, true);
// true這個(gè)參數(shù)意思是 是否將redis連接暴露給回調(diào)代碼,大多數(shù)情況下設(shè)置true就可以了,往后深入的話可以看到
RedisConnection connToExpose = (exposeConnection ? connToUse : createRedisConnectionProxy(connToUse)); 如果為false的話會(huì)創(chuàng)建redis連接的代理
}
訂閱者
訂閱者因?yàn)樯婕暗竭B接、線程等 所以內(nèi)容相對(duì)會(huì)多一點(diǎn)
@Resource
private RedisTemplate<String, Object> redisTemplate;
public void subscribe() {
redisTemplate.execute(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
// 我定義了一個(gè)全局的 ConcurrentHashMap 用來(lái)存放連接 因?yàn)楹竺娴娜∠嗛喌木€程要和訂閱的線程用同一個(gè)連接
map.put("connection",connection);
// subscribe 按頻道訂閱 該方法會(huì)阻塞該線程 只有取消訂閱才會(huì)釋放該線程
connection.subscribe(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
log.info("接收到消息");
System.out.println(new String(message.getBody()));
}
}, "channelOne".getBytes(StandardCharsets.UTF_8), "channelTwo".getBytes(StandardCharsets.UTF_8));
// 按模式訂閱 pSubscribe 只有取消訂閱才會(huì)釋放該線程
// connection.pSubscribe(new MessageListener() {
// @Override
// public void onMessage(Message message, byte[] pattern) {
// System.out.println(new String(message.getBody()));
// }
// }, "patternOne".getBytes(StandardCharsets.UTF_8), "patternOne".getBytes(StandardCharsets.UTF_8));
return null;
}
}, true);
}
如何取消訂閱呢?從剛才的map里取到連接
RedisConnection the = map.get("connection");
Subscription subscription = the.getSubscription();
subscription.unsubscribe();
消息監(jiān)聽容器
上面的那種訂閱為低級(jí)訂閱,由于連接在調(diào)用subscribe的時(shí)候會(huì)導(dǎo)致當(dāng)前線程阻塞,這種方式需要對(duì)每個(gè)監(jiān)聽器連接和線程管理,所以spring提供了RedisMessageListenerContainer類來(lái)幫我們完成這些工作。
RedisMessageListenerContainer顧名思義可以知道它是一個(gè)消息監(jiān)聽容器
詳情請(qǐng)參考官方文檔
如何實(shí)現(xiàn)
@Configuration
public class DefaultMessageListenerContainerConfig {
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory factory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
// 官方推薦我們使用自定義的線程池或者使用TaskExecutor
container.setTaskExecutor(executor());
container.addMessageListener(new MessageListener() {
@Override
public void onMessage(Message message, byte[] pattern) {
System.out.println(Thread.currentThread().getName() + ": " + new String(message.getBody()));
}
}, new ChannelTopic("message"));
return container;
}
@Bean
public TaskExecutor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
executor.setQueueCapacity(100);
executor.initialize();
return executor;
}
}
這個(gè)時(shí)候我們?cè)趓edis命令行內(nèi)使用 publish channel message 的時(shí)候,我們的spring程序就可以訂閱到消息了。
再說下 MessageListenerAdapter
我們可以通過 MessageListenerAdapter 消息接收者包裝進(jìn)去,消息接收者不會(huì)和redis有任何耦合。
官方文檔給了spring傳統(tǒng)的xml的方式配置的,下面我給出基于configuration配置的代碼
public interface MessageDelegate {
void handleMessage(String message);
}
public class DefaultMessageDelegate implements MessageDelegate {
@Override
public void handleMessage(String message) {
System.out.println(message);
}
}
@Configuration
public class MessageListenerContainerConfig {
@Autowired
private DefaultMessageDelegate defaultMessageDelegate;
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory factory,
MessageListenerAdapter messageListenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.setTaskExecutor(executor());
Map<MessageListenerAdapter, Collection<? extends Topic>> map = new HashMap<>();
List<ChannelTopic> channelTopics = new ArrayList<>();
ChannelTopic channelTopic = new ChannelTopic("message");
channelTopics.add(channelTopic);
map.put(messageListenerAdapter, channelTopics);
container.setMessageListeners(map);
return container;
}
@Bean
public TaskExecutor executor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
executor.setQueueCapacity(100);
executor.initialize();
return executor;
}
@Bean
public MessageListenerAdapter messageListenerAdapter() {
// handleMessage 參數(shù)消息來(lái)的時(shí)候要調(diào)用的方法 默認(rèn)是 handleMessage
return new MessageListenerAdapter(defaultMessageDelegate, "handleMessage");
}
}
如果我們要在程序運(yùn)行時(shí)添加訂閱或者取消訂閱的時(shí)候該怎么辦呢?
我們需要提前準(zhǔn)備好消息偵聽器,添加的時(shí)候把偵聽器注入到消息容器
取消的時(shí)候就調(diào)用消息容器的remove方法把偵聽器刪除掉即可。
到此這篇關(guān)于SpringBoot Redis 發(fā)布訂閱模式(Pub/Sub)的具體使用的文章就介紹到這了,更多相關(guān)SpringBoot Redis發(fā)布訂閱模式內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!