redis之基于SpringBoot實現(xiàn)Redis stream實時流事件處理方式
1、redis stream簡介
Redis Stream 是 Redis 5.0 版本新增加的數(shù)據(jù)結(jié)構(gòu)。Redis Stream 主要用于消息隊列(MQ,Message Queue),Redis 本身是有一個 Redis 發(fā)布訂閱 (pub/sub) 來實現(xiàn)消息隊列的功能,但它有個缺點就是消息無法持久化,如果出現(xiàn)網(wǎng)絡斷開、Redis 宕機等,消息就會被丟棄。
簡單來說發(fā)布訂閱 (pub/sub) 可以分發(fā)消息,但無法記錄歷史消息。
Redis5.0中發(fā)布的Stream類型,也用來實現(xiàn)典型的消息隊列。
提供了消息的持久化和主備復制功能,可以讓任何客戶端訪問任何時刻的數(shù)據(jù),并且能記住每一個客戶端的訪問位置,還能保證消息不丟失。
該Stream類型的出現(xiàn),幾乎滿足了消息隊列具備的全部內(nèi)容,包括但不限于:
- 消息ID的序列化生成
- 消息遍歷
- 消息的阻塞和非阻塞讀取
- 消息的分組消費
- 未完成消息的處理
- 消息隊列監(jiān)控
Redis Stream 的結(jié)構(gòu)如下所示,它有一個消息鏈表,將所有加入的消息都串起來,每個消息都有一個唯一的 ID 和對應的內(nèi)容:
在某些特定場景可以使用redis的stream代替kafka等消息隊列,減少系統(tǒng)復雜性,增強系統(tǒng)的穩(wěn)定性

每個 Stream 都有唯一的名稱,它就是 Redis 的 key,在我們首次使用 xadd 指令追加消息時自動創(chuàng)建。
上圖解析:
- Consumer Group :消費組,使用 XGROUP CREATE 命令創(chuàng)建,一個消費組有多個消費者(Consumer)。
- last_delivered_id :游標,每個消費組會有個游標 last_delivered_id,任意一個消費者讀取了消息都會使游標 last_delivered_id 往前移動。
- pending_ids :消費者(Consumer)的狀態(tài)變量,作用是維護消費者的未確認的 id。 pending_ids 記錄了當前已經(jīng)被客戶端讀取的消息,但是還沒有 ack (Acknowledge character:確認字符)。
2、redis stream基礎命令
添加:XADD
命令格式:XADD stream_name id key-value [key-value …]
127.0.0.1:6379> XADD mytopic * acctid 012 age 1 1527837352024-0
查看隊列長度:xlen
命令格式:xlen xxx
127.0.0.1:6379> xlen mytopic (integer) 1 127.0.0.1:6379>
獲取數(shù)據(jù):xrange xrevrange
1.xrange 命令格式:
xrange mytopic - + xrange mytopic 生成的ID + count 2
2.xrevrange命令格式:
xrevrange mytopic + 1527837440632 count 3
該命令的意思為:反向查詢ID以無限大為開始,以1527837440632為結(jié)束的entry,但只取出查詢結(jié)果集(降序排列)中的前三個entry;
獲取數(shù)據(jù):xread
1.非阻塞
從stream 中拿ID比0大的4個Entry,按升序排列
xread count 4 streams mytopic 0
2.阻塞
監(jiān)聽name為mystream的stream,從stream中拿比ID比"$"(特殊ID:該stream中此刻最大ID)還大的Entry
xread block 0 streams mystream $
block 0:block表示命令要阻塞,0表示阻塞時間為無限大,不超時,如果設置為>0的整數(shù),即為阻塞超時時間
監(jiān)聽生效后,拿到數(shù)據(jù)監(jiān)聽就失效,與zk的watcher雷同。意思是該命令執(zhí)行后,只能拿到一條ID比設置ID更大的entry,要想繼續(xù)拿,必須執(zhí)行xread命令,官方推薦下一次拿entry使用上一次得到的ID。注意千萬別亂設置很大的ID ,否則你可能永遠拿不到entry。
xread block 0 streams mystream mytopic $ $
收到任何一個stream的消息,本次監(jiān)聽就失效,只能拿到一條數(shù)據(jù),后面還需要拿數(shù)據(jù),可以將各自stream拿到的ID作為最大ID,重新執(zhí)行命令
消費者組:Consumer groups
redis5引入了消費者組的概念,一個stream的數(shù)據(jù)每一個消費者組都發(fā)一份,消費者組里面的消費者競爭同一份數(shù)據(jù),亦即在同一個消費者組內(nèi),一個消息是不可能發(fā)給多個消費者的:

