欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

redis在spring boot中異常退出的問題解決方案

 更新時間:2025年05月22日 11:25:31   作者:簡誠  
這篇文章主要介紹了redis在spring boot中異常退出的問題解決方案,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友參考下吧

問題:

Exception in thread "rtsp-consumer-3" org.springframework.data.redis.RedisConnectionFailureException: Unable to connect to Redis; nested exception is io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
    at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.translateException(LettuceConnectionFactory.java:1689)
    at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.getConnection(LettuceConnectionFactory.java:1597)
    at org.springframework.data.redis.connection.lettuce.LettuceConnection.doGetAsyncDedicatedConnection(LettuceConnection.java:1006)
    at org.springframework.data.redis.connection.lettuce.LettuceConnection.getOrCreateDedicatedConnection(LettuceConnection.java:1069)
    at org.springframework.data.redis.connection.lettuce.LettuceConnection.getAsyncDedicatedConnection(LettuceConnection.java:990)
    at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.getAsyncDedicatedConnection(LettuceStreamCommands.java:395)
    at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xReadGroup(LettuceStreamCommands.java:346)
    at org.springframework.data.redis.connection.DefaultedRedisConnection.xReadGroup(DefaultedRedisConnection.java:592)
    at org.springframework.data.redis.core.DefaultStreamOperations$4.inRedis(DefaultStreamOperations.java:310)
    at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:387)
    at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:382)
    at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:222)
    at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:189)
    at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:96)
    at org.springframework.data.redis.core.DefaultStreamOperations.read(DefaultStreamOperations.java:305)
    at com.ruoyi.vedioFrame.utils.RedisStreamOperations.readGroup(RedisStreamOperations.java:70)
    at com.ruoyi.vedioFrame.service.impl.StreamConsumerService.consumeFrames(StreamConsumerService.java:200)
    at com.ruoyi.vedioFrame.service.impl.StreamConsumerService.lambda$null$0(StreamConsumerService.java:108)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
    at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:78)
    at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:56)
    at io.lettuce.core.AbstractRedisClient.getConnection(AbstractRedisClient.java:330)
    at io.lettuce.core.RedisClient.connect(RedisClient.java:216)
    at org.springframework.data.redis.connection.lettuce.StandaloneConnectionProvider.lambda$getConnection$1(StandaloneConnectionProvider.java:115)
    at java.util.Optional.orElseGet(Optional.java:267)
    at org.springframework.data.redis.connection.lettuce.StandaloneConnectionProvider.getConnection(StandaloneConnectionProvider.java:115)
    at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.getConnection(LettuceConnectionFactory.java:1595)
    ... 19 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
Caused by: java.net.BindException: Address already in use: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:745) 

解決

根據(jù)代碼和現(xiàn)象分析,根本原因可能是處理時間過長導致Redis消息積壓觸發(fā)超時異常。以下是詳細分析和解決方案:

?? 問題根源

同步阻塞模型

  • frameProcessor.process()調(diào)用外部接口時存在網(wǎng)絡延遲,導致消息處理時間超過Redis的BLOCK超時(2000ms)。
  • 未及時ACK會導致消息重新進入Pending狀態(tài),積累后觸發(fā)READONLY錯誤或消費者組失效。

Thread.sleep(1000)加劇阻塞

  • 即使沒有消息,每次循環(huán)強制休眠1秒,降低消費吞吐量,無法及時處理積壓消息。

異常未妥善處理

  • 注釋掉的try-catch導致處理過程中的異常(如網(wǎng)絡超時)未被捕獲,中斷循環(huán)后Redis連接未正確恢復。

??? 解決方案

1. 異步處理 + 提前ACK(關鍵步驟)

將消息處理與ACK分離,處理前立即ACK避免消息積壓,異步處理業(yè)務邏輯:

