欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Redis Subscribe timeout 報錯的問題解決

 更新時間:2025年08月15日 11:04:59   作者:一樂小哥  
最近系統(tǒng)偶爾報出org.redisson.client.RedisTimeoutException: Subscribe timeout: (7500ms)的錯誤,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

?? 介紹

Redisson版本 2.8.2

最近公司系統(tǒng)偶爾報出org.redisson.client.RedisTimeoutException: Subscribe timeout: (7500ms)的錯誤,觀察堆棧信息看到報錯是一段使用Redisson的redis鎖的地方,去除業(yè)務(wù)邏輯代碼基本如下

public void mockLock(String phoneNum) {
log.info("{} - prepare lock", threadName);
RLock lock = redissonClient.getLock("redis_cache_test" + phoneNum);
try {
    lock.lock();
    log.info("{} - get lock", threadName);
    //睡眠10s
    Thread.sleep(10000);
} catch (Exception e) {
    log.info("{} - exception", threadName,e);
} finally {
    log.info("{} - unlock lock", threadName);
    lock.unlock();
}

導(dǎo)致報錯的代碼是lock.lock()的實(shí)現(xiàn)

@Override
public void syncSubscription(RFuture<?> future) {
    MasterSlaveServersConfig config = connectionManager.getConfig();
    try {
        int timeout = config.getTimeout() + config.getRetryInterval()*config.getRetryAttempts();
        if (!future.await(timeout)) {
            throw new RedisTimeoutException("Subscribe timeout: (" + timeout + "ms)");
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    future.syncUninterruptibly();
}

溯因

syncSubscription中的futureRedissonLock.subscribe(long threadId)方法

protected RFuture<RedissonLockEntry> subscribe(long threadId) {
    return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager());
}

這里可以看出大概是在PUBSUB中獲取一個訂閱,再往下看源碼

public RFuture<E> subscribe(final String entryName, final String channelName, final ConnectionManager connectionManager) {
    //監(jiān)聽持有
    final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
    //獲取鎖訂閱隊列
    final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName);
    //訂閱拒絕實(shí)現(xiàn)
    final RPromise<E> newPromise = new PromiseDelegator<E>(connectionManager.<E>newPromise()) {
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return semaphore.remove(listenerHolder.get());
        }
    };

    Runnable listener = new Runnable() {

        @Override
        public void run() {
        //判斷是否已經(jīng)存在相同的entry
            E entry = entries.get(entryName);
            if (entry != null) {
                entry.aquire();
                semaphore.release();
                entry.getPromise().addListener(new TransferListener<E>(newPromise));
                return;
            }
            //沒有則新建
            E value = createEntry(newPromise);
            value.aquire();
            
            E oldValue = entries.putIfAbsent(entryName, value);
            if (oldValue != null) {
                oldValue.aquire();
                semaphore.release();
                oldValue.getPromise().addListener(new TransferListener<E>(newPromise));
                return;
            }
            //監(jiān)聽對應(yīng)的entry
            RedisPubSubListener<Object> listener = createListener(channelName, value);
            //訂閱事件
            connectionManager.subscribe(LongCodec.INSTANCE, channelName, listener, semaphore);
        }
    };
    //用semaphore管理監(jiān)聽隊列,因為可能存在多個線程等待一個鎖
    semaphore.acquire(listener);
    //保證訂閱拒絕邏輯
    listenerHolder.set(listener);
    
    return newPromise;
}

這里可以看到這個方法其實(shí)只是定義了一個名叫listener的Runnable, semaphore.acquire(listener);則保證了同一個channel僅會有一個線程去監(jiān)聽,其他的繼續(xù)等待,而訂閱邏輯還在connectionManager.subscribe里面

private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?> listener, 
        final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock) {
    final PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName);
    if (connEntry != null) {
        connEntry.addListener(channelName, listener);
        connEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
            @Override
            public void operationComplete(Future<Void> future) throws Exception {
                lock.release();
                promise.trySuccess(connEntry);
            }
        });
        return;
    }

    freePubSubLock.acquire(new Runnable() {

        @Override
        public void run() {
            if (promise.isDone()) {
                return;
            }
            //如果沒有獲取到公共的連接直接返回
            final PubSubConnectionEntry freeEntry = freePubSubConnections.peek();
            if (freeEntry == null) {
                connect(codec, channelName, listener, promise, type, lock);
                return;
            }
            //entry有個計數(shù)器subscriptionsPerConnection
            如果為-1報錯因為下面有0的判斷
            int remainFreeAmount = freeEntry.tryAcquire();
            if (remainFreeAmount == -1) {
                throw new IllegalStateException();
            }
            
            final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
            if (oldEntry != null) {
                freeEntry.release();
                freePubSubLock.release();
                
                oldEntry.addListener(channelName, listener);
                oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
                    @Override
                    public void operationComplete(Future<Void> future) throws Exception {
                        lock.release();
                        promise.trySuccess(oldEntry);
                    }
                });
                return;
            }
            //subscriptionsPerConnection為0時從公共連接池中吐出
            if (remainFreeAmount == 0) {
                freePubSubConnections.poll();
            }
            freePubSubLock.release();
            
            freeEntry.addListener(channelName, listener);
            freeEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
                @Override
                public void operationComplete(Future<Void> future) throws Exception {
                    lock.release();
                    promise.trySuccess(freeEntry);
                }
            });
            
            if (PubSubType.PSUBSCRIBE == type) {
                freeEntry.psubscribe(codec, channelName);
            } else {
                freeEntry.subscribe(codec, channelName);
            }
        }
        
    });
}

