欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Redis實現(xiàn)延遲隊列的項目示例

 更新時間:2024年06月12日 11:38:32   作者:yifanghub  
延遲隊列是Redis的一個重要應用場景,本文主要介紹了Redis實現(xiàn)延遲隊列的項目示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧

最近用到一個延遲消息的功能,第一時間想到使用MQ或者MQ的插件,因為數(shù)據(jù)量不大,所以嘗試使用Redis來實現(xiàn)了,畢竟Redis也天生支持類似MQ的隊列消費,所以,在這里總結(jié)了一下Redis實現(xiàn)延遲消息隊列的方式。

一、監(jiān)聽key過期時間

處理流程:當redis的一個key過期時,redis會生成一個事件,通知訂閱了該事件的客戶端(KeyExpirationEventMessageListener),然后在客戶端的回調(diào)方法中處理邏輯。
1)新建SpringBoot項目,maven依賴及yml如下
maven依賴:

<dependencies>
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-web</artifactId>
     </dependency>
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-data-redis</artifactId>
     </dependency>

     <dependency>
         <groupId>org.projectlombok</groupId>
         <artifactId>lombok</artifactId>
     </dependency>

 </dependencies>

yml文件

server:
  port: 8000

spring:
  redis:
    database: 0
    host: xxxx
    port: 6379
    password: xxxxxx
    lettuce:
      pool:
        #最大連接數(shù)
        max-active: 8
        #最大阻塞等待時間
        max-wait: -1
        #最大空閑
        max-idle: 8
        #最小空閑
        min-idle: 0
    #連接超時時間
    timeout: 5000

2)修改redis.conf文件開啟事件通知配置
默認的配置:notify-keyspace-events “”
修改為:notify-keyspace-events Ex,該配置表示監(jiān)聽key的過期事件

3)設(shè)置Redis監(jiān)聽配置,注入Bean RedisMessageListenerContaine

@Configuration
public class RedisTimeoutConfiguration {

    @Autowired
    private RedisConnectionFactory redisConnectionFactory;

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer() {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
        return redisMessageListenerContainer;
    }

    @Bean
    public KeyExpiredListener keyExpiredListener() {
        return new KeyExpiredListener(this.redisMessageListenerContainer());
    }
}

4)創(chuàng)建監(jiān)聽器類,重寫key過期回調(diào)方法onMessage

@Slf4j
public class KeyExpiredListener extends KeyExpirationEventMessageListener {

    @Autowired
    public RedisTemplate<String, String> redisTemplate;

    public KeyExpiredListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    @Override
    public void onMessage(Message message, byte[] bytes) {
        String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
        //過期的key
        String key = new String(message.getBody(), StandardCharsets.UTF_8);
        log.info("redis key 過期:bytes={},channel={},key={}", new String(bytes), channel, key);
    }
}

5)編寫測試接口:寫入一個帶過期時間的key

@RestController
@RequestMapping("/demo")
public class BasicController {

    @Autowired
    public RedisTemplate<String, String> redisTemplate;

    @GetMapping(value = "/test")
    public void redisTest() {
        redisTemplate.opsForValue().set("test", "5s后過期", 5, TimeUnit.SECONDS);
    }
}

執(zhí)行后,onMessage監(jiān)聽方法打印結(jié)果:

 redis key 過期:bytes=__keyevent@*__:expired,channel=__keyevent@0__:expired,key=test

該方案缺點:可靠性問題,Redis 是一個內(nèi)存數(shù)據(jù)庫,盡管它提供了數(shù)據(jù)持久化選項(如 RDB 和 AOF),但在某些情況下(如意外崩潰或重啟),可能會丟失一些未處理的過期事件。

二、zset + score

基本思路是將消息按需發(fā)送的時間作為分數(shù)存儲在有序集合zset中,然后定期檢查并處理到期的消息。代碼例子如下:
1)創(chuàng)建 DelayedMessageService 類

@Slf4j
@Service
public class DelayedMessageService {

    private static final String DELAYED_MESSAGES_ZSET = "delayed:messages";


    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    public void addMessage(String message, long delayMillis) {
        long score = System.currentTimeMillis() + delayMillis;
        redisTemplate.opsForZSet().add(DELAYED_MESSAGES_ZSET, message, score);
    }


