Java實(shí)現(xiàn)Redis延時(shí)消息隊(duì)列
什么是延時(shí)任務(wù)
延時(shí)任務(wù),顧名思義,就是延遲一段時(shí)間后才執(zhí)行的任務(wù)。舉個(gè)例子,假設(shè)我們有個(gè)發(fā)布資訊的功能,運(yùn)營(yíng)需要在每天早上7點(diǎn)準(zhǔn)時(shí)發(fā)布資訊,但是早上7點(diǎn)大家都還沒(méi)上班,這個(gè)時(shí)候就可以使用延時(shí)任務(wù)來(lái)實(shí)現(xiàn)資訊的延時(shí)發(fā)布了。只要在前一天下班前指定第二天要發(fā)送資訊的時(shí)間,到了第二天指定的時(shí)間點(diǎn)資訊就能準(zhǔn)時(shí)發(fā)出去了。如果大家有運(yùn)營(yíng)過(guò)公眾號(hào),就會(huì)知道公眾號(hào)后臺(tái)也有文章定時(shí)發(fā)送的功能??偠灾?,延時(shí)任務(wù)的使用還是很廣泛的。
延時(shí)任務(wù)的特點(diǎn)
- 時(shí)間有序性
- 時(shí)間具體性
- 任務(wù)中攜帶詳細(xì)的信息 ,通常包括 任務(wù)ID, 任務(wù)的類(lèi)型 ,時(shí)間點(diǎn)。
實(shí)現(xiàn)思路:
將整個(gè)Redis當(dāng)做消息池,以kv形式存儲(chǔ)消息,key為id,value為具體的消息body
使用ZSET做優(yōu)先隊(duì)列,按照score維持優(yōu)先級(jí)(用當(dāng)前時(shí)間+需要延時(shí)的時(shí)間作為score)
輪詢ZSET,拿出score比當(dāng)前時(shí)間戳大的數(shù)據(jù)(已過(guò)期的)
根據(jù)id拿到消息池的具體消息進(jìn)行消費(fèi)
消費(fèi)成功,刪除改隊(duì)列和消息
消費(fèi)失敗,讓該消息重新回到隊(duì)列
代碼實(shí)現(xiàn)