ExecutorService executor = Executors.newCachedThreadPool(); // 全局線程池
while (!Thread.currentThread().isInterrupted()) {
    List<MapRecord<String, Object, Object>> records = redisStreamOperations.readGroup(...);
    if (records != null) {
        for (MapRecord<String, Object, Object> record : records) {
            // 提前ACK釋放消息
            redisStreamOperations.ack(streamKey, groupName, record.getId());
            // 提交異步任務處理業(yè)務
            executor.submit(() -> {
                try {
                    String strdata = (String) record.getValue().get("frameData");
                    byte[] frameData = Base64.getDecoder().decode(strdata);
                    BufferedImage image = ImageIO.read(new ByteArrayInputStream(frameData));
                    frameProcessor.process(image, streamId, commonQueues, regions);
                } catch (Exception e) {
                    log.error("處理幀失敗: {}", e.getMessage());
                }
            });
        }
    }
    // 移除Thread.sleep(1000) 提升吞吐量
}

2. 調(diào)整Redis消費者組參數(shù)

增加XGROUP CREATE時的BLOCKIDLE超時:

XGROUP CREATE stream:1234 mygroup $ MKSTREAM BLOCK 5000 IDLE 30000

3. 優(yōu)化Redis連接配置

確保RedisTemplate的讀寫超時大于業(yè)務處理最長時間:

@Bean
public RedisTemplate<String, Object> redisTemplate() {
    RedisTemplate<String, Object> template = new RedisTemplate<>();
    template.setConnectionFactory(redisConnectionFactory());
    // 設置超時時間(示例:10秒)
    RedisConnectionFactory factory = template.getConnectionFactory();
    if (factory instanceof LettuceConnectionFactory) {
        ((LettuceConnectionFactory) factory).setTimeout(Duration.ofSeconds(10));
    }
    return template;
}

4. 監(jiān)控Pending隊列

定期檢查Pending消息數(shù)量,異常時重置消費者:

// 在循環(huán)中添加檢查
Long pending = redisTemplate.opsForStream().pending(streamKey, groupName);
if (pending > 1000) { // 閾值根據(jù)業(yè)務設定
    redisTemplate.opsForStream().destroyGroup(streamKey, groupName);
    redisTemplate.opsForStream().createGroup(streamKey, groupName);
}

5. 添加熔斷機制

當外部接口連續(xù)失敗時,暫停消費避免雪崩:

// 全局熔斷器
CircuitBreaker breaker = CircuitBreaker.ofDefaults("frameProcessor");
// 處理邏輯改為
breaker.executeSupplier(() -> {
    frameProcessor.process(...);
    return null;
});

?? 驗證方案

日志監(jiān)控

// 在異步任務中添加耗時日志
long start = System.currentTimeMillis();
frameProcessor.process(...);
log.info("處理耗時: {}ms", System.currentTimeMillis() - start);

確保單次處理時間低于Redis超時閾值。

Redis CLI檢查

XINFO GROUPS stream:1234  # 查看消費者組狀態(tài)
XPENDING stream:1234 mygroup # 查看Pending消息

?? 關鍵優(yōu)化點總結(jié)

措施作用
異步處理 + 提前ACK徹底解耦處理耗時與Redis消費
增大Redis超時參數(shù)避免服務端主動斷開連接
線程池隔離防止外部接口故障拖垮整個消費服務
熔斷機制快速失敗,保護系統(tǒng)穩(wěn)定性

通過以上改造,即使frameProcessor.process()調(diào)用外部平臺接口,也能通過異步化和資源隔離保證Redis消費鏈路的穩(wěn)定性。最終只需確保線程池和熔斷參數(shù)適配業(yè)務負載即可。

代碼具體添加位置:

以下是針對 優(yōu)化Redis連接配置、監(jiān)控Pending隊列添加熔斷機制 的代碼具體添加位置的詳細說明:

一、優(yōu)化Redis連接配置

位置:Redis配置類
在Spring Boot的Redis配置類中調(diào)整超時參數(shù)(通常為RedisConfig.java):

@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
        // 設置連接超時和讀寫超時(關鍵參數(shù))
        lettuceConnectionFactory.setTimeout(Duration.ofSeconds(10));  // 命令超時時間
        lettuceConnectionFactory.setShareNativeConnection(false);    // 禁用共享連接,避免阻塞
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(lettuceConnectionFactory);
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        return template;
    }
}

關鍵參數(shù)說明

  • setTimeout(10秒):確保超時時間大于frameProcessor.process()的最長處理時間
  • setShareNativeConnection(false):避免多個線程共享同一個連接導致阻塞。

