欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

生產(chǎn)redisson延時隊列不消費(fèi)問題排查解決

 更新時間:2022年09月26日 15:57:07   作者:法力損毀  
這篇文章主要為大家介紹了生產(chǎn)redisson延時隊列不消費(fèi)問題排查解決,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

問題描述

項目使用redisson延時隊列功能,實(shí)現(xiàn)直播的開播提醒,突然有一天業(yè)務(wù)爆出問題,未觸發(fā)開播提醒。

初步排查

首先通過查詢生產(chǎn)日志,發(fā)送端日志存在,沒有消費(fèi)日志,猜測消費(fèi)端沒有消費(fèi)到延時消息,,在dba的協(xié)助下查詢redis隊列,消息也確實(shí)存在,但已經(jīng)過了過期時間,由此證明redisson消費(fèi)者出現(xiàn)問題。通過服務(wù)日志發(fā)現(xiàn)在最后一次設(shè)置自定義推送任務(wù)是在一次服務(wù)發(fā)布之前,服務(wù)發(fā)布后,之前設(shè)置的自定義推送消息均沒有被客戶端消費(fèi),由此猜想是由發(fā)布服務(wù)導(dǎo)致消費(fèi)端失效。

排查過程

發(fā)送端代碼

public <T> void produce(String delayQueue, T t, long delay, TimeUnit timeUnit) {
    try {
        log.info("delay msg,delayQueue:{},key:{},delay:{}", delayQueue, t, delay);
        if (delay < 0) {
            delay = 0;
        }
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(delayQueue);
        RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
        delayedQueue.offer(t, delay, timeUnit);
    }catch (Exception e){
        log.error("添加延時任務(wù)隊列失敗",e);
    }
}

消費(fèi)端代碼

public class DelayTaskHandler implements Runnable {
    @Override
    public void run() {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(delayQueue);
        while (true) {
            try {
                T value = blockingFairQueue.take();
                log.info("delay queue {},延時任務(wù)開始執(zhí)行,value - {} , timeStamp - {} , threadName - {}", delayQueue, value, System.currentTimeMillis(), Thread.currentThread().getName());
                consumer.accept(value);
            } catch (Exception e) {
                log.error("延時任務(wù)執(zhí)行失敗,", e);
            }
        }
    }
}

因為redisson 延時隊列是基于redis實(shí)現(xiàn)的,所以從redis執(zhí)行命令開始入手排查

1.打開redis監(jiān)控,啟動服務(wù),發(fā)現(xiàn)redis首先執(zhí)行了blpop命令,阻塞等待{cl-live-admin:notice_delay_queue} 隊列消息

2.提交一個延時任務(wù)后,觀察redis命令

此時發(fā)現(xiàn)redis首先執(zhí)行了一個SUBSCRIBE命令,訂閱了一個隊列,然后執(zhí)行了一段lua腳本,主要包括以下命令:

  • zrangebyscore:獲取zset中score在0至當(dāng)前時間戳范圍內(nèi)的前一百條數(shù)據(jù) 如果獲取到數(shù)據(jù)則循環(huán)執(zhí)行rpush,lrem,zrem命令
  • zrange:取zset中第一條數(shù)據(jù)
  • zadd:向zset中添加一條數(shù)據(jù),score為時間戳
  • rpush:向list右邊push一條數(shù)據(jù)
  • publish:如果添加的消息在頂部,則發(fā)布一條訂閱消息

3.消費(fèi)一條消息

同樣消費(fèi)的時候也是提交了一條lua腳本,主要執(zhí)行了以下命令 可以看到和發(fā)送端命令相似

  • zrangebyscore:獲取zset中score在0至當(dāng)前時間戳范圍內(nèi)的前一百條數(shù)據(jù)
  • rpush:向list右邊push一條數(shù)據(jù)
  • lrem:刪除一條數(shù)據(jù)
  • zrem:刪除zeset中的數(shù)據(jù)
  • zrange:獲取第一條數(shù)據(jù)
  • BLPOP:阻塞等待隊列消息

通過以上redis命令的執(zhí)行可以發(fā)現(xiàn)一個命令SUBCRIBE用于訂閱redis的一個隊列,而這個命令只在發(fā)送消息的時候執(zhí)行了,在消費(fèi)的時候沒有執(zhí)行。從而驗證了當(dāng)服務(wù)重啟后如果沒有新的消息發(fā)送,那么客戶端就不會發(fā)送SUBCRIBE命令,訂閱延時隊列,這就導(dǎo)致在服務(wù)重啟前發(fā)送的消息到時間后無法消費(fèi)。

解決方案

在消費(fèi)端啟動的時候添加一行代碼用于訂閱延時隊列

 //訂閱redis隊列
 redissonClient.getDelayedQueue(blockingFairQueue);

那么為什么沒有訂閱就消費(fèi)不到消息了呢?帶著疑問繼續(xù)深入理解redisson的實(shí)現(xiàn)

redisson 延時隊列原理

首先回到消費(fèi)端代碼

在我們沒有發(fā)送訂閱命令的時候,客戶端只是在阻塞等待一個指定隊列的消息,那么這個隊列的消息是誰放進(jìn)去的呢? 帶著疑問我們再看發(fā)送端代碼

直接進(jìn)入 delayedQueue.offer()方法內(nèi)部

可以看到發(fā)送端是提交了一個lua腳本主要執(zhí)行了zadd,rpush,publish命令,這里我們需要注意publish命令,在redis中pub/sub是對應(yīng)的,當(dāng)有publish的時候,那么subcribe端會收到該訂閱消息。

那么是誰收到了訂閱的消息,收到消息后又做了什么呢,回到redissonClient.getDelayedQueue(blockingFairQueue)代碼中

