springboot整合redis之消息隊(duì)列
一、項(xiàng)目準(zhǔn)備
依賴
<!-- RedisTemplate -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Redis-Jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
application.yaml配置文件
spring:
redis:
host: 127.0.0.1
port: 6379
database: 0
timeout: 4000
jedis:
pool:
max-wait: -1
max-active: -1
max-idle: 20
min-idle: 10
二、配置類
public class ObjectMapperConfig {
public static final ObjectMapper objectMapper;
private static final String PATTERN = "yyyy-MM-dd HH:mm:ss";
static {
JavaTimeModule javaTimeModule = new JavaTimeModule();
javaTimeModule.addSerializer(LocalDateTime.class, new LocalDateTimeSerializer());
javaTimeModule.addDeserializer(LocalDateTime.class, new LocalDateTimeDeserializer());
objectMapper = new ObjectMapper()
// 轉(zhuǎn)換為格式化的json(控制臺(tái)打印時(shí),自動(dòng)格式化規(guī)范)
//.enable(SerializationFeature.INDENT_OUTPUT)
// Include.ALWAYS 是序列化對(duì)像所有屬性(默認(rèn))
// Include.NON_NULL 只有不為null的字段才被序列化,屬性為NULL 不序列化
// Include.NON_EMPTY 如果為null或者 空字符串和空集合都不會(huì)被序列化
// Include.NON_DEFAULT 屬性為默認(rèn)值不序列化
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
// 如果是空對(duì)象的時(shí)候,不拋異常
.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false)
// 反序列化的時(shí)候如果多了其他屬性,不拋出異常
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
// 取消時(shí)間的轉(zhuǎn)化格式,默認(rèn)是時(shí)間戳,可以取消,同時(shí)需要設(shè)置要表現(xiàn)的時(shí)間格式
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
.setDateFormat(new SimpleDateFormat(PATTERN))
// 對(duì)LocalDateTime序列化跟反序列化
.registerModule(javaTimeModule)
.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY)
// 此項(xiàng)必須配置,否則會(huì)報(bào)java.lang.ClassCastException: java.util.LinkedHashMap cannot be cast to XXX
.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY)
;
}
static class LocalDateTimeSerializer extends JsonSerializer<LocalDateTime> {
@Override
public void serialize(LocalDateTime value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
gen.writeString(value.format(DateTimeFormatter.ofPattern(PATTERN)));
}
}
static class LocalDateTimeDeserializer extends JsonDeserializer<LocalDateTime> {
@Override
public LocalDateTime deserialize(JsonParser p, DeserializationContext deserializationContext) throws IOException {
return LocalDateTime.parse(p.getValueAsString(), DateTimeFormatter.ofPattern(PATTERN));
}
}
}
@Configuration
public class RedisConfig {
/**
* redisTemplate配置
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
// 配置連接工廠
template.setConnectionFactory(factory);
//使用Jackson2JsonRedisSerializer來(lái)序列化和反序列化redis的value值(默認(rèn)使用JDK的序列化方式)
Jackson2JsonRedisSerializer<Object> jacksonSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
jacksonSerializer.setObjectMapper(ObjectMapperConfig.objectMapper);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// 使用StringRedisSerializer來(lái)序列化和反序列化redis的key,value采用json序列化
template.setKeySerializer(stringRedisSerializer);
template.setValueSerializer(jacksonSerializer);
// 設(shè)置hash key 和value序列化模式
template.setHashKeySerializer(stringRedisSerializer);
template.setHashValueSerializer(jacksonSerializer);
template.afterPropertiesSet();
return template;
}
}
三、redis中l(wèi)ist數(shù)據(jù)類型
在Redis中,List類型是按照插入順序排序的字符串鏈表。和數(shù)據(jù)結(jié)構(gòu)中的普通鏈表一樣,我們可以在其頭部和尾部添加新的元素
優(yōu)勢(shì):
- 順序排序,保證先進(jìn)先出
- 隊(duì)列為空時(shí),自動(dòng)從Redis數(shù)據(jù)庫(kù)刪除
- 在隊(duì)列的兩頭插入或刪除元素,效率極高,即使隊(duì)列中元素達(dá)到百萬(wàn)級(jí)
- List中可以包含的最大元素?cái)?shù)量是4294967295
定時(shí)器監(jiān)聽(tīng)隊(duì)列
生產(chǎn)者
@Slf4j
@Component
public class MessageProducer {
public static final String MESSAGE_KEY = "message:queue";
@Autowired
private RedisTemplate<String,Object> redisTemplate;
public void lPush() {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
Long size = redisTemplate.opsForList().leftPush(MESSAGE_KEY, Thread.currentThread().getName() + ":hello world");
log.info(Thread.currentThread().getName() + ":put message size = " + size);
}).start();
}
}
}
消費(fèi)者:消費(fèi)消息,定時(shí)器以達(dá)到監(jiān)聽(tīng)隊(duì)列功能
@Slf4j
@Component
@EnableScheduling
public class MessageConsumer {
public static final String MESSAGE_KEY = "message:queue";
@Autowired
private RedisTemplate<String,Object> redisTemplate;
@Scheduled(initialDelay = 5 * 1000, fixedRate = 2 * 1000)
public void rPop() {
String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY);
log.info(message);
}
}
@RestController
public class RedisController {
@Autowired
private MessageProducer messageProducer;
@GetMapping("/lPush")
public void lPush() {
messageProducer.lPush();
}
}
測(cè)試
http://localhost:8080/lPush

可能出現(xiàn)的問(wèn)題:
1.通過(guò)定時(shí)器監(jiān)聽(tīng)List中是否有待處理消息,每執(zhí)行一次都會(huì)發(fā)起一次連接,這會(huì)造成不必要的浪費(fèi)。
2.生產(chǎn)速度大于消費(fèi)速度,隊(duì)列堆積,消息時(shí)效性差,占用內(nèi)存。
運(yùn)行即監(jiān)控隊(duì)列
修改消息消費(fèi)者代碼。
當(dāng)隊(duì)列沒(méi)有元素時(shí),會(huì)阻塞10秒,然后再次監(jiān)聽(tīng)隊(duì)列,
需要注意的是,阻塞時(shí)間必須小于連接超時(shí)時(shí)間
@Slf4j
@Component
@EnableScheduling
public class MessageConsumer {
public static final String MESSAGE_KEY = "message:queue";
@Autowired
private RedisTemplate<String,Object> redisTemplate;
//@Scheduled(initialDelay = 5 * 1000, fixedRate = 2 * 1000)
public void rPop() {
String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY);
log.info(message);
}
@PostConstruct
public void brPop() {
new Thread(() -> {
while (true) {
String message = (String) redisTemplate.opsForList().rightPop(MESSAGE_KEY, 10, TimeUnit.SECONDS);
log.info(message);
}
}).start();
}
}

阻塞時(shí)間不能為負(fù),直接報(bào)錯(cuò)超時(shí)為負(fù)
阻塞時(shí)間為零,此時(shí)阻塞時(shí)間等于超時(shí)時(shí)間,最后報(bào)錯(cuò)連接超時(shí)
阻塞時(shí)間大于超時(shí)時(shí)間,報(bào)錯(cuò)連接超時(shí)
測(cè)試:

消息不可重復(fù)消費(fèi),因?yàn)橄年?duì)列POP之后就被移除了,即不支持多個(gè)消費(fèi)者消費(fèi)同一批數(shù)據(jù)
消息丟失,消費(fèi)期間發(fā)生異常,消息未能正常消費(fèi)
四、發(fā)布/訂閱模式
消息可以重復(fù)消費(fèi),多個(gè)消費(fèi)者訂閱同一頻道即可
一個(gè)消費(fèi)者根據(jù)匹配規(guī)則訂閱多個(gè)頻道
消費(fèi)者只能消費(fèi)訂閱之后發(fā)布的消息,這意味著,消費(fèi)者下線再上線這期間發(fā)布的消息將會(huì)丟失
數(shù)據(jù)不具有持久化。同樣Redis宕機(jī)也會(huì)數(shù)據(jù)丟失
消息發(fā)布后,是推送到一個(gè)緩沖區(qū)(內(nèi)存),消費(fèi)者從緩沖區(qū)拉取消息,當(dāng)消息堆積,緩沖區(qū)溢出,消費(fèi)者就會(huì)被迫下線,同時(shí)釋放對(duì)應(yīng)的緩沖區(qū)
RedisConfig中添加監(jiān)聽(tīng)器
/**
* redis消息監(jiān)聽(tīng)器容器
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//訂閱頻道,通配符*表示任意多個(gè)占位符
container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));
return container;
}
訂閱者
package com.yzm.redis08.message;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
public class MySubscribe implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
System.out.println("訂閱頻道:" + new String(message.getChannel()));
System.out.println("接收數(shù)據(jù):" + new String(message.getBody()));
}
}
消息發(fā)布
@GetMapping("/publish")
public void publish() {
redisTemplate.convertAndSend("channel_first", "hello world");
}

另一種發(fā)布方式
/**
* redis消息監(jiān)聽(tīng)器容器
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//訂閱頻道,通配符*表示任意多個(gè)占位符
container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));
// 通配符?:表示一個(gè)占位符
MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage");
listenerAdapter.afterPropertiesSet();
container.addMessageListener(listenerAdapter, new PatternTopic("channel?"));
return container;
}
public class MySubscribe2 {
public void getMessage(Object message, String channel) {
System.out.println("訂閱頻道2:" + channel);
System.out.println("接收數(shù)據(jù)2:" + message);
}
}
@GetMapping("/publish2")
public void publish2() {
redisTemplate.convertAndSend("channel2", "hello world");
}

消息是實(shí)體對(duì)象,進(jìn)行轉(zhuǎn)換
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {
private static final long serialVersionUID = 5250232737975907491L;
private Integer id;
private String username;
}
public class MySubscribe3 implements MessageListener {
@Override
public void onMessage(Message message, byte[] bytes) {
Jackson2JsonRedisSerializer<User> jacksonSerializer = new Jackson2JsonRedisSerializer<>(User.class);
jacksonSerializer.setObjectMapper(ObjectMapperConfig.objectMapper);
User user = jacksonSerializer.deserialize(message.getBody());
System.out.println("訂閱頻道3:" + new String(message.getChannel()));
System.out.println("接收數(shù)據(jù)3:" + user);
}
}
/**
* redis消息監(jiān)聽(tīng)器容器
*/
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//訂閱頻道,通配符*:表示任意多個(gè)占位符
container.addMessageListener(new MySubscribe(), new PatternTopic("channel*"));
// 通配符?:表示一個(gè)占位符
MessageListenerAdapter listenerAdapter = new MessageListenerAdapter(new MySubscribe2(), "getMessage");
listenerAdapter.afterPropertiesSet();
container.addMessageListener(listenerAdapter, new PatternTopic("channel?"));
container.addMessageListener(new MySubscribe3(), new PatternTopic("user"));
return container;
}
@GetMapping("/publish3")
public void publish3() {
User user = User.builder().id(1).username("yzm").build();
redisTemplate.convertAndSend("user", user);
}