二、監(jiān)控Pending隊列

位置:consumeFrames方法內(nèi)的循環(huán)中
在消費消息的主循環(huán)中定期檢查Pending隊列:

private void consumeFrames(String streamId, String groupName, String consumerName,
                           CommonQueues commonQueues, String regions) throws InterruptedException, IOException {
    // ... 其他初始化代碼 ...
    int checkPendingInterval = 10; // 每處理10次循環(huán)檢查一次Pending隊列
    int loopCount = 0;
    while (!Thread.currentThread().isInterrupted()) {
        // ... 原有代碼讀取消息 ...
        // 監(jiān)控Pending隊列的邏輯(添加位置)
        loopCount++;
        if (loopCount % checkPendingInterval == 0) {
            String streamKey = "stream:" + streamId;
            PendingMessages pending = redisStreamOperations.pending(streamKey, groupName);
            if (pending != null && pending.getTotalPendingMessages() > 1000) { // 閾值根據(jù)業(yè)務調(diào)整
                log.warn("檢測到Pending消息積壓 {} 條,重置消費者組", pending.getTotalPendingMessages());
                redisStreamOperations.destroyGroup(streamKey, groupName);
                redisStreamOperations.createGroup(StreamKey.of(streamKey), groupName);
            }
        }
        // ... 后續(xù)處理代碼 ...
    }
}

說明

  • 通過redisStreamOperations.pending()獲取當前Pending消息數(shù)。
  • 當Pending消息超過閾值時,強制銷毀并重建消費者組,避免消息卡死。

三、添加熔斷機制

位置:處理消息的業(yè)務邏輯外層
使用Resilience4j熔斷器包裹frameProcessor.process()調(diào)用:

1. 熔斷器配置類

@Configuration
public class CircuitBreakerConfig {
    @Bean
    public CircuitBreaker frameProcessorCircuitBreaker() {
        CircuitBreakerConfig config = CircuitBreakerConfig.custom()
            .failureRateThreshold(50)          // 失敗率閾值50%
            .slidingWindowType(SlidingWindowType.COUNT_BASED)
            .slidingWindowSize(10)             // 基于最近10次調(diào)用統(tǒng)計
            .minimumNumberOfCalls(5)           // 最少5次調(diào)用后開始計算
            .waitDurationInOpenState(Duration.ofSeconds(30)) // 熔斷后30秒進入半開狀態(tài)
            .build();
        return CircuitBreakerRegistry.of(config).circuitBreaker("frameProcessor");
    }
}

2. 在消費代碼中使用熔斷器

public class YourConsumerClass {
    @Autowired
    private CircuitBreaker frameProcessorCircuitBreaker; // 注入熔斷器
    private void consumeFrames(...) {
        // ... 原有代碼 ...
        for (MapRecord<String, Object, Object> record : records) {
            redisStreamOperations.ack(...); // 提前ACK
            // 使用熔斷器保護處理邏輯(添加位置)
            Try.runRunnable(() -> frameProcessorCircuitBreaker.executeRunnable(() -> {
                String strdata = (String) record.getValue().get("frameData");
                byte[] frameData = Base64.getDecoder().decode(strdata);
                BufferedImage image = ImageIO.read(new ByteArrayInputStream(frameData));
                frameProcessor.process(image, streamId, commonQueues, regions);
            })).onFailure(e -> log.error("處理失敗且熔斷: {}", e.getMessage()));
        }
        // ... 后續(xù)代碼 ...
    }
}

熔斷邏輯說明

  • frameProcessor.process()連續(xù)失敗觸發(fā)閾值時,熔斷器會暫時阻止后續(xù)調(diào)用,避免雪崩效應。
  • 熔斷期間直接跳過處理,但仍會ACK消息(根據(jù)業(yè)務需求選擇是否重試)。

四、代碼集成位置總結(jié)