繼續(xù)進(jìn)入 new RedissonDelayedQueue()

可以看到這里創(chuàng)建了一個QueueTransferTask,實(shí)現(xiàn)了pushTaskAsync()方法,具體內(nèi)容是一個lua腳本,首先執(zhí)行zrangebyscore 獲取過期的前一百條數(shù)據(jù),循環(huán)調(diào)用rpush,lrem,zrem,注意這里rpush的隊列為我們指定的延時隊列,也就是consumer端take的隊列。至此明白了消費(fèi)端的消息是方法pushTaskAsync()執(zhí)行后放入的。那么什么時候執(zhí)行這個方法呢。

進(jìn)入 queueTransferService.schedule(queueName, task)方法

這里會執(zhí)行start方法,繼續(xù)跟進(jìn)

這里可以看到添加了兩個listener,onSubcribe,onMessage,當(dāng)訂閱到消息時執(zhí)行onSubcribe中的pushTash,當(dāng)redis有新的消息通知,就會觸發(fā)scheduleTask(...)方法,startTime為上述中publish通知的元素過期時間

繼續(xù)進(jìn)入pushTask方法

這里可以看到一個熟悉的方法pushTaskAsync(),也就是前邊的一段lua腳本,用于將過期的消息放入阻塞隊列,并返回排在第一個的消息執(zhí)行scheduleTask()

繼續(xù)進(jìn)入scheduleTask()方法

如果時間差小于10毫秒則執(zhí)行pushTask方法,如果大于10毫秒則啟動一個延時任務(wù),到時間后執(zhí)行pushTask方法。pushTask與scheduleTask互相調(diào)用循環(huán)往復(fù)

流程總結(jié)

至此源碼分析完畢,整個流程總結(jié)如下:

發(fā)送端只是往zset,list,添加數(shù)據(jù),并且發(fā)布一條訂閱消息

消費(fèi)端收到訂閱消息后會查詢zset中的過期消息,并放入阻塞隊列供消費(fèi)端take消息,并且獲取zset第一個消息,啟動一個延時任務(wù),到期后繼續(xù)從zset中獲取過期消息如此循環(huán)。

此時就回答了上邊的問題 那么為什么沒有訂閱就消費(fèi)不到消息了呢?

如果沒有訂閱的話消費(fèi)端就收不到訂閱消息,也就不會去獲取過期時間放入阻塞隊列進(jìn)行循環(huán)。

以上就是生產(chǎn)redisson延時隊列不消費(fèi)問題排查解決的詳細(xì)內(nèi)容,更多關(guān)于排查redisson延時隊列不消費(fèi)的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Unable?to?connect?to?Redis無法連接到Redis解決的全過程

    Unable?to?connect?to?Redis無法連接到Redis解決的全過程

    這篇文章主要給大家介紹了關(guān)于Unable?to?connect?to?Redis無法連接到Redis解決的相關(guān)資料,文中通過圖文以及實(shí)例代碼將解決的過程介紹的非常詳細(xì),需要的朋友可以參考下
    2023-03-03
  • redis使用Lua腳本解決多線程下的超賣問題及原因解析

    redis使用Lua腳本解決多線程下的超賣問題及原因解析

    這篇文章主要介紹了redis使用Lua腳本解決多線程下的超賣問題,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2023-05-05
  • Redis實(shí)現(xiàn)延遲隊列的項目示例

    Redis實(shí)現(xiàn)延遲隊列的項目示例

    延遲隊列是Redis的一個重要應(yīng)用場景,本文主要介紹了Redis實(shí)現(xiàn)延遲隊列的項目示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2024-06-06
  • 淺談Redis變慢的原因及排查方法

    淺談Redis變慢的原因及排查方法

    本文主要介紹了淺談Redis變慢的原因及排查方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2022-06-06
  • Redis常用的數(shù)據(jù)結(jié)構(gòu)及實(shí)際應(yīng)用場景

    Redis常用的數(shù)據(jù)結(jié)構(gòu)及實(shí)際應(yīng)用場景

    本文介紹了Redis中常用的數(shù)據(jù)結(jié)構(gòu),包括字符串、列表、集合、哈希表、有序集合和Bitmap,并詳細(xì)說明了它們在各種場景下的使用,需要的朋友可以參考下
    2024-05-05
  • Redis的9種數(shù)據(jù)類型用法解讀

    Redis的9種數(shù)據(jù)類型用法解讀

    這篇文章主要介紹了Redis的9種數(shù)據(jù)類型用法及說明,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-02-02
  • Redis高可用部署架構(gòu)的實(shí)現(xiàn)

    Redis高可用部署架構(gòu)的實(shí)現(xiàn)

    本文主要介紹了Redis高可用部署架構(gòu)的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-08-08
  • Django使用redis配置緩存的方法

    Django使用redis配置緩存的方法

    Redis是一個內(nèi)存數(shù)據(jù)庫由于其性能極高,因此經(jīng)常作為中間件、緩存使用,緩存某些內(nèi)容是為了保存昂貴計算的結(jié)果,這樣就不必在下次執(zhí)行計算,接下來通過本文給大家分享redis配置緩存的方法,感興趣的朋友一起看看吧
    2021-06-06
  • Redis如何使用lua腳本實(shí)例教程

    Redis如何使用lua腳本實(shí)例教程

    這篇文章主要給大家介紹了關(guān)于Redis如何使用lua腳本的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2018-10-10
  • Redis分布式鎖詳細(xì)介紹

    Redis分布式鎖詳細(xì)介紹

    大家好,本篇文章主要講的是Redis分布式鎖詳細(xì)介紹,感興趣的同學(xué)趕快來看一看吧,對你有幫助的話記得收藏一下,方便下次瀏覽
    2021-12-12

最新評論