欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java實現(xiàn)Redis延時消息隊列

 更新時間:2021年08月12日 15:38:37   作者:shikanatsu  
本文主要介紹了Java實現(xiàn)Redis延時消息隊列,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下

什么是延時任務(wù)

延時任務(wù),顧名思義,就是延遲一段時間后才執(zhí)行的任務(wù)。舉個例子,假設(shè)我們有個發(fā)布資訊的功能,運營需要在每天早上7點準時發(fā)布資訊,但是早上7點大家都還沒上班,這個時候就可以使用延時任務(wù)來實現(xiàn)資訊的延時發(fā)布了。只要在前一天下班前指定第二天要發(fā)送資訊的時間,到了第二天指定的時間點資訊就能準時發(fā)出去了。如果大家有運營過公眾號,就會知道公眾號后臺也有文章定時發(fā)送的功能??偠灾?,延時任務(wù)的使用還是很廣泛的。

延時任務(wù)的特點

  • 時間有序性
  • 時間具體性
  • 任務(wù)中攜帶詳細的信息 ,通常包括 任務(wù)ID, 任務(wù)的類型 ,時間點。

實現(xiàn)思路:

將整個Redis當(dāng)做消息池,以kv形式存儲消息,key為id,value為具體的消息body
使用ZSET做優(yōu)先隊列,按照score維持優(yōu)先級(用當(dāng)前時間+需要延時的時間作為score)
輪詢ZSET,拿出score比當(dāng)前時間戳大的數(shù)據(jù)(已過期的)
根據(jù)id拿到消息池的具體消息進行消費
消費成功,刪除改隊列和消息
消費失敗,讓該消息重新回到隊列

代碼實現(xiàn)

1.消息模型

import lombok.Data;
import lombok.experimental.Accessors;

import javax.validation.constraints.NotNull;
import java.io.Serializable;

/**
 * Redis 消息隊列中的消息體
 * @author shikanatsu
 */
@Data
@Accessors(chain = true)
public class RedisMessage implements Serializable {

    /** 消息隊列組 **/
    private String group;

    /**
     * 消息id
     */
    private String id;

    /**
     * 消息延遲/ 秒
     */
    @NotNull(message = "消息延時時間不能為空")
    private long delay;

    /**
     * 消息存活時間 單位:秒
     */
    @NotNull(message = "消息存活時間不能為空")
    private int ttl;
    /**
     * 消息體,對應(yīng)業(yè)務(wù)內(nèi)容
     */
    private Object body;
    /**
     * 創(chuàng)建時間,如果只有優(yōu)先級沒有延遲,可以設(shè)置創(chuàng)建時間為0
     * 用來消除時間的影響
     */
    private long createTime;
}

2.RedisMq 消息隊列實現(xiàn)類

package com.shixun.base.redisMq;

import com.shixun.base.jedis.service.RedisService;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * Redis消息隊列
 *
 * @author shikanatsu
 */
@Component
public class RedisMq {

    /**
     * 消息池前綴,以此前綴加上傳遞的消息id作為key,以消息{@link MSG_POOL}
     * 的消息體body作為值存儲
     */
    public static final String MSG_POOL = "Message:Pool:";

    /**
     * zset隊列 名稱 queue
     */
    public static final String QUEUE_NAME = "Message:Queue:";

//    private static final int SEMIH = 30 * 60;


    @Resource
    private RedisService redisService;

    /**
     * 存入消息池
     *
     * @param message
     * @return
     */
    public boolean addMsgPool(RedisMessage message) {
        if (null != message) {
            redisService.set(MSG_POOL + message.getGroup() + message.getId(), message, message.getTtl());
            return true;
        }
        return false;
    }

    /**
     * 從消息池中刪除消息
     *
     * @param id
     * @return
     */
    public void deMsgPool(String group, String id) {
        redisService.remove(MSG_POOL + group + id);
    }

    /**
     * 向隊列中添加消息
     *
     * @param key
     * @param score 優(yōu)先級
     * @param val
     * @return 返回消息id
     */
    public void enMessage(String key, long score, String val) {
        redisService.zsset(key, val, score);
    }

    /**
     * 從隊列刪除消息
     *
     * @param id
     * @return
     */
    public boolean deMessage(String key, String id) {
        return redisService.zdel(key, id);
    }
}

3.消息生產(chǎn)者

import cn.hutool.core.convert.Convert;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.IdUtil;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;

import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * 消息生產(chǎn)者
 *
 * @author shikanatsu
 */