優(yōu)化措施代碼位置關鍵注解
Redis連接配置Redis配置類(如RedisConfig.java調(diào)整超時時間和連接池參數(shù)
Pending隊列監(jiān)控consumeFrames方法的主循環(huán)內(nèi)定期檢查+自動重置消費者組
熔斷機制業(yè)務處理代碼外層(包裹frameProcessor.process依賴熔斷器庫(如Resilience4j)

五、參數(shù)調(diào)整建議

Redis超時

  • lettuceConnectionFactory.setTimeout應大于frameProcessor.process()的最大處理時間 + 網(wǎng)絡抖動余量(如設置為實際最大處理時間的2倍)。

Pending隊列閾值

  • 如果每秒處理100條消息,閾值可設置為1000(相當于10秒積壓量)。

熔斷器參數(shù)

  • failureRateThreshold:根據(jù)外部接口的穩(wěn)定性調(diào)整(如頻繁超時可設為70%)。
  • waitDurationInOpenState:根據(jù)外部服務恢復時間調(diào)整(如30秒到5分鐘)。

通過以上改造,即使frameProcessor.process()調(diào)用外部平臺接口,也能通過資源隔離、快速失敗和自動恢復機制保障Redis消費鏈路的穩(wěn)定性。

到此這篇關于redis在spring boot中異常退出的文章就介紹到這了,更多相關redis spring boot異常退出內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

相關文章

  • Redis緩存異常常用解決方案總結(jié)

    Redis緩存異常常用解決方案總結(jié)

    Redis緩存異常問題分別是緩存雪崩,緩存預熱,緩存穿透,緩存降級,緩存擊穿,本文主要介紹了Redis緩存異常常用解決方案總結(jié),具有一定的參考價值,感興趣的可以了解一下
    2023-12-12
  • Redis配置文件代碼講解

    Redis配置文件代碼講解

    在本篇文章里小編給大家整理的是一篇關于Redis配置文件的說明內(nèi)容,需要的朋友們可以學習下。
    2020-03-03
  • 如何利用 Redis 實現(xiàn)接口頻次限制

    如何利用 Redis 實現(xiàn)接口頻次限制

    這篇文章主要介紹了如何利用 Redis 實現(xiàn)接口頻次限制,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-02-02
  • redis中bind配置的詳細步驟

    redis中bind配置的詳細步驟

    本文主要介紹了redis中bind配置的詳細步驟,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-07-07
  • Redis的持久化方式

    Redis的持久化方式

    Redis提供了兩種主要的持久化方式:RDB和AOF,RDB通過定時快照的方式保存數(shù)據(jù)狀態(tài),而AOF記錄每個寫操作以便于重啟時重放,兩者可以結(jié)合使用,且在重啟時AOF文件會被優(yōu)先用于數(shù)據(jù)恢復,RDB快照具有速度快、節(jié)省磁盤空間的優(yōu)點,但可能會丟失最近的數(shù)據(jù)
    2024-10-10
  • 緩存替換策略及應用(以Redis、InnoDB為例)

    緩存替換策略及應用(以Redis、InnoDB為例)

    本文以Redis、InnoDB為例給大家講解緩存替換策略及應用,本文給大家提到五種置換策略,通過實例代碼給大家介紹的非常詳細,需要的朋友參考下吧
    2021-07-07
  • Redis實現(xiàn)延遲隊列的項目示例

    Redis實現(xiàn)延遲隊列的項目示例

    延遲隊列是Redis的一個重要應用場景,本文主要介紹了Redis實現(xiàn)延遲隊列的項目示例,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2024-06-06
  • Redis中ServiceStack.Redis和StackExchange.Redis區(qū)別詳解

    Redis中ServiceStack.Redis和StackExchange.Redis區(qū)別詳解

    本文主要介紹了Redis中ServiceStack.Redis和StackExchange.Redis區(qū)別詳解,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-05-05
  • 淺談內(nèi)存耗盡后Redis會發(fā)生什么

    淺談內(nèi)存耗盡后Redis會發(fā)生什么

    這篇文章主要介紹了淺談內(nèi)存耗盡后Redis會發(fā)生什么,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2021-03-03
  • Redis Caffeine實現(xiàn)兩級緩存的項目實踐

    Redis Caffeine實現(xiàn)兩級緩存的項目實踐

    本文介紹了使用Redis和Caffeine實現(xiàn)兩級緩存,以提高查詢接口的性能,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2024-12-12

最新評論