springboot整合redis之消息隊列
一、項目準備
依賴
<!-- RedisTemplate --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- Redis-Jedis --> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency>
application.yaml配置文件
spring: redis: host: 127.0.0.1 port: 6379 database: 0 timeout: 4000 jedis: pool: max-wait: -1 max-active: -1 max-idle: 20 min-idle: 10
二、配置類
public class ObjectMapperConfig { public static final ObjectMapper objectMapper; private static final String PATTERN = "yyyy-MM-dd HH:mm:ss"; static { JavaTimeModule javaTimeModule = new JavaTimeModule(); javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer()); javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer()); objectMapper = new ObjectMapper() // 轉(zhuǎn)換為格式化的json(控制臺打印時,自動格式化規(guī)范) //.enable(SerializationFeature.INDENT_OUTPUT) // Include.ALWAYS 是序列化對像所有屬性(默認) // Include.NON_NULL 只有不為null的字段才被序列化,屬性為NULL 不序列化 // Include.NON_EMPTY 如果為null或者 空字符串和空集合都不會被序列化 // Include.NON_DEFAULT 屬性為默認值不序列化 .setSerializationInclusion(JsonInclude.Include.NON_NULL) // 如果是空對象的時候,不拋異常 .configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false) // 反序列化的時候如果多了其他屬性,不拋出異常 .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) // 取消時間的轉(zhuǎn)化格式,默認是時間戳,可以取消,同時需要設(shè)置要表現(xiàn)的時間格式 .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false) .setDateFormat(new SimpleDateFormat(PATTERN)) // 對LocalDateTime序列化跟反序列化 .registerModule(javaTimeModule) .setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY) // 此項必須配置,否則會報java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to XXX .enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY) ; } static class LocalDateTimeSerializer extends JsonSerializer<LocalDateTime> { @Override public void serialize(LocalDateTime value, JsonGenerator gen, SerializerProvider serializers) throws IOException { gen.writeString(value.format(DateTimeFormatter.ofPattern(PATTERN))); } } static class LocalDateTimeDeserializer extends JsonDeserializer<LocalDateTime> { @Override public LocalDateTime deserialize(JsonParser p, DeserializationContext deserializationContext) throws IOException { return LocalDateTime.parse(p.getValueAsString(), DateTimeFormatter.ofPattern(PATTERN)); } } }
@Configuration public class RedisConfig { /** * redisTemplate配置 */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); // 配置連接工廠 template.setConnectionFactory(factory); //使用Jackson2JsonRedisSerializer來序列化和反序列化redis的value值(默認使用JDK的序列化方式) Jackson2JsonRedisSerializer<Object> jacksonSerializer = new Jackson2JsonRedisSerializer<>(Object.class); jacksonSerializer.setObjectMapper(ObjectMapperConfig.objectMapper); StringRedisSerializer stringRedisSerializer = new StringRedisSerializer(); // 使用StringRedisSerializer來序列化和反序列化redis的key,value采用json序列化 template.setKeySerializer(stringRedisSerializer); template.setValueSerializer(jacksonSerializer); // 設(shè)置hash key 和value序列化模式 template.setHashKeySerializer(stringRedisSerializer); template.setHashValueSerializer(jacksonSerializer); template.afterPropertiesSet(); return template; } }
三、redis中l(wèi)ist數(shù)據(jù)類型
在Redis中,List類型是按照插入順序排序的字符串鏈表。和數(shù)據(jù)結(jié)構(gòu)中的普通鏈表一樣,我們可以在其頭部和尾部添加新的元素
優(yōu)勢:
- 順序排序,保證先進先出
- 隊列為空時,自動從Redis數(shù)據(jù)庫刪除
- 在隊列的兩頭插入或刪除元素,效率極高,即使隊列中元素達到百萬級
- List中可以包含的最大元素數(shù)量是4294967295
定時器監(jiān)聽隊列
生產(chǎn)者
@Slf4j @Component public class MessageProducer { public static final String MESSAGE_KEY = "message:queue"; @Autowired private RedisTemplate<String,Object> redisTemplate; public void lPush() { for (int i = 0; i < 10; i++) { new Thread(() -> { Long size = redisTemplate.opsForList().leftPush(MESSAGE_KEY, Thread.currentThread().getName() + ":hello world"); log.info(Thread.currentThread().getName() + ":put message size = " + size); }).start(); } } }
消費者:消費消息,定時器以達到監(jiān)聽隊列功能
@Slf4j @Component @EnableScheduling public class MessageConsumer { public static final String MESSAGE_KEY = "message:queue"; @Autowired private RedisTemplate<String,Object> redisTemplate; @Scheduled(initialDelay = 5 * 1000, fixedRate = 2 * 1000) public void rPop() { String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY); log.info(message); } }
@RestController public class RedisController { @Autowired private MessageProducer messageProducer; @GetMapping("/lPush") public void lPush() { messageProducer.lPush(); } }
測試
http://localhost:8080/lPush
可能出現(xiàn)的問題:
1.通過定時器監(jiān)聽List中是否有待處理消息,每執(zhí)行一次都會發(fā)起一次連接,這會造成不必要的浪費。
2.生產(chǎn)速度大于消費速度,隊列堆積,消息時效性差,占用內(nèi)存。
運行即監(jiān)控隊列
修改消息消費者代碼。
當隊列沒有元素時,會阻塞10秒,然后再次監(jiān)聽隊列,
需要注意的是,阻塞時間必須小于連接超時時間
@Slf4j @Component @EnableScheduling public class MessageConsumer { public static final String MESSAGE_KEY = "message:queue"; @Autowired private RedisTemplate<String,Object> redisTemplate; //@Scheduled(initialDelay = 5 * 1000, fixedRate = 2 * 1000) public void rPop() { String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY); log.info(message); } @PostConstruct public void brPop() { new Thread(() -> { while (true) { String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY, 10, TimeUnit.SECONDS); log.info(message); } }).start(); } }
阻塞時間不能為負,直接報錯超時為負
阻塞時間為零,此時阻塞時間等于超時時間,最后報錯連接超時
阻塞時間大于超時時間,報錯連接超時
測試:
消息不可重復(fù)消費,因為消息從隊列POP之后就被移除了,即不支持多個消費者消費同一批數(shù)據(jù)
消息丟失,消費期間發(fā)生異常,消息未能正常消費
四、發(fā)布/訂閱模式
消息可以重復(fù)消費,多個消費者訂閱同一頻道即可
一個消費者根據(jù)匹配規(guī)則訂閱多個頻道
消費者只能消費訂閱之后發(fā)布的消息,這意味著,消費者下線再上線這期間發(fā)布的消息將會丟失
數(shù)據(jù)不具有持久化。同樣Redis宕機也會數(shù)據(jù)丟失
消息發(fā)布后,是推送到一個緩沖區(qū)(內(nèi)存),消費者從緩沖區(qū)拉取消息,當消息堆積,緩沖區(qū)溢出,消費者就會被迫下線,同時釋放對應(yīng)的緩沖區(qū)
RedisConfig中添加監(jiān)聽器
/** * redis消息監(jiān)聽器容器 */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //訂閱頻道,通配符*表示任意多個占位符 container.addMessageListener(new MySubscribe(), new PatternTopic("channel*")); return container; }
訂閱者
package com.yzm.redis08.message; import org.springframework.data.redis.connection.Message; import org.springframework.data.redis.connection.MessageListener; public class MySubscribe implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { System.out.println("訂閱頻道:" + new String(message.getChannel())); System.out.println("接收數(shù)據(jù):" + new String(message.getBody())); } }
消息發(fā)布
@GetMapping("/publish") public void publish() { redisTemplate.convertAndSend("channel_first", "hello world"); }
另一種發(fā)布方式
/** * redis消息監(jiān)聽器容器 */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //訂閱頻道,通配符*表示任意多個占位符 container.addMessageListener(new MySubscribe(), new PatternTopic("channel*")); // 通配符?:表示一個占位符 MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage"); listenerAdapter.afterPropertiesSet(); container.addMessageListener(listenerAdapter, new PatternTopic("channel?")); return container; }
public class MySubscribe2 { public void getMessage(Object message, String channel) { System.out.println("訂閱頻道2:" + channel); System.out.println("接收數(shù)據(jù)2:" + message); } }
@GetMapping("/publish2") public void publish2() { redisTemplate.convertAndSend("channel2", "hello world"); }
消息是實體對象,進行轉(zhuǎn)換
@Data @Builder @NoArgsConstructor @AllArgsConstructor public class User implements Serializable { private static final long serialVersionUID = 5250232737975907491L; private Integer id; private String username; }
public class MySubscribe3 implements MessageListener { @Override public void onMessage(Message message, byte[] bytes) { Jackson2JsonRedisSerializer<User> jacksonSerializer = new Jackson2JsonRedisSerializer<>(User.class); jacksonSerializer.setObjectMapper(ObjectMapperConfig.objectMapper); User user = jacksonSerializer.deserialize(message.getBody()); System.out.println("訂閱頻道3:" + new String(message.getChannel())); System.out.println("接收數(shù)據(jù)3:" + user); } }
/** * redis消息監(jiān)聽器容器 */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); //訂閱頻道,通配符*:表示任意多個占位符 container.addMessageListener(new MySubscribe(), new PatternTopic("channel*")); // 通配符?:表示一個占位符 MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage"); listenerAdapter.afterPropertiesSet(); container.addMessageListener(listenerAdapter, new PatternTopic("channel?")); container.addMessageListener(new MySubscribe3(), new PatternTopic("user")); return container; }
@GetMapping("/publish3") public void publish3() { User user = User.builder().id(1).username("yzm").build(); redisTemplate.convertAndSend("user", user); }
五、ZSet實現(xiàn)延遲隊列
生產(chǎn)消息,score = 時間搓+60s隨機數(shù)
public static final String MESSAGE_ZKEY = "message:ZSetqueue"; public volatile AtomicInteger count = new AtomicInteger(); public void zAdd() { for (int i = 0; i < 10; i++) { new Thread(() -> { int increment = count.getAndIncrement(); log.info(Thread.currentThread().getName() + ":put message to zset = " + increment); double score = System.currentTimeMillis() + new Random().nextInt(60 * 1000); redisTemplate.opsForZSet().add(MESSAGE_ZKEY, Thread.currentThread().getName() + " hello zset:" + increment, score); }).start(); } }
消費者:定時任務(wù),每秒執(zhí)行一次
public static final String MESSAGE_ZKEY = "message:ZSetqueue"; public SimpleDateFormat simpleDateFormat = new SimpleDateFormat(); @Scheduled(initialDelay = 5 * 1000, fixedRate = 1000) public void zrangebysocre() { log.info("延時隊列消費。。。"); // 拉取score小于當前時間戳的消息 Set<Object> messages = redisTemplate.opsForZSet().rangeByScore(MESSAGE_ZKEY, 0, System.currentTimeMillis()); if (messages != null) { for (Object message : messages) { Double score = redisTemplate.opsForZSet().score(MESSAGE_ZKEY, message); log.info("消費了:" + message + "消費時間為:" + simpleDateFormat.format(score)); redisTemplate.opsForZSet().remove(MESSAGE_ZKEY, message); } } }
@GetMapping("/zadd") public void zadd() { messageProducer.zAdd(); }
到此這篇關(guān)于springboot整合redis之消息隊列的文章就介紹到這了,更多相關(guān)springboot redis消息隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java中執(zhí)行docker命令的實現(xiàn)示例
本文主要介紹了Java中執(zhí)行docker命令的實現(xiàn)示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-08-08Java利用ffmpeg實現(xiàn)視頻MP4轉(zhuǎn)m3u8
本文綜合了下網(wǎng)上教程,從ffmpeg工具轉(zhuǎn)碼,ffmpeg視頻播放,java語言操控ffmpeg轉(zhuǎn)碼,轉(zhuǎn)碼后視頻上傳阿里云oss,四個方面完整記錄下這個流程,需要的朋友可以參考下2024-02-02mybatis(mybatis-plus)映射文件(XML文件)中特殊字符轉(zhuǎn)義的實現(xiàn)
XML 文件在解析時會將五種特殊字符進行轉(zhuǎn)義,本文主要介紹了mybatis(mybatis-plus)映射文件(XML文件)中特殊字符轉(zhuǎn)義的實現(xiàn),具有一定的參考價值,感興趣的可以了解一下2023-12-12阿里dubbo出錯提示Thread pool is EXHAUSTED問題及解決方法
這篇文章主要介紹了阿里dubbo出錯提示Thread pool is EXHAUSTED的問題及解決方法,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-08-08SpringBoot接收form-data和x-www-form-urlencoded數(shù)據(jù)的方法
form-data和x-www-form-urlencoded是兩種不同的HTTP請求體格式,本文主要介紹了SpringBoot接收form-data和x-www-form-urlencoded數(shù)據(jù)的方法,具有一定的參考價值,感興趣的可以了解一下2024-05-05springboot如何實現(xiàn)導(dǎo)入其他配置類
這篇文章主要介紹了springboot如何實現(xiàn)導(dǎo)入其他配置類問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-11-11Spring5.2.x 源碼本地環(huán)境搭建的方法步驟
這篇文章主要介紹了Spring5.2.x 源碼本地環(huán)境搭建的方法步驟,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-09-09