消費者組提供了如下5點保障
- 組內(nèi)消費者消費的消息不重復
- 組內(nèi)消費者名稱必須唯一
- 消費者拿到的消息肯定是沒有被組內(nèi)其他消費者消費過的消息
- 消費者成功消費消息之后要求發(fā)送ACK,然后這條消息才會從消費者組中移除,也就是說消息至少被消費一次,和kafka一樣
- 消費者組會跟蹤所有待處理的消息
基礎命令
1.創(chuàng)建消費者組
xgroup create mytopic mygroup $
該命令的意思是:使用xgroup命令創(chuàng)建了一個mygroup消費者組,該消費者組與mytopic stream進行了關(guān)聯(lián),以后mygroup消費者組中的消費者就會mytopic stream中拿數(shù)據(jù);
符號" $ "代表mytopic stream中目前最大的ID,消費者拿到的entry的id一定會大于此刻$代表的最大ID。你也可以指定這個最大的ID,比如0;
2.從消費者組讀數(shù)據(jù)
使用xreadgroup命令讓消費者consumer_a從mygroup消費者組的mytopic stream中拿最新的,并且沒有被發(fā)送給其他消費者處理的entry:
xreadgroup group mygroup consumer_a count 1 streams mytopic >
參數(shù):
- group:該參數(shù)是必選項
- “>”:該符號只有在消費者組命令xreadgroup中有效,意思為mytopic stream中最新且沒有被其他消費者處理的ID,千萬記住不要與上面"$"最大ID搞混了,否則拿出來的數(shù)據(jù)與你的期望值不符,如果使用的是任何數(shù)組ID,那么該消費者就無法拿到任何新的消息,只是從它的已經(jīng)處理過的消息中拿,并且不會有ACK機制;
如果想一個消費者組關(guān)聯(lián)多個stream可以這樣做:
xgroup create mystream mygroup $ xgroup create mytopic mygroup $ xreadgroup group mygroup consumer_a block 0 count 1 streams mytopic mystream > >
讀消息的參數(shù)多了一個block 0,就是說讀數(shù)據(jù)需要阻塞。
3.發(fā)送ACK
將指定ID對應的entry從consumer的已處理消息列表中刪除
XACK mystream mygroup 1527864992409-0
3、結(jié)合Spring Boot進行redis實時流處理
樣例應用:

項目依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency>
3.1 生產(chǎn)者
理想情況下,生產(chǎn)者和消費者將是兩個不同的微服務/應用程序。
在這里,我們把消費和生產(chǎn)都弄在同一個項目中。
但是,我們基于名為“ app.role ”的自定義屬性來控制應用程序的行為,使其像生產(chǎn)者或消費者。
基于該值,將在Spring中創(chuàng)建相應的組件。
@Service
@ConditionalOnProperty(name="app.role", havingValue="producer")
public class PurchaseEventProducer {
private AtomicInteger atomicInteger = new AtomicInteger(0);
@Value("${stream.key}")
private String streamKey;
@Autowired
private ProductRepository repository;
@Autowired
private ReactiveRedisTemplate<String, String> redisTemplate;
@Scheduled(fixedRateString= "${publish.rate}")
public void publishEvent(){
Product product = this.repository.getRandomProduct();
ObjectRecord<String, Product> record = StreamRecords.newRecord()
.ofObject(product)
.withStreamKey(streamKey);
this.redisTemplate
.opsForStream()
.add(record)
.subscribe(System.out::println);
atomicInteger.incrementAndGet();
}
@Scheduled(fixedRate = 10000)
public void showPublishedEventsSoFar(){
System.out.println(
"Total Events :: " + atomicInteger.get()
);
}
}- publishEvent方法定期發(fā)布一些隨機購買的產(chǎn)品訂單。
- showPublishedEventsSoFar方法僅顯示到目前為止已下的訂單數(shù)。
3.2 消費者
我們的發(fā)布者已經(jīng)準備好。讓我們創(chuàng)建一個消費者。
要使用RedisStreams,我們需要實現(xiàn)StreamListener接口。
@Service
@ConditionalOnProperty(name="app.role", havingValue="consumer")
public class PurchaseEventConsumer implements StreamListener<String, ObjectRecord<String, Product>> {
private AtomicInteger atomicInteger = new AtomicInteger(0);
@Autowired
private ReactiveRedisTemplate<String, String> redisTemplate;
@Override
@SneakyThrows
public void onMessage(ObjectRecord<String, Product> record) {
System.out.println(
InetAddress.getLocalHost().getHostName() + " - consumed :" +
record.getValue()
);
this.redisTemplate
.opsForZSet()
.incrementScore("revenue", record.getValue().getCategory().toString(), record.getValue().getPrice())
.subscribe();
atomicInteger.incrementAndGet();
}
@Scheduled(fixedRate = 10000)
public void showPublishedEventsSoFar(){
System.out.println(
"Total Consumed :: " + atomicInteger.get()
);
}
}在消費者端,我們只簡單地顯示消費記錄情況。
然后,我們獲得支付價格并將其添加到redis排序集中。
像發(fā)布者一樣,我們會定期顯示此使用者消費到的事件數(shù)。
3.3 Redis流配置
創(chuàng)建使用者后,我們需要通過將上述使用者添加到StreamMessageListenerContainer實例中來創(chuàng)建訂閱。
@Configuration
@ConditionalOnProperty(name="app.role", havingValue="consumer")
public class RedisStreamConfig {
@Value("${stream.key}")
private String streamKey;
@Autowired
private StreamListener<String, ObjectRecord<String, Product>> streamListener;
@Bean
public Subscription subscription(RedisConnectionFactory redisConnectionFactory) throws UnknownHostException {
var options = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofSeconds(1))
.targetType(Product.class)
.build();
var listenerContainer = StreamMessageListenerContainer
.create(redisConnectionFactory, options);
var subscription = listenerContainer.receiveAutoAck(
Consumer.from(streamKey, InetAddress.getLocalHost().getHostName()),
StreamOffset.create(streamKey, ReadOffset.lastConsumed()),
streamListener);
listenerContainer.start();
return subscription;
}
}總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
- SpringBoot3集成Redis的方法詳解
- 基于SpringBoot+Redis實現(xiàn)一個簡單的限流器
- SpringBoot中Redis的緩存更新策略詳解
- SpringBoot中Redis自動配置的介紹、原理和使用詳解
- SpringBoot引入Redis報Redis?command?timed?out兩種異常情況
- 使用Spring?Boot實現(xiàn)Redis鍵過期回調(diào)功能示例詳解
- SpringBoot整合Redis實現(xiàn)緩存分頁數(shù)據(jù)查詢功能
- 基于SpringBoot + Redis實現(xiàn)密碼暴力破解防護
- Spring Boot 中的 Redis 分布式鎖
相關(guān)文章
mybatis-plus插入一條數(shù)據(jù),獲取插入數(shù)據(jù)自動生成的主鍵問題
這篇文章主要介紹了mybatis-plus插入一條數(shù)據(jù),獲取插入數(shù)據(jù)自動生成的主鍵問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12
SpringBoot整合jasypt進行重要數(shù)據(jù)加密的操作代碼
Jasypt(Java?Simplified?Encryption)是一個專注于簡化Java加密操作的開源工具,它提供了一種簡單而強大的方式來處理數(shù)據(jù)的加密和解密,使開發(fā)者能夠輕松地保護應用程序中的敏感信息,本文給大家介紹了SpringBoot整合jasypt進行重要數(shù)據(jù)加密,需要的朋友可以參考下2024-05-05
Springboot RestTemplate設置超時時間的方法(Spring boot
這篇文章主要介紹了Springboot RestTemplate設置超時時間的方法,包括Spring boot 版本<=1.3和Spring boot 版本>=1.4,本文通過實例代碼給大家介紹的非常詳細,感興趣的朋友跟隨小編一起看看吧2024-08-08
解決Maven無法下載2.1.7.js7版本的itext依賴問題
本文主要解決使用Maven編譯項目時出現(xiàn)的itext依賴版本問題,通過分析,發(fā)現(xiàn)該問題是由jasperreports依賴的特定版本itext導致的,解決方法是排除jasperreports中的itext依賴,并自行指定更高版本的itext依賴2024-12-12
MyBatis-Plus實現(xiàn)公共字段自動填充功能詳解
在開發(fā)中經(jīng)常遇到多個實體類有共同的屬性字段,這些字段屬于公共字段,也就是很多表中都有這些字段,能不能對于這些公共字段在某個地方統(tǒng)一處理,來簡化開發(fā)呢?MyBatis-Plus就提供了這一功能,本文就來為大家詳細講講2022-08-08

