基于AOP+Redis的簡易滑動窗口限流
在分布式系統(tǒng)設計中,限流是保障服務穩(wěn)定性的核心技術(shù)之一。滑動窗口限流算法以其精確性和平滑性優(yōu)勢,成為解決傳統(tǒng)固定窗口限流臨界突變問題的理想方案。本文將深入解析滑動窗口算法原理,并通過AOP+Redis滑動窗口限流。
固定窗口與滑動窗口對比
固定窗口限流及其缺陷
固定窗口限流將時間劃分為固定區(qū)間(如每分鐘),統(tǒng)計每個區(qū)間內(nèi)的請求數(shù)量。這種方法雖然簡單,但存在嚴重缺陷:
當大量請求集中在兩個窗口的交界處時(如00:59:59和01:00:00),系統(tǒng)會在極短時間內(nèi)接收雙倍于閾值的請求,導致服務過載?;瑒哟翱谙蘖魍ㄟ^動態(tài)時間區(qū)間解決了這個問題。
核心原理:為每個請求動態(tài)定義一個以當前時間為終點、向前回溯固定時長T的時間區(qū)間(滑動窗口),統(tǒng)計該區(qū)間內(nèi)的請求數(shù):
- 動態(tài)窗口:每個請求到達時計算
[當前時間 - T, 當前時間]
區(qū)間 - 實時統(tǒng)計:計算該區(qū)間內(nèi)的請求數(shù)量
- 決策執(zhí)行:請求數(shù) < 閾值 → 允許;否則拒絕
- 窗口滑動:過期請求自動移出統(tǒng)計范圍
Redis實現(xiàn)方案
Redis的有序集合(ZSET)是實現(xiàn)滑動窗口限流的理想數(shù)據(jù)結(jié)構(gòu):
ZSET結(jié)合了集合(Set)和哈希(Hash)的特性:
- 唯一成員:每個成員(member)在集合中唯一
- 分數(shù)排序:每個成員關(guān)聯(lián)一個分數(shù)(score),用于排序
- 自動排序:成員按分數(shù)值從小到大排序
之所以是滑動窗口限流的理想選擇,關(guān)鍵在于它完美解決了滑動窗口算法的三個核心需求:
- 時間序列的天然支持:ZSET的分數(shù)(score)機制為時間戳提供了原生支持。當我們將請求時間戳作為score存儲時,所有請求按時間順序自動排序,形成精確的時間序列。這使得界定時間窗口邊界變得簡單——只需計算
當前時間 - 窗口大小
就能得到窗口起始點,無需額外維護時間索引。這種設計讓滑動窗口的"滑動"機制得以自然實現(xiàn)。 - 高效的范圍操作能力:滑動窗口的核心操作是清理過期請求,ZSET的
ZREMRANGEBYSCORE
命令正是為此而生。它能以O(log(N)+M)
的復雜度高效刪除指定時間范圍外的歷史請求。這種高效的范圍刪除能力確保了窗口滑動時的實時性能。 - 精確的實時統(tǒng)計特性:通過
ZCARD
和ZCOUNT
命令,ZSET提供原子級的精確計數(shù)能力。在滑動窗口算法中,我們需要實時統(tǒng)計當前窗口內(nèi)的請求數(shù)并與閾值比較,這些命令能以O(1)和O(log(N))復雜度瞬間完成統(tǒng)計。這種即時反饋機制對高并發(fā)場景至關(guān)重要,確保限流決策的及時性和準確性。
命令 | 統(tǒng)計范圍 | 時間復雜度 | 典型使用場景 |
---|---|---|---|
ZCARD key | 整個 ZSET 的總成員數(shù) | O(1) | 清理過期數(shù)據(jù)后快速獲取當前窗口請求總數(shù) |
ZCOUNT key min max | 指定 score 范圍內(nèi)的成員數(shù) | O(log(N)) | 動態(tài)統(tǒng)計子窗口/特定時間段的請求量 |
代碼
自定義注解
首先自定義注解,定義限流維度、窗口大小、時間單位、窗口內(nèi)最大請求數(shù)量
@Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface SlidingWindowLimit { /** * 限流維度的 SpEL 表達式 * 示例: * - 按郵箱: "#email" * - 按 IP: "#request.remoteAddr" * - 按用戶 ID + 郵箱: "#user.id + ':' + #user.email" */ String keySpEL() default "#email"; /** * 窗口大小 */ int windowSize() default 60; /** * 時間單位 */ TimeUnit timeUnit() default TimeUnit.SECONDS; /** * 窗口內(nèi)最大請求數(shù) */ int maxRequests() default 10; }
切面類
1. 切面配置與基礎(chǔ)結(jié)構(gòu)
首先創(chuàng)建切面類
@Aspect @Component @Order(Ordered.HIGHEST_PRECEDENCE + 20) public class SlidingWindowLimitAspect { @Resource private RedissonClient redissonClient; private static final Logger log = LoggerFactory.getLogger(SlidingWindowLimitAspect.class); private static final String RATE_LIMIT_PREFIX = "rate_limit"; }
@Aspect
:聲明該類為AOP切面@Order
:設置切面執(zhí)行優(yōu)先級(數(shù)字越小優(yōu)先級越高)redissonClient
:Redis客戶端操作接口RATE_LIMIT_PREFIX
:Redis鍵名前綴,用于區(qū)分限流數(shù)據(jù)
2. 切面入口方法 - around()
@Around("@annotation(slidingWindowLimit)") public Object around(ProceedingJoinPoint joinPoint, SlidingWindowLimit slidingWindowLimit) throws Throwable { //獲取方法簽名 MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); String methodName = method.getName(); //判斷是否為Http請求 ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); if(attributes == null){ log.warn("方法 {} 不在 Web 請求上下文中,跳過限流檢查。", methodName); return joinPoint.proceed(); } try { Object parseSpEL = parseSpEL(joinPoint, signature, slidingWindowLimit.keySpEL()); String rateLimitKey = buildRateLimitKey(parseSpEL, slidingWindowLimit); if(isRateLimited(rateLimitKey, slidingWindowLimit.windowSize(), slidingWindowLimit.timeUnit(), slidingWindowLimit.maxRequests())){ throw new RateLimitExceededException("Rate limit exceeded"); } return joinPoint.proceed(); }catch (RateLimitExceededException e){ log.warn("方法 {} 觸發(fā)了限流,已拒絕訪問。", methodName); throw e; }catch (Exception e){ log.error("方法 {} 觸發(fā)了異常,已拒絕訪問。", methodName, e); throw e; } }
執(zhí)行過程:
3. SpEL解析方法 - parseSpEL()
private Object parseSpEL(ProceedingJoinPoint joinPoint,MethodSignature signature, String keySpEL){ StandardEvaluationContext context = new StandardEvaluationContext(); Object[] args = joinPoint.getArgs(); String[] parameterNames = signature.getParameterNames(); for (int i = 0; i < args.length; i++) { if(parameterNames!=null && i< parameterNames.length){ context.setVariable(parameterNames[i], args[i]); }else { context.setVariable("arg" + i, args[i]); } } ExpressionParser parser = new SpelExpressionParser(); return parser.parseExpression(keySpEL).getValue(context); }
功能說明:
- 動態(tài)解析注解中的SpEL表達式(如
#email
) - 將方法參數(shù)注入表達式上下文
- 支持靈活的限流鍵生成策略
4.限流鍵構(gòu)建方法 - buildRateLimitKey()
private String buildRateLimitKey(Object keyValue, SlidingWindowLimit slidingWindowLimit){ if(keyValue == null){ throw new IllegalArgumentException("限流參數(shù)不能為空"); } return String.format("%s:%s:%s",RATE_LIMIT_PREFIX,slidingWindowLimit.keySpEL(), keyValue); }
鍵格式說明:
rate_limit:SpEL表達式:參數(shù)值 ↓ ↓ ↓ rate_limit:#email:user@example.com
5. 限流核心邏輯 - isRateLimited()
private boolean isRateLimited(String key, int windowSize, TimeUnit timeUnit, int maxRequests){ //獲取當前時間數(shù) long currentTime = System.currentTimeMillis(); long windowStartTime = currentTime - convertToMillis(windowSize, timeUnit); // 獲取 Redisson 的 ZSet 操作對象 RScoredSortedSet<Long> scoredSortedSet = redissonClient.getScoredSortedSet(key); // 1. 刪除窗口外的過期請求 scoredSortedSet.removeRangeByScore(0, true, windowStartTime, true); // [0, windowStartTime] // 2. 添加當前請求的時間戳到 ZSet scoredSortedSet.add(currentTime, currentTime); // score 和 value 均為時間戳 // 3. 統(tǒng)計窗口內(nèi)請求數(shù)量 int count = scoredSortedSet.size(); return count > maxRequests; }
Redis操作序列:
ZREMRANGEBYSCORE key 0 windowStart
:刪除過期請求ZADD key currentTime currentTime
:添加當前請求ZCARD key
:獲取當前請求數(shù)
6. 時間單位轉(zhuǎn)換 - convertToMillis()
private long convertToMillis(int windowSize, TimeUnit timeUnit) { return switch (timeUnit) { case SECONDS -> timeUnit.toMillis(windowSize); case MINUTES -> timeUnit.toMillis(windowSize); case HOURS -> timeUnit.toMillis(windowSize); case DAYS -> timeUnit.toMillis(windowSize); case MILLISECONDS -> windowSize; default -> throw new IllegalArgumentException("不支持的時間單位: " + timeUnit); }; }
完整代碼
@Aspect @Component @Order(Ordered.HIGHEST_PRECEDENCE + 20) public class SlidingWindowLimitAspect { @Resource private RedissonClient redissonClient; private static final Logger log = LoggerFactory.getLogger(SlidingWindowLimitAspect.class); private static final String RATE_LIMIT_PREFIX = "rate_limit"; @Around("@annotation(slidingWindowLimit)") public Object around(ProceedingJoinPoint joinPoint, SlidingWindowLimit slidingWindowLimit) throws Throwable { //獲取方法簽名 MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); String methodName = method.getName(); //判斷是否為Http請求 ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); if(attributes == null){ log.warn("方法 {} 不在 Web 請求上下文中,跳過限流檢查。", methodName); return joinPoint.proceed(); } try { Object parseSpEL = parseSpEL(joinPoint, signature, slidingWindowLimit.keySpEL()); String rateLimitKey = buildRateLimitKey(parseSpEL, slidingWindowLimit); if(isRateLimited(rateLimitKey, slidingWindowLimit.windowSize(), slidingWindowLimit.timeUnit(), slidingWindowLimit.maxRequests())){ throw new RateLimitExceededException("Rate limit exceeded"); } return joinPoint.proceed(); }catch (RateLimitExceededException e){ log.warn("方法 {} 觸發(fā)了限流,已拒絕訪問。", methodName); throw e; }catch (Exception e){ log.error("方法 {} 觸發(fā)了異常,已拒絕訪問。", methodName, e); throw e; } } private Object parseSpEL(ProceedingJoinPoint joinPoint,MethodSignature signature, String keySpEL){ StandardEvaluationContext context = new StandardEvaluationContext(); Object[] args = joinPoint.getArgs(); String[] parameterNames = signature.getParameterNames(); for (int i = 0; i < args.length; i++) { if(parameterNames!=null && i< parameterNames.length){ context.setVariable(parameterNames[i], args[i]); }else { context.setVariable("arg" + i, args[i]); } } ExpressionParser parser = new SpelExpressionParser(); return parser.parseExpression(keySpEL).getValue(context); } private String buildRateLimitKey(Object keyValue, SlidingWindowLimit slidingWindowLimit){ if(keyValue == null){ throw new IllegalArgumentException("限流參數(shù)不能為空"); } return String.format("%s:%s:%s",RATE_LIMIT_PREFIX,slidingWindowLimit.keySpEL(), keyValue); } private boolean isRateLimited(String key, int windowSize, TimeUnit timeUnit, int maxRequests){ //獲取當前時間數(shù) long currentTime = System.currentTimeMillis(); long windowStartTime = currentTime - convertToMillis(windowSize, timeUnit); // 獲取 Redisson 的 ZSet 操作對象 RScoredSortedSet<Long> scoredSortedSet = redissonClient.getScoredSortedSet(key); // 1. 刪除窗口外的過期請求 scoredSortedSet.removeRangeByScore(0, true, windowStartTime, true); // [0, windowStartTime] // 2. 添加當前請求的時間戳到 ZSet scoredSortedSet.add(currentTime, currentTime); // score 和 value 均為時間戳 // 3. 統(tǒng)計窗口內(nèi)請求數(shù)量 int count = scoredSortedSet.size(); return count > maxRequests; } /** * 時間單位轉(zhuǎn)換,將時間單位轉(zhuǎn)換為毫秒數(shù) * @param windowSize 窗口大小 * @param timeUnit 時間單位 * @return */ private long convertToMillis(int windowSize, TimeUnit timeUnit){ return switch (timeUnit){ case NANOSECONDS, SECONDS, MICROSECONDS, MINUTES, HOURS, DAYS -> timeUnit.toMillis(windowSize); case MILLISECONDS -> windowSize; default -> throw new IllegalArgumentException("不支持的時間單位: " + timeUnit); }; } }
注解使用
比如說我們現(xiàn)在定義發(fā)送驗證碼的方法60秒內(nèi)只能發(fā)送三次
@PostMapping("/send/code") @SlidingWindowLimit(keySpEL = "#email.email",windowSize = 60, maxRequests = 3) public Result sendVerificationCode(@RequestBody EmailSendDTO email) { userService.sendVerificationCode(email.getEmail()); return Result.success(); }
每一次訪問時,Redis都會記錄下時間戳,如果第四次訪問時的時間戳與第一次訪問的時間戳之間少于60秒,則返回
{ "code": 429, "message": "請求過于頻繁,請稍后再試", "data": null }
優(yōu)化
private boolean isRateLimited(String key, int windowSize, TimeUnit timeUnit, int maxRequests){ //獲取當前時間數(shù) long currentTime = System.currentTimeMillis(); long windowStartTime = currentTime - convertToMillis(windowSize, timeUnit); // 獲取 Redisson 的 ZSet 操作對象 RScoredSortedSet<Long> scoredSortedSet = redissonClient.getScoredSortedSet(key); // 1. 刪除窗口外的過期請求 scoredSortedSet.removeRangeByScore(0, true, windowStartTime, true); // [0, windowStartTime] // 2. 添加當前請求的時間戳到 ZSet scoredSortedSet.add(currentTime, currentTime); // score 和 value 均為時間戳 // 3. 統(tǒng)計窗口內(nèi)請求數(shù)量 int count = scoredSortedSet.size(); return count > maxRequests; }
在這個方法中,存在幾個問題:
代碼中直接將時間戳作為ZSET的成員(member)和分數(shù)(score),當同一毫秒內(nèi)有多個請求時,后寫入的請求會覆蓋先前的請求(ZSET成員唯一),導致計數(shù)不準確。
當前操作序列:
在并發(fā)場景下,多個請求可能同時通過計數(shù)檢查,導致實際請求量超過閾值
- 刪除過期請求
- 添加當前請求
- 獲取當前計數(shù)
當某個限流鍵長時間無請求時,對應的空ZSET會永久占用內(nèi)存
那么優(yōu)化時,可以利用UUID作為member 這樣不會出現(xiàn)覆蓋的情況,使用Lua腳本進行執(zhí)行避免多個請求同時通過計數(shù)檢查的情況,針對問題三可以通過設置過期時間來解決,優(yōu)化后的代碼如下:
private boolean isRateLimited(String key, int windowSize, TimeUnit timeUnit, int maxRequests) { // 1. 計算窗口大?。ê撩耄? long windowMillis = convertToMillis(windowSize, timeUnit); // 2. 獲取當前時間和窗口起始時間 long currentTime = System.currentTimeMillis(); long windowStartTime = currentTime - windowMillis; // 3. 生成唯一請求ID String requestId = UUID.randomUUID().toString(); // 4. 計算過期時間(秒) long expireSeconds = calculateExpireSeconds(windowMillis); // 5. Lua腳本(使用分數(shù)范圍精確統(tǒng)計) String luaScript = "redis.call('ZREMRANGEBYSCORE', KEYS[1], '-inf', ARGV[2])\n" + // 清理過期數(shù)據(jù) "local count = redis.call('ZCOUNT', KEYS[1], ARGV[2], ARGV[1])\n" + // 精確統(tǒng)計窗口內(nèi)請求 "if count >= tonumber(ARGV[4]) then\n" + " return 1\n" + // 觸發(fā)限流 "end\n" + "redis.call('ZADD', KEYS[1], ARGV[1], ARGV[3])\n" + // 添加當前請求 "redis.call('EXPIRE', KEYS[1], ARGV[5])\n" + // 設置過期時間 "return 0"; // 允許通過 try { RScript script = redissonClient.getScript(); Long result = script.eval( RScript.Mode.READ_WRITE, luaScript, RScript.ReturnType.INTEGER, Collections.singletonList(key), currentTime, windowStartTime, requestId, maxRequests, expireSeconds ); return result != null && result == 1; } catch (Exception e) { log.error("限流服務異常,降級放行", e); return false; // Redis故障時允許請求 } } private long calculateExpireSeconds(long windowMillis) { // 過期時間 = 2 * 窗口大小(秒),向上取整 double expireSec = (windowMillis * 2.0) / 1000; long result = (long) Math.ceil(expireSec); return Math.max(1, result); // 至少1秒 }
Lua 腳本執(zhí)行邏輯:
redis.call('ZREMRANGEBYSCORE', KEYS[1], '-inf', ARGV[2])
:清理過期數(shù)據(jù),將 ZSET 中分數(shù)小于等于窗口起始時間的成員刪除。local count = redis.call('ZCOUNT', KEYS[1], ARGV[2], ARGV[1])
:精確統(tǒng)計窗口內(nèi)請求數(shù)量,即分數(shù)在窗口起始時間和當前時間之間的成員數(shù)量。if count >= tonumber(ARGV[4]) then return 1 end
:如果統(tǒng)計的請求數(shù)量大于等于閾值,則返回 1,表示觸發(fā)限流。redis.call('ZADD', KEYS[1], ARGV[1], ARGV[3])
:添加當前請求,將當前請求的唯一 ID 作為成員,當前時間作為分數(shù)添加到 ZSET 中。redis.call('EXPIRE', KEYS[1], ARGV[5])
:設置 ZSET 的過期時間,避免長時間無請求時空 ZSET 占用內(nèi)存。return 0
:如果未觸發(fā)限流,則返回 0,表示允許請求通過。
我們從命令行可以看到,每一次請求后,都會在ZSET中多一條記錄,并且每次都會重置過期時間,當觸發(fā)限流后,不再允許訪問。
127.0.0.1:6379> ZRANGE rate_limit:#email.email:6888@example.com 0 -1 1) "fbd525dd-0e1e-4abf-a578-8c1207e8f6f0" 127.0.0.1:6379> TTL rate_limit:#email.email:6888@example.com (integer) 355 127.0.0.1:6379> ZRANGE rate_limit:#email.email:6888@example.com 0 -1 1) "fbd525dd-0e1e-4abf-a578-8c1207e8f6f0" 2) "a7c8d5c2-f4da-46ce-9f98-472ad702e1ad" 127.0.0.1:6379> TTL rate_limit:#email.email:6888@example.com (integer) 355 127.0.0.1:6379> ZRANGE rate_limit:#email.email:6888@example.com 0 -1 1) "fbd525dd-0e1e-4abf-a578-8c1207e8f6f0" 2) "a7c8d5c2-f4da-46ce-9f98-472ad702e1ad" 3) "12ed502c-6225-45bd-b85e-1d3ed9e46ef6" 127.0.0.1:6379> TTL rate_limit:#email.email:6888@example.com (integer) 355
到此這篇關(guān)于基于AOP+Redis的簡易滑動窗口限流的文章就介紹到這了,更多相關(guān)AOP+Redis滑動窗口限流內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java(jdk)環(huán)境變量配置(XP、win7、win8)圖文教程詳解
對于初學java的同學來說,第一件事不是寫hello world,而是搭建好java開發(fā)環(huán)境,下載jdk,安裝,配置環(huán)境變量。這些操作在xp、win7、win8不同的操作系統(tǒng)里面配置不太一樣,下面通過本文給大家介紹如何在上面不同操作系統(tǒng)下配置2017-03-03使用Java WebSocket獲取客戶端IP地址的示例代碼
在開發(fā)Web應用程序時,我們通常需要獲取客戶端的 IP 地址用于日志記錄、身份驗證、限制訪問等操作,本文將介紹如何使用Java WebSocket API獲取客戶端IP地址,以及如何在常見的WebSocket框架中獲得客戶端 IP地址,需要的朋友可以參考下2023-11-11Java并發(fā)編程之synchronized底層實現(xiàn)原理分析
這篇文章主要介紹了Java并發(fā)編程之synchronized底層實現(xiàn)原理,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-02-02如何使用Spring+redis實現(xiàn)對session的分布式管理
本篇文章主要介紹了如何使用Spring+redis實現(xiàn)對session的分布式管理,本文主要是在Spring中實現(xiàn)分布式session,采用redis對session進行持久化管理,感興趣的小伙伴們可以參考一下2018-06-06