欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

redis使用zset實(shí)現(xiàn)延時隊(duì)列的示例代碼

 更新時間:2023年06月02日 15:31:50   作者:搶老婆酸奶的小肥仔  
本文主要介紹了redis使用zset實(shí)現(xiàn)延時隊(duì)列的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

最近在使用redis時,就想能不能用其實(shí)現(xiàn)消息隊(duì)列?也在網(wǎng)上看了下其他小伙伴寫的實(shí)現(xiàn),結(jié)合自身業(yè)務(wù)實(shí)現(xiàn)了如下消息隊(duì)列,希望對大家有用。

廢話不多說,直接開擼。

1、為什么zset可以做消息隊(duì)列?

首先我們來看下,設(shè)計(jì)消息隊(duì)列需要考慮的需求:有序性,消息重復(fù)性,可靠性。

  • 有序性:zset所有元素可以根據(jù)成員關(guān)聯(lián)的score來進(jìn)行從低到高的排序,例如,我們可以利用時間戳來進(jìn)行排序
  • 消息重復(fù)性:在zset中每個元素都是唯一的,這也保證了消息的唯一性
  • 可靠性:zset會自動維護(hù)元素之間的順序,在添加或刪除元素時無需手動排序,提升操作速度。

2、使用的zset命令

命令描述
zadd將一個給定score的成員添加到有序集合中,返回添加元素的個數(shù)
zrange根據(jù)元素在有序排序中的位置,從有序集合中獲取多個元素
rank(K key, Object o)獲取指定元素在集合中的索引,索引從0開始

3、代碼實(shí)現(xiàn)

使用zset實(shí)現(xiàn)消息隊(duì)列時,具體的流程,如下:

生產(chǎn)者流程:

  • 用戶獲取消息Id,并封裝消息體
  • 用戶發(fā)送數(shù)據(jù)到生產(chǎn)者,先獲取鎖
  • 如果獲取到鎖,則校驗(yàn)該消息體是否已添加到隊(duì)列中,已添加則直接返回提醒。
  • 若未添加則調(diào)用方法將數(shù)據(jù)保存到zset集合中,否則等到指定時間后再獲取鎖。
  • 推送數(shù)據(jù)后,釋放鎖

消費(fèi)者流程:

  • 調(diào)用方法獲取數(shù)據(jù)
  • 獲取到數(shù)據(jù),則直接返回,否則到指定時間后再次獲取數(shù)據(jù),直到獲取到數(shù)據(jù)并返回。

統(tǒng)一返回類:

    /**
     * @Author: jiangjs
     * @Description:
     * @Date: 2021/11/12 15:46
     **/
    @Data
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    public class ResultUtil<T> implements Serializable {
        private int code;
        private String msg;
        private T data;
        public static <T> ResultUtil<T> success(){
            return ResultUtil.<T>builder().code(1000).msg("成功").build();
        }
        public static <T> ResultUtil<T> success(T data){
            return ResultUtil.<T>builder().code(1000).msg("成功").data(data).build();
        }
        public static <T> ResultUtil<T> error(String msg){
            return ResultUtil.<T>builder().code(5000).msg(msg).data(null).build();
        }
        public static <T> ResultUtil<T> error(int code,String msg){
            return ResultUtil.<T>builder().code(code).msg(msg).build();
        }
    }

3.1 消息實(shí)體

需添加消息Id,主要防止消息重復(fù)提交。

    /**
     * @author: jiangjs
     * @description: 消息實(shí)體
     * @date: 2023/5/30 11:11
     **/
    @Data
    @Accessors(chain = true)
    public class QueueTask<T> {
        /**
         * 消息Id
         */
        private String taskId;
        /**
         * 任務(wù)
         */
        private T task;
    }

3.2 隊(duì)列類型

隊(duì)列類型可以理解為隊(duì)列的名稱,通過枚舉,可以隨意添加隊(duì)列名稱。

    /**
     * @author: jiangjs
     * @description: 隊(duì)列類型
     * @date: 2023/5/30 10:53
     **/
    public enum QueueTypeEnum {
        /**
         * 訂單
         */
        ORDER("order");
        private final String type;
        QueueTypeEnum(String type){
            this.type = type;
        }
        public String getType(){
            return type;
        }
    }

