redis在spring boot中異常退出的問題解決方案
問題:
Exception in thread "rtsp-consumer-3" org.springframework.data.redis.RedisConnectionFailureException: Unable to connect to Redis; nested exception is io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.translateException(LettuceConnectionFactory.java:1689)
at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.getConnection(LettuceConnectionFactory.java:1597)
at org.springframework.data.redis.connection.lettuce.LettuceConnection.doGetAsyncDedicatedConnection(LettuceConnection.java:1006)
at org.springframework.data.redis.connection.lettuce.LettuceConnection.getOrCreateDedicatedConnection(LettuceConnection.java:1069)
at org.springframework.data.redis.connection.lettuce.LettuceConnection.getAsyncDedicatedConnection(LettuceConnection.java:990)
at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.getAsyncDedicatedConnection(LettuceStreamCommands.java:395)
at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xReadGroup(LettuceStreamCommands.java:346)
at org.springframework.data.redis.connection.DefaultedRedisConnection.xReadGroup(DefaultedRedisConnection.java:592)
at org.springframework.data.redis.core.DefaultStreamOperations$4.inRedis(DefaultStreamOperations.java:310)
at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:387)
at org.springframework.data.redis.core.DefaultStreamOperations$RecordDeserializingRedisCallback.doInRedis(DefaultStreamOperations.java:382)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:222)
at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:189)
at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:96)
at org.springframework.data.redis.core.DefaultStreamOperations.read(DefaultStreamOperations.java:305)
at com.ruoyi.vedioFrame.utils.RedisStreamOperations.readGroup(RedisStreamOperations.java:70)
at com.ruoyi.vedioFrame.service.impl.StreamConsumerService.consumeFrames(StreamConsumerService.java:200)
at com.ruoyi.vedioFrame.service.impl.StreamConsumerService.lambda$null$0(StreamConsumerService.java:108)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:78)
at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:56)
at io.lettuce.core.AbstractRedisClient.getConnection(AbstractRedisClient.java:330)
at io.lettuce.core.RedisClient.connect(RedisClient.java:216)
at org.springframework.data.redis.connection.lettuce.StandaloneConnectionProvider.lambda$getConnection$1(StandaloneConnectionProvider.java:115)
at java.util.Optional.orElseGet(Optional.java:267)
at org.springframework.data.redis.connection.lettuce.StandaloneConnectionProvider.getConnection(StandaloneConnectionProvider.java:115)
at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.getConnection(LettuceConnectionFactory.java:1595)
... 19 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
Caused by: java.net.BindException: Address already in use: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:745)
解決
根據(jù)代碼和現(xiàn)象分析,根本原因可能是處理時間過長導致Redis消息積壓觸發(fā)超時異常。以下是詳細分析和解決方案:
?? 問題根源
同步阻塞模型:
frameProcessor.process()
調(diào)用外部接口時存在網(wǎng)絡延遲,導致消息處理時間超過Redis的BLOCK
超時(2000ms)。- 未及時ACK會導致消息重新進入Pending狀態(tài),積累后觸發(fā)
READONLY
錯誤或消費者組失效。
Thread.sleep(1000)加劇阻塞:
- 即使沒有消息,每次循環(huán)強制休眠1秒,降低消費吞吐量,無法及時處理積壓消息。
異常未妥善處理:
- 注釋掉的try-catch導致處理過程中的異常(如網(wǎng)絡超時)未被捕獲,中斷循環(huán)后Redis連接未正確恢復。
??? 解決方案
1. 異步處理 + 提前ACK(關鍵步驟)
將消息處理與ACK分離,處理前立即ACK避免消息積壓,異步處理業(yè)務邏輯:
ExecutorService executor = Executors.newCachedThreadPool(); // 全局線程池 while (!Thread.currentThread().isInterrupted()) { List<MapRecord<String, Object, Object>> records = redisStreamOperations.readGroup(...); if (records != null) { for (MapRecord<String, Object, Object> record : records) { // 提前ACK釋放消息 redisStreamOperations.ack(streamKey, groupName, record.getId()); // 提交異步任務處理業(yè)務 executor.submit(() -> { try { String strdata = (String) record.getValue().get("frameData"); byte[] frameData = Base64.getDecoder().decode(strdata); BufferedImage image = ImageIO.read(new ByteArrayInputStream(frameData)); frameProcessor.process(image, streamId, commonQueues, regions); } catch (Exception e) { log.error("處理幀失敗: {}", e.getMessage()); } }); } } // 移除Thread.sleep(1000) 提升吞吐量 }
2. 調(diào)整Redis消費者組參數(shù)
增加XGROUP CREATE
時的BLOCK
和IDLE
超時:
XGROUP CREATE stream:1234 mygroup $ MKSTREAM BLOCK 5000 IDLE 30000
3. 優(yōu)化Redis連接配置
確保RedisTemplate的讀寫超時大于業(yè)務處理最長時間:
@Bean public RedisTemplate<String, Object> redisTemplate() { RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(redisConnectionFactory()); // 設置超時時間(示例:10秒) RedisConnectionFactory factory = template.getConnectionFactory(); if (factory instanceof LettuceConnectionFactory) { ((LettuceConnectionFactory) factory).setTimeout(Duration.ofSeconds(10)); } return template; }
4. 監(jiān)控Pending隊列
定期檢查Pending消息數(shù)量,異常時重置消費者:
// 在循環(huán)中添加檢查 Long pending = redisTemplate.opsForStream().pending(streamKey, groupName); if (pending > 1000) { // 閾值根據(jù)業(yè)務設定 redisTemplate.opsForStream().destroyGroup(streamKey, groupName); redisTemplate.opsForStream().createGroup(streamKey, groupName); }
5. 添加熔斷機制
當外部接口連續(xù)失敗時,暫停消費避免雪崩:
// 全局熔斷器 CircuitBreaker breaker = CircuitBreaker.ofDefaults("frameProcessor"); // 處理邏輯改為 breaker.executeSupplier(() -> { frameProcessor.process(...); return null; });
?? 驗證方案
日志監(jiān)控:
// 在異步任務中添加耗時日志 long start = System.currentTimeMillis(); frameProcessor.process(...); log.info("處理耗時: {}ms", System.currentTimeMillis() - start);
確保單次處理時間低于Redis超時閾值。
Redis CLI檢查:
XINFO GROUPS stream:1234 # 查看消費者組狀態(tài) XPENDING stream:1234 mygroup # 查看Pending消息
?? 關鍵優(yōu)化點總結(jié)
措施 | 作用 |
---|---|
異步處理 + 提前ACK | 徹底解耦處理耗時與Redis消費 |
增大Redis超時參數(shù) | 避免服務端主動斷開連接 |
線程池隔離 | 防止外部接口故障拖垮整個消費服務 |
熔斷機制 | 快速失敗,保護系統(tǒng)穩(wěn)定性 |
通過以上改造,即使frameProcessor.process()
調(diào)用外部平臺接口,也能通過異步化和資源隔離保證Redis消費鏈路的穩(wěn)定性。最終只需確保線程池和熔斷參數(shù)適配業(yè)務負載即可。
代碼具體添加位置:
以下是針對 優(yōu)化Redis連接配置、監(jiān)控Pending隊列 和 添加熔斷機制 的代碼具體添加位置的詳細說明:
一、優(yōu)化Redis連接配置
位置:Redis配置類
在Spring Boot的Redis配置類中調(diào)整超時參數(shù)(通常為RedisConfig.java
):
@Configuration public class RedisConfig { @Bean public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) { // 設置連接超時和讀寫超時(關鍵參數(shù)) lettuceConnectionFactory.setTimeout(Duration.ofSeconds(10)); // 命令超時時間 lettuceConnectionFactory.setShareNativeConnection(false); // 禁用共享連接,避免阻塞 RedisTemplate<String, Object> template = new RedisTemplate<>(); template.setConnectionFactory(lettuceConnectionFactory); template.setKeySerializer(new StringRedisSerializer()); template.setValueSerializer(new GenericJackson2JsonRedisSerializer()); return template; } }
關鍵參數(shù)說明:
setTimeout(10秒)
:確保超時時間大于frameProcessor.process()
的最長處理時間setShareNativeConnection(false)
:避免多個線程共享同一個連接導致阻塞。
二、監(jiān)控Pending隊列
位置:consumeFrames
方法內(nèi)的循環(huán)中
在消費消息的主循環(huán)中定期檢查Pending隊列:
private void consumeFrames(String streamId, String groupName, String consumerName, CommonQueues commonQueues, String regions) throws InterruptedException, IOException { // ... 其他初始化代碼 ... int checkPendingInterval = 10; // 每處理10次循環(huán)檢查一次Pending隊列 int loopCount = 0; while (!Thread.currentThread().isInterrupted()) { // ... 原有代碼讀取消息 ... // 監(jiān)控Pending隊列的邏輯(添加位置) loopCount++; if (loopCount % checkPendingInterval == 0) { String streamKey = "stream:" + streamId; PendingMessages pending = redisStreamOperations.pending(streamKey, groupName); if (pending != null && pending.getTotalPendingMessages() > 1000) { // 閾值根據(jù)業(yè)務調(diào)整 log.warn("檢測到Pending消息積壓 {} 條,重置消費者組", pending.getTotalPendingMessages()); redisStreamOperations.destroyGroup(streamKey, groupName); redisStreamOperations.createGroup(StreamKey.of(streamKey), groupName); } } // ... 后續(xù)處理代碼 ... } }
說明:
- 通過
redisStreamOperations.pending()
獲取當前Pending消息數(shù)。 - 當Pending消息超過閾值時,強制銷毀并重建消費者組,避免消息卡死。
三、添加熔斷機制
位置:處理消息的業(yè)務邏輯外層
使用Resilience4j熔斷器包裹frameProcessor.process()
調(diào)用:
1. 熔斷器配置類
@Configuration public class CircuitBreakerConfig { @Bean public CircuitBreaker frameProcessorCircuitBreaker() { CircuitBreakerConfig config = CircuitBreakerConfig.custom() .failureRateThreshold(50) // 失敗率閾值50% .slidingWindowType(SlidingWindowType.COUNT_BASED) .slidingWindowSize(10) // 基于最近10次調(diào)用統(tǒng)計 .minimumNumberOfCalls(5) // 最少5次調(diào)用后開始計算 .waitDurationInOpenState(Duration.ofSeconds(30)) // 熔斷后30秒進入半開狀態(tài) .build(); return CircuitBreakerRegistry.of(config).circuitBreaker("frameProcessor"); } }
2. 在消費代碼中使用熔斷器
public class YourConsumerClass { @Autowired private CircuitBreaker frameProcessorCircuitBreaker; // 注入熔斷器 private void consumeFrames(...) { // ... 原有代碼 ... for (MapRecord<String, Object, Object> record : records) { redisStreamOperations.ack(...); // 提前ACK // 使用熔斷器保護處理邏輯(添加位置) Try.runRunnable(() -> frameProcessorCircuitBreaker.executeRunnable(() -> { String strdata = (String) record.getValue().get("frameData"); byte[] frameData = Base64.getDecoder().decode(strdata); BufferedImage image = ImageIO.read(new ByteArrayInputStream(frameData)); frameProcessor.process(image, streamId, commonQueues, regions); })).onFailure(e -> log.error("處理失敗且熔斷: {}", e.getMessage())); } // ... 后續(xù)代碼 ... } }
熔斷邏輯說明:
- 當
frameProcessor.process()
連續(xù)失敗觸發(fā)閾值時,熔斷器會暫時阻止后續(xù)調(diào)用,避免雪崩效應。 - 熔斷期間直接跳過處理,但仍會ACK消息(根據(jù)業(yè)務需求選擇是否重試)。
四、代碼集成位置總結(jié)
優(yōu)化措施 | 代碼位置 | 關鍵注解 |
---|---|---|
Redis連接配置 | Redis配置類(如RedisConfig.java ) | 調(diào)整超時時間和連接池參數(shù) |
Pending隊列監(jiān)控 | consumeFrames 方法的主循環(huán)內(nèi) | 定期檢查+自動重置消費者組 |
熔斷機制 | 業(yè)務處理代碼外層(包裹frameProcessor.process ) | 依賴熔斷器庫(如Resilience4j) |
五、參數(shù)調(diào)整建議
Redis超時:
lettuceConnectionFactory.setTimeout
應大于frameProcessor.process()
的最大處理時間 + 網(wǎng)絡抖動余量(如設置為實際最大處理時間的2倍)。
Pending隊列閾值:
- 如果每秒處理100條消息,閾值可設置為
1000
(相當于10秒積壓量)。
熔斷器參數(shù):
failureRateThreshold
:根據(jù)外部接口的穩(wěn)定性調(diào)整(如頻繁超時可設為70%)。waitDurationInOpenState
:根據(jù)外部服務恢復時間調(diào)整(如30秒到5分鐘)。
通過以上改造,即使frameProcessor.process()
調(diào)用外部平臺接口,也能通過資源隔離、快速失敗和自動恢復機制保障Redis消費鏈路的穩(wěn)定性。
到此這篇關于redis在spring boot中異常退出的文章就介紹到這了,更多相關redis spring boot異常退出內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Redis中ServiceStack.Redis和StackExchange.Redis區(qū)別詳解
本文主要介紹了Redis中ServiceStack.Redis和StackExchange.Redis區(qū)別詳解,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-05-05Redis Caffeine實現(xiàn)兩級緩存的項目實踐
本文介紹了使用Redis和Caffeine實現(xiàn)兩級緩存,以提高查詢接口的性能,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2024-12-12