這里在沒有連接的情況下會進(jìn)到connect(codec, channelName, listener, promise, type, lock);中去

private void connect(final Codec codec, final String channelName, final RedisPubSubListener<?> listener,
        final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock) {
    final int slot = calcSlot(channelName);
    //根據(jù)subscriptionConnectionPoolSize獲取下一個鏈接
    RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
    connFuture.addListener(new FutureListener<RedisPubSubConnection>() {

        @Override
        public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
            if (!future.isSuccess()) {
                freePubSubLock.release();
                lock.release();
                promise.tryFailure(future.cause());
                return;
            }

            RedisPubSubConnection conn = future.getNow();
            
            final PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
            entry.tryAcquire();
            
            final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
            if (oldEntry != null) {
                releaseSubscribeConnection(slot, entry);
                
                freePubSubLock.release();
                
                oldEntry.addListener(channelName, listener);
                oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
                    @Override
                    public void operationComplete(Future<Void> future) throws Exception {
                        lock.release();
                        promise.trySuccess(oldEntry);
                    }
                });
                return;
            }
            
            freePubSubConnections.add(entry);
            freePubSubLock.release();
            
            entry.addListener(channelName, listener);
            entry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
                @Override
                public void operationComplete(Future<Void> future) throws Exception {
                    lock.release();
                    promise.trySuccess(entry);
                }
            });
            
            if (PubSubType.PSUBSCRIBE == type) {
                entry.psubscribe(codec, channelName);
            } else {
                entry.subscribe(codec, channelName);
            }
            
        }
    });
}

這里的RFuture<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);最終會調(diào)用ClientConnectionsEntry#acquireSubscribeConnection方法的
freeSubscribeConnectionsCounter.acquire(runnable) 至此我們找到原因 當(dāng)同時等待鎖訂閱消息達(dá)到subscriptionConnectionPoolSize*subscriptionsPerConnection個時,再多一個訂閱消息,連接一直無法獲取導(dǎo)致MasterSlaveConnectionManager中的freePubSubLock沒有釋放。 另外由于在超時場景下MasterSlaveConnectionManager向連接池獲取連接后是直接緩存下來,不把分發(fā)訂閱鏈接釋返回給連接池的,因此導(dǎo)致freeSubscribeConnectionsCounter一直等待,出現(xiàn)死鎖情況。

最終表現(xiàn)就是org.redisson.client.RedisTimeoutException: Subscribe timeout: (7500ms)

復(fù)現(xiàn)

Redis配置

public RedissonClient redissonClient(RedisConfig redisConfig) {

    Config config = new Config();
    config.useSingleServer()
        .setAddress(redisConfig.getHost() + ":" + redisConfig.getPort())
        .setPassword(redisConfig.getPassword())
        .setDatabase(redisConfig.getDatabase())
        .setConnectTimeout(redisConfig.getConnectionTimeout())
        .setTimeout(redisConfig.getTimeout())
        //把兩個配置項設(shè)置為1
        .setSubscriptionConnectionPoolSize(1)
        .setSubscriptionsPerConnection(1);
    return Redisson.create(config);
}

測試方法

void contextLoads() throws InterruptedException {
    Runnable runnable = () -> {
        redissonLock.tryRedissonLock();
    };
    new Thread(runnable, "線程1").start();
    new Thread(runnable, "線程12").start();
    new Thread(runnable, "線程23").start();
    new Thread(runnable, "線程21").start();
    
    Thread.sleep(200000);
}

結(jié)果

org.redisson.client.RedisTimeoutException: Subscribe timeout: (5500ms)
	at org.redisson.command.CommandAsyncService.syncSubscription(CommandAsyncService.java:126) ~[redisson-2.8.2.jar:na]
	at org.redisson.RedissonLock.lockInterruptibly(RedissonLock.java:121) ~[redisson-2.8.2.jar:na]
	at org.redisson.RedissonLock.lockInterruptibly(RedissonLock.java:108) ~[redisson-2.8.2.jar:na]
	at org.redisson.RedissonLock.lock(RedissonLock.java:90) ~[redisson-2.8.2.jar:na]
	at com.rick.redislock.lock.RedissonLock.registerPersonalMember(RedissonLock.java:30) ~[classes/:na]
	at com.rick.redislock.RedisLockApplicationTests.lambda$contextLoads$0(RedisLockApplicationTests.java:15) [test-classes/:na]
	at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_144]

