通俗易懂的Java常見限流算法具體實(shí)現(xiàn)
一、漏桶算法
1.漏桶算法的思想和原理
1.固定容量的漏桶:系統(tǒng)維護(hù)一個固定容量的漏桶,用來存放請求。
2.請求入桶:當(dāng)一個請求到達(dá)系統(tǒng)時,相當(dāng)于將水倒入漏桶。如果漏桶已滿,多余的請求會被丟棄或拒絕。
3.恒定速率的出桶:漏桶以恒定的速率處理請求,就像漏斗中的水穩(wěn)定地漏出一樣。
4.平滑流量:通過漏桶的出水速率,可以平滑流入系統(tǒng)的請求,避免突發(fā)流量。
5.限流判斷:當(dāng)一個請求到達(dá)時,會檢查漏桶是否已滿,如果漏桶已滿,則觸發(fā)限流機(jī)制,拒絕請求。
漏桶算法的實(shí)現(xiàn)步驟是,先聲明一個隊(duì)列用來保存請求,這個隊(duì)列相當(dāng)于漏斗,當(dāng)隊(duì)列容量滿了之后就放棄新來的請求,然后重新聲明一個線程定期(指定速率)從任務(wù)隊(duì)列中獲取一個或多個任務(wù)進(jìn)行執(zhí)行,這樣就實(shí)現(xiàn)了漏桶算法。
優(yōu)點(diǎn):可以有效控制流量,避免突發(fā)請求的沖擊,保持系統(tǒng)穩(wěn)定性;
缺點(diǎn):可能會影響請求響應(yīng)時間,且不使用大并發(fā)量的請求系統(tǒng);
2.具體實(shí)現(xiàn)
import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class LeakyBucket { private final long capacity; // 桶容量 private final long rate; // 漏水速率 private long water; // 當(dāng)前水量 private long lastLeakTime; // 上一次漏水時間 private final AtomicLong requestCount; // 請求計(jì)數(shù) public LeakyBucket(long capacity, long rate) { this.capacity = capacity; this.rate = rate; this.water = 0; this.lastLeakTime = System.currentTimeMillis(); this.requestCount = new AtomicLong(0); //以固定的速率漏水 ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(this::leakWater, 0, 1, TimeUnit.SECONDS); } //限流 public synchronized boolean allowRequest() { long currentTime = System.currentTimeMillis(); long elapsedTime = currentTime - lastLeakTime; water = Math.max(0, water - elapsedTime * rate); // 漏水 lastLeakTime = currentTime; if (water < capacity) { water++; requestCount.incrementAndGet(); return true; // 請求通過 } return false; // 漏桶已滿,限流 } public long getRequestCount() { return requestCount.get(); } //以固定速率漏水 private synchronized void leakWater() { long currentTime = System.currentTimeMillis(); long elapsedTime = currentTime - lastLeakTime; water = Math.max(0, water - elapsedTime * rate); // 漏水 lastLeakTime = currentTime; } public static void main(String[] args) { // 創(chuàng)建一個容量為 10,速率為 2/S的漏桶 LeakyBucket leakyBucket = new LeakyBucket(10, 2); // 模擬請求 for (int i = 0; i < 20; i++) { boolean allowed = leakyBucket.allowRequest(); if (allowed) { System.out.println("Request " + (i + 1) + ": Allowed"); } else { System.out.println("Request " + (i + 1) + ": Limited"); } } // 輸出總請求數(shù) System.out.println("Total requests: " + leakyBucket.getRequestCount()); } }
二、令牌桶算法
1.令牌桶算法流程:
1.放入令牌到桶:按照固定的速率被放入令牌桶中,比如每秒放5個、10個、100個令牌到桶中。
2.獲取令牌:所有的請求在處理之前都需要拿到一個可用的令牌才會被處理。
3.令牌桶滿了拒絕:桶中最多能放1000個令牌,當(dāng)桶滿時,就不能繼續(xù)放入了,新添加的令牌要么被丟棄,要么就直接拒絕。
優(yōu)點(diǎn):
1.避免了突發(fā)流量對系統(tǒng)的沖擊。
2.可以根據(jù)需求調(diào)整令牌生成速率和令牌桶的容量,以適應(yīng)不同的流星控制需求。
缺點(diǎn):1.不適合瞬時突發(fā)流量,令牌桶算法可能無法處理突然涌入的大量請求,因?yàn)榱钆仆暗牧钆粕伤俾适枪潭ǖ摹?/p>
2.如果請求需要等待令牌桶中的令牌,可能會導(dǎo)致一些請求的響應(yīng)時間增加。
2.具體實(shí)現(xiàn)
2.1 編程實(shí)現(xiàn)
import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; public class TokenBucket { private final long capacity; // 令牌桶容量 private final long rate; // 令牌生成速率 private AtomicLong tokens; // 當(dāng)前令牌數(shù)量 private ScheduledExecutorService scheduler; public TokenBucket(long capacity, long rate) { this.capacity = capacity; this.rate = rate; this.tokens = new AtomicLong(0); this.scheduler = Executors.newScheduledThreadPool(1); ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(this::addToken, 0, 1, TimeUnit.SECONDS); } public boolean allowRequest() { long currentTokens = tokens.get(); if (currentTokens > 0) { tokens.decrementAndGet(); return true; // 有令牌,允許請求通過 } return false; // 無令牌,限流 } //添加令牌 private void addToken() { long newTokens = Math.min(capacity, tokens.get() + rate); tokens.set(newTokens); } public void shutdown() { scheduler.shutdown(); } public static void main(String[] args) { TokenBucket tokenBucket = new TokenBucket(10, 2); // 創(chuàng)建容量為10,速率為2的令牌桶 // 模擬請求 for (int i = 0; i < 20; i++) { boolean allowed = tokenBucket.allowRequest(); if (allowed) { System.out.println("Request " + (i + 1) + ": Allowed"); } else { System.out.println("Request " + (i + 1) + ": Limited"); } } tokenBucket.shutdown(); } }
2.2 使用 Google 開源的 guava 包
(1)導(dǎo)入依賴
<!-- https://mvnrepository.com/artifact/com.google.guava/guava --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>28.2-jre</version> </dependency>
(2)代碼實(shí)現(xiàn)
import java.lang.annotation.*; import java.util.concurrent.TimeUnit; @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface Limiter { int NOT_LIMITED = 0; String LIMIT_ERROR = "使用太頻繁了,稍后再試..." ; /** * 限流key,唯一 * * @return */ String key() default ""; /** * 時間單位內(nèi)允許的次數(shù) * * @return */ double qps() default NOT_LIMITED; /** * 最大等待時間 * * @return */ int timeout() default NOT_LIMITED; /** * 最大等待時間單位 * * @return */ TimeUnit timeUnit() default TimeUnit.MILLISECONDS; }
import cn.hutool.core.util.StrUtil; import com.google.common.util.concurrent.RateLimiter; import com.hytera.annotation.Limiter; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.stereotype.Component; import utils.IpUtil; import java.lang.reflect.Method; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** * @Author: zt 2024/1/9 17:48 * @CreateTime: 2024/1/9 17:48 * @描述:限流 **/ @Slf4j @Aspect @Component public class RateLimiterAspect { private static final ConcurrentMap<String, RateLimiter> RATE_LIMITER_CACHE = new ConcurrentHashMap<>(); @Around("@annotation(com.hytera.annotation.Limiter)") public Object pointcut(ProceedingJoinPoint point) throws Throwable { MethodSignature signature = (MethodSignature) point.getSignature(); Method method = signature.getMethod(); Limiter rateLimiter = AnnotationUtils.findAnnotation(method, Limiter.class); if (rateLimiter != null && rateLimiter.qps() > Limiter.NOT_LIMITED) { double qps = rateLimiter.qps(); String ip = IpUtil.getIpAddress(); String key = StrUtil.isEmpty(rateLimiter.key())?method.getName()+"-"+IpUtil.getIpAddress():rateLimiter.key()+"-"+ ip; RateLimiter limiter = RATE_LIMITER_CACHE.get(key); if (limiter == null) { RATE_LIMITER_CACHE.put(key, RateLimiter.create(qps)); log.debug("【{}】的QPS設(shè)置為: {}", method.getName(), RATE_LIMITER_CACHE.get(key).getRate()); }else { //超時或者獲取不到令牌,則報(bào)錯 boolean b = limiter.tryAcquire(rateLimiter.timeout(), rateLimiter.timeUnit()); if (b) { throw new RuntimeException(Limiter.LIMIT_ERROR);//自定義異常 } } } return point.proceed(); } }
三、Nginx限流
Nginx 提供了兩種限流手段:一是控制速率,二是控制并發(fā)連接數(shù)。
一、控制速率
我們需要使用 limit_req_zone 用來限制單位時間內(nèi)的請求數(shù),即速率限制,示例配置如下:
#限制每個 IP 訪問的速度為 2r/s,因?yàn)?Nginx 的限流統(tǒng)計(jì)是基于毫秒的,我們設(shè)置的速度是 2r/s,轉(zhuǎn)換一下就是 500ms 內(nèi)單個 IP 只允許通過 1 個請求,從 501ms 開始才允許通過第 2 個請求。 limit_req_zone $binary_remote_addr zone=mylimit:10m rate=2r/s; server { location / { limit_req zone=mylimit; } }
#使用 burst 關(guān)鍵字,控制一個 IP 單位總時間內(nèi)的總訪問次數(shù) #burst=4,設(shè)置一個大小為4的緩沖區(qū)域,當(dāng)大量請求到來,請求數(shù)量超過限流頻率時,將其放入緩沖區(qū)域 limit_req_zone $binary_remote_addr zone=mylimit:10m rate=2r/s; server { location / { limit_req zone=mylimit burst=4; } }
二、控制并發(fā)連接數(shù)
#limit_conn perip 10 表示限制單個 IP 同時最多能持有 10 個連接; #limit_conn perserver 100 表示 server 同時能處理并發(fā)連接的總數(shù)為 100 個。 limit_conn_zone $binary_remote_addr zone=perip:10m; limit_conn_zone $server_name zone=perserver:10m; server { ... limit_conn perip 10; limit_conn perserver 100; }
四、Redis+Lua限流
1.Lua介紹
Lua 是一種輕量小巧的腳本語言,用標(biāo)準(zhǔn)C語言編寫并以源代碼形式開放, 其設(shè)計(jì)目的是為了嵌入應(yīng)用程序中,從而為應(yīng)用程序提供靈活的擴(kuò)展和定制功。
2.Lua優(yōu)勢:
(1)減少網(wǎng)絡(luò)開銷: 不使用 Lua 的代碼需要向 Redis 發(fā)送多次請求, 而腳本只需一次即可, 減少網(wǎng)絡(luò)傳輸;
(2)原子操作: Redis 將整個腳本作為一個原子執(zhí)行, 無需擔(dān)心并發(fā), 也就無需事務(wù);(3)復(fù)用: 腳本會永久保存 Redis 中, 其他客戶端可繼續(xù)使用。
3.具體實(shí)現(xiàn):
(1)編寫Lua腳本(將其放在resources/scripts/redis目錄下):
-- 下標(biāo)從 1 開始 local key = KEYS[1] local now = tonumber(ARGV[1]) local ttl = tonumber(ARGV[2]) local expired = tonumber(ARGV[3]) -- 最大訪問量 local max = tonumber(ARGV[4]) -- 清除過期的數(shù)據(jù) -- 移除指定分?jǐn)?shù)區(qū)間內(nèi)的所有元素,expired 即已經(jīng)過期的 score -- 根據(jù)當(dāng)前時間毫秒數(shù) - 超時毫秒數(shù),得到過期時間 expired redis.call('zremrangebyscore', key, 0, expired) -- 獲取 zset 中的當(dāng)前元素個數(shù) local current = tonumber(redis.call('zcard', key)) local next = current + 1 if next > max then -- 達(dá)到限流大小 返回 0 return 0; else -- 往 zset 中添加一個值、得分均為當(dāng)前時間戳的元素,[value,score] redis.call("zadd", key, now, now) -- 每次訪問均重新設(shè)置 zset 的過期時間,單位毫秒 redis.call("pexpire", key, ttl) return next end
(2)代碼實(shí)現(xiàn):
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.RedisScript; import org.springframework.scripting.support.ResourceScriptSource; @Configuration public class RedisConfig { @Bean @SuppressWarnings("unchecked") public RedisScript<Long> limitRedisScript() { DefaultRedisScript redisScript = new DefaultRedisScript<>(); redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("scripts/redis/limit.lua"))); redisScript.setResultType(Long.class); return redisScript; } }
import org.springframework.core.annotation.AliasFor; import org.springframework.core.annotation.AnnotationUtils; import java.lang.annotation.*; import java.util.concurrent.TimeUnit; @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RateLimiter { long DEFAULT_REQUEST = 10; /** * max 最大請求數(shù) */ @AliasFor("max") long value() default DEFAULT_REQUEST; /** * 限流key */ String key() default ""; /** * 超時時長,默認(rèn)1分鐘 */ long timeout() default 1; /** * 超時時間單位,默認(rèn) 分鐘 */ TimeUnit timeUnit() default TimeUnit.MINUTES; }
import cn.hutool.core.util.StrUtil; import com.xkcoding.ratelimit.redis.annotation.RateLimiter; import com.xkcoding.ratelimit.redis.util.IpUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.RedisScript; import org.springframework.stereotype.Component; import java.lang.reflect.Method; import java.time.Instant; import java.util.Collections; import java.util.concurrent.TimeUnit; /** * <p> * 限流切面 * </p> * * @author yangkai.shen * @date Created in 2019-09-30 10:30 */ @Slf4j @Aspect @Component @RequiredArgsConstructor(onConstructor_ = @Autowired) public class RateLimiterAspect { private final static String SEPARATOR = ":"; private final static String REDIS_LIMIT_KEY_PREFIX = "limit:"; private final StringRedisTemplate stringRedisTemplate; private final RedisScript<Long> limitRedisScript; @Around("@annotation(com.xkcoding.ratelimit.redis.annotation.RateLimiter)") public Object pointcut(ProceedingJoinPoint point) throws Throwable { MethodSignature signature = (MethodSignature) point.getSignature(); Method method = signature.getMethod(); // 通過 AnnotationUtils.findAnnotation 獲取 RateLimiter 注解 RateLimiter rateLimiter = AnnotationUtils.findAnnotation(method, RateLimiter.class); if (rateLimiter != null) { String key = rateLimiter.key(); // 默認(rèn)用類名+方法名做限流的 key 前綴 if (StrUtil.isBlank(key)) { key = method.getDeclaringClass().getName() + StrUtil.DOT + method.getName(); } // 最終限流的 key 為 前綴 + IP地址 key = key + SEPARATOR + IpUtil.getIpAddr(); long max = rateLimiter.max(); long timeout = rateLimiter.timeout(); TimeUnit timeUnit = rateLimiter.timeUnit(); boolean limited = shouldLimited(key, max, timeout, timeUnit); if (limited) { throw new RuntimeException("手速太快了,慢點(diǎn)兒吧~"); } } return point.proceed(); } private boolean shouldLimited(String key, long max, long timeout, TimeUnit timeUnit) { // 最終的 key 格式為: // limit:自定義key:IP // limit:類名.方法名:IP key = REDIS_LIMIT_KEY_PREFIX + key; // 統(tǒng)一使用單位毫秒 long ttl = timeUnit.toMillis(timeout); // 當(dāng)前時間毫秒數(shù) long now = Instant.now().toEpochMilli(); long expired = now - ttl; Long executeTimes = stringRedisTemplate.execute(limitRedisScript, Collections.singletonList(key), now + "", ttl + "", expired + "", max + ""); if (executeTimes != null) { if (executeTimes == 0) { log.error("【{}】在單位時間 {} 毫秒內(nèi)已達(dá)到訪問上限,當(dāng)前接口上限 {}", key, ttl, max); return true; } else { log.info("【{}】在單位時間 {} 毫秒內(nèi)訪問 {} 次", key, ttl, executeTimes); return false; } } return false; }
總結(jié)
到此這篇關(guān)于Java常見限流算法具體實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Java常見限流算法內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
springboot通過注解、接口創(chuàng)建定時任務(wù)詳解
使用SpringBoot創(chuàng)建定時任務(wù)其實(shí)是挺簡單的,這篇文章主要給大家介紹了關(guān)于springboot如何通過注解、接口創(chuàng)建這兩種方法實(shí)現(xiàn)定時任務(wù)的相關(guān)資料,需要的朋友可以參考下2021-07-07JAVA中的函數(shù)式接口Function和BiFunction詳解
這篇文章主要介紹了JAVA中的函數(shù)式接口Function和BiFunction詳解,JDK的函數(shù)式接口都加上了@FunctionalInterface注解進(jìn)行標(biāo)識,但是無論是否加上該注解只要接口中只有一個抽象方法,都是函數(shù)式接口,需要的朋友可以參考下2024-01-01關(guān)于SpringBoot Actuator漏洞補(bǔ)救方案
SpringBoot Actuator模塊提供了健康檢查,審計(jì),指標(biāo)收集,HTTP 跟蹤等,是幫助我們監(jiān)控和管理SpringBoot應(yīng)用的模塊,本文將主要介紹SpringBoot Actuator漏洞的補(bǔ)救方案,需要的朋友可以參考下2023-06-06Jenkins如何使用DockerFile自動部署Java項(xiàng)目
這篇文章主要介紹了Jenkins如何使用DockerFile自動部署Java項(xiàng)目,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-08-08基于strict-origin-when-cross-origin問題的解決
這篇文章主要介紹了基于strict-origin-when-cross-origin問題的解決方案,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-03-03Java實(shí)現(xiàn)讀取和寫入properties文件
這篇文章主要介紹了Java實(shí)現(xiàn)讀取和寫入properties文件方式,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-08-08Springboot詳解RocketMQ實(shí)現(xiàn)消息發(fā)送與接收流程
這篇文章主要介紹了SpringBoot整合RocketMQ實(shí)現(xiàn)消息發(fā)送和接收功能,我們使用主流的SpringBoot框架整合RocketMQ來講解,使用方便快捷,本文分步驟給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-06-06