3.3 創(chuàng)建消息工具

    package com.jiashn.springbootproject.redis.utils;
    import com.jiashn.springbootproject.redis.domain.QueueTask;
    import com.jiashn.springbootproject.utils.ResultUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.data.redis.core.RedisTemplate;
    import javax.annotation.Resource;
    import java.time.LocalDateTime;
    import java.util.Objects;
    import java.util.Set;
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
    /**
     * @author: jiangjs
     * @description: redis實(shí)現(xiàn)消息隊(duì)列
     * @date: 2023/5/30 10:51
     **/
    public class RedisQueueUtil<T> {
        private static final Logger log = LoggerFactory.getLogger(RedisQueueUtil.class);
        private RedisTemplate<String,QueueTask<T>> redisTemplate;
        /**
         * 隊(duì)列類型,即名稱
         */
        private final QueueTypeEnum typeEnum;
        public RedisQueueUtil(QueueTypeEnum typeEnum,RedisTemplate<String,QueueTask<T>> redisTemplate){
            this.typeEnum = typeEnum;
            this.redisTemplate = redisTemplate;
        }
        /**
         * 添加消息數(shù)據(jù)
         * @param queueTask 消息
         * @param time 延遲時間,單位s
         */
        public ResultUtil<String> sendQueueTask(QueueTask<T> queueTask, long time){
            //加鎖
            if (getLock()){
                try {
                    Long rank = redisTemplate.opsForZSet().rank(typeEnum.getType(), queueTask);
                    if (Objects.nonNull(rank)){
                        return ResultUtil.error(6000,"消息數(shù)據(jù)已經(jīng)存在,不予添加......");
                    }
                    Boolean result = redisTemplate.opsForZSet().add(typeEnum.getType(), queueTask, System.currentTimeMillis() + time*1000);
                    if (Objects.nonNull(result) && result){
                        log.info("添加消息數(shù)據(jù)成功:" + queueTask + ",添加時間:" + LocalDateTime.now());
                        return ResultUtil.success("添加消息數(shù)據(jù)成功");
                    }
                    return ResultUtil.error("添加消息數(shù)據(jù)失敗");
                }finally {
                    //釋放鎖
                    releaseLock();
                }
            } else {
                log.info("未獲取到鎖,稍后再試");
                return ResultUtil.error("未獲取到鎖,稍后再試");
            }
        }
        /**
         * 獲取zset前count數(shù)據(jù)
         * @param count 數(shù)據(jù)數(shù)
         * @return 返回獲取到數(shù)據(jù)
         */
        public Set<QueueTask<T>> loopGetTask(int count) {
                //rangeByScore,根據(jù)score順序獲取zset數(shù)據(jù)的值
                return redisTemplate.opsForZSet().rangeByScore(typeEnum.getType(), 0, System.currentTimeMillis(), 0, count-1);
        }
        /**
         * 注銷消息隊(duì)列
         * @param typeEnum 消息隊(duì)列名稱
         */
        public void destroy(QueueTypeEnum typeEnum){
            redisTemplate.opsForZSet().remove(typeEnum.getType());
        }
        /**
         * 獲取任務(wù)Id
         * @return 返回消息Id
         */
        public String getTaskId(){
           return typeEnum.getType() + "_" + UUID.randomUUID().toString().replace("-","");
        }
        /**
         * 獲取鎖
         * @return 返回加鎖狀態(tài)
         */
        private boolean getLock(){
            Boolean absent = redisTemplate.opsForValue().setIfAbsent(typeEnum.getType() + "_Locked", null, 30L, TimeUnit.MINUTES);
            return Objects.nonNull(absent) ? absent : false;
        }
        /**
         * 釋放鎖
         */
        public void releaseLock(){
            redisTemplate.delete(typeEnum.getType() + "_Locked");
        }
    }

在消息工具類中,創(chuàng)建消息任務(wù)時添加了鎖,只有在獲取鎖的前提下才能添加消息任務(wù)。

提供獲取消息Id的方法是為了讓提交消息任務(wù)前,先獲取Id,即使在提交時網(wǎng)絡(luò)發(fā)生問題,提交的Id還是同一個,再進(jìn)行消息消費(fèi)時,可以根據(jù)這個Id來進(jìn)行判斷該消息任務(wù)是否已被消費(fèi),被消費(fèi)則直接丟棄。

3.4 消費(fèi)消息

    /**
     * @author: jiangjs
     * @description: 啟動消費(fèi)
     * @date: 2023/5/30 14:27
     **/
    @Component
    public class CustomerTaskLineRunner implements CommandLineRunner {
        @Resource
        private RedisTemplate<String,QueueTask<String>> redisTemplate;
        private final static String QUEUE_TYPE = QueueTypeEnum.ORDER.getType();
        private final static Logger log = LoggerFactory.getLogger(CustomerTaskLineRunner.class);
        @Override
        public void run(String... args) throws Exception {
            RedisQueueUtil<String> queueUtil = new RedisQueueUtil<>(QueueTypeEnum.ORDER,redisTemplate);
            while (true){
                Set<QueueTask<String>> queueTasks = queueUtil.loopGetTask(10);
                if (CollectionUtils.isNotEmpty(queueTasks)){
                    for (QueueTask<String> queueTask : queueTasks) {
                        //校驗(yàn)當(dāng)前消息是否已消費(fèi),主要防止網(wǎng)絡(luò)延時,導(dǎo)致多次提交同一任務(wù) 存在
                        QueueTask<String> stringQueueTask = redisTemplate.opsForValue().get(QUEUE_TYPE + "_" + queueTask.getTaskId());
                        if (Objects.nonNull(stringQueueTask)){
                            log.info("該任務(wù)已經(jīng)消費(fèi),不能重復(fù)消費(fèi)");
                            redisTemplate.opsForZSet().remove(QUEUE_TYPE,queueTask);
                            continue;
                        }
                        Long removeNum = redisTemplate.opsForZSet().remove(QUEUE_TYPE,queueTask);
                        if (Objects.nonNull(removeNum) && removeNum > 0){
                            String task = queueTask.getTask();
                            log.info("消費(fèi)任務(wù)數(shù)據(jù):" + task);
                            //設(shè)置過期時間,10分鐘內(nèi)則默認(rèn)是重復(fù)提交
                            redisTemplate.opsForValue().set(QUEUE_TYPE + "_" + queueTask.getTaskId(),queueTask,10L, TimeUnit.MINUTES);
                        }
                    }
                }
                log.info("------1分鐘后再次獲取------");
                Thread.sleep(60000);
            }
        }
    }

