欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Redis簡易延時(shí)隊(duì)列的實(shí)現(xiàn)示例

 更新時(shí)間:2023年12月10日 10:44:39   作者:!chen  
在實(shí)際的業(yè)務(wù)場景中,經(jīng)常會遇到需要延時(shí)處理的業(yè)務(wù),本文就來介紹有下Redis簡易延時(shí)隊(duì)列的實(shí)現(xiàn)示例,具有一定的參考價(jià)值,感興趣的可以了解一下

一、背景

在實(shí)際的業(yè)務(wù)場景中,經(jīng)常會遇到需要延時(shí)處理的業(yè)務(wù),比如訂單超時(shí)未支付,需要取消訂單,或者是用戶注冊后,需要在一段時(shí)間內(nèi)激活賬號,否則賬號失效等等。這些業(yè)務(wù)場景都可以通過延時(shí)隊(duì)列來實(shí)現(xiàn)。
最近在實(shí)際業(yè)務(wù)當(dāng)中就遇到了這樣的一個(gè)場景,需要實(shí)現(xiàn)一個(gè)延時(shí)隊(duì)列,用來處理訂單超時(shí)未支付的業(yè)務(wù)。在網(wǎng)上找了一些資料,發(fā)現(xiàn)大部分都是使用了mq來實(shí)現(xiàn),比如rabbitmq,rocketmq等等,但是這些mq都是需要安裝的,而且還需要配置,對于此項(xiàng)目來說不想增加額外的依賴,所以就想到了使用redis來實(shí)現(xiàn)一個(gè)簡易的延時(shí)隊(duì)列。

二、實(shí)現(xiàn)思路

1. 業(yè)務(wù)場景

訂單超時(shí)未支付,需要取消訂單,這個(gè)業(yè)務(wù)場景可以分為兩個(gè)步驟來實(shí)現(xiàn):

  • 用戶下單后,將訂單信息存入數(shù)據(jù)庫,并將訂單信息存入延時(shí)隊(duì)列中,設(shè)置延時(shí)時(shí)間為30分鐘。
  • 30分鐘后,從延時(shí)隊(duì)列中取出訂單信息,判斷訂單是否已支付,如果未支付,則取消訂單。
  • 如果用戶在30分鐘內(nèi)支付了訂單,則將訂單從延時(shí)隊(duì)列中刪除。

2. 實(shí)現(xiàn)思路

  • 使用redis的zset來實(shí)現(xiàn)延時(shí)隊(duì)列,zset的score用來存儲訂單的超時(shí)時(shí)間,value用來存儲訂單信息。
  • 使用redis的set來存儲已支付的訂單,set中的value為訂單id。

三、實(shí)現(xiàn)代碼

1. 使用了兩個(gè)注解類分別標(biāo)記生產(chǎn)者類、生產(chǎn)者方法,消費(fèi)者方法

/**
 * @program: 
 * @description: redis延時(shí)隊(duì)列生產(chǎn)者類注解,標(biāo)記生產(chǎn)者類,用來掃描生產(chǎn)者類中的生產(chǎn)者方法,將生產(chǎn)者方法注冊到redis延時(shí)隊(duì)列中
 * @author: jiangchengxuan
 * @created: 2023/12/09 10:32
 */
@Component
@Documented
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisMessageQueue {}
/**
 * @program: 
 * @description: 
 * 帶有此注解的方法,方法的入?yún)⑹紫葧晦D(zhuǎn)換為json字符串,然后存入redis的zset中,score為當(dāng)前時(shí)間+延時(shí)時(shí)間,value為json字符串
 * 當(dāng)延時(shí)時(shí)間到達(dá)后,會從redis的zset中取出value,然后將value轉(zhuǎn)換為入?yún)㈩愋?,調(diào)用此方法,執(zhí)行業(yè)務(wù)邏輯
 * 此注解只能標(biāo)記在方法上,且方法必須為public,且只能有一個(gè)參數(shù)
 * 此注解標(biāo)記的方法,必須在redis延時(shí)隊(duì)列生產(chǎn)者類中,否則不會生效
 * @author: jiangchengxuan
 * @created:  2023/12/09 10:37
 */
