Java中常見的4種限流算法詳解
固定窗口
FixedWindowRateLimiter 類表示一個固定窗口限流器,使用 limit 和 interval 參數(shù)分別表示限制請求數(shù)量和時間間隔(毫秒)。在 allowRequest() 方法中,通過比較當(dāng)前時間與上一次請求時間來判斷是否需要重置請求數(shù)和上一次請求時間。如果請求數(shù)還沒有達(dá)到限制數(shù)量,允許請求并增加請求數(shù),否則拒絕請求。 缺點:短時間內(nèi)可能會流量翻倍
public class FixedWindowRateLimiter {
private final int limit; // 限制請求數(shù)量
private final AtomicInteger count; // 當(dāng)前請求數(shù)
private final long interval; // 時間間隔(毫秒)
private long lastRequestTime; // 上一次請求時間
public FixedWindowRateLimiter(int limit, long interval) {
this.limit = limit;
this.interval = interval;
this.count = new AtomicInteger(0);
this.lastRequestTime = System.currentTimeMillis();
}
public synchronized boolean allowRequest() {
long currentTime = System.currentTimeMillis();
if (currentTime - lastRequestTime > interval) {
// 如果距離上一次請求時間已經(jīng)超過了時間間隔,重置請求數(shù)和上一次請求時間
count.set(0);
lastRequestTime = currentTime;
}
// 如果請求數(shù)還沒有達(dá)到限制數(shù)量,允許請求并增加請求數(shù)
if (count.get() < limit) {
count.incrementAndGet();
return true;
}
return false; // 否則拒絕請求
}
}
// 使用示例
FixedWindowRateLimiter limiter = new FixedWindowRateLimiter(10, 1000); // 每秒最多處理10個請求
for (int i = 0; i < 20; i++) { // 嘗試發(fā)起20個請求
if (limiter.allowRequest()) {
System.out.println("Allow request " + i);
} else {
System.out.println("Reject request " + i);
}
Thread.sleep(200); // 每次請求間隔200毫秒
}滑動窗口
相比于固定窗口,滑動窗口有以下幾個好處:
- 平滑限流:滑動窗口限流算法會以時間為軸將請求限制平均到每個時間段內(nèi),從而平滑了請求的涌入。相比于簡單粗暴的限制請求的數(shù)量或速率等方式,這種平滑的限流方式能夠更好地保證服務(wù)的可用性和穩(wěn)定性。
- 精確控制:滑動窗口限流算法可以根據(jù)具體的業(yè)務(wù)需要設(shè)置窗口大小和時間間隔,從而實現(xiàn)對請求的精確控制。通過適當(dāng)調(diào)整窗口大小和時間間隔,可以達(dá)到更好的限流效果。
關(guān)于這個,我覺得sentinel中的滑動窗口就非常的nice,下面是從sentinel中摘出來改一下的示例(同時也運用在我本人的中間件內(nèi)),總得來說有三部分:
- 窗口存放的實體類(監(jiān)控指標(biāo)) HystrixEntity
- 窗口的定義 HystrixWindow
- 滑動窗口具體實現(xiàn) HystrixWindowArray
// 窗口存放的實體類(監(jiān)控指標(biāo))
public class HystrixEntity {
// 窗口請求數(shù)
private AtomicInteger requestCount;
// 窗口異常數(shù)
private AtomicInteger errorCount;
public HystrixEntity(){
this.requestCount=new AtomicInteger(0);
this.errorCount=new AtomicInteger(0);
}
public int getRequestCountValue() {
return requestCount.get();
}
public int getErrorCountValue() {
return errorCount.get();
}
public void resetValue() {
this.errorCount.set(0);
this.requestCount.set(0);
}
public void addErrorCount(){
this.errorCount.addAndGet(1);
}
public void addRequestCount(){
this.requestCount.addAndGet(1);
}
}
//窗口的定義 HystrixWindow
public class HystrixWindow {
// 窗口的長度 單位:ms
private final int windowLengthInMs;
// 窗口的開始時間戳 單位:ms
private long windowStartInMs;
// 窗口內(nèi)存放的實體類
private HystrixEntity hystrixEntity;
public HystrixWindow(int windowLengthInMs, long windowStartInMs, HystrixEntity hystrixEntity) {
this.windowLengthInMs = windowLengthInMs;
this.windowStartInMs = windowStartInMs;
this.hystrixEntity = hystrixEntity;
}
public int getWindowLengthInMs() {
return windowLengthInMs;
}
public long getWindowStartInMs() {
return windowStartInMs;
}
public HystrixEntity getHystrixEntity() {
return hystrixEntity;
}
public void setHystrixEntity(HystrixEntity hystrixEntity) {
this.hystrixEntity = hystrixEntity;
}
/**
* @Description 重置窗口
**/
public HystrixWindow resetTo(long startTime) {
this.windowStartInMs = startTime;
hystrixEntity.resetValue();
return this;
}
/**
* @Description 判斷時間是否屬于該窗口
**/
public boolean isTimeInWindow(long timeMillis) {
return windowStartInMs <= timeMillis && timeMillis < windowStartInMs + windowLengthInMs;
}
}
//滑動窗口具體實現(xiàn)
public class HystrixWindowArray {
// 單個窗口的長度
private int windowLengthInMs;
// 窗口數(shù)量
private int sampleCount;
// 所有窗口的總長度
private int intervalInMs;
// 窗口數(shù)組
private final AtomicReferenceArray<HystrixWindow> array;
/**
* The conditional (predicate) update lock is used only when current bucket is deprecated.
*/
private final ReentrantLock updateLock = new ReentrantLock();
/**
* @Param [sampleCount, intervalInMs]
* sampleCount: 窗口數(shù)量 intervalInMs:所有窗口的總長度
**/
public HystrixWindowArray(int sampleCount, int intervalInMs) {
Assert.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
Assert.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
Assert.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
/**
* 獲取當(dāng)前時間所在的窗口下標(biāo)索引
*/
private int calculateTimeIdx(long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int)(timeId % array.length());
}
/**
* 獲取當(dāng)前時間所在的窗口開始時間
*/
private long calculateWindowStart(long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
private HystrixEntity newEmptyWindowValue(long timeMillis){
return new HystrixEntity();
}
/**
* 獲取當(dāng)前窗口
*/
public HystrixWindow currentWindow() {
return currentWindow(System.currentTimeMillis());
}
private HystrixWindow currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
long windowStart = calculateWindowStart(timeMillis);
while (true) {
HystrixWindow old = array.get(idx);
if (old == null) {
// 如果獲取為空,說明窗口還沒創(chuàng)建,所以我們創(chuàng)建一個新的窗口(CAS保證線程安全)
HystrixWindow window = new HystrixWindow(windowLengthInMs, windowStart, newEmptyWindowValue(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// 創(chuàng)建成功則返回當(dāng)前窗口
return window;
} else {
// 創(chuàng)建失敗說明發(fā)生了競爭,所以暫時先讓出CPU
Thread.yield();
}
} else if (windowStart == old.getWindowStartInMs()) {
// 如果窗口已經(jīng)存在,則對比窗口的開始時間是否相同,相同說明是用同一個窗口,直接返回窗口就可以了
return old;
} else if (windowStart > old.getWindowStartInMs()) {
// 如果窗口已經(jīng)存在,而且窗口開始時間比之前的窗口開始時間要大
// 說明原來的窗口已經(jīng)過時了,需要替換一個新的窗口
// 所以加鎖防止競爭
if (updateLock.tryLock()) {
try {
// 這里我選擇直接重置之前的窗口()
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.getWindowStartInMs()) {
// 窗口的開始時間比之前的窗口開始時間還會小,這種屬于異常情況
// 要是真出現(xiàn)了也只能新建一個窗口返回了
return new HystrixWindow(windowLengthInMs, windowStart, newEmptyWindowValue(timeMillis));
}
}
}
/**
* 獲取當(dāng)前窗口內(nèi)的值
*/
public HystrixEntity getWindowValue() {
return getWindowValue(System.currentTimeMillis());
}
public HystrixEntity getWindowValue(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
HystrixWindow bucket = array.get(idx);
if (bucket == null || !bucket.isTimeInWindow(timeMillis)) {
return null;
}
return bucket.getHystrixEntity();
}
/**
* 重置一個窗口
*/
private HystrixWindow resetWindowTo(HystrixWindow window, long startTime){
return window.resetTo(startTime);
}
public List<HystrixEntity> values() {
return values(System.currentTimeMillis());
}
private List<HystrixEntity> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<HystrixEntity>();
}
int size = array.length();
List<HystrixEntity> result = new ArrayList<HystrixEntity>(size);
for (int i = 0; i < size; i++) {
HystrixWindow window = array.get(i);
if (window == null || isWindowDeprecated(timeMillis, window)) {
continue;
}
result.add(window.getHystrixEntity());
}
return result;
}
/**
* 判斷窗口是否有效
*/
public boolean isWindowDeprecated(long time, HystrixWindow window) {
return time - window.getWindowStartInMs() > intervalInMs;
}
}使用示例:
// 初始化 代表監(jiān)控1s內(nèi)的指標(biāo) 窗口數(shù)為2 HystrixWindowArray hystrixWindowArray = new HystrixWindowArray(2, 1000); // 指標(biāo)監(jiān)控 數(shù)量+1 windowArray.currentWindow().getHystrixEntity().addErrorCount(); windowArray.currentWindow().getHystrixEntity().addRequestCount(); // 獲取所有窗口的指標(biāo)累計 判斷是否超標(biāo),也就是1s內(nèi)的總計 List<HystrixEntity> windowValues = windowArray.values(); Integer errorCount = hystrixEntities.stream().map(HystrixEntity::getErrorCountValue).reduce(Integer::sum).get(); Integer requestCount = hystrixEntities.stream().map(HystrixEntity::getRequestCountValue).reduce(Integer::sum).get();
令牌桶算法
思想: 固定時間內(nèi)(例如 1 秒)通過一個桶來存儲令牌,每當(dāng)接收到一個請求時就會消耗一個令牌。如果請求過來時沒有令牌,則無法繼續(xù)處理該請求。在sentinel中被稱為冷啟動
優(yōu)點:
相比于漏桶算法,令牌桶算法具有更好的適應(yīng)性,可以應(yīng)對短時間內(nèi)的流量波動。(漏桶算法只能處理恒定速率的流量)
代碼如下:
public class TokenBucket {
private long lastTime; // 上次請求時間
private double rate; // 令牌放入速率
private long capacity; // 令牌桶容量
private long tokens; // 當(dāng)前令牌數(shù)量
public TokenBucket(double rate, long capacity) {
this.lastTime = System.currentTimeMillis();
this.rate = rate;
this.capacity = capacity;
this.tokens = capacity;
}
public synchronized boolean getToken() {
long now = System.currentTimeMillis();
long timeElapsed = now - lastTime;
tokens += timeElapsed * rate;
if (tokens > capacity) {
tokens = capacity;
}
lastTime = now;
if (tokens >= 1) {
tokens--;
return true;
} else {
return false;
}
}
}漏斗桶算法
漏桶算法(Leaky Bucket)是網(wǎng)絡(luò)世界中流量整形(Traffic Shaping)或速率限制(Rate Limiting)時經(jīng)常使用的一種算法,它的主要目的是控制數(shù)據(jù)注入到網(wǎng)絡(luò)的速率,平滑網(wǎng)絡(luò)上的突發(fā)流量。漏桶算法提供了一種機(jī)制,通過它,突發(fā)流量可以被整形以便為網(wǎng)絡(luò)提供一個穩(wěn)定的流量。在sentinel中被稱為勻速器
優(yōu)點:
- 控制速率:漏斗桶算法可以限制數(shù)據(jù)流量的傳輸速率,確保各個環(huán)節(jié)的數(shù)據(jù)處理能力都得到了滿足,從而避免了系統(tǒng)因為數(shù)據(jù)過多而導(dǎo)致的崩潰和癱瘓。
- 平滑輸出:漏斗桶算法通過將數(shù)據(jù)分散到不同的時間段內(nèi)進(jìn)行處理,使得數(shù)據(jù)傳輸?shù)妮敵龈悠椒€(wěn),不會出現(xiàn)明顯的波動,提高了網(wǎng)絡(luò)的穩(wěn)定性。
代碼如下:
public class LeakyBucket {
private int capacity; //漏桶容量
private int rate; //漏水速率
private int water; //當(dāng)前水量
private Instant timestamp; //上次漏水時間
public LeakyBucket(int capacity, int rate) {
this.capacity = capacity;
this.rate = rate;
this.water = 0;
this.timestamp = Instant.now();
}
public synchronized boolean allow() { //判斷是否允許通過
Instant now = Instant.now();
long duration = now.toEpochMilli() - timestamp.toEpochMilli(); //計算距上次漏水過去了多久
int outflow = (int) (duration * rate / 1000); //計算過去的時間內(nèi)漏出的水量
water = Math.max(0, water - outflow); //更新當(dāng)前水量,不能小于0
if (water < capacity) { //如果漏桶還沒滿,放行
water++;
timestamp = now;
return true;
}
return false; //否則拒絕通過
}
}到此這篇關(guān)于Java中常見的4種限流算法詳解的文章就介紹到這了,更多相關(guān)Java限流算法內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java Listener監(jiān)聽器使用規(guī)范詳細(xì)介紹
監(jiān)聽器是一個專門用于對其他對象身上發(fā)生的事件或狀態(tài)改變進(jìn)行監(jiān)聽和相應(yīng)處理的對象,當(dāng)被監(jiān)視的對象發(fā)生情況時,立即采取相應(yīng)的行動。監(jiān)聽器其實就是一個實現(xiàn)特定接口的普通java程序,這個程序?qū)iT用于監(jiān)聽另一個java對象的方法調(diào)用或?qū)傩愿淖?/div> 2023-01-01
Java詳解如何將excel數(shù)據(jù)轉(zhuǎn)為樹形
在平常的辦公工作中,excel數(shù)據(jù)的操作是最常見的需求,今天就來看一下通過Java如何來實現(xiàn)將excel數(shù)據(jù)轉(zhuǎn)為樹形,感興趣的朋友可以了解下2022-08-08
Java中Json字符串直接轉(zhuǎn)換為對象的方法(包括多層List集合)
下面小編就為大家?guī)硪黄狫ava中Json字符串直接轉(zhuǎn)換為對象的方法(包括多層List集合)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-08-08最新評論

