SpringBoot中使用Redis?Stream實現(xiàn)消息監(jiān)聽示例
Demo環(huán)境
- JDK8
- Maven3.6.3
- springboot2.4.3
- redis_version:6.2.1
倉庫地址
https://gitee.com/hlovez/redismq.git.
POM依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.3</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>vip.huhailong</groupId> <artifactId>redismq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>redismq</name> <description>base redis stream mq</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
這里是一個簡單的Demo,所以關(guān)于redis的一些序列化配置就省略了。
配置監(jiān)聽消息類
配置監(jiān)聽消息類,這里類需要實現(xiàn)StreamListener接口,該接口下只有一個要實現(xiàn)的方法——onMessage方法,代碼:
package vip.huhailong.redismq.redistool; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.stream.StreamListener; import org.springframework.stereotype.Component; /** * @author Huhailong * @Description 監(jiān)聽消息 * @Date 2021/3/10. */ @Slf4j @Component public class ListenerMessage implements StreamListener<String, MapRecord<String, String, String>> { @Override public void onMessage(MapRecord<String, String, String> entries) { log.info("接受到來自redis的消息"); System.out.println("message id "+entries.getId()); System.out.println("stream "+entries.getStream()); System.out.println("body "+entries.getValue()); } }
配置完該類后我們再創(chuàng)建一個類將該監(jiān)聽器注入進去,代碼:
package vip.huhailong.redismq.config; import lombok.var; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.data.redis.stream.Subscription; import vip.huhailong.redismq.redistool.ListenerMessage; import java.time.Duration; /** * @author Huhailong * @Description * @Date 2021/3/12. */ @Configuration public class RedisStreamConfig { @Autowired private ListenerMessage streamListener; @Bean public Subscription subscription(RedisConnectionFactory factory){ var options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .build(); var listenerContainer = StreamMessageListenerContainer.create(factory,options); var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"), StreamOffset.create("mystream", ReadOffset.lastConsumed()),streamListener); listenerContainer.start(); return subscription; } }
代碼分析:
首先將我們實現(xiàn)了StreamListener的監(jiān)聽器類注入。
subscription方法返回的是一個Subscription類型,它是與當前正在運行任務(wù)的鏈接,可以理解為訂閱的鏈接,它有倆個方法
- boolean await(Duration timeout): 當訂閱變?yōu)榛顒踊虺瑫r時,同步阻塞呼叫將返回
- boolean isActive(): 如果當前正在訂閱則為true
代碼中的var是使用了Lombok的可變局部變量。主要是為了方便
StreamMessageListenerContainer: 消息偵聽容器,不能在外部實現(xiàn)。創(chuàng)建后,StreamMessageListenerContainer可以訂閱Redis流并使用傳入的消息。 StreamMessageListenerContainer允許多個流讀取請求,并為每個讀取請求返回一個Subscription句柄。取消訂閱最終將終止后臺輪詢。使用鍵和值序列化器轉(zhuǎn)換消息以支持各種序列化策略。具體文檔
__StreamMessageListenerContainerOptions: __ 它是上面4的選項,代碼中pollTimeout表示輪詢超時時間
create方法使用給定的RedisConnectionFactory和上面的配置選項創(chuàng)建偵聽器。
接下來使用receiveAutoAck創(chuàng)建一個新訂閱,注意這里接受到消息后會被自動的確認,如果不想自動確認請使用其他的創(chuàng)建訂閱方式。該方法共有三個參數(shù)
- 消費組 consumer group ,它不能為null (Consumer類型)
- stream offset ,stream的偏移量(StreamOffset 類型)
- listener 不能為null (StreamListener<K,V> 類型)
代碼中表示消費者來自名稱為mygroup的組,消費者名稱為huhailong,這里偏移量設(shè)置為了lastConsumed,它表示讀取ID大于消費者組使用的最后一個元素的所有新到達的元素。
現(xiàn)在就可以運行項目來驗證了,將項目運行起來后通過終端給對應key的stream添加一條消息
> XADD mystream * message springboot
這時可以看到spring boot的控制臺打印除了一下消息:
message id 1615532778588-0
stream mystream
body {message=springboot}
說明偵聽成功,它會一直處于監(jiān)聽狀態(tài),只要對應key的stream添加了新的消息都會被偵聽到,到此也就簡單的實現(xiàn)了消息隊列功能。
監(jiān)聽倆個stream的實現(xiàn)
有網(wǎng)友想實現(xiàn)倆個或倆個以上的stream監(jiān)聽,可以這樣實現(xiàn)(有其他更好的方法請留言評論,謝謝)
在上面監(jiān)聽一個stream的基礎(chǔ)上在RedisStreamConfig類里增加監(jiān)聽方法,如下:
@Configuration public class RedisStreamConfig { @Autowired private ListenerMessage streamListener; @Bean public Subscription subscription(RedisConnectionFactory factory){ var options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .build(); initStream("mystream","mygroup"); //詳細描述請看下方的問題補充——初始化key和group var listenerContainer = StreamMessageListenerContainer.create(factory,options); var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"), StreamOffset.create("mystream", ReadOffset.lastConsumed()),streamListener); listenerContainer.start(); return subscription; } @Bean public Subscription subscription2(RedisConnectionFactory factory){ var options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .build(); initStream("mystream","mygroup"); //詳細描述請看下方的問題補充——初始化key和group var listenerContainer = StreamMessageListenerContainer.create(factory,options); var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"), StreamOffset.create("mystream2", ReadOffset.lastConsumed()),streamListener); listenerContainer.start(); return subscription; } }
記得創(chuàng)建該stream和對應的組,讓后啟動程序,在控制臺模擬添加數(shù)據(jù),如圖:
然后idea控制臺的打印可以看到分別接收到了來自不同的stream的消息
其他類和配置與監(jiān)聽一個的時候相同,不需要改變。
【問題補充】確認完消息刪除消息
消息確認完后消息實際上沒有在redis中消失,這也是redis stream中的一個特性,如果要想真正的刪除該消息,需要我們進行指定字段ID刪除,如下:
在RedisUtil中添加刪除方法:
public void delField(String key, String fieldId){ redisTemplate.opsForStream().delete(key,fieldId); }
然后再ListenerMessage中的onMessage方法里處理完消息后添加刪除該消息的方法:
@Component public class ListenerMessage implements StreamListener<String, MapRecord<String, String, String>> { @Autowired RedisUtil redisUtil; @Override public void onMessage(MapRecord<String, String, String> entries) { log.info("接受到來自redis的消息"); System.out.println("message id "+entries.getId().getValue()); System.out.println("stream "+entries.getStream()); System.out.println("body "+entries.getValue()); //下面是刪除該消息的方法 redisUtil.delField("mystream",entries.getId().getValue()); } }
【問題補充】自動初始化stream的key和group問題-最新更新-2021年12月4日
最近看到評論大部分有報找不到key和group的錯信息,這個是因為沒有初始化導致的,當時在寫這個demo的時候為了簡單測試直接寫死了我已經(jīng)存在的key和group,部分朋友不知道這個是需要初始化的,所以我更新了一下代碼,增加了初始化key和group的方法,這樣大家直接運行代碼就可以了,不需要在做多余的操作。
RedisUtil中增加增加的代碼:
public RecordId addStream(String key, Map<String,Object> message){ RecordId add = redisTemplate.opsForStream().add(key, message); return add; //返回增加后的id } public void addGroup(String key, String groupName){ redisTemplate.opsForStream().createGroup(key,groupName); } /** * 用來判斷key是否存在 */ public boolean hasKey(String key){ if(key==null){ return false; }else{ return redisTemplate.hasKey(key); } }
然后再RedisStreamConfig中增加初始化方法,這里注意要判斷key是否存在:
private void initStream(String key, String group){ //判斷key是否存在,如果不存在則創(chuàng)建 boolean hasKey = redisUtil.hasKey(key); if(!hasKey){ Map<String,Object> map = new HashMap<>(); map.put("field","value"); RecordId recordId = redisUtil.addStream(key, map); redisUtil.addGroup(key,group); //將初始化的值刪除掉 redisUtil.delField(key,recordId.getValue()); log.info("stream:{}-group:{} initialize success",key,group); } }
然后再上面配置監(jiān)聽的方法里調(diào)用這個初始化方法就可以了
@Bean public Subscription subscription(RedisConnectionFactory factory){ var options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .build(); initStream("mystream","mygroup"); //調(diào)用初始化 var listenerContainer = StreamMessageListenerContainer.create(factory,options); var subscription = listenerContainer.receiveAutoAck(Consumer.from("mygroup","huhailong"), StreamOffset.create("mystream", ReadOffset.lastConsumed()),streamListener); listenerContainer.start(); return subscription; }
https://gitee.com/hlovez/redismq.git已更新,代碼已優(yōu)化整理上傳
到此這篇關(guān)于SpringBoot中使用Redis Stream實現(xiàn)消息監(jiān)聽示例的文章就介紹到這了,更多相關(guān)SpringBoot 消息監(jiān)聽內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
IDEA神器一鍵查看Java字節(jié)碼及其他類信息插件
這篇文章主要為大家介紹了一款I(lǐng)DEA神器,可以一鍵查看Java字節(jié)碼及其他類信息,有需要的朋友可以借鑒參考下,希望能夠有所幫助2022-01-01jquery對輸入框內(nèi)容的數(shù)字校驗代碼實例
這篇文章主要介紹了jquery對輸入框內(nèi)容的數(shù)字校驗代碼實例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2019-09-09Java數(shù)據(jù)結(jié)構(gòu)之鏈表的增刪查改詳解
在這篇文章中,小編將帶大家了解一下Java數(shù)據(jù)結(jié)構(gòu)中鏈表的增刪查改(以下結(jié)果均在IDEA中編譯)希望在方便自己復習的同時也能幫助到大家2022-09-09基于Springboot商品進銷存管理系統(tǒng)的設(shè)計與實現(xiàn)
本項目基于springboot實現(xiàn)一個進銷存管理系統(tǒng),主要用戶開設(shè)網(wǎng)店的相關(guān)商品的進貨、銷售、庫存的管理,功能比較完整,需要的可以參考一下2022-08-08easycode配置成mybatis-plus模板的實現(xiàn)方法
本文主要介紹了easycode配置成mybatis-plus模板的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-09-09Java關(guān)鍵字volatile知識點總結(jié)
在本篇文章里小編給大家整理的是一篇關(guān)于Java關(guān)鍵字volatile知識點總結(jié)內(nèi)容,有興趣的朋友們可以學習參考下。2021-01-01