@Documented
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface RedisMessageQueueMethod {
    String threadName() default "redis消息隊(duì)列默認(rèn)線程";
    String queueKey();   // 隊(duì)列key值
    int threadNum() default 1;      //默認(rèn)線程數(shù)量
    int threadSleepTime() default 500;  //默認(rèn)線程休眠時(shí)間默認(rèn)500ms
}

2. 生產(chǎn)者類具體實(shí)現(xiàn)

/**
 * @program: 
 * @description:  生產(chǎn)者類具體實(shí)現(xiàn)
 * @author: jiangchengxuan
 * @created: 2023/12/09 10:44
 */
@Slf4j
@Component
public class DelayQueueWorkerConfig implements InitializingBean {
    private volatile boolean monitorStarted = false;

    private volatile boolean monitorShutDowned = false;

    private ExecutorService executorService;

    // 需要監(jiān)控的延時(shí)隊(duì)列
    @Autowired
    protected IDelayQueue<String> monitorQueue;

    @Autowired
    private ApplicationContext applicationContext;


    @Override
    public void afterPropertiesSet(){
        //spring工具類,可以獲取指定注解的類
        Map<String, Object> allNeedClass = applicationContext.getBeansWithAnnotation(RedisMessageQueue.class);
        for (Map.Entry<String, Object> entry : allNeedClass.entrySet()) {
            Object bean = entry.getValue();
            Method[] methods = bean.getClass().getMethods();
            for (Method method : methods) {
                Annotation[] annotations = method.getDeclaredAnnotations();
                for (Annotation annotation : annotations) {
                    if (annotation instanceof RedisMessageQueueMethod) {
                        RedisMessageQueueMethod queueMethod = (RedisMessageQueueMethod) annotation;
                        //找的需要使用消息隊(duì)列的方法后,
                        initExecuteQueue(queueMethod, method, bean);
                    }
                    }
                }
            }
        }


    /**
     * 初始化執(zhí)行造作
     * @param queueAnnotations 注解
     * @param method 方法
     * @param bean 對象
     */
    void initExecuteQueue(RedisMessageQueueMethod queueAnnotations ,Method method,Object bean) {
        String threadName = queueAnnotations.threadName();
        int threadNum = queueAnnotations.threadNum();
        int threadSheepTime = queueAnnotations.threadSleepTime();
        String queueKey = queueAnnotations.queueKey();
        //獲取所有消息隊(duì)列名稱
        executorService = Executors.newFixedThreadPool(threadNum);
        for (int i = 0; i < threadNum; i++) {
            final int num = i;
            executorService.execute(() -> {
                Thread.currentThread().setName(threadName + "[" + num + "]");
                //如果沒有設(shè)置隊(duì)列queuekey或者已經(jīng)暫停則不執(zhí)行
                while (!monitorShutDowned) {
                    String value = null;
                    try {
                        value = monitorQueue.get(queueKey);
                        // 獲取數(shù)據(jù)時(shí)進(jìn)行刪除操作,刪除成功,則進(jìn)行處理,業(yè)務(wù)邏輯處理失敗則繼續(xù)添加回隊(duì)列但是時(shí)間設(shè)置最大以達(dá)到保存現(xiàn)場的目的,防止并發(fā)獲取重復(fù)數(shù)據(jù)
                        if (StringUtils.isNotEmpty(value)) {
                            if (log.isDebugEnabled()) {
                                log.debug("Monitor Thread[" + Thread.currentThread().getName() + "], get from queue,value = {}", value);
                            }
                            boolean success = (Boolean) method.invoke(bean, value);
                            // 失敗重試
                            if (!success) {
                                success =  (Boolean) method.invoke(bean, value);;
                                if (!success) {
                                    log.warn("Monitor Thread[" + Thread.currentThread().getName() + "] execute Failed,value = {}", value);
                                    monitorQueue.add(TimeUnit.DAYS,365, value, queueKey);
                                }
                            } else {
                                if (log.isDebugEnabled()) {
                                    log.debug("Monitor Thread[" + Thread.currentThread().getName() + "]:execute successfully!values = {}", value);
                                }
                            }
                        } else {
                            if (log.isDebugEnabled()) {
                                log.debug("Monitor Thread[" + Thread.currentThread().getName() + "]:monitorThreadRunning = {}", monitorStarted);
                            }
                            Thread.sleep(threadSheepTime);
                        }
                    } catch (Exception e) {
                        log.error("Monitor Thread[" + Thread.currentThread().getName() + "] execute Failed,value = " + value, e);
                    }
                }
                log.info("Monitor Thread[" + Thread.currentThread().getName() + "] Completed...");
            });
        }
        log.info("thread pool is started...");
    }

}
/**
 * @program: 
 * @description: 
 * 延時(shí)隊(duì)列接口實(shí)現(xiàn)類,
 * 使用redis的zset實(shí)現(xiàn)延時(shí)隊(duì)列,
 * @author: jiangchengxuan
 * @created:  2023/12/09 23:34
 */