1.消息模型
import lombok.Data;
import lombok.experimental.Accessors;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
/**
* Redis 消息隊(duì)列中的消息體
* @author shikanatsu
*/
@Data
@Accessors(chain = true)
public class RedisMessage implements Serializable {
/** 消息隊(duì)列組 **/
private String group;
/**
* 消息id
*/
private String id;
/**
* 消息延遲/ 秒
*/
@NotNull(message = "消息延時(shí)時(shí)間不能為空")
private long delay;
/**
* 消息存活時(shí)間 單位:秒
*/
@NotNull(message = "消息存活時(shí)間不能為空")
private int ttl;
/**
* 消息體,對(duì)應(yīng)業(yè)務(wù)內(nèi)容
*/
private Object body;
/**
* 創(chuàng)建時(shí)間,如果只有優(yōu)先級(jí)沒(méi)有延遲,可以設(shè)置創(chuàng)建時(shí)間為0
* 用來(lái)消除時(shí)間的影響
*/
private long createTime;
}
2.RedisMq 消息隊(duì)列實(shí)現(xiàn)類(lèi)
package com.shixun.base.redisMq;
import com.shixun.base.jedis.service.RedisService;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* Redis消息隊(duì)列
*
* @author shikanatsu
*/
@Component
public class RedisMq {
/**
* 消息池前綴,以此前綴加上傳遞的消息id作為key,以消息{@link MSG_POOL}
* 的消息體body作為值存儲(chǔ)
*/
public static final String MSG_POOL = "Message:Pool:";
/**
* zset隊(duì)列 名稱(chēng) queue
*/
public static final String QUEUE_NAME = "Message:Queue:";
// private static final int SEMIH = 30 * 60;
@Resource
private RedisService redisService;
/**
* 存入消息池
*
* @param message
* @return
*/
public boolean addMsgPool(RedisMessage message) {
if (null != message) {
redisService.set(MSG_POOL + message.getGroup() + message.getId(), message, message.getTtl());
return true;
}
return false;
}
/**
* 從消息池中刪除消息
*
* @param id
* @return
*/
public void deMsgPool(String group, String id) {
redisService.remove(MSG_POOL + group + id);
}
/**
* 向隊(duì)列中添加消息
*
* @param key
* @param score 優(yōu)先級(jí)
* @param val
* @return 返回消息id
*/
public void enMessage(String key, long score, String val) {
redisService.zsset(key, val, score);
}
/**
* 從隊(duì)列刪除消息
*
* @param id
* @return
*/
public boolean deMessage(String key, String id) {
return redisService.zdel(key, id);
}
}
3.消息生產(chǎn)者
import cn.hutool.core.convert.Convert;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.IdUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
/**
* 消息生產(chǎn)者
*
* @author shikanatsu
*/
@Component
public class MessageProvider {
static Logger logger = LoggerFactory.getLogger(MessageProvider.class);
@Resource
private RedisMq redisMq;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public boolean sendMessage(@Validated RedisMessage message) {
Assert.notNull(message);
//The priority is if there is no creation time
// message.setCreateTime(System.currentTimeMillis());
message.setId(IdUtil.fastUUID());
Long delayTime = message.getCreateTime() + Convert.convertTime(message.getDelay(), TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
try {
redisMq.addMsgPool(message);
redisMq.enMessage(RedisMq.QUEUE_NAME+message.getGroup(), delayTime, message.getId());
logger.info("RedisMq發(fā)送消費(fèi)信息{},當(dāng)前時(shí)間:{},消費(fèi)時(shí)間預(yù)計(jì){}",message.toString(),new Date(),sdf.format(delayTime));
}catch (Exception e){
e.printStackTrace();
logger.error("RedisMq 消息發(fā)送失敗,當(dāng)前時(shí)間:{}",new Date());
return false;
}
return true;
}
}
4.消息消費(fèi)者
/**
* Redis消息消費(fèi)者
* @author shikanatsu
*/
@Component
public class RedisMqConsumer {
private static final Logger log = LoggerFactory.getLogger(RedisMqConsumer.class);
@Resource
private RedisMq redisMq;
@Resource
private RedisService redisService;
@Resource
private MessageProvider provider;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//@Scheduled(cron = "*/1 * * * * ? ")
/**
Instead of a thread loop, you can use Cron expressions to perform periodic tasks
*/
public void baseMonitor(RedisMqExecute mqExecute){
String queueName = RedisMq.QUEUE_NAME+mqExecute.getQueueName();
//The query is currently expired
Set<Object> set = redisService.rangeByScore(queueName, 0, System.currentTimeMillis());
if (null != set) {
long current = System.currentTimeMillis();
for (Object id : set) {
long score = redisService.getScore(queueName, id.toString()).longValue();
//Once again the guarantee has expired , And then perform the consumption
if (current >= score) {
String str = "";
RedisMessage message = null;
String msgPool = RedisMq.MSG_POOL+mqExecute.getQueueName();
try {
message = (RedisMessage)redisService.get(msgPool + id.toString());
log.debug("RedisMq:{},get RedisMessage success now Time:{}",str,sdf.format(System.currentTimeMillis()));
if(null==message){
return;
}
//Do something ; You can add a judgment here and if it fails you can add it to the queue again
mqExecute.execute(message);
} catch (Exception e) {
e.printStackTrace();
//If an exception occurs, it is put back into the queue
// todo: If repeated, this can lead to repeated cycles
log.error("RedisMq: RedisMqMessage exception ,It message rollback , If repeated, this can lead to repeated cycles{}",new Date());
provider.sendMessage(message);
} finally {
redisMq.deMessage(queueName, id.toString());
redisMq.deMsgPool(message.getGroup(),id.toString());
}
}
}
}
}
}
5. 消息執(zhí)接口
/**
* @author shikanatsu
*/
public interface RedisMqExecute {
/**
* 獲取隊(duì)列名稱(chēng)
* @return
*/
public String getQueueName();
/**
* 統(tǒng)一的通過(guò)執(zhí)行期執(zhí)行
* @param message
* @return
*/
public boolean execute(RedisMessage message);
/**
* Perform thread polling
*/
public void threadPolling();
}
6. 任務(wù)類(lèi)型的實(shí)現(xiàn)類(lèi):可以根據(jù)自己的情況去實(shí)現(xiàn)對(duì)應(yīng)的隊(duì)列需求
/**
* 訂單執(zhí)行
*
* @author shikanatsu
*/
@Service
public class OrderMqExecuteImpl implements RedisMqExecute {
private static Logger logger = LoggerFactory.getLogger(OrderMqExecuteImpl.class);
public final static String name = "orderPoll:";
@Resource
private RedisMqConsumer redisMqConsumer;
private RedisMqExecute mqExecute = this;
@Resource
private OrderService orderService;
@Override
public String getQueueName() {
return name;
}
@Override
/**
* For the time being, only all orders will be processed. You can change to make orders
*/
public boolean execute(RedisMessage message) {
logger.info("Do orderMqPoll ; Time:{}",new Date());
//Do
return true;
}
@Override
/** 通過(guò)線程去執(zhí)行輪詢的過(guò)程,時(shí)間上可以自由控制 **/
public void threadPolling() {
ThreadUtil.execute(() -> {
while (true) {
redisMqConsumer.baseMonitor(mqExecute);
ThreadUtil.sleep(5, TimeUnit.MICROSECONDS);
}
});
}
}
使用事例
1. 實(shí)現(xiàn)RedisMqExecute 接口 創(chuàng)建對(duì)應(yīng)的輪詢或者采取定時(shí)器的方式執(zhí)行 和實(shí)現(xiàn)具體的任務(wù)。
2. 通過(guò)MessageProvider 實(shí)現(xiàn)相對(duì)應(yīng)的消息服務(wù)和綁定隊(duì)列組,通過(guò)隊(duì)列組的方式執(zhí)行。
3. 提示: 采取線程的方式需要在項(xiàng)目啟動(dòng)過(guò)程中執(zhí)行,采取定時(shí)器或者調(diào)度的方式可以更加動(dòng)態(tài)的調(diào)整。
到此這篇關(guān)于Java實(shí)現(xiàn)Redis延時(shí)消息隊(duì)列的文章就介紹到這了,更多相關(guān)Java Redis延時(shí)消息隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java后端向前端返回文件流實(shí)現(xiàn)下載功能
后端可以使用Java中servlet提供的HttpServletResponse,核心步驟是要設(shè)置響應(yīng)的數(shù)據(jù)類(lèi)型,設(shè)置為某一類(lèi)文件類(lèi)型或二進(jìn)制格式,以及響應(yīng)頭,然后用ServletOutputStream將文件以流的形式發(fā)送到前端,本文介紹Java后端向前端返回文件流實(shí)現(xiàn)下載功能,感興趣的朋友一起看看吧2023-12-12
Java設(shè)計(jì)模式之享元模式實(shí)例詳解
這篇文章主要介紹了Java設(shè)計(jì)模式之享元模式,結(jié)合實(shí)例形式詳細(xì)分析了享元模式的概念、功能、定義及使用方法,需要的朋友可以參考下2018-04-04
關(guān)于idea中出現(xiàn)nbsp和zwsp的完美解決辦法
本文給大家介紹關(guān)于idea中出現(xiàn)nbsp和zwsp的解決辦法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2022-06-06
springdata jpa使用Example快速實(shí)現(xiàn)動(dòng)態(tài)查詢功能
這篇文章主要介紹了springdata jpa使用Example快速實(shí)現(xiàn)動(dòng)態(tài)查詢功能,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11
Java8如何從一個(gè)Stream中過(guò)濾null值
這篇文章主要介紹了Java8如何從一個(gè)Stream中過(guò)濾null值,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-05-05
Spring Boot使用Druid進(jìn)行維度的統(tǒng)計(jì)和監(jiān)控
這篇文章主要介紹了Spring Boot使用Druid進(jìn)行維度的統(tǒng)計(jì)和監(jiān)控,非常不錯(cuò),具有參考借鑒價(jià)值,需要的朋友可以參考下2017-04-04
java 并發(fā)編程之共享變量的實(shí)現(xiàn)方法
這篇文章主要介紹了java 并發(fā)編程之共享變量的實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-09-09