五、ZSet實(shí)現(xiàn)延遲隊(duì)列
生產(chǎn)消息,score = 時(shí)間搓+60s隨機(jī)數(shù)
public static final String MESSAGE_ZKEY = "message:ZSetqueue";
public volatile AtomicInteger count = new AtomicInteger();
public void zAdd() {
for (int i = 0; i < 10; i++) {
new Thread(() -> {
int increment = count.getAndIncrement();
log.info(Thread.currentThread().getName() + ":put message to zset = " + increment);
double score = System.currentTimeMillis() + new Random().nextInt(60 * 1000);
redisTemplate.opsForZSet().add(MESSAGE_ZKEY, Thread.currentThread().getName() + " hello zset:" + increment, score);
}).start();
}
}
消費(fèi)者:定時(shí)任務(wù),每秒執(zhí)行一次
public static final String MESSAGE_ZKEY = "message:ZSetqueue";
public SimpleDateFormat simpleDateFormat = new SimpleDateFormat();
@Scheduled(initialDelay = 5 * 1000, fixedRate = 1000)
public void zrangebysocre() {
log.info("延時(shí)隊(duì)列消費(fèi)。。。");
// 拉取score小于當(dāng)前時(shí)間戳的消息
Set<Object> messages = redisTemplate.opsForZSet().rangeByScore(MESSAGE_ZKEY, 0, System.currentTimeMillis());
if (messages != null) {
for (Object message : messages) {
Double score = redisTemplate.opsForZSet().score(MESSAGE_ZKEY, message);
log.info("消費(fèi)了:" + message + "消費(fèi)時(shí)間為:" + simpleDateFormat.format(score));
redisTemplate.opsForZSet().remove(MESSAGE_ZKEY, message);
}
}
}
@GetMapping("/zadd")
public void zadd() {
messageProducer.zAdd();
}