符合預(yù)期

到此這篇關(guān)于Redis Subscribe timeout 報錯的問題解決的文章就介紹到這了,更多相關(guān)Redis Subscribe timeout 報錯內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Redis憑啥可以這么快

    Redis憑啥可以這么快

    本文詳細(xì)的介紹了為啥使用Redis的時候,可以做到非常快的讀取速度,對于大家學(xué)習(xí)Redis非常有幫助,希望大家喜歡
    2021-02-02
  • Redis?腳本和連接命令示例詳解

    Redis?腳本和連接命令示例詳解

    Redis腳本是一種可以實(shí)現(xiàn)復(fù)雜任務(wù)的腳本語言,可以用來快速履行復(fù)雜任務(wù),靈活處理數(shù)據(jù)管理和管理復(fù)雜的利用場景,這篇文章主要介紹了Redis?腳本和連接命令,需要的朋友可以參考下
    2023-09-09
  • redis快速部署為docker容器的方法實(shí)現(xiàn)

    redis快速部署為docker容器的方法實(shí)現(xiàn)

    部署 Redis 作為 Docker 容器是一種快速、靈活且可重復(fù)使用的方式,特別適合開發(fā)、測試和部署環(huán)境,本文主要介紹了redis快速部署為docker容器的方法實(shí)現(xiàn),具有一定的參考價值,感興趣的可以了解一下
    2024-05-05
  • Redis的數(shù)據(jù)類型和內(nèi)部編碼詳解

    Redis的數(shù)據(jù)類型和內(nèi)部編碼詳解

    Redis是通過Key-Value的形式來組織數(shù)據(jù)的,而Key的類型都是String,而Value的類型可以有很多,在Redis中最通用的數(shù)據(jù)類型大致有這幾種:String、List、Set、Hash、Sorted Set,下面通過本文介紹Redis數(shù)據(jù)類型和內(nèi)部編碼,感興趣的朋友一起看看吧
    2024-04-04
  • 詳解Redis SCAN命令實(shí)現(xiàn)有限保證的原理

    詳解Redis SCAN命令實(shí)現(xiàn)有限保證的原理

    這篇文章主要介紹了Redis SCAN命令實(shí)現(xiàn)有限保證的原理,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價值 ,需要的朋友可以參考下
    2019-07-07
  • Redis集群擴(kuò)容的實(shí)現(xiàn)示例

    Redis集群擴(kuò)容的實(shí)現(xiàn)示例

    本文介紹了在虛擬機(jī)上新建Redis集群,并將新增節(jié)點(diǎn)加入現(xiàn)有集群,通過配置文件和`redis-cli`命令,成功實(shí)現(xiàn)了Redis集群的擴(kuò)容,感興趣的可以了解一下
    2025-02-02
  • Windows安裝Redis的幾種方式與測試流程總結(jié)

    Windows安裝Redis的幾種方式與測試流程總結(jié)

    本文系統(tǒng)梳理了在 Windows 系統(tǒng)上安裝和使用 Redis 的多種方式,涵蓋通過端口號識別運(yùn)行中的 Redis 實(shí)例、進(jìn)程定位方法,并提供了 Java 環(huán)境下的連接與測試示例,同時還介紹了常見的圖形化管理工具,便于可視化管理與調(diào)試,需要的朋友可以參考下
    2025-05-05
  • Redis的五種基本類型和業(yè)務(wù)場景和使用方式

    Redis的五種基本類型和業(yè)務(wù)場景和使用方式

    Redis是一種高性能的鍵值存儲數(shù)據(jù)庫,支持多種數(shù)據(jù)結(jié)構(gòu)如字符串、列表、集合、哈希表和有序集合等,它提供豐富的API和持久化功能,適用于緩存、消息隊列、排行榜等多種場景,Redis能夠?qū)崿F(xiàn)高速讀寫操作,尤其適合需要快速響應(yīng)的應(yīng)用
    2024-10-10
  • 從MySQL到Redis的簡單數(shù)據(jù)庫遷移方法

    從MySQL到Redis的簡單數(shù)據(jù)庫遷移方法

    這篇文章主要介紹了從MySQL到Redis的簡單數(shù)據(jù)庫遷移方法,注意Redis數(shù)據(jù)庫基于內(nèi)存,并不能代替?zhèn)鹘y(tǒng)數(shù)據(jù)庫,需要的朋友可以參考下
    2015-06-06
  • Redis報錯UnrecognizedPropertyException: Unrecognized field問題

    Redis報錯UnrecognizedPropertyException: Unrecognized 

    在使用SpringBoot訪問Redis時,報錯提示識別不了屬性headPart,經(jīng)過排查,發(fā)現(xiàn)并非Serializable或getset方法問題,而是存在一個方法getHeadPart,但無headPart屬性,解決方案是將getHeadPart改為makeHeadPart
    2024-10-10

最新評論