校驗(yàn)重復(fù)消息,若消息重復(fù)且在10分鐘內(nèi)未被消費(fèi),則直接將該消息從隊(duì)列中刪除。在消息任務(wù)被消費(fèi)后,將數(shù)據(jù)從隊(duì)列中移除。

執(zhí)行結(jié)果:

到此這篇關(guān)于redis使用zset實(shí)現(xiàn)延時隊(duì)列的示例代碼的文章就介紹到這了,更多相關(guān)redis zset延時隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 在Centos?8.0中安裝Redis服務(wù)器的教程詳解

    在Centos?8.0中安裝Redis服務(wù)器的教程詳解

    由于考慮到linux服務(wù)器的性能,所以經(jīng)常需要把一些中間件安裝在linux服務(wù)上,今天通過本文給大家介紹下在Centos?8.0中安裝Redis服務(wù)器的詳細(xì)過程,感興趣的朋友一起看看吧
    2022-03-03
  • 深入解析Redis中常見的應(yīng)用場景

    深入解析Redis中常見的應(yīng)用場景

    這篇文章主要給大家介紹了關(guān)于Redis中常見的應(yīng)用場景的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。
    2017-09-09
  • redis快速部署為docker容器的方法實(shí)現(xiàn)

    redis快速部署為docker容器的方法實(shí)現(xiàn)

    部署 Redis 作為 Docker 容器是一種快速、靈活且可重復(fù)使用的方式,特別適合開發(fā)、測試和部署環(huán)境,本文主要介紹了redis快速部署為docker容器的方法實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下
    2024-05-05
  • Redis不同數(shù)據(jù)類型的命令語句詳解

    Redis不同數(shù)據(jù)類型的命令語句詳解

    這篇文章主要介紹了Redis不同數(shù)據(jù)類型的命令語句,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-10-10
  • Redis慢查詢的具體使用

    Redis慢查詢的具體使用

    慢查詢顧名思義就是比較慢的查詢,但是究竟是哪里慢呢?本文詳細(xì)的介紹了Redis慢查詢的具體使用,具有一定的參考價(jià)值,感興趣的可以了解一下
    2023-06-06
  • Redis高可用集群redis-cluster詳解

    Redis高可用集群redis-cluster詳解

    redis?cluster?是redis官方提供的分布式解決方案,在3.0版本后推出的,有效地解決了redis分布式的需求,當(dāng)一個redis節(jié)點(diǎn)掛了可以快速的切換到另一個節(jié)點(diǎn),對redis-cluster高可用集群相關(guān)知識感興趣的朋友一起看看吧
    2022-03-03
  • 關(guān)于redisson緩存序列化的幾枚大坑說明

    關(guān)于redisson緩存序列化的幾枚大坑說明

    這篇文章主要介紹了redisson緩存序列化幾枚大坑,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-08-08
  • Redis特殊數(shù)據(jù)類型Geospatial地理空間

    Redis特殊數(shù)據(jù)類型Geospatial地理空間

    這篇文章主要為大家介紹了Redis特殊數(shù)據(jù)類型Geospatial地理空間,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-05-05
  • Redis讀寫分離搭建的完整步驟

    Redis讀寫分離搭建的完整步驟

    為滿足讀多寫少的業(yè)務(wù)場景.最大化節(jié)約用戶成本.云數(shù)據(jù)庫Redis版推出了讀寫分離規(guī)格,為用戶提供透明、高可用、高性能、高靈活的讀寫分離服務(wù),這篇文章主要給大家介紹了關(guān)于Redis讀寫分離搭建的相關(guān)資料,需要的朋友可以參考下
    2021-09-09
  • Redis中一個String類型引發(fā)的慘案

    Redis中一個String類型引發(fā)的慘案

    著存儲的數(shù)據(jù)量越來越大,Redis的內(nèi)存的使用量也快速上升,結(jié)果遇到了大內(nèi)存Redis實(shí)例因?yàn)樯蒖DB而響應(yīng)變慢的問題。很顯然String類型并不是一種好的選擇,那有什么辦法可以降低內(nèi)存消耗嗎?帶著這個問題一起通過本文學(xué)習(xí)下吧
    2021-07-07

最新評論