詳解Spring Cloud Gateway 限流操作
開(kāi)發(fā)高并發(fā)系統(tǒng)時(shí)有三把利器用來(lái)保護(hù)系統(tǒng):緩存、降級(jí)和限流。
API網(wǎng)關(guān)作為所有請(qǐng)求的入口,請(qǐng)求量大,我們可以通過(guò)對(duì)并發(fā)訪(fǎng)問(wèn)的請(qǐng)求進(jìn)行限速來(lái)保護(hù)系統(tǒng)的可用性。
常用的限流算法比如有令牌桶算法,漏桶算法,計(jì)數(shù)器算法等。
在Zuul中我們可以自己去實(shí)現(xiàn)限流的功能 (Zuul中如何限流在我的書(shū) 《Spring Cloud微服務(wù)-全棧技術(shù)與案例解析》 中有詳細(xì)講解) ,Spring Cloud Gateway的出現(xiàn)本身就是用來(lái)替代Zuul的。
要想替代那肯定得有強(qiáng)大的功能,除了性能上的優(yōu)勢(shì)之外,Spring Cloud Gateway還提供了很多新功能,比如今天我們要講的限流操作,使用起來(lái)非常簡(jiǎn)單,今天我們就來(lái)學(xué)習(xí)在如何在Spring Cloud Gateway中進(jìn)行限流操作。
目前限流提供了基于Redis的實(shí)現(xiàn),我們需要增加對(duì)應(yīng)的依賴(lài):
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis-reactive</artifactId> </dependency>
可以通過(guò)KeyResolver來(lái)指定限流的Key,比如我們需要根據(jù)用戶(hù)來(lái)做限流,IP來(lái)做限流等等。
IP限流
@Bean
public KeyResolver ipKeyResolver() {
return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName());
}
通過(guò)exchange對(duì)象可以獲取到請(qǐng)求信息,這邊用了HostName,如果你想根據(jù)用戶(hù)來(lái)做限流的話(huà)這邊可以獲取當(dāng)前請(qǐng)求的用戶(hù)ID或者用戶(hù)名就可以了,比如:
用戶(hù)限流
使用這種方式限流,請(qǐng)求路徑中必須攜帶userId參數(shù)。
@Bean
KeyResolver userKeyResolver() {
return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("userId"));
}
接口限流
獲取請(qǐng)求地址的uri作為限流key。
@Bean
KeyResolver apiKeyResolver() {
return exchange -> Mono.just(exchange.getRequest().getPath().value());
}
然后配置限流的過(guò)濾器信息:
server:
port: 8084
spring:
redis:
host: 127.0.0.1
port: 6379
cloud:
gateway:
routes:
- id: fsh-house
uri: lb://fsh-house
predicates:
- Path=/house/**
filters:
- name: RequestRateLimiter
args:
redis-rate-limiter.replenishRate: 10
redis-rate-limiter.burstCapacity: 20
key-resolver: "#{@ipKeyResolver}"
- filter名稱(chēng)必須是RequestRateLimiter
- redis-rate-limiter.replenishRate:允許用戶(hù)每秒處理多少個(gè)請(qǐng)求
- redis-rate-limiter.burstCapacity:令牌桶的容量,允許在一秒鐘內(nèi)完成的最大請(qǐng)求數(shù)
- key-resolver:使用SpEL按名稱(chēng)引用bean
可以訪(fǎng)問(wèn)接口進(jìn)行測(cè)試,這時(shí)候Redis中會(huì)有對(duì)應(yīng)的數(shù)據(jù):
127.0.0.1:6379> keys *
1) "request_rate_limiter.{localhost}.timestamp"
2) "request_rate_limiter.{localhost}.tokens"
大括號(hào)中就是我們的限流Key,這邊是IP,本地的就是localhost
- timestamp:存儲(chǔ)的是當(dāng)前時(shí)間的秒數(shù),也就是System.currentTimeMillis() / 1000或者Instant.now().getEpochSecond()
- tokens:存儲(chǔ)的是當(dāng)前這秒鐘的對(duì)應(yīng)的可用的令牌數(shù)量
Spring Cloud Gateway目前提供的限流還是相對(duì)比較簡(jiǎn)單的,在實(shí)際中我們的限流策略會(huì)有很多種情況,比如:
- 每個(gè)接口的限流數(shù)量不同,可以通過(guò)配置中心動(dòng)態(tài)調(diào)整
- 超過(guò)的流量被拒絕后可以返回固定的格式給調(diào)用方
- 對(duì)某個(gè)服務(wù)進(jìn)行整體限流(這個(gè)大家可以思考下用Spring Cloud Gateway如何實(shí)現(xiàn),其實(shí)很簡(jiǎn)單)
- ……
當(dāng)然我們也可以通過(guò)重新RedisRateLimiter來(lái)實(shí)現(xiàn)自己的限流策略,這個(gè)我們后面再進(jìn)行介紹。
限流源碼
// routeId也就是我們的fsh-house,id就是限流的key,也就是localhost。
public Mono<Response> isAllowed(String routeId, String id) {
// 會(huì)判斷RedisRateLimiter是否初始化了
if (!this.initialized.get()) {
throw new IllegalStateException("RedisRateLimiter is not initialized");
}
// 獲取routeId對(duì)應(yīng)的限流配置
Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig);
if (routeConfig == null) {
throw new IllegalArgumentException("No Configuration found for route " + routeId);
}
// 允許用戶(hù)每秒做多少次請(qǐng)求
int replenishRate = routeConfig.getReplenishRate();
// 令牌桶的容量,允許在一秒鐘內(nèi)完成的最大請(qǐng)求數(shù)
int burstCapacity = routeConfig.getBurstCapacity();
try {
// 限流key的名稱(chēng)(request_rate_limiter.{localhost}.timestamp,request_rate_limiter.{localhost}.tokens)
List<String> keys = getKeys(id);
// The arguments to the LUA script. time() returns unixtime in seconds.
List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
Instant.now().getEpochSecond() + "", "1");
// allowed, tokens_left = redis.eval(SCRIPT, keys, args)
// 執(zhí)行LUA腳本
Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
// .log("redisratelimiter", Level.FINER);
return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
.reduce(new ArrayList<Long>(), (longs, l) -> {
longs.addAll(l);
return longs;
}) .map(results -> {
boolean allowed = results.get(0) == 1L;
Long tokensLeft = results.get(1);
Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft));
if (log.isDebugEnabled()) {
log.debug("response: " + response);
}
return response;
});
}
catch (Exception e) {
log.error("Error determining if user allowed from redis", e);
}
return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
}
LUA腳本在:

local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)
--redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
--redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
--redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
--redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
--redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
--redis.log(redis.LOG_WARNING, "ttl " .. ttl)
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
last_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)
local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
new_tokens = filled_tokens - requested
allowed_num = 1
end
--redis.log(redis.LOG_WARNING, "delta " .. delta)
--redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
--redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
--redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)
return { allowed_num, new_tokens }
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
java類(lèi)中使用Jfreechart的簡(jiǎn)單實(shí)例
這篇文章介紹了java類(lèi)中使用Jfreechart的簡(jiǎn)單實(shí)例,有需要的朋友可以參考一下2013-08-08
Java cglib為實(shí)體類(lèi)(javabean)動(dòng)態(tài)添加屬性方式
這篇文章主要介紹了Java cglib為實(shí)體類(lèi)(javabean)動(dòng)態(tài)添加屬性方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-02-02
SpringBoot如何使用TestEntityManager進(jìn)行JPA集成測(cè)試
TestEntityManager是Spring Framework提供的一個(gè)測(cè)試框架,它可以幫助我們進(jìn)行 JPA 集成測(cè)試,在本文中,我們將介紹如何使用 TestEntityManager 進(jìn)行 JPA 集成測(cè)試,感興趣的跟著小編一起來(lái)學(xué)習(xí)吧2023-06-06
Java使用poi生成word文檔的簡(jiǎn)單實(shí)例
Java POI是一個(gè)用于處理Microsoft Office文件(如Word、Excel和PowerPoint)的API,它是一個(gè)開(kāi)源庫(kù),允許Java開(kāi)發(fā)者讀取、創(chuàng)建和修改這些文檔,本文給大集介紹了Java使用poi生成word文檔的簡(jiǎn)單實(shí)例,感興趣的朋友可以參考下2024-06-06
Java基數(shù)排序radix sort原理及用法解析
這篇文章主要介紹了Java基數(shù)排序radix sort原理及用法解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-06-06
mybatis如何使用注解實(shí)現(xiàn)一對(duì)多關(guān)聯(lián)查詢(xún)
這篇文章主要介紹了mybatis如何使用注解實(shí)現(xiàn)一對(duì)多關(guān)聯(lián)查詢(xún)的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07