@Component
public class MessageProvider {

    static Logger logger = LoggerFactory.getLogger(MessageProvider.class);

    @Resource
    private RedisMq redisMq;

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");


    public boolean sendMessage(@Validated RedisMessage message) {
        Assert.notNull(message);
        //The priority is if there is no creation time
//        message.setCreateTime(System.currentTimeMillis());
        message.setId(IdUtil.fastUUID());
        Long delayTime = message.getCreateTime() + Convert.convertTime(message.getDelay(), TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
        try {
            redisMq.addMsgPool(message);
            redisMq.enMessage(RedisMq.QUEUE_NAME+message.getGroup(), delayTime, message.getId());
            logger.info("RedisMq發(fā)送消費信息{},當(dāng)前時間:{},消費時間預(yù)計{}",message.toString(),new Date(),sdf.format(delayTime));
        }catch (Exception e){
            e.printStackTrace();
            logger.error("RedisMq 消息發(fā)送失敗,當(dāng)前時間:{}",new Date());
            return false;
        }
        return true;
    }
}

4.消息消費者

/**
 * Redis消息消費者
 * @author shikanatsu
 */
@Component
public class RedisMqConsumer {

    private static final Logger log = LoggerFactory.getLogger(RedisMqConsumer.class);

    @Resource
    private RedisMq redisMq;

    @Resource
    private RedisService redisService;

    @Resource
    private MessageProvider provider;

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");


    //@Scheduled(cron = "*/1 * * * * ? ")
    /**
     Instead of a thread loop, you can use Cron expressions to perform periodic tasks
     */
    public void baseMonitor(RedisMqExecute mqExecute){
        String queueName = RedisMq.QUEUE_NAME+mqExecute.getQueueName();
        //The query is currently expired
        Set<Object> set = redisService.rangeByScore(queueName, 0, System.currentTimeMillis());
        if (null != set) {
            long current = System.currentTimeMillis();
            for (Object id : set) {
                long  score = redisService.getScore(queueName, id.toString()).longValue();
                //Once again the guarantee has expired , And then perform the consumption
                if (current >= score) {
                    String str = "";
                    RedisMessage message = null;
                    String msgPool = RedisMq.MSG_POOL+mqExecute.getQueueName();
                    try {
                        message = (RedisMessage)redisService.get(msgPool + id.toString());
                        log.debug("RedisMq:{},get RedisMessage success now Time:{}",str,sdf.format(System.currentTimeMillis()));
                        if(null==message){
                            return;
                        }
                        //Do something ; You can add a judgment here and if it fails you can add it to the queue again
                        mqExecute.execute(message);
                    } catch (Exception e) {
                        e.printStackTrace();
                        //If an exception occurs, it is put back into the queue
                        // todo:  If repeated, this can lead to repeated cycles
                        log.error("RedisMq: RedisMqMessage exception ,It message rollback , If repeated, this can lead to repeated cycles{}",new Date());
                        provider.sendMessage(message);
                    } finally {
                        redisMq.deMessage(queueName, id.toString());
                        redisMq.deMsgPool(message.getGroup(),id.toString());
                    }
                }
            }
        }
    }
}

5. 消息執(zhí)接口

/**
 * @author shikanatsu
 */


public interface RedisMqExecute {

    /**
     * 獲取隊列名稱
     * @return
     */
    public String getQueueName();


    /**
     * 統(tǒng)一的通過執(zhí)行期執(zhí)行
     * @param message
     * @return
     */
    public boolean execute(RedisMessage message);


    /**
     * Perform thread polling
     */

    public void   threadPolling();

}

6. 任務(wù)類型的實現(xiàn)類:可以根據(jù)自己的情況去實現(xiàn)對應(yīng)的隊列需求 

/**
 * 訂單執(zhí)行
 *
 * @author shikanatsu
 */
@Service
public class OrderMqExecuteImpl implements RedisMqExecute {


    private static Logger logger = LoggerFactory.getLogger(OrderMqExecuteImpl.class);

    public final static String name = "orderPoll:";

    @Resource
    private RedisMqConsumer redisMqConsumer;

    private RedisMqExecute mqExecute = this;

    @Resource
    private OrderService orderService;


    @Override
    public String getQueueName() {
        return name;
    }

    @Override
    /**
     * For the time being, only all orders will be processed. You can change to make orders
     */
    public boolean execute(RedisMessage message) {
        logger.info("Do orderMqPoll ; Time:{}",new Date());
  //Do 
        return true;
    }

