Redis實現(xiàn)延遲隊列的項目示例
最近用到一個延遲消息的功能,第一時間想到使用MQ或者MQ的插件,因為數(shù)據(jù)量不大,所以嘗試使用Redis來實現(xiàn)了,畢竟Redis也天生支持類似MQ的隊列消費,所以,在這里總結了一下Redis實現(xiàn)延遲消息隊列的方式。
一、監(jiān)聽key過期時間
處理流程:當redis的一個key過期時,redis會生成一個事件,通知訂閱了該事件的客戶端(KeyExpirationEventMessageListener),然后在客戶端的回調方法中處理邏輯。
1)新建SpringBoot項目,maven依賴及yml如下
maven依賴:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
yml文件
server: port: 8000 spring: redis: database: 0 host: xxxx port: 6379 password: xxxxxx lettuce: pool: #最大連接數(shù) max-active: 8 #最大阻塞等待時間 max-wait: -1 #最大空閑 max-idle: 8 #最小空閑 min-idle: 0 #連接超時時間 timeout: 5000
2)修改redis.conf
文件開啟事件通知配置
默認的配置:notify-keyspace-events “”
修改為:notify-keyspace-events Ex,該配置表示監(jiān)聽key的過期事件
3)設置Redis監(jiān)聽配置,注入Bean RedisMessageListenerContaine
@Configuration public class RedisTimeoutConfiguration { @Autowired private RedisConnectionFactory redisConnectionFactory; @Bean public RedisMessageListenerContainer redisMessageListenerContainer() { RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer(); redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory); return redisMessageListenerContainer; } @Bean public KeyExpiredListener keyExpiredListener() { return new KeyExpiredListener(this.redisMessageListenerContainer()); } }
4)創(chuàng)建監(jiān)聽器類,重寫key過期回調方法onMessage
@Slf4j public class KeyExpiredListener extends KeyExpirationEventMessageListener { @Autowired public RedisTemplate<String, String> redisTemplate; public KeyExpiredListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); } @Override public void onMessage(Message message, byte[] bytes) { String channel = new String(message.getChannel(), StandardCharsets.UTF_8); //過期的key String key = new String(message.getBody(), StandardCharsets.UTF_8); log.info("redis key 過期:bytes={},channel={},key={}", new String(bytes), channel, key); } }
5)編寫測試接口:寫入一個帶過期時間的key
@RestController @RequestMapping("/demo") public class BasicController { @Autowired public RedisTemplate<String, String> redisTemplate; @GetMapping(value = "/test") public void redisTest() { redisTemplate.opsForValue().set("test", "5s后過期", 5, TimeUnit.SECONDS); } }
執(zhí)行后,onMessage
監(jiān)聽方法打印結果:
redis key 過期:bytes=__keyevent@*__:expired,channel=__keyevent@0__:expired,key=test
該方案缺點:可靠性問題,Redis 是一個內(nèi)存數(shù)據(jù)庫,盡管它提供了數(shù)據(jù)持久化選項(如 RDB 和 AOF),但在某些情況下(如意外崩潰或重啟),可能會丟失一些未處理的過期事件。
二、zset + score
基本思路是將消息按需發(fā)送的時間作為分數(shù)存儲在有序集合zset中,然后定期檢查并處理到期的消息。代碼例子如下:
1)創(chuàng)建 DelayedMessageService 類
@Slf4j @Service public class DelayedMessageService { private static final String DELAYED_MESSAGES_ZSET = "delayed:messages"; @Autowired private RedisTemplate<String, String> redisTemplate; public void addMessage(String message, long delayMillis) { long score = System.currentTimeMillis() + delayMillis; redisTemplate.opsForZSet().add(DELAYED_MESSAGES_ZSET, message, score); } @Scheduled(fixedRate = 1000) public void processMessages() { long now = System.currentTimeMillis(); Set<ZSetOperations.TypedTuple<String>> messages = redisTemplate.opsForZSet().rangeByScoreWithScores(DELAYED_MESSAGES_ZSET, 0, now); if (messages != null && !messages.isEmpty()) { for (ZSetOperations.TypedTuple<String> message : messages) { String msg = message.getValue(); long score = message.getScore().longValue(); if (score <= now) { // Process the message System.out.println("Processing message: " + msg); // Remove the message from the zset redisTemplate.opsForZSet().remove(DELAYED_MESSAGES_ZSET, msg); } } }else{ log.info("定時任務執(zhí)行~"); } } }
2)編寫Controller接口測試,初始化zset內(nèi)容
@RestController @RequestMapping("/demo") public class BasicController { @Autowired private DelayedMessageService delayedMessageService; @GetMapping(value = "/test2") public void redisZsetTest() { // Add some messages with delays delayedMessageService.addMessage("Message 1", 5000); // 5 seconds delay delayedMessageService.addMessage("Message 2", 10000); // 10 seconds delay delayedMessageService.addMessage("Message 3", 15000); // 15 seconds delay } }
說明:
- redisZsetTest接口通過調用
DelayedMessageService
的addMessage
方法,將消息及其到期時間添加到 Redis 的 zset 中 - 開啟一個定時任務,定期檢查和處理到期的消息。使用
@Scheduled
注解定期執(zhí)行,每秒檢查一次,注意這里使用@Scheduled
,不要忘了啟動類上添加@EnableScheduling
注解,否則定時任務不會生效。fixedRate
屬性表示以固定的頻率(毫秒為單位)執(zhí)行方法。即方法執(zhí)行完成后,會立即等待指定的毫秒數(shù),然后再次執(zhí)行。 - 通過
redisTemplate.opsForZSet().rangeByScoreWithScores
方法按時間范圍獲取到期的消息,消息處理完成后,從zset 中移除處理過的消息
三、Redisson框架
利用 Redisson 提供的數(shù)據(jù)結構RDelayedQueue
和RBlockingDeque
,可以自動處理過期的任務并將它們移動到阻塞隊列中,這樣我們就可以從阻塞隊列中獲取任務并進行消費處理。例子如下:
1)添加依賴
<!-- Redisson 依賴項 --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>2.15.1</version> </dependency>
2)創(chuàng)建DelayedMessageService
@Slf4j @Service public class DelayedMessageService { @Autowired private RedissonClient redissonClient; private RBlockingDeque<String> blockingDeque; private RDelayedQueue<String> delayedQueue; @PostConstruct public void init() { this.blockingDeque = redissonClient.getBlockingDeque("delayedQueue"); this.delayedQueue = redissonClient.getDelayedQueue(blockingDeque); Executors.newSingleThreadExecutor().submit(this::processMessages); } public void addMessage(String message, long delayMillis) { delayedQueue.offer(message, delayMillis, TimeUnit.MILLISECONDS); } public void processMessages() { try { while (true) { String message = blockingDeque.take(); // Process the message log.info("消息被處理: " + message); // ..業(yè)務邏輯處理 } } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("中斷異常",e); } } }
3)測試接口
@GetMapping(value = "/test3") public void redisQueueTest() { // Add some messages with delays delayedMessageService.addMessage("Message 1", 5000); // 5 seconds delay delayedMessageService.addMessage("Message 2", 10000); // 10 seconds delay delayedMessageService.addMessage("Message 3", 15000); // 15 seconds delay }
說明:
RDelayedQueue
是 Redisson 提供的延遲隊列,它將消息存儲在指定的隊列中,直到消息到期才會被轉移到該隊列。它的主要作用包括:
- 延遲消息管理:我們可以使用
RDelayedQueue
的offer
方法將消息添加到延遲隊列,并指定延遲時間,消息在延遲時間到期前一直保留在RDelayedQueue
中。 - 消息轉移:一旦消息到期,
RDelayedQueue
會自動將消息轉移到指定的RBlockingDeque
中。
RBlockingQueue
是 Redisson 提供的阻塞隊列,它支持阻塞操作。主要作用包括:
- 阻塞操作:支持阻塞的
take
操作,如果隊列中沒有元素,會一直阻塞直到有元素可供消費。
總結:
個人推薦使用Redisson 的RDelayedQueue
方式,感覺更加可靠和簡單一些,當然zset+score也可以是個不錯選擇,畢竟更加靈活,延遲消息還有其他不同的方案,比如rocketmq、rabbitmq插件等,假如項目中用了redis,又不想引入更多的中間件,可以嘗試使用redis來實現(xiàn),為了測試,這里例子都比較簡單,在實際使用過程中,還要考慮補償機制、冪等性等問題。
參考:
1.https://blog.csdn.net/qq_34826261/article/details/120598731
到此這篇關于Redis實現(xiàn)延遲隊列的項目示例的文章就介紹到這了,更多相關Redis 延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!