public interface IDelayQueue <E> {
    /**
     * 向延時(shí)隊(duì)列中添加數(shù)據(jù)
     *
     * @param score 分?jǐn)?shù)
     * @param data  數(shù)據(jù)
     * @return true 成功 false 失敗
     */
    boolean add(long score, E data,String queueKey);


    /**
     * 向延時(shí)隊(duì)列中添加數(shù)據(jù)
     *
     * @param timeUnit 時(shí)間單位
     * @param time     延后時(shí)間
     * @param data     數(shù)據(jù)
     * @param queueKey
     * @return true 成功 false 失敗
     */
    boolean add(TimeUnit timeUnit, long time, E data, String queueKey);

    /**
     * 從延時(shí)隊(duì)列中獲取數(shù)據(jù)
     * @param queueKey 隊(duì)列key
     * @return 數(shù)據(jù)
     */
    String get(String queueKey);

    /**
     * 刪除數(shù)據(jù)
     *
     * @param key
     * @param data 數(shù)據(jù)
     * @return
     */
    public<T> boolean rem(String key, T data) ;
}
/**
 * @program: 
 * @description:  redis操作類,封裝了redis的操作方法,使用時(shí)直接注入即可使用,不需要關(guān)心redis的操作細(xì)節(jié),使用時(shí)只需要關(guān)心業(yè)務(wù)邏輯即可
 * @author: jiangchengxuan
 * @created: 2023/12/09 23:35
 */
@Service
public class RedisDelayQueue implements IDelayQueue<String> {

    @Autowired
    private RedisService redisService;


    @Override
    public boolean add(long score, String data,String queueKey) {
        return redisService.opsForZSet(Constant.DEFAULT_REDIS_QUEUE_KEY_PREFIX+queueKey, data, score);
    }

