生產(chǎn)redisson延時隊列不消費(fèi)問題排查解決
問題描述
項目使用redisson延時隊列功能,實(shí)現(xiàn)直播的開播提醒,突然有一天業(yè)務(wù)爆出問題,未觸發(fā)開播提醒。
初步排查
首先通過查詢生產(chǎn)日志,發(fā)送端日志存在,沒有消費(fèi)日志,猜測消費(fèi)端沒有消費(fèi)到延時消息,,在dba的協(xié)助下查詢redis隊列,消息也確實(shí)存在,但已經(jīng)過了過期時間,由此證明redisson消費(fèi)者出現(xiàn)問題。通過服務(wù)日志發(fā)現(xiàn)在最后一次設(shè)置自定義推送任務(wù)是在一次服務(wù)發(fā)布之前,服務(wù)發(fā)布后,之前設(shè)置的自定義推送消息均沒有被客戶端消費(fèi),由此猜想是由發(fā)布服務(wù)導(dǎo)致消費(fèi)端失效。
排查過程
發(fā)送端代碼
public <T> void produce(String delayQueue, T t, long delay, TimeUnit timeUnit) { try { log.info("delay msg,delayQueue:{},key:{},delay:{}", delayQueue, t, delay); if (delay < 0) { delay = 0; } RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(delayQueue); RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue); delayedQueue.offer(t, delay, timeUnit); }catch (Exception e){ log.error("添加延時任務(wù)隊列失敗",e); } }
消費(fèi)端代碼
public class DelayTaskHandler implements Runnable { @Override public void run() { RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(delayQueue); while (true) { try { T value = blockingFairQueue.take(); log.info("delay queue {},延時任務(wù)開始執(zhí)行,value - {} , timeStamp - {} , threadName - {}", delayQueue, value, System.currentTimeMillis(), Thread.currentThread().getName()); consumer.accept(value); } catch (Exception e) { log.error("延時任務(wù)執(zhí)行失敗,", e); } } } }
因為redisson 延時隊列是基于redis實(shí)現(xiàn)的,所以從redis執(zhí)行命令開始入手排查
1.打開redis監(jiān)控,啟動服務(wù),發(fā)現(xiàn)redis首先執(zhí)行了blpop命令,阻塞等待{cl-live-admin:notice_delay_queue} 隊列消息
2.提交一個延時任務(wù)后,觀察redis命令
此時發(fā)現(xiàn)redis首先執(zhí)行了一個SUBSCRIBE命令,訂閱了一個隊列,然后執(zhí)行了一段lua腳本,主要包括以下命令:
- zrangebyscore:獲取zset中score在0至當(dāng)前時間戳范圍內(nèi)的前一百條數(shù)據(jù) 如果獲取到數(shù)據(jù)則循環(huán)執(zhí)行rpush,lrem,zrem命令
- zrange:取zset中第一條數(shù)據(jù)
- zadd:向zset中添加一條數(shù)據(jù),score為時間戳
- rpush:向list右邊push一條數(shù)據(jù)
- publish:如果添加的消息在頂部,則發(fā)布一條訂閱消息
3.消費(fèi)一條消息
同樣消費(fèi)的時候也是提交了一條lua腳本,主要執(zhí)行了以下命令 可以看到和發(fā)送端命令相似
- zrangebyscore:獲取zset中score在0至當(dāng)前時間戳范圍內(nèi)的前一百條數(shù)據(jù)
- rpush:向list右邊push一條數(shù)據(jù)
- lrem:刪除一條數(shù)據(jù)
- zrem:刪除zeset中的數(shù)據(jù)
- zrange:獲取第一條數(shù)據(jù)
- BLPOP:阻塞等待隊列消息
通過以上redis命令的執(zhí)行可以發(fā)現(xiàn)一個命令SUBCRIBE用于訂閱redis的一個隊列,而這個命令只在發(fā)送消息的時候執(zhí)行了,在消費(fèi)的時候沒有執(zhí)行。從而驗證了當(dāng)服務(wù)重啟后如果沒有新的消息發(fā)送,那么客戶端就不會發(fā)送SUBCRIBE命令,訂閱延時隊列,這就導(dǎo)致在服務(wù)重啟前發(fā)送的消息到時間后無法消費(fèi)。
解決方案
在消費(fèi)端啟動的時候添加一行代碼用于訂閱延時隊列
//訂閱redis隊列 redissonClient.getDelayedQueue(blockingFairQueue);
那么為什么沒有訂閱就消費(fèi)不到消息了呢?帶著疑問繼續(xù)深入理解redisson的實(shí)現(xiàn)
redisson 延時隊列原理
首先回到消費(fèi)端代碼
在我們沒有發(fā)送訂閱命令的時候,客戶端只是在阻塞等待一個指定隊列的消息,那么這個隊列的消息是誰放進(jìn)去的呢? 帶著疑問我們再看發(fā)送端代碼
直接進(jìn)入 delayedQueue.offer()方法內(nèi)部
可以看到發(fā)送端是提交了一個lua腳本主要執(zhí)行了zadd,rpush,publish命令,這里我們需要注意publish命令,在redis中pub/sub是對應(yīng)的,當(dāng)有publish的時候,那么subcribe端會收到該訂閱消息。
那么是誰收到了訂閱的消息,收到消息后又做了什么呢,回到redissonClient.getDelayedQueue(blockingFairQueue)代碼中
繼續(xù)進(jìn)入 new RedissonDelayedQueue()
可以看到這里創(chuàng)建了一個QueueTransferTask,實(shí)現(xiàn)了pushTaskAsync()方法,具體內(nèi)容是一個lua腳本,首先執(zhí)行zrangebyscore 獲取過期的前一百條數(shù)據(jù),循環(huán)調(diào)用rpush,lrem,zrem,注意這里rpush的隊列為我們指定的延時隊列,也就是consumer端take的隊列。至此明白了消費(fèi)端的消息是方法pushTaskAsync()執(zhí)行后放入的。那么什么時候執(zhí)行這個方法呢。
進(jìn)入 queueTransferService.schedule(queueName, task)方法
這里會執(zhí)行start方法,繼續(xù)跟進(jìn)
這里可以看到添加了兩個listener,onSubcribe,onMessage,當(dāng)訂閱到消息時執(zhí)行onSubcribe中的pushTash,當(dāng)redis有新的消息通知,就會觸發(fā)scheduleTask(...)方法,startTime為上述中publish通知的元素過期時間
繼續(xù)進(jìn)入pushTask方法
這里可以看到一個熟悉的方法pushTaskAsync(),也就是前邊的一段lua腳本,用于將過期的消息放入阻塞隊列,并返回排在第一個的消息執(zhí)行scheduleTask()
繼續(xù)進(jìn)入scheduleTask()方法
如果時間差小于10毫秒則執(zhí)行pushTask方法,如果大于10毫秒則啟動一個延時任務(wù),到時間后執(zhí)行pushTask方法。pushTask與scheduleTask互相調(diào)用循環(huán)往復(fù)
流程總結(jié)
至此源碼分析完畢,整個流程總結(jié)如下:
發(fā)送端只是往zset,list,添加數(shù)據(jù),并且發(fā)布一條訂閱消息
消費(fèi)端收到訂閱消息后會查詢zset中的過期消息,并放入阻塞隊列供消費(fèi)端take消息,并且獲取zset第一個消息,啟動一個延時任務(wù),到期后繼續(xù)從zset中獲取過期消息如此循環(huán)。
此時就回答了上邊的問題 那么為什么沒有訂閱就消費(fèi)不到消息了呢?
如果沒有訂閱的話消費(fèi)端就收不到訂閱消息,也就不會去獲取過期時間放入阻塞隊列進(jìn)行循環(huán)。
以上就是生產(chǎn)redisson延時隊列不消費(fèi)問題排查解決的詳細(xì)內(nèi)容,更多關(guān)于排查redisson延時隊列不消費(fèi)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Unable?to?connect?to?Redis無法連接到Redis解決的全過程
這篇文章主要給大家介紹了關(guān)于Unable?to?connect?to?Redis無法連接到Redis解決的相關(guān)資料,文中通過圖文以及實(shí)例代碼將解決的過程介紹的非常詳細(xì),需要的朋友可以參考下2023-03-03Redis常用的數(shù)據(jù)結(jié)構(gòu)及實(shí)際應(yīng)用場景
本文介紹了Redis中常用的數(shù)據(jù)結(jié)構(gòu),包括字符串、列表、集合、哈希表、有序集合和Bitmap,并詳細(xì)說明了它們在各種場景下的使用,需要的朋友可以參考下2024-05-05Redis高可用部署架構(gòu)的實(shí)現(xiàn)
本文主要介紹了Redis高可用部署架構(gòu)的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-08-08