到此這篇關(guān)于springboot整合redis之消息隊(duì)列的文章就介紹到這了,更多相關(guān)springboot redis消息隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Springboot3+Redis實(shí)現(xiàn)消息隊(duì)列的多種方法小結(jié)
- SpringBoot集成Redisson實(shí)現(xiàn)消息隊(duì)列的示例代碼
- SpringBoot使用Redis Stream實(shí)現(xiàn)輕量消息隊(duì)列的示例代碼
- SpringBoot使用Redis實(shí)現(xiàn)消息隊(duì)列的方法小結(jié)
- SpringBoot集成Redis實(shí)現(xiàn)消息隊(duì)列的方法
- SpringBoot利用redis集成消息隊(duì)列的方法
- SpringBoot集成Redis消息隊(duì)列的實(shí)現(xiàn)示例
相關(guān)文章
Java中執(zhí)行docker命令的實(shí)現(xiàn)示例
本文主要介紹了Java中執(zhí)行docker命令的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-08-08
Java利用ffmpeg實(shí)現(xiàn)視頻MP4轉(zhuǎn)m3u8
本文綜合了下網(wǎng)上教程,從ffmpeg工具轉(zhuǎn)碼,ffmpeg視頻播放,java語(yǔ)言操控ffmpeg轉(zhuǎn)碼,轉(zhuǎn)碼后視頻上傳阿里云oss,四個(gè)方面完整記錄下這個(gè)流程,需要的朋友可以參考下2024-02-02
mybatis(mybatis-plus)映射文件(XML文件)中特殊字符轉(zhuǎn)義的實(shí)現(xiàn)
XML 文件在解析時(shí)會(huì)將五種特殊字符進(jìn)行轉(zhuǎn)義,本文主要介紹了mybatis(mybatis-plus)映射文件(XML文件)中特殊字符轉(zhuǎn)義的實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2023-12-12
阿里dubbo出錯(cuò)提示Thread pool is EXHAUSTED問(wèn)題及解決方法
這篇文章主要介紹了阿里dubbo出錯(cuò)提示Thread pool is EXHAUSTED的問(wèn)題及解決方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-08-08
SpringBoot接收f(shuō)orm-data和x-www-form-urlencoded數(shù)據(jù)的方法
form-data和x-www-form-urlencoded是兩種不同的HTTP請(qǐng)求體格式,本文主要介紹了SpringBoot接收f(shuō)orm-data和x-www-form-urlencoded數(shù)據(jù)的方法,具有一定的參考價(jià)值,感興趣的可以了解一下2024-05-05
springboot如何實(shí)現(xiàn)導(dǎo)入其他配置類
這篇文章主要介紹了springboot如何實(shí)現(xiàn)導(dǎo)入其他配置類問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-11-11
使用MultipartFile來(lái)上傳單個(gè)及多個(gè)文件代碼示例
這篇文章主要介紹了使用MultipartFile來(lái)上傳單個(gè)及多個(gè)文件代碼示例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-01-01
Spring5.2.x 源碼本地環(huán)境搭建的方法步驟
這篇文章主要介紹了Spring5.2.x 源碼本地環(huán)境搭建的方法步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09