    @Override
    public boolean add(TimeUnit timeUnit, long time, String data, String queueKey) {
        switch (timeUnit) {
            case SECONDS:
                return add(LocalDateTime.now().plusSeconds(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data, queueKey);
            case MINUTES:
                return add(LocalDateTime.now().plusMinutes(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data,queueKey);
            case HOURS:
                return add(LocalDateTime.now().plusHours(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data,queueKey);
            case DAYS:
                return add(LocalDateTime.now().plusDays(time).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(), data,queueKey);
            default:
                return false;
        }
    }


    @Override
    public String get(String queueKey) {
        long now = System.currentTimeMillis();
        long min = Long.MIN_VALUE;
        Set<String> res = redisService.rangeByScoreZSet(Constant.DEFAULT_REDIS_QUEUE_KEY_PREFIX+queueKey, min, now, 0, 10);
        if (!CollectionUtils.isEmpty(res)) {
            for (String data : res){
                // 刪除成功,則進(jìn)行處理,防止并發(fā)獲取重復(fù)數(shù)據(jù)
                if (rem(queueKey, data)){
                    return data;
                }
            }
        }
        return null;
    }


    @Override
    public<T> boolean rem(String key, T data) {
        return redisService.remZSet(Constant.DEFAULT_REDIS_QUEUE_KEY_PREFIX+key, data);
    }
}
  • 使用
@RedisMessageQueue
public class SomethingClass
{
    @Autowired
    private IDelayQueue<String> messageQueue;

    /**
     * 生產(chǎn)者,向隊(duì)列中添加數(shù)據(jù),30秒后消費(fèi)者進(jìn)行消費(fèi)
     */
    public void test(){
        messageQueue.add(TimeUnit.SECONDS,30L,"這是參數(shù)數(shù)據(jù)","new_queue");
    }
    
    /**
     * 消費(fèi)者,如果按此配置的話,會啟動一個(gè)線程,線程名稱為:測試線程名稱,線程數(shù)量為1,線程休眠時(shí)間為10毫秒
     * 注意:queueKey需要與生產(chǎn)者中的queueKey保持一致才能進(jìn)行消費(fèi)
     * @param data 
     */
    @Override
    @RedisMessageQueueMethod(threadName = "測試線程名稱",queueKey = "new_queue",threadNum = 1,threadSleepTime = 10)
    public void testMethod(String data) {
        //do something
    }

}

 到此這篇關(guān)于Redis簡易延時(shí)隊(duì)列的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)Redis 延時(shí)隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Redisson分布式限流的實(shí)現(xiàn)原理解析

    Redisson分布式限流的實(shí)現(xiàn)原理解析

    這篇文章主要為大家介紹了Redisson分布式限流的實(shí)現(xiàn)原理解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-02-02
  • Redis數(shù)據(jù)過期策略的實(shí)現(xiàn)詳解

    Redis數(shù)據(jù)過期策略的實(shí)現(xiàn)詳解

    最近項(xiàng)目當(dāng)中遇到一個(gè)需求場景,需要清空一些存放在Redis的數(shù)據(jù),本文對Redis的過期機(jī)制簡單的講解一下,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-09-09
  • Redis 刪除策略的三種實(shí)現(xiàn)

    Redis 刪除策略的三種實(shí)現(xiàn)

    本文主要介紹了Redis 刪除策略的三種實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-06-06
  • redis延時(shí)隊(duì)列的項(xiàng)目實(shí)踐

    redis延時(shí)隊(duì)列的項(xiàng)目實(shí)踐

    本文主要介紹了redis延時(shí)隊(duì)列的項(xiàng)目實(shí)踐,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2024-11-11
  • 關(guān)于Redis未授權(quán)訪問的問題

    關(guān)于Redis未授權(quán)訪問的問題

    這篇文章主要介紹了Redis未授權(quán)訪問的問題,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-07-07
  • Redis實(shí)現(xiàn)限量優(yōu)惠券的秒殺功能

    Redis實(shí)現(xiàn)限量優(yōu)惠券的秒殺功能

    文章詳細(xì)分析了避免超賣問題的方法,包括確保一人一單的業(yè)務(wù)邏輯,并提供了代碼實(shí)現(xiàn)步驟和代碼示例,感興趣的朋友跟隨小編一起看看吧
    2024-12-12
  • 基于Redis的List實(shí)現(xiàn)特價(jià)商品列表功能

    基于Redis的List實(shí)現(xiàn)特價(jià)商品列表功能

    本文通過場景分析給大家介紹了基于Redis的List實(shí)現(xiàn)特價(jià)商品列表,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下
    2021-08-08
  • redis使用skiplist跳表的原因解析

    redis使用skiplist跳表的原因解析

    經(jīng)常會有人問這個(gè)問題,redis中為什么要使用跳表?這個(gè)問題,redis作者已經(jīng)給出過明確答案,今天通過本文再給大家講解下這個(gè)問題,對redis?skiplist跳表知識感興趣的朋友一起看看吧
    2022-10-10
  • 利用redisson快速實(shí)現(xiàn)自定義限流注解(接口防刷)

    利用redisson快速實(shí)現(xiàn)自定義限流注解(接口防刷)

    利用redis的有序集合即Sorted?Set數(shù)據(jù)結(jié)構(gòu),構(gòu)造一個(gè)令牌桶來實(shí)施限流,而redisson已經(jīng)幫我們封裝成了RRateLimiter,通過redisson,即可快速實(shí)現(xiàn)我們的目標(biāo),這篇文章主要介紹了利用redisson快速實(shí)現(xiàn)自定義限流注解,需要的朋友可以參考下
    2024-07-07
  • Redis中常見的幾種集群部署方案

    Redis中常見的幾種集群部署方案

    本文主要介紹了Redis中常見的幾種集群部署方案,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2022-03-03

最新評論