    @Override
    /**  通過線程去執(zhí)行輪詢的過程,時間上可以自由控制 **/
    public void threadPolling() {
        ThreadUtil.execute(() -> {
            while (true) {
                redisMqConsumer.baseMonitor(mqExecute);
                ThreadUtil.sleep(5, TimeUnit.MICROSECONDS);
            }
        });
    }
}

使用事例
 1. 實現(xiàn)RedisMqExecute 接口 創(chuàng)建對應(yīng)的輪詢或者采取定時器的方式執(zhí)行 和實現(xiàn)具體的任務(wù)。
 2.  通過MessageProvider 實現(xiàn)相對應(yīng)的消息服務(wù)和綁定隊列組,通過隊列組的方式執(zhí)行。
 3. 提示: 采取線程的方式需要在項目啟動過程中執(zhí)行,采取定時器或者調(diào)度的方式可以更加動態(tài)的調(diào)整。

到此這篇關(guān)于Java實現(xiàn)Redis延時消息隊列的文章就介紹到這了,更多相關(guān)Java Redis延時消息隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java后端向前端返回文件流實現(xiàn)下載功能

    Java后端向前端返回文件流實現(xiàn)下載功能

    后端可以使用Java中servlet提供的HttpServletResponse,核心步驟是要設(shè)置響應(yīng)的數(shù)據(jù)類型,設(shè)置為某一類文件類型或二進制格式,以及響應(yīng)頭,然后用ServletOutputStream將文件以流的形式發(fā)送到前端,本文介紹Java后端向前端返回文件流實現(xiàn)下載功能,感興趣的朋友一起看看吧
    2023-12-12
  • mybatis xml中特殊字符處理及特殊符號

    mybatis xml中特殊字符處理及特殊符號

    這篇文章主要介紹了mybatis xml中特殊字符處理及mybatis特殊符號處理技巧,mybatis特殊符號處理給大家介紹了兩種寫法,感興趣的朋友一起看看吧
    2018-03-03
  • Java PhantomJs完成html圖片輸出功能

    Java PhantomJs完成html圖片輸出功能

    給大家?guī)硪黄P(guān)于用Java PhantomJs完成html圖片輸出功能的教學(xué)內(nèi)容,有興趣的朋友學(xué)習(xí)參考下吧。
    2017-12-12
  • Java設(shè)計模式之享元模式實例詳解

    Java設(shè)計模式之享元模式實例詳解

    這篇文章主要介紹了Java設(shè)計模式之享元模式,結(jié)合實例形式詳細分析了享元模式的概念、功能、定義及使用方法,需要的朋友可以參考下
    2018-04-04
  • 關(guān)于idea中出現(xiàn)nbsp和zwsp的完美解決辦法

    關(guān)于idea中出現(xiàn)nbsp和zwsp的完美解決辦法

    本文給大家介紹關(guān)于idea中出現(xiàn)nbsp和zwsp的解決辦法,本文給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧
    2022-06-06
  • Spring Batch入門教程篇

    Spring Batch入門教程篇

    這篇文章主要給大家介紹了Spring Batch入門的相關(guān)資料,文中介紹的非常詳細,對大家具有一定的參考學(xué)習(xí)價值,需要的朋友們下面跟著小編一起來學(xué)習(xí)學(xué)習(xí)吧。
    2017-06-06
  • springdata jpa使用Example快速實現(xiàn)動態(tài)查詢功能

    springdata jpa使用Example快速實現(xiàn)動態(tài)查詢功能

    這篇文章主要介紹了springdata jpa使用Example快速實現(xiàn)動態(tài)查詢功能,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2021-11-11
  • Java8如何從一個Stream中過濾null值

    Java8如何從一個Stream中過濾null值

    這篇文章主要介紹了Java8如何從一個Stream中過濾null值,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2022-05-05
  • Spring Boot使用Druid進行維度的統(tǒng)計和監(jiān)控

    Spring Boot使用Druid進行維度的統(tǒng)計和監(jiān)控

    這篇文章主要介紹了Spring Boot使用Druid進行維度的統(tǒng)計和監(jiān)控,非常不錯,具有參考借鑒價值,需要的朋友可以參考下
    2017-04-04
  • java 并發(fā)編程之共享變量的實現(xiàn)方法

    java 并發(fā)編程之共享變量的實現(xiàn)方法

    這篇文章主要介紹了java 并發(fā)編程之共享變量的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-09-09

最新評論