redis之基于SpringBoot實(shí)現(xiàn)Redis stream實(shí)時(shí)流事件處理方式
1、redis stream簡(jiǎn)介
Redis Stream 是 Redis 5.0 版本新增加的數(shù)據(jù)結(jié)構(gòu)。Redis Stream 主要用于消息隊(duì)列(MQ,Message Queue),Redis 本身是有一個(gè) Redis 發(fā)布訂閱 (pub/sub) 來(lái)實(shí)現(xiàn)消息隊(duì)列的功能,但它有個(gè)缺點(diǎn)就是消息無(wú)法持久化,如果出現(xiàn)網(wǎng)絡(luò)斷開、Redis 宕機(jī)等,消息就會(huì)被丟棄。
簡(jiǎn)單來(lái)說(shuō)發(fā)布訂閱 (pub/sub) 可以分發(fā)消息,但無(wú)法記錄歷史消息。
Redis5.0中發(fā)布的Stream類型,也用來(lái)實(shí)現(xiàn)典型的消息隊(duì)列。
提供了消息的持久化和主備復(fù)制功能,可以讓任何客戶端訪問(wèn)任何時(shí)刻的數(shù)據(jù),并且能記住每一個(gè)客戶端的訪問(wèn)位置,還能保證消息不丟失。
該Stream類型的出現(xiàn),幾乎滿足了消息隊(duì)列具備的全部?jī)?nèi)容,包括但不限于:
- 消息ID的序列化生成
- 消息遍歷
- 消息的阻塞和非阻塞讀取
- 消息的分組消費(fèi)
- 未完成消息的處理
- 消息隊(duì)列監(jiān)控
Redis Stream 的結(jié)構(gòu)如下所示,它有一個(gè)消息鏈表,將所有加入的消息都串起來(lái),每個(gè)消息都有一個(gè)唯一的 ID 和對(duì)應(yīng)的內(nèi)容:
在某些特定場(chǎng)景可以使用redis的stream代替kafka等消息隊(duì)列,減少系統(tǒng)復(fù)雜性,增強(qiáng)系統(tǒng)的穩(wěn)定性
每個(gè) Stream 都有唯一的名稱,它就是 Redis 的 key,在我們首次使用 xadd 指令追加消息時(shí)自動(dòng)創(chuàng)建。
上圖解析:
- Consumer Group :消費(fèi)組,使用 XGROUP CREATE 命令創(chuàng)建,一個(gè)消費(fèi)組有多個(gè)消費(fèi)者(Consumer)。
- last_delivered_id :游標(biāo),每個(gè)消費(fèi)組會(huì)有個(gè)游標(biāo) last_delivered_id,任意一個(gè)消費(fèi)者讀取了消息都會(huì)使游標(biāo) last_delivered_id 往前移動(dòng)。
- pending_ids :消費(fèi)者(Consumer)的狀態(tài)變量,作用是維護(hù)消費(fèi)者的未確認(rèn)的 id。 pending_ids 記錄了當(dāng)前已經(jīng)被客戶端讀取的消息,但是還沒(méi)有 ack (Acknowledge character:確認(rèn)字符)。
2、redis stream基礎(chǔ)命令
添加:XADD
命令格式:XADD stream_name id key-value [key-value …]
127.0.0.1:6379> XADD mytopic * acctid 012 age 1 1527837352024-0
查看隊(duì)列長(zhǎng)度:xlen
命令格式:xlen xxx
127.0.0.1:6379> xlen mytopic (integer) 1 127.0.0.1:6379>
獲取數(shù)據(jù):xrange xrevrange
1.xrange 命令格式:
xrange mytopic - + xrange mytopic 生成的ID + count 2
2.xrevrange命令格式:
xrevrange mytopic + 1527837440632 count 3
該命令的意思為:反向查詢ID以無(wú)限大為開始,以1527837440632為結(jié)束的entry,但只取出查詢結(jié)果集(降序排列)中的前三個(gè)entry;
獲取數(shù)據(jù):xread
1.非阻塞
從stream 中拿ID比0大的4個(gè)Entry,按升序排列
xread count 4 streams mytopic 0
2.阻塞
監(jiān)聽name為mystream的stream,從stream中拿比ID比"$"(特殊ID:該stream中此刻最大ID)還大的Entry
xread block 0 streams mystream $
block 0:block表示命令要阻塞,0表示阻塞時(shí)間為無(wú)限大,不超時(shí),如果設(shè)置為>0的整數(shù),即為阻塞超時(shí)時(shí)間
監(jiān)聽生效后,拿到數(shù)據(jù)監(jiān)聽就失效,與zk的watcher雷同。意思是該命令執(zhí)行后,只能拿到一條ID比設(shè)置ID更大的entry,要想繼續(xù)拿,必須執(zhí)行xread命令,官方推薦下一次拿entry使用上一次得到的ID。注意千萬(wàn)別亂設(shè)置很大的ID ,否則你可能永遠(yuǎn)拿不到entry。
xread block 0 streams mystream mytopic $ $
收到任何一個(gè)stream的消息,本次監(jiān)聽就失效,只能拿到一條數(shù)據(jù),后面還需要拿數(shù)據(jù),可以將各自stream拿到的ID作為最大ID,重新執(zhí)行命令
消費(fèi)者組:Consumer groups
redis5引入了消費(fèi)者組的概念,一個(gè)stream的數(shù)據(jù)每一個(gè)消費(fèi)者組都發(fā)一份,消費(fèi)者組里面的消費(fèi)者競(jìng)爭(zhēng)同一份數(shù)據(jù),亦即在同一個(gè)消費(fèi)者組內(nèi),一個(gè)消息是不可能發(fā)給多個(gè)消費(fèi)者的:
消費(fèi)者組提供了如下5點(diǎn)保障
- 組內(nèi)消費(fèi)者消費(fèi)的消息不重復(fù)
- 組內(nèi)消費(fèi)者名稱必須唯一
- 消費(fèi)者拿到的消息肯定是沒(méi)有被組內(nèi)其他消費(fèi)者消費(fèi)過(guò)的消息
- 消費(fèi)者成功消費(fèi)消息之后要求發(fā)送ACK,然后這條消息才會(huì)從消費(fèi)者組中移除,也就是說(shuō)消息至少被消費(fèi)一次,和kafka一樣
- 消費(fèi)者組會(huì)跟蹤所有待處理的消息
基礎(chǔ)命令
1.創(chuàng)建消費(fèi)者組
xgroup create mytopic mygroup $
該命令的意思是:使用xgroup命令創(chuàng)建了一個(gè)mygroup消費(fèi)者組,該消費(fèi)者組與mytopic stream進(jìn)行了關(guān)聯(lián),以后mygroup消費(fèi)者組中的消費(fèi)者就會(huì)mytopic stream中拿數(shù)據(jù);
符號(hào)" $ "代表mytopic stream中目前最大的ID,消費(fèi)者拿到的entry的id一定會(huì)大于此刻$代表的最大ID。你也可以指定這個(gè)最大的ID,比如0;
2.從消費(fèi)者組讀數(shù)據(jù)
使用xreadgroup命令讓消費(fèi)者consumer_a從mygroup消費(fèi)者組的mytopic stream中拿最新的,并且沒(méi)有被發(fā)送給其他消費(fèi)者處理的entry:
xreadgroup group mygroup consumer_a count 1 streams mytopic >
參數(shù):
- group:該參數(shù)是必選項(xiàng)
- “>”:該符號(hào)只有在消費(fèi)者組命令xreadgroup中有效,意思為mytopic stream中最新且沒(méi)有被其他消費(fèi)者處理的ID,千萬(wàn)記住不要與上面"$"最大ID搞混了,否則拿出來(lái)的數(shù)據(jù)與你的期望值不符,如果使用的是任何數(shù)組ID,那么該消費(fèi)者就無(wú)法拿到任何新的消息,只是從它的已經(jīng)處理過(guò)的消息中拿,并且不會(huì)有ACK機(jī)制;
如果想一個(gè)消費(fèi)者組關(guān)聯(lián)多個(gè)stream可以這樣做:
xgroup create mystream mygroup $ xgroup create mytopic mygroup $ xreadgroup group mygroup consumer_a block 0 count 1 streams mytopic mystream > >
讀消息的參數(shù)多了一個(gè)block 0,就是說(shuō)讀數(shù)據(jù)需要阻塞。
3.發(fā)送ACK
將指定ID對(duì)應(yīng)的entry從consumer的已處理消息列表中刪除
XACK mystream mygroup 1527864992409-0
3、結(jié)合Spring Boot進(jìn)行redis實(shí)時(shí)流處理
樣例應(yīng)用:
項(xiàng)目依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency>
3.1 生產(chǎn)者
理想情況下,生產(chǎn)者和消費(fèi)者將是兩個(gè)不同的微服務(wù)/應(yīng)用程序。
在這里,我們把消費(fèi)和生產(chǎn)都弄在同一個(gè)項(xiàng)目中。
但是,我們基于名為“ app.role ”的自定義屬性來(lái)控制應(yīng)用程序的行為,使其像生產(chǎn)者或消費(fèi)者。
基于該值,將在Spring中創(chuàng)建相應(yīng)的組件。
@Service @ConditionalOnProperty(name="app.role", havingValue="producer") public class PurchaseEventProducer { private AtomicInteger atomicInteger = new AtomicInteger(0); @Value("${stream.key}") private String streamKey; @Autowired private ProductRepository repository; @Autowired private ReactiveRedisTemplate<String, String> redisTemplate; @Scheduled(fixedRateString= "${publish.rate}") public void publishEvent(){ Product product = this.repository.getRandomProduct(); ObjectRecord<String, Product> record = StreamRecords.newRecord() .ofObject(product) .withStreamKey(streamKey); this.redisTemplate .opsForStream() .add(record) .subscribe(System.out::println); atomicInteger.incrementAndGet(); } @Scheduled(fixedRate = 10000) public void showPublishedEventsSoFar(){ System.out.println( "Total Events :: " + atomicInteger.get() ); } }
- publishEvent方法定期發(fā)布一些隨機(jī)購(gòu)買的產(chǎn)品訂單。
- showPublishedEventsSoFar方法僅顯示到目前為止已下的訂單數(shù)。
3.2 消費(fèi)者
我們的發(fā)布者已經(jīng)準(zhǔn)備好。讓我們創(chuàng)建一個(gè)消費(fèi)者。
要使用RedisStreams,我們需要實(shí)現(xiàn)StreamListener接口。
@Service @ConditionalOnProperty(name="app.role", havingValue="consumer") public class PurchaseEventConsumer implements StreamListener<String, ObjectRecord<String, Product>> { private AtomicInteger atomicInteger = new AtomicInteger(0); @Autowired private ReactiveRedisTemplate<String, String> redisTemplate; @Override @SneakyThrows public void onMessage(ObjectRecord<String, Product> record) { System.out.println( InetAddress.getLocalHost().getHostName() + " - consumed :" + record.getValue() ); this.redisTemplate .opsForZSet() .incrementScore("revenue", record.getValue().getCategory().toString(), record.getValue().getPrice()) .subscribe(); atomicInteger.incrementAndGet(); } @Scheduled(fixedRate = 10000) public void showPublishedEventsSoFar(){ System.out.println( "Total Consumed :: " + atomicInteger.get() ); } }
在消費(fèi)者端,我們只簡(jiǎn)單地顯示消費(fèi)記錄情況。
然后,我們獲得支付價(jià)格并將其添加到redis排序集中。
像發(fā)布者一樣,我們會(huì)定期顯示此使用者消費(fèi)到的事件數(shù)。
3.3 Redis流配置
創(chuàng)建使用者后,我們需要通過(guò)將上述使用者添加到StreamMessageListenerContainer實(shí)例中來(lái)創(chuàng)建訂閱。
@Configuration @ConditionalOnProperty(name="app.role", havingValue="consumer") public class RedisStreamConfig { @Value("${stream.key}") private String streamKey; @Autowired private StreamListener<String, ObjectRecord<String, Product>> streamListener; @Bean public Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException { var options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(1)) .targetType(Product.class) .build(); var listenerContainer = StreamMessageListenerContainer .create(redisConnectionFactory, options); var subscription = listenerContainer.receiveAutoAck( Consumer.from(streamKey, InetAddress.getLocalHost().getHostName()), StreamOffset.create(streamKey, ReadOffset.lastConsumed()), streamListener); listenerContainer.start(); return subscription; } }
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
- SpringBoot3集成Redis的方法詳解
- 基于SpringBoot+Redis實(shí)現(xiàn)一個(gè)簡(jiǎn)單的限流器
- SpringBoot中Redis的緩存更新策略詳解
- SpringBoot中Redis自動(dòng)配置的介紹、原理和使用詳解
- SpringBoot引入Redis報(bào)Redis?command?timed?out兩種異常情況
- 使用Spring?Boot實(shí)現(xiàn)Redis鍵過(guò)期回調(diào)功能示例詳解
- SpringBoot整合Redis實(shí)現(xiàn)緩存分頁(yè)數(shù)據(jù)查詢功能
- 基于SpringBoot + Redis實(shí)現(xiàn)密碼暴力破解防護(hù)
- Spring Boot 中的 Redis 分布式鎖
相關(guān)文章
mybatis-plus插入一條數(shù)據(jù),獲取插入數(shù)據(jù)自動(dòng)生成的主鍵問(wèn)題
這篇文章主要介紹了mybatis-plus插入一條數(shù)據(jù),獲取插入數(shù)據(jù)自動(dòng)生成的主鍵問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12SpringBoot整合jasypt進(jìn)行重要數(shù)據(jù)加密的操作代碼
Jasypt(Java?Simplified?Encryption)是一個(gè)專注于簡(jiǎn)化Java加密操作的開源工具,它提供了一種簡(jiǎn)單而強(qiáng)大的方式來(lái)處理數(shù)據(jù)的加密和解密,使開發(fā)者能夠輕松地保護(hù)應(yīng)用程序中的敏感信息,本文給大家介紹了SpringBoot整合jasypt進(jìn)行重要數(shù)據(jù)加密,需要的朋友可以參考下2024-05-05java中Hutool工具類的常見(jiàn)使用場(chǎng)景詳解
在日常開發(fā)中,我們會(huì)使用很多工具類來(lái)提升項(xiàng)目開發(fā)的速度,而國(guó)內(nèi)用的比較多的 Hutool 框架,就是其中之一,本文我們就來(lái)介紹一下Hutool的具體使用吧2023-12-12Springboot RestTemplate設(shè)置超時(shí)時(shí)間的方法(Spring boot
這篇文章主要介紹了Springboot RestTemplate設(shè)置超時(shí)時(shí)間的方法,包括Spring boot 版本<=1.3和Spring boot 版本>=1.4,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),感興趣的朋友跟隨小編一起看看吧2024-08-08解決Maven無(wú)法下載2.1.7.js7版本的itext依賴問(wèn)題
本文主要解決使用Maven編譯項(xiàng)目時(shí)出現(xiàn)的itext依賴版本問(wèn)題,通過(guò)分析,發(fā)現(xiàn)該問(wèn)題是由jasperreports依賴的特定版本itext導(dǎo)致的,解決方法是排除jasperreports中的itext依賴,并自行指定更高版本的itext依賴2024-12-12MyBatis-Plus實(shí)現(xiàn)公共字段自動(dòng)填充功能詳解
在開發(fā)中經(jīng)常遇到多個(gè)實(shí)體類有共同的屬性字段,這些字段屬于公共字段,也就是很多表中都有這些字段,能不能對(duì)于這些公共字段在某個(gè)地方統(tǒng)一處理,來(lái)簡(jiǎn)化開發(fā)呢?MyBatis-Plus就提供了這一功能,本文就來(lái)為大家詳細(xì)講講2022-08-08淺談ArrayList和LinkedList到底誰(shuí)更快
今天給大家?guī)?lái)的是關(guān)于Java的相關(guān)知識(shí),文章圍繞著ArrayList和LinkedList到底誰(shuí)更快展開,文中有非常詳細(xì)的介紹,需要的朋友可以參考下2021-06-06