redis發(fā)布訂閱Java代碼實現(xiàn)過程解析
前言
Redis除了可以用作緩存數(shù)據(jù)外,另一個重要用途是它實現(xiàn)了發(fā)布訂閱(pub/sub)消息通信模式:發(fā)送者(pub)發(fā)送消息,訂閱者(sub)接收消息。
為了實現(xiàn)redis的發(fā)布訂閱機制,首先要打開redis服務;其次,引入redis需要的jar包,在pom.xml配置文件加入以下代碼:
<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.1.0</version> </dependency>
由于訂閱消息通道需要再tomcat啟動時觸發(fā),因此,需要創(chuàng)建一個listener監(jiān)聽器,在監(jiān)聽器里實現(xiàn)redis訂閱,在web.xml里配置監(jiān)聽器如下:
<listener> <listener-class>com.test.listener.InitListener</listener-class> </listener>
一、訂閱消息(InitListener實現(xiàn))
redis支持多通道訂閱,一個客戶端可以同時訂閱多個消息通道,如下代碼所示,訂閱了13個通道。由于訂閱機制是線程阻塞的,需要額外開啟一個線程專門用于處理訂閱消息及接收消息處理。
public class InitListener implements ServletContextListener{ private Logger logger = Logger.getLogger(InitListener.class); @Override public void contextInitialized(ServletContextEvent sce) { logger.info("啟動tomcat");// 連接redis Map<String, String> proMap = PropertyReader.getProperties(); final String url = proMap.get("redis.host"); final Integer port = Integer.parseInt(proMap.get("redis.port")); final ClassPathXmlApplicationContext classPathXmlApplicationContext = new ClassPathXmlApplicationContext("classpath*:applicationContext.xml"); final RedisSubListener redisSubListener = (RedisSubListener) classPathXmlApplicationContext.getBean("redisSubListener"); // 為防止阻塞tomcat啟動,開啟線程執(zhí)行 new Thread(new Runnable(){ public void run(){ // 連接redis,建立監(jiān)聽 Jedis jedis = null; while(true){ //解碼資源更新通知,畫面選看回復,畫面選看停止回復,預案啟動,預案停止,輪切啟動,輪切停止,預案啟動回復,預案停止回復,輪切啟動回復,輪切停止回復,監(jiān)視屏分屏狀態(tài)通知,畫面狀態(tài)通知 String[] channels = new String[] { "decodeResourceUpdateNtf", "tvSplitPlayRsp","tvSplitPlayStopRsp", "planStartStatusNtf", "planStopStatusNtf", "pollStartStatusNtf", "pollStopStatusNtf", "planStartRsp","planStopRsp","pollStartRsp","pollStopRsp","tvSplitTypeNtf","tvSplitStatusNtf"}; try{ jedis = new Jedis(url,port); logger.info("redis請求訂閱通道"); jedis.subscribe(redisSubListener,channels); logger.info("redis訂閱結(jié)束"); }catch(JedisConnectionException e){ logger.error("Jedis連接異常,異常信息 :" + e); }catch(IllegalStateException e){ logger.error("Jedis異常,異常信息 :" + e); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } if(jedis != null){ jedis = null; } } }}) .start(); }
最后在spring配置文件里接入以下配置:
<!-- redis --> <bean id="redisMessageService" class="com.test.service.impl.RedisMessageServiceImpl" scope="singleton"> <property name="webSocketService"><ref local="webSocketService" /></property> <property name="tvSplitStatusDao" ref="tvSplitStatusDao"></property> </bean> <bean id="redisSubListener" class="com.test.common.RedisSubListener" scope="singleton"> <property name="redisMessageService"><ref local="redisMessageService" /></property> </bean>
RedisMessageServiceImpl用于處理接收的redis消息。
二、發(fā)布消息
public class RedisPublishUtil { private Logger logger = Logger.getLogger(RedisPublishUtil.class); public static Jedis pubJedis; private static Map<String, String> proMap = PropertyReader.getProperties(); private static final String redisPort = proMap.get("redis.port"); private static String url = proMap.get("redis.host"); private static final int port = Integer.parseInt(redisPort); public void setPubJedis(Jedis jedis) { RedisPublishUtil.pubJedis = jedis; } public Jedis getPubJedis() { if (pubJedis == null) { createJedisConnect(); } // 返回對象 return pubJedis; } public Jedis createJedisConnect(){ // 連接redis logger.info("===創(chuàng)建連接jedis====="); try { pubJedis = new Jedis(url, port); } catch (JedisConnectionException e) { logger.error("Jedis連接異常,異常信息 :" + e.getMessage()); try { Thread.sleep(1000); logger.info("發(fā)起重新連接jedis"); createJedisConnect(); } catch (InterruptedException except) { except.printStackTrace(); } } // 返回對象 return pubJedis; } //公共發(fā)布接口 public void pubRedisMsg(String msgType,String msg){ logger.info("redis準備發(fā)布消息內(nèi)容:" + msg); try { this.getPubJedis().publish(msgType, msg); } catch (JedisConnectionException e) { logger.error("redis發(fā)布消息失?。?, e); this.setPubJedis(null); logger.info("重新發(fā)布消息,channel="+msgType); pubRedisMsg(msgType, msg); } } }
public class PropertyReader { private static Logger logger = Logger.getLogger(PropertyReader.class); /* * 獲得數(shù)據(jù)庫鏈接的配置文件 */ public static Map<String,String> getProperties(){ logger.info("讀取redis配置文件開始。。。"); Properties prop = new Properties(); Map<String,String> proMap = new HashMap<String,String>(); try { //讀取屬性文件redis.properties InputStream in= PropertyReader.class.getClassLoader().getResourceAsStream("redis.properties"); prop.load(in); ///加載屬性列表 Iterator<String> it=prop.stringPropertyNames().iterator(); while(it.hasNext()){ String key=it.next(); proMap.put(key, prop.getProperty(key)); } in.close(); logger.info("讀取redis配置文件成功。。。"); } catch (Exception e) { logger.error("讀取redis配置文件異常!", e); e.printStackTrace(); } return proMap; } }
以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Java中ExecutorService和ThreadPoolExecutor運行原理
本文主要介紹了Java中ExecutorService和ThreadPoolExecutor運行原理,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-08-08SpringBoot優(yōu)雅接收前端請求參數(shù)的詳細過程
這篇文章主要介紹了SpringBoot如何優(yōu)雅接收前端請求參數(shù),我們可以通過@RequestParm注解去綁定請求中的參數(shù),將(查詢參數(shù)或者form表單數(shù)據(jù))綁定到controller的方法參數(shù)中,本文結(jié)合示例代碼給大家講解的非常詳細,需要的朋友可以參考下2023-06-06Spring Boot 2.7.6整合redis與低版本的區(qū)別
這篇文章主要介紹了Spring Boot 2.7.6整合redis與低版本的區(qū)別,文中補充介紹了SpringBoot各個版本使用Redis之間的區(qū)別實例講解,需要的朋友可以參考下2023-02-02