SpringBoot使用Redis Stream實(shí)現(xiàn)輕量消息隊(duì)列的示例代碼
引言
Redis Stream 是 Redis 5.0 引入的一種數(shù)據(jù)結(jié)構(gòu),用于處理日志類型的數(shù)據(jù)。它提供了高效、可靠的方式來處理和存儲(chǔ)時(shí)間序列數(shù)據(jù),如事件、消息等。其設(shè)計(jì)靈感源于 Kafka 和類似的消息隊(duì)列系統(tǒng),且完全集成在 Redis 中,利用了 Redis 的高性能和持久化特性。
依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
說明:此部分定義了 Redis 相關(guān)的依賴,確保項(xiàng)目能夠引入并使用 Spring Boot 提供的 Redis 啟動(dòng)器。
RedisTemplate 配置
package com.mjg.config; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; @Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(connectionFactory); Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); // om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY); // 注冊(cè) Java 8 日期時(shí)間模塊 om.registerModule(new JavaTimeModule()); om.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); om.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); jackson2JsonRedisSerializer.serialize(om); StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); // key 采用 String 的序列化方式 template.setKeySerializer(stringRedisSerializer); // hash 的 key 也采用 String 的序列化方式 template.setHashKeySerializer(stringRedisSerializer); // value 序列化方式采用 jackson template.setValueSerializer(jackson2JsonRedisSerializer); // hash 的 value 序列化方式采用 jackson template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } }
說明:此配置類用于設(shè)置 RedisTemplate 的序列化方式,以滿足不同數(shù)據(jù)類型的存儲(chǔ)和讀取需求。
RedisStreamConfig
package com.mjg.config; import cn.hutool.core.convert.Convert; import cn.hutool.core.util.StrUtil; import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.RedisServerCommands; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StreamOperations; import org.springframework.data.redis.stream.StreamListener; import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.Assert; import java.net.InetAddress; import java.time.Duration; import java.util.Properties; @Slf4j @RequiredArgsConstructor @Configuration public class RedisStreamConfig implements InitializingBean, DisposableBean { private final RedisTemplate<String, Object> redisTemplate; public static String streamName = "user-event-stream"; public static String userEventGroup = "user-event-group"; private final ThreadPoolTaskExecutor threadPoolTaskExecutor; /** * 消息偵聽器容器,用于監(jiān)聽 Redis Stream 中的消息 * * @param connectionFactory Redis 連接工廠,用于創(chuàng)建 Redis 連接 * @param messageConsumer 消息消費(fèi)者,用于處理接收到的消息 * @return 返回 {@link StreamMessageListenerContainer}<{@link String}, {@link ObjectRecord}<{@link String}, {@link String}>> 類型的消息偵聽器容器 */ @Bean public StreamMessageListenerContainer<String, ObjectRecord<String, String>> messageListenerContainer(RedisConnectionFactory connectionFactory, MessageConsumer messageConsumer) { StreamMessageListenerContainer<String, ObjectRecord<String, String>> listenerContainer = streamContainer(streamName, connectionFactory, messageConsumer); listenerContainer.start(); return listenerContainer; } /** * 創(chuàng)建一個(gè)流容器,用于監(jiān)聽 Redis Stream 中的數(shù)據(jù) * * @param streamName Redis Stream 的名稱 * @param connectionFactory Redis 連接工廠 * @param streamListener 綁定的監(jiān)聽類 * @return 返回 StreamMessageListenerContainer 對(duì)象 */ @SneakyThrows private StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamContainer(String streamName, RedisConnectionFactory connectionFactory, StreamListener<String, ObjectRecord<String, String>> streamListener) { StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> options = StreamMessageListenerContainer.StreamMessageListenerContainerOptions .builder() .pollTimeout(Duration.ofSeconds(5)) // 拉取消息超時(shí)時(shí)間 .batchSize(10) // 批量抓取消息 .targetType(String.class) // 傳遞的數(shù)據(jù)類型 .executor(threadPoolTaskExecutor) .build(); StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer .create(connectionFactory, options); // 指定消費(fèi)最新的消息 StreamOffset<String> offset = StreamOffset.create(streamName, ReadOffset.lastConsumed()); // 創(chuàng)建消費(fèi)者 StreamMessageListenerContainer.StreamReadRequest<String> streamReadRequest = buildStreamReadRequest(offset, streamListener); // 指定消費(fèi)者對(duì)象 container.register(streamReadRequest, streamListener); return container; } /** * 生成流讀取請(qǐng)求 * * @param offset 偏移量,用于指定從 Redis Stream 中的哪個(gè)位置開始讀取消息 * @param streamListener 流偵聽器,用于處理接收到的消息 * @return 返回一個(gè) StreamReadRequest 對(duì)象,表示一個(gè)流讀取請(qǐng)求 * @throws Exception 當(dāng) streamListener 無法識(shí)別為 MessageConsumer 類型時(shí),拋出異常 */ private StreamMessageListenerContainer.StreamReadRequest<String> buildStreamReadRequest(StreamOffset<String> offset, StreamListener<String, ObjectRecord<String, String>> streamListener) throws Exception { Consumer consumer; if (streamListener instanceof MessageConsumer) { consumer = Consumer.from(userEventGroup, InetAddress.getLocalHost().getHostName()); } else { throw new Exception("無法識(shí)別的 stream key"); } // 關(guān)閉自動(dòng) ack 確認(rèn) return StreamMessageListenerContainer.StreamReadRequest.builder(offset) .errorHandler((error) -> { log.error(error.getMessage()); }) .cancelOnError(e -> false) .consumer(consumer) // 關(guān)閉自動(dòng) ack 確認(rèn) .autoAcknowledge(false) .build(); } /** * 檢查 Redis 版本是否符合要求 * * @throws IllegalStateException 如果 Redis 版本小于 5.0.0 版本,拋出該異常 */ private void checkRedisVersion() { // 獲得 Redis 版本 Properties info = redisTemplate.execute((RedisCallback<Properties>) RedisServerCommands::info); Assert.notNull(info, "Redis info is null"); Object redisVersion = info.get("redis_version"); Integer anInt = Convert.toInt(redisVersion); if (anInt < 5) { throw new IllegalStateException(StrUtil.format("您當(dāng)前的 Redis 版本為 {},小于最低要求的 5.0.0 版本!", redisVersion)); } } @Override public void destroy() throws Exception { } @Override public void afterPropertiesSet() throws Exception { checkRedisVersion(); StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream(); if (Boolean.FALSE.equals(redisTemplate.hasKey(streamName))) { streamOperations.createGroup(streamName, ReadOffset.from("0"), userEventGroup); } } }
說明:該配置類實(shí)現(xiàn)了對(duì) Redis Stream 的相關(guān)配置,包括消息監(jiān)聽容器的創(chuàng)建、流讀取請(qǐng)求的生成、Redis 版本的檢查以及組的創(chuàng)建等功能。
生產(chǎn)者
package com.mjg.config; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.connection.stream.StreamRecords; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.util.Collections; @Component @RequiredArgsConstructor @Slf4j public class MessageProducer { private final RedisTemplate<String, Object> redisTemplate; public void sendMessage(String streamKey, Object message) { RecordId recordId = redisTemplate .opsForStream().add(StreamRecords.newRecord() .ofMap(Collections.singletonMap("data", message)) .withStreamKey(streamKey)); if (recordId!= null) { log.info("Message sent to Stream '{}' with RecordId: {}", streamKey, recordId); } } }
說明:MessageProducer 類負(fù)責(zé)向 Redis Stream 發(fā)送消息。
消費(fèi)者
package com.mjg.config; import lombok.RequiredArgsConstructor; import org.springframework.data.redis.connection.stream.ObjectRecord; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.stream.StreamListener; import org.springframework.stereotype.Component; @RequiredArgsConstructor @Component public class MessageConsumer implements StreamListener<String, ObjectRecord<String, String>> { private final RedisTemplate<String, Object> redisTemplate; @Override public void onMessage(ObjectRecord<String, String> message) { String stream = message.getStream(); String messageId = message.getId().toString(); String messageBody = message.getValue(); System.out.println("Received message from Stream '" + stream + "' with messageId: " + messageId); System.out.println("Message body: " + messageBody); // 消息應(yīng)答 redisTemplate.opsForStream().acknowledge(RedisStreamConfig.streamName, RedisStreamConfig.userEventGroup, message.getId()); } }
說明:MessageConsumer 類實(shí)現(xiàn)了 StreamListener 接口,用于處理從 Redis Stream 接收到的消息,并進(jìn)行相應(yīng)的應(yīng)答操作。
測(cè)試
@RequiredArgsConstructor @Slf4j @RestController public class MessageController { public static String streamName = "user-event-stream"; private final MessageProducer messageProducer; @GetMapping("/send") public void send() { messageProducer.sendMessage(streamName, "hello 啦啦啦啦" + LocalDateTime.now()); } }
說明:MessageController 類中的 send 方法通過調(diào)用 MessageProducer 來發(fā)送消息到指定的 Redis Stream 中。
以上就是SpringBoot使用Redis Stream實(shí)現(xiàn)輕量消息隊(duì)列的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于SpringBoot Redis Stream輕量消息隊(duì)列的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
詳解JavaEE 使用 Redis 數(shù)據(jù)庫進(jìn)行內(nèi)容緩存和高訪問負(fù)載
本篇文章主要介紹了JavaEE 使用 Redis 數(shù)據(jù)庫進(jìn)行內(nèi)容緩存和高訪問負(fù)載,具有一定的參考價(jià)值,有興趣的可以了解一下2017-08-08Spring中ApplicationListener的使用解析
這篇文章主要介紹了Spring中ApplicationListener的使用解析,ApplicationContext事件機(jī)制是觀察者設(shè)計(jì)模式的實(shí)現(xiàn),通過ApplicationEvent類和ApplicationListener接口,需要的朋友可以參考下2023-12-12老生常談Java中instanceof關(guān)鍵字的理解
java 中的instanceof 運(yùn)算符是用來在運(yùn)行時(shí)指出對(duì)象是否是特定類的一個(gè)實(shí)例。這篇文章主要介紹了老生常談Java中instanceof關(guān)鍵字的理解,需要的朋友可以參考下2018-10-10詳談hibernate,jpa與spring?data?jpa三者之間的關(guān)系
這篇文章主要介紹了hibernate,jpa與spring?data?jpa三者之間的關(guān)系,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11Java實(shí)現(xiàn)發(fā)送HTML內(nèi)容并帶附件的電子郵件
這篇文章主要為大家詳細(xì)介紹了如何使用Java實(shí)現(xiàn)發(fā)送HTML內(nèi)容并帶附件的電子郵件,文中的示例代碼講解詳細(xì),有需要的小伙伴可以參考一下2025-01-01Spring MVC過濾器-登錄過濾的代碼實(shí)現(xiàn)
本篇文章主要介紹了Spring MVC過濾器-登錄過濾,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧。2017-01-01