    @Scheduled(fixedRate = 1000)
    public void processMessages() {
        long now = System.currentTimeMillis();
        Set<ZSetOperations.TypedTuple<String>> messages = redisTemplate.opsForZSet().rangeByScoreWithScores(DELAYED_MESSAGES_ZSET, 0, now);
        if (messages != null && !messages.isEmpty()) {
            for (ZSetOperations.TypedTuple<String> message : messages) {
                String msg = message.getValue();
                long score = message.getScore().longValue();
                if (score <= now) {
                    // Process the message
                    System.out.println("Processing message: " + msg);
                    // Remove the message from the zset
                    redisTemplate.opsForZSet().remove(DELAYED_MESSAGES_ZSET, msg);
                }
            }
        }else{
            log.info("定時任務執(zhí)行~");
        }
    }


}

2)編寫Controller接口測試,初始化zset內(nèi)容

@RestController
@RequestMapping("/demo")
public class BasicController {

    @Autowired
    private DelayedMessageService delayedMessageService;

    @GetMapping(value = "/test2")
    public void redisZsetTest() {
        // Add some messages with delays
        delayedMessageService.addMessage("Message 1", 5000); // 5 seconds delay
        delayedMessageService.addMessage("Message 2", 10000); // 10 seconds delay
        delayedMessageService.addMessage("Message 3", 15000); // 15 seconds delay
    }
}

說明:

  • redisZsetTest接口通過調(diào)用DelayedMessageServiceaddMessage方法,將消息及其到期時間添加到 Redis 的 zset 中
  • 開啟一個定時任務,定期檢查和處理到期的消息。使用 @Scheduled 注解定期執(zhí)行,每秒檢查一次,注意這里使用@Scheduled,不要忘了啟動類上添加@EnableScheduling注解,否則定時任務不會生效。fixedRate 屬性表示以固定的頻率(毫秒為單位)執(zhí)行方法。即方法執(zhí)行完成后,會立即等待指定的毫秒數(shù),然后再次執(zhí)行。
  • 通過 redisTemplate.opsForZSet().rangeByScoreWithScores 方法按時間范圍獲取到期的消息,消息處理完成后,從zset 中移除處理過的消息

三、Redisson框架

利用 Redisson 提供的數(shù)據(jù)結(jié)構(gòu)RDelayedQueueRBlockingDeque,可以自動處理過期的任務并將它們移動到阻塞隊列中,這樣我們就可以從阻塞隊列中獲取任務并進行消費處理。例子如下:
1)添加依賴

<!-- Redisson 依賴項 -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>2.15.1</version>
</dependency>

2)創(chuàng)建DelayedMessageService

@Slf4j
@Service
public class DelayedMessageService {

    @Autowired
    private RedissonClient redissonClient;

    private RBlockingDeque<String> blockingDeque;
    private RDelayedQueue<String> delayedQueue;

    @PostConstruct
    public void init() {
        this.blockingDeque = redissonClient.getBlockingDeque("delayedQueue");
        this.delayedQueue = redissonClient.getDelayedQueue(blockingDeque);

        Executors.newSingleThreadExecutor().submit(this::processMessages);
    }

    public void addMessage(String message, long delayMillis) {
        delayedQueue.offer(message, delayMillis, TimeUnit.MILLISECONDS);
    }

    public void processMessages() {
        try {
            while (true) {
                String message = blockingDeque.take();
                // Process the message
                log.info("消息被處理: " + message);
                // ..業(yè)務邏輯處理
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("中斷異常",e);
        }
    }


}

3)測試接口

@GetMapping(value = "/test3")
    public void redisQueueTest() {
        // Add some messages with delays
        delayedMessageService.addMessage("Message 1", 5000); // 5 seconds delay
        delayedMessageService.addMessage("Message 2", 10000); // 10 seconds delay
        delayedMessageService.addMessage("Message 3", 15000); // 15 seconds delay
    }

說明:

RDelayedQueue 是 Redisson 提供的延遲隊列,它將消息存儲在指定的隊列中,直到消息到期才會被轉(zhuǎn)移到該隊列。它的主要作用包括:

  • 延遲消息管理:我們可以使用 RDelayedQueue 的 offer 方法將消息添加到延遲隊列,并指定延遲時間,消息在延遲時間到期前一直保留在 RDelayedQueue 中。
  • 消息轉(zhuǎn)移:一旦消息到期,RDelayedQueue 會自動將消息轉(zhuǎn)移到指定的RBlockingDeque 中。

RBlockingQueue是 Redisson 提供的阻塞隊列,它支持阻塞操作。主要作用包括:

  • 阻塞操作:支持阻塞的 take 操作,如果隊列中沒有元素,會一直阻塞直到有元素可供消費。

總結(jié)
個人推薦使用Redisson 的RDelayedQueue 方式,感覺更加可靠和簡單一些,當然zset+score也可以是個不錯選擇,畢竟更加靈活,延遲消息還有其他不同的方案,比如rocketmq、rabbitmq插件等,假如項目中用了redis,又不想引入更多的中間件,可以嘗試使用redis來實現(xiàn),為了測試,這里例子都比較簡單,在實際使用過程中,還要考慮補償機制、冪等性等問題。

參考:

1.https://blog.csdn.net/qq_34826261/article/details/120598731

到此這篇關(guān)于Redis實現(xiàn)延遲隊列的項目示例的文章就介紹到這了,更多相關(guān)Redis 延遲隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • redis 過期策略及內(nèi)存回收機制解析

    redis 過期策略及內(nèi)存回收機制解析

    這篇文章主要介紹了redis 過期策略及內(nèi)存回收機制,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • Redis教程(四):Hashes數(shù)據(jù)類型

    Redis教程(四):Hashes數(shù)據(jù)類型

    這篇文章主要介紹了Redis教程(四):Hashes數(shù)據(jù)類型,本文講解了Hashes數(shù)據(jù)類型概述、相關(guān)命令列表和命令使用示例等內(nèi)容,需要的朋友可以參考下
    2015-04-04
  • Redis設(shè)置密碼的實現(xiàn)步驟

    Redis設(shè)置密碼的實現(xiàn)步驟

    本文主要介紹了Redis設(shè)置密碼的實現(xiàn)步驟,主要包括兩種方法:臨時密碼和持久密碼,具有一定的參考價值,感興趣的可以了解一下
    2023-08-08
  • 解決redis sentinel 頻繁主備切換的問題

    解決redis sentinel 頻繁主備切換的問題

    這篇文章主要介紹了解決redis sentinel 頻繁主備切換的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-04-04
  • redis實現(xiàn)分布式session的解決方案

    redis實現(xiàn)分布式session的解決方案

    session存放在服務器,關(guān)閉瀏覽器不會失效,本文主要介紹了redis實現(xiàn)分布式session的解決方案,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-03-03
  • Redis生成全局唯一ID的實現(xiàn)方法

    Redis生成全局唯一ID的實現(xiàn)方法

    全局唯一ID生成器是一種在分布式系統(tǒng)下用來生成全局唯一ID的工具,本文主要介紹了Redis生成全局唯一ID的實現(xiàn)方法,具有一定的參考價值,感興趣的可以了解一下
    2022-06-06
  • 在redis中存儲ndarray的示例代碼

    在redis中存儲ndarray的示例代碼

    在Redis中存儲NumPy數(shù)組(ndarray)通常需要將數(shù)組轉(zhuǎn)換為二進制格式,然后將其存儲為字符串,這篇文章給大家介紹了在redis中存儲ndarray的示例代碼,感興趣的朋友一起看看吧
    2024-02-02
  • Redisson分布式限流的實現(xiàn)原理分析

    Redisson分布式限流的實現(xiàn)原理分析

    這篇文章主要介紹了Redisson分布式限流的實現(xiàn)原理分析,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-07-07
  • Redis數(shù)據(jù)遷移RedisShake的實現(xiàn)方法

    Redis數(shù)據(jù)遷移RedisShake的實現(xiàn)方法

    本文主要介紹了Redis數(shù)據(jù)遷移RedisShake的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-04-04
  • Redis Cluster模式配置

    Redis Cluster模式配置

    這篇文章主要介紹了Redis Cluster模式配置,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2025-06-06

最新評論