redis實現(xiàn)延時隊列的兩種方式(小結(jié))
背景
項目中的流程監(jiān)控,有幾種節(jié)點,需要監(jiān)控每一個節(jié)點是否超時。按傳統(tǒng)的做法,肯定是通過定時任務(wù),去掃描然后判斷,但是定時任務(wù)有缺點:1,數(shù)據(jù)量大會慢;2,時間不好控制,太短,怕一次處理不完,太長狀態(tài)就會有延遲。所以就想到用延遲隊列的方式去實現(xiàn)。
一,redis的過期key監(jiān)控
1,開啟過期key監(jiān)聽
在redis的配置里把這個注釋去掉
notify-keyspace-events Ex
然后重啟redis
2,使用redis過期監(jiān)聽實現(xiàn)延遲隊列
繼承KeyExpirationEventMessageListener類,實現(xiàn)父類的方法,就可以監(jiān)聽key過期時間了。當(dāng)有key過期,就會執(zhí)行這里。這里就把需要的key過濾出來,然后發(fā)送給kafka隊列。
@Component
@Slf4j
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
@Autowired
private KafkaProducerService kafkaProducerService;
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
* 針對 redis 數(shù)據(jù)失效事件,進行數(shù)據(jù)處理
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern){
if(message == null || StringUtils.isEmpty(message.toString())){
return;
}
String content = message.toString();
//key的格式為 flag:時效類型:運單號 示例如下
try {
if(content.startsWith(AbnConstant.EMS)){
kafkaProducerService.sendMessageSync(TopicConstant.EMS_WAYBILL_ABN_QUEUE,content);
}else if(content.startsWith(AbnConstant.YUNDA)){
kafkaProducerService.sendMessageSync(TopicConstant.YUNDA_WAYBILL_ABN_QUEUE,content);
}
} catch (Exception e) {
log.error("監(jiān)控過期key,發(fā)送kafka異常,",e);
}
}
}
可以看的出來,這種方式其實是很簡單的,但是有幾個問題需要注意,一是,這個盡量單機運行,因為多臺機器都會執(zhí)行,浪費cpu,增加數(shù)據(jù)庫負(fù)擔(dān)。二是,機器頻繁部署的時候,如果有時間間隔,會出現(xiàn)數(shù)據(jù)的漏處理。
二,redis的zset實現(xiàn)延遲隊列
1,生產(chǎn)者實現(xiàn)
可以看到生產(chǎn)者很簡單,其實就是利用zset的特性,給一個zset添加元素而已,而時間就是它的score。
public void produce(Integer taskId, long exeTime) {
System.out.println("加入任務(wù), taskId: " + taskId + ", exeTime: " + exeTime + ", 當(dāng)前時間:" + LocalDateTime.now());
RedisOps.getJedis().zadd(RedisOps.key, exeTime, String.valueOf(taskId));
}
2,消費者實現(xiàn)
消費者的代碼也不難,就是把已經(jīng)過期的zset中的元素給刪除掉,然后處理數(shù)據(jù)。
public void consumer() {
Executors.newSingleThreadExecutor().submit(new Runnable() {
@Override
public void run() {
while (true) {
Set<String> taskIdSet = RedisOps.getJedis().zrangeByScore(RedisOps.key, 0, System.currentTimeMillis(), 0, 1);
if (taskIdSet == null || taskIdSet.isEmpty()) {
System.out.println("沒有任務(wù)");
} else {
taskIdSet.forEach(id -> {
long result = RedisOps.getJedis().zrem(RedisOps.key, id);
if (result == 1L) {
System.out.println("從延時隊列中獲取到任務(wù),taskId:" + id + " , 當(dāng)前時間:" + LocalDateTime.now());
}
});
}
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
可以看到這種方式其實是比上個方式要好的。因為,他的那兩個缺點都被克服掉了。多臺機器也沒事兒,也不用再擔(dān)心部署時間間隔長的問題。
總結(jié)
兩個方式都是不錯的,都能解決問題。碰到問題,多思考,多總結(jié)。
到此這篇關(guān)于redis實現(xiàn)延時隊列的兩種方式(小結(jié))的文章就介紹到這了,更多相關(guān)redis 延時隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redis數(shù)據(jù)結(jié)構(gòu)之鏈表詳解
大家好,本篇文章主要講的是Redis數(shù)據(jù)結(jié)構(gòu)之鏈表詳解,感興趣的同學(xué)趕快來看一看吧,對你有幫助的話記得收藏一下,方便下次瀏覽2021-12-12
Redis擊穿穿透雪崩產(chǎn)生原因分析及解決思路面試
這篇文章主要為大家介紹了Redis擊穿穿透雪崩產(chǎn)生原因及解決思路的面試問題答案參考,有需要的朋友可以借鑒參考下,希望能夠有所幫助祝大家多多進步2022-03-03
為何Redis使用跳表而非紅黑樹實現(xiàn)SortedSet
本篇文章主要介紹了為何Redis使用跳表而非紅黑樹實現(xiàn)SortedSet,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-09-09
高效異步redis客戶端aredis優(yōu)劣勢原理解析
這篇文章主要介紹了高效異步redis客戶端aredis優(yōu)劣勢原理解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-09-09
Redis 緩存實現(xiàn)存儲和讀取歷史搜索關(guān)鍵字的操作方法
這篇文章主要介紹了Redis 緩存實現(xiàn)存儲和讀取歷史搜索關(guān)鍵字,本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-12-12

