SpringBoot中實現(xiàn)Redis?Stream隊列的代碼實例
前言
簡單實現(xiàn)一下在SpringBoot中操作Redis Stream隊列的方式,監(jiān)聽隊列中的消息進(jìn)行消費(fèi)。
- jdk:1.8
- springboot-version:2.6.3
- redis:5.0.1(5版本以上才有Stream隊列)
準(zhǔn)備工作
1、pom
redis 依賴包(version 2.6.3)
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
2、 yml
spring: redis: database: 0 host: 127.0.0.1
3、 RedisStreamUtil工具類
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.StreamInfo; import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.util.List; import java.util.Map; @Component public class RedisStreamUtil { @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 創(chuàng)建消費(fèi)組 * * @param key 鍵名稱 * @param group 組名稱 * @return {@link String} */ public String oup(String key, String group) { return redisTemplate.opsForStream().createGroup(key, group); } /** * 獲取消費(fèi)者信息 * * @param key 鍵名稱 * @param group 組名稱 * @return {@link StreamInfo.XInfoConsumers} */ public StreamInfo.XInfoConsumers queryConsumers(String key, String group) { return redisTemplate.opsForStream().consumers(key, group); } /** * 查詢組信息 * * @param key 鍵名稱 * @return */ public StreamInfo.XInfoGroups queryGroups(String key) { return redisTemplate.opsForStream().groups(key); } // 添加Map消息 public String addMap(String key, Map<String, Object> value) { return redisTemplate.opsForStream().add(key, value).getValue(); } // 讀取消息 public List<MapRecord<String, Object, Object>> read(String key) { return redisTemplate.opsForStream().read(StreamOffset.fromStart(key)); } // 確認(rèn)消費(fèi) public Long ack(String key, String group, String... recordIds) { return redisTemplate.opsForStream().acknowledge(key, group, recordIds); } // 刪除消息。當(dāng)一個節(jié)點的所有消息都被刪除,那么該節(jié)點會自動銷毀 public Long del(String key, String... recordIds) { return redisTemplate.opsForStream().delete(key, recordIds); } // 判斷是否存在key public boolean hasKey(String key) { Boolean aBoolean = redisTemplate.hasKey(key); return aBoolean != null && aBoolean; } }
代碼實現(xiàn)
生產(chǎn)者發(fā)送消息
生產(chǎn)者發(fā)送消息,在Service層創(chuàng)建addMessage
方法,往隊列中發(fā)送消息。
代碼中addMap()
方法第一個參數(shù)為key,第二個參數(shù)為value,該key要和后續(xù)配置的保持一致,暫時先記住這個key。
@Service @Slf4j @RequiredArgsConstructor public class RedisStreamMqServiceImpl implements RedisStreamMqService { private final RedisStreamUtil redisStreamUtil; /** * 發(fā)送一個消息 * * @return {@code Object} */ @Override public Object addMessage() { RedisUser redisUser = new RedisUser(); redisUser.setAge(18); redisUser.setName("hcr"); redisUser.setEmail("156ef561@gmail.com"); Map<String, Object> message = new HashMap<>(); message.put("user", redisUser); String recordId = redisStreamUtil.addMap("mystream", message); return recordId; } }
controller接口方法
@RestController @RequestMapping("/redis") @Slf4j @RequiredArgsConstructor public class RedisController { private final RedisStreamMqService redisStreamMqService; @GetMapping("/addMessage") public Object addMessage() { return redisStreamMqService.addMessage(); } }
調(diào)用測試,查看redis中是否正常添加數(shù)據(jù)。
接口返回數(shù)據(jù)
1702622585248-0
查看redis中的數(shù)據(jù)
消費(fèi)者監(jiān)聽消息進(jìn)行消費(fèi)
創(chuàng)建RedisConsumersListener
監(jiān)聽器
import cn.hcr.utils.RedisStreamUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.stream.StreamListener; import org.springframework.stereotype.Component; import java.util.Map; @Component @Slf4j @RequiredArgsConstructor public class RedisConsumersListener implements StreamListener<String, MapRecord<String, String, String>> { public final RedisStreamUtil redisStreamUtil; /** * 監(jiān)聽器 * * @param message */ @Override public void onMessage(MapRecord<String, String, String> message) { // stream的key值 String streamKey = message.getStream(); //消息ID RecordId recordId = message.getId(); //消息內(nèi)容 Map<String, String> msg = message.getValue(); log.info("【streamKey】= " + streamKey + ",【recordId】= " + recordId + ",【msg】=" + msg); //處理邏輯 //邏輯處理完成后,ack消息,刪除消息,group為消費(fèi)組名稱 StreamInfo.XInfoGroups xInfoGroups = redisStreamUtil.queryGroups(streamKey); xInfoGroups.forEach(xInfoGroup -> redisStreamUtil.ack(streamKey, xInfoGroup.groupName(), recordId.getValue())); redisStreamUtil.del(streamKey, recordId.getValue()); } }
創(chuàng)建RedisConfig
配置類,配置監(jiān)聽
package cn.hcr.config; import cn.hcr.listener.RedisConsumersListener; import cn.hcr.utils.RedisStreamUtil; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import lombok.var; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.ReadOffset; import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.data.redis.stream.Subscription; import javax.annotation.Resource; import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @Configuration @Slf4j public class RedisConfig { @Resource private RedisStreamUtil redisStreamUtil; /** * redis序列化 * * @param redisConnectionFactory * @return {@code RedisTemplate<String, Object>} */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(redisConnectionFactory); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); template.setKeySerializer(stringRedisSerializer); template.setHashKeySerializer(stringRedisSerializer); template.setValueSerializer(jackson2JsonRedisSerializer); template.setHashValueSerializer(jackson2JsonRedisSerializer); template.afterPropertiesSet(); return template; } @Bean public Subscription subscription(RedisConnectionFactory factory) { AtomicInteger index = new AtomicInteger(1); int processors = Runtime.getRuntime().availableProcessors(); ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS, new LinkedBlockingDeque<>(), r -> { Thread thread = new Thread(r); thread.setName("async-stream-consumer-" + index.getAndIncrement()); thread.setDaemon(true); return thread; }); StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> options = StreamMessageListenerContainer .StreamMessageListenerContainerOptions .builder() // 一次最多獲取多少條消息 .batchSize(5) .executor(executor) .pollTimeout(Duration.ofSeconds(1)) .errorHandler(throwable -> { log.error("[MQ handler exception]", throwable); throwable.printStackTrace(); }) .build(); //該key和group可根據(jù)需求自定義配置 String streamName = "mystream"; String groupname = "mygroup"; initStream(streamName, groupname); var listenerContainer = StreamMessageListenerContainer.create(factory, options); // 手動ask消息 Subscription subscription = listenerContainer.receive(Consumer.from(groupname, "zhuyazhou"), StreamOffset.create(streamName, ReadOffset.lastConsumed()), new RedisConsumersListener(redisStreamUtil)); // 自動ask消息 /* Subscription subscription = listenerContainer.receiveAutoAck(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]), StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());*/ listenerContainer.start(); return subscription; } private void initStream(String key, String group) { boolean hasKey = redisStreamUtil.hasKey(key); if (!hasKey) { Map<String, Object> map = new HashMap<>(1); map.put("field", "value"); //創(chuàng)建主題 String result = redisStreamUtil.addMap(key, map); //創(chuàng)建消費(fèi)組 redisStreamUtil.oup(key, group); //將初始化的值刪除掉 redisStreamUtil.del(key, result); log.info("stream:{}-group:{} initialize success", key, group); } } }
redisTemplate:該bean用于配置redis序列化
subscription:配置監(jiān)聽
initStream:初始化消費(fèi)組
監(jiān)聽測試
使用addMessage()
方法投送一條消息后,查看控制臺輸出信息。
【streamKey】= mystream, 【recordId】= 1702623008044-0, 【msg】= {user=[ "cn.hcr.pojo.RedisUser", {"name":"hcr","age":18,"email":"156ef561@gmail.com"} ] }
總結(jié)
以上就是在SpringBoot中簡單實現(xiàn)Redis Stream隊列的Demo,如有需要源碼或者哪里不清楚的請評論或者發(fā)送私信。
Template:該bean用于配置redis序列化
subscription:配置監(jiān)聽
initStream:初始化消費(fèi)組
到此這篇關(guān)于SpringBoot中實現(xiàn)Redis Stream隊列的文章就介紹到這了,更多相關(guān)SpringBoot實現(xiàn)Redis Stream隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java并發(fā) synchronized鎖住的內(nèi)容解析
這篇文章主要介紹了Java并發(fā) synchronized鎖住的內(nèi)容解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-09-09springboot在filter中如何用threadlocal存放用戶身份信息
這篇文章主要介紹了springboot中在filter中如何用threadlocal存放用戶身份信息,本文章主要描述通過springboot的filter類,在過濾器中設(shè)置jwt信息進(jìn)行身份信息保存的方法,需要的朋友可以參考下2024-07-07SpringBoot+React實現(xiàn)計算個人所得稅
本文將以個人所得稅的計算為例,使用React+SpringBoot+GcExcel來實現(xiàn)這一功能,文中的示例代碼講解詳細(xì),具有一定的學(xué)習(xí)價值,感興趣的小伙伴可以了解下2023-09-09SpringBoot項目設(shè)置斷點debug調(diào)試無效忽略web.xml問題的解決
這篇文章主要介紹了SpringBoot項目設(shè)置斷點debug調(diào)試無效忽略web.xml問題的解決,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-08-08Java編程利用socket多線程訪問服務(wù)器文件代碼示例
這篇文章主要介紹了Java編程利用socket多線程訪問服務(wù)器文件代碼示例,具有一定參考價值,需要的朋友可以了解下。2017-10-10Java的@Transactional、@Aysnc、事務(wù)同步問題詳解
這篇文章主要介紹了Java的@Transactional、@Aysnc、事務(wù)同步問題詳解,現(xiàn)在我們需要在一個業(yè)務(wù)方法中插入一個用戶,這個業(yè)務(wù)方法我們需要加上事務(wù),然后插入用戶后,我們要異步的方式打印出數(shù)據(jù)庫中所有存在的用戶,需要的朋友可以參考下2023-11-11