Java中常見的4種限流算法詳解
固定窗口
FixedWindowRateLimiter 類表示一個固定窗口限流器,使用 limit 和 interval 參數(shù)分別表示限制請求數(shù)量和時間間隔(毫秒)。在 allowRequest() 方法中,通過比較當(dāng)前時間與上一次請求時間來判斷是否需要重置請求數(shù)和上一次請求時間。如果請求數(shù)還沒有達到限制數(shù)量,允許請求并增加請求數(shù),否則拒絕請求。 缺點:短時間內(nèi)可能會流量翻倍
public class FixedWindowRateLimiter { private final int limit; // 限制請求數(shù)量 private final AtomicInteger count; // 當(dāng)前請求數(shù) private final long interval; // 時間間隔(毫秒) private long lastRequestTime; // 上一次請求時間 public FixedWindowRateLimiter(int limit, long interval) { this.limit = limit; this.interval = interval; this.count = new AtomicInteger(0); this.lastRequestTime = System.currentTimeMillis(); } public synchronized boolean allowRequest() { long currentTime = System.currentTimeMillis(); if (currentTime - lastRequestTime > interval) { // 如果距離上一次請求時間已經(jīng)超過了時間間隔,重置請求數(shù)和上一次請求時間 count.set(0); lastRequestTime = currentTime; } // 如果請求數(shù)還沒有達到限制數(shù)量,允許請求并增加請求數(shù) if (count.get() < limit) { count.incrementAndGet(); return true; } return false; // 否則拒絕請求 } } // 使用示例 FixedWindowRateLimiter limiter = new FixedWindowRateLimiter(10, 1000); // 每秒最多處理10個請求 for (int i = 0; i < 20; i++) { // 嘗試發(fā)起20個請求 if (limiter.allowRequest()) { System.out.println("Allow request " + i); } else { System.out.println("Reject request " + i); } Thread.sleep(200); // 每次請求間隔200毫秒 }
滑動窗口
相比于固定窗口,滑動窗口有以下幾個好處:
- 平滑限流:滑動窗口限流算法會以時間為軸將請求限制平均到每個時間段內(nèi),從而平滑了請求的涌入。相比于簡單粗暴的限制請求的數(shù)量或速率等方式,這種平滑的限流方式能夠更好地保證服務(wù)的可用性和穩(wěn)定性。
- 精確控制:滑動窗口限流算法可以根據(jù)具體的業(yè)務(wù)需要設(shè)置窗口大小和時間間隔,從而實現(xiàn)對請求的精確控制。通過適當(dāng)調(diào)整窗口大小和時間間隔,可以達到更好的限流效果。
關(guān)于這個,我覺得sentinel中的滑動窗口就非常的nice,下面是從sentinel中摘出來改一下的示例(同時也運用在我本人的中間件內(nèi)),總得來說有三部分:
- 窗口存放的實體類(監(jiān)控指標) HystrixEntity
- 窗口的定義 HystrixWindow
- 滑動窗口具體實現(xiàn) HystrixWindowArray
// 窗口存放的實體類(監(jiān)控指標) public class HystrixEntity { // 窗口請求數(shù) private AtomicInteger requestCount; // 窗口異常數(shù) private AtomicInteger errorCount; public HystrixEntity(){ this.requestCount=new AtomicInteger(0); this.errorCount=new AtomicInteger(0); } public int getRequestCountValue() { return requestCount.get(); } public int getErrorCountValue() { return errorCount.get(); } public void resetValue() { this.errorCount.set(0); this.requestCount.set(0); } public void addErrorCount(){ this.errorCount.addAndGet(1); } public void addRequestCount(){ this.requestCount.addAndGet(1); } } //窗口的定義 HystrixWindow public class HystrixWindow { // 窗口的長度 單位:ms private final int windowLengthInMs; // 窗口的開始時間戳 單位:ms private long windowStartInMs; // 窗口內(nèi)存放的實體類 private HystrixEntity hystrixEntity; public HystrixWindow(int windowLengthInMs, long windowStartInMs, HystrixEntity hystrixEntity) { this.windowLengthInMs = windowLengthInMs; this.windowStartInMs = windowStartInMs; this.hystrixEntity = hystrixEntity; } public int getWindowLengthInMs() { return windowLengthInMs; } public long getWindowStartInMs() { return windowStartInMs; } public HystrixEntity getHystrixEntity() { return hystrixEntity; } public void setHystrixEntity(HystrixEntity hystrixEntity) { this.hystrixEntity = hystrixEntity; } /** * @Description 重置窗口 **/ public HystrixWindow resetTo(long startTime) { this.windowStartInMs = startTime; hystrixEntity.resetValue(); return this; } /** * @Description 判斷時間是否屬于該窗口 **/ public boolean isTimeInWindow(long timeMillis) { return windowStartInMs <= timeMillis && timeMillis < windowStartInMs + windowLengthInMs; } } //滑動窗口具體實現(xiàn) public class HystrixWindowArray { // 單個窗口的長度 private int windowLengthInMs; // 窗口數(shù)量 private int sampleCount; // 所有窗口的總長度 private int intervalInMs; // 窗口數(shù)組 private final AtomicReferenceArray<HystrixWindow> array; /** * The conditional (predicate) update lock is used only when current bucket is deprecated. */ private final ReentrantLock updateLock = new ReentrantLock(); /** * @Param [sampleCount, intervalInMs] * sampleCount: 窗口數(shù)量 intervalInMs:所有窗口的總長度 **/ public HystrixWindowArray(int sampleCount, int intervalInMs) { Assert.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount); Assert.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive"); Assert.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided"); this.windowLengthInMs = intervalInMs / sampleCount; this.intervalInMs = intervalInMs; this.sampleCount = sampleCount; this.array = new AtomicReferenceArray<>(sampleCount); } /** * 獲取當(dāng)前時間所在的窗口下標索引 */ private int calculateTimeIdx(long timeMillis) { long timeId = timeMillis / windowLengthInMs; // Calculate current index so we can map the timestamp to the leap array. return (int)(timeId % array.length()); } /** * 獲取當(dāng)前時間所在的窗口開始時間 */ private long calculateWindowStart(long timeMillis) { return timeMillis - timeMillis % windowLengthInMs; } private HystrixEntity newEmptyWindowValue(long timeMillis){ return new HystrixEntity(); } /** * 獲取當(dāng)前窗口 */ public HystrixWindow currentWindow() { return currentWindow(System.currentTimeMillis()); } private HystrixWindow currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } int idx = calculateTimeIdx(timeMillis); // Calculate current bucket start time. long windowStart = calculateWindowStart(timeMillis); while (true) { HystrixWindow old = array.get(idx); if (old == null) { // 如果獲取為空,說明窗口還沒創(chuàng)建,所以我們創(chuàng)建一個新的窗口(CAS保證線程安全) HystrixWindow window = new HystrixWindow(windowLengthInMs, windowStart, newEmptyWindowValue(timeMillis)); if (array.compareAndSet(idx, null, window)) { // 創(chuàng)建成功則返回當(dāng)前窗口 return window; } else { // 創(chuàng)建失敗說明發(fā)生了競爭,所以暫時先讓出CPU Thread.yield(); } } else if (windowStart == old.getWindowStartInMs()) { // 如果窗口已經(jīng)存在,則對比窗口的開始時間是否相同,相同說明是用同一個窗口,直接返回窗口就可以了 return old; } else if (windowStart > old.getWindowStartInMs()) { // 如果窗口已經(jīng)存在,而且窗口開始時間比之前的窗口開始時間要大 // 說明原來的窗口已經(jīng)過時了,需要替換一個新的窗口 // 所以加鎖防止競爭 if (updateLock.tryLock()) { try { // 這里我選擇直接重置之前的窗口() return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart < old.getWindowStartInMs()) { // 窗口的開始時間比之前的窗口開始時間還會小,這種屬于異常情況 // 要是真出現(xiàn)了也只能新建一個窗口返回了 return new HystrixWindow(windowLengthInMs, windowStart, newEmptyWindowValue(timeMillis)); } } } /** * 獲取當(dāng)前窗口內(nèi)的值 */ public HystrixEntity getWindowValue() { return getWindowValue(System.currentTimeMillis()); } public HystrixEntity getWindowValue(long timeMillis) { if (timeMillis < 0) { return null; } int idx = calculateTimeIdx(timeMillis); HystrixWindow bucket = array.get(idx); if (bucket == null || !bucket.isTimeInWindow(timeMillis)) { return null; } return bucket.getHystrixEntity(); } /** * 重置一個窗口 */ private HystrixWindow resetWindowTo(HystrixWindow window, long startTime){ return window.resetTo(startTime); } public List<HystrixEntity> values() { return values(System.currentTimeMillis()); } private List<HystrixEntity> values(long timeMillis) { if (timeMillis < 0) { return new ArrayList<HystrixEntity>(); } int size = array.length(); List<HystrixEntity> result = new ArrayList<HystrixEntity>(size); for (int i = 0; i < size; i++) { HystrixWindow window = array.get(i); if (window == null || isWindowDeprecated(timeMillis, window)) { continue; } result.add(window.getHystrixEntity()); } return result; } /** * 判斷窗口是否有效 */ public boolean isWindowDeprecated(long time, HystrixWindow window) { return time - window.getWindowStartInMs() > intervalInMs; } }
使用示例:
// 初始化 代表監(jiān)控1s內(nèi)的指標 窗口數(shù)為2 HystrixWindowArray hystrixWindowArray = new HystrixWindowArray(2, 1000); // 指標監(jiān)控 數(shù)量+1 windowArray.currentWindow().getHystrixEntity().addErrorCount(); windowArray.currentWindow().getHystrixEntity().addRequestCount(); // 獲取所有窗口的指標累計 判斷是否超標,也就是1s內(nèi)的總計 List<HystrixEntity> windowValues = windowArray.values(); Integer errorCount = hystrixEntities.stream().map(HystrixEntity::getErrorCountValue).reduce(Integer::sum).get(); Integer requestCount = hystrixEntities.stream().map(HystrixEntity::getRequestCountValue).reduce(Integer::sum).get();
令牌桶算法
思想: 固定時間內(nèi)(例如 1 秒)通過一個桶來存儲令牌,每當(dāng)接收到一個請求時就會消耗一個令牌。如果請求過來時沒有令牌,則無法繼續(xù)處理該請求。在sentinel中被稱為冷啟動
優(yōu)點:
相比于漏桶算法,令牌桶算法具有更好的適應(yīng)性,可以應(yīng)對短時間內(nèi)的流量波動。(漏桶算法只能處理恒定速率的流量)
代碼如下:
public class TokenBucket { private long lastTime; // 上次請求時間 private double rate; // 令牌放入速率 private long capacity; // 令牌桶容量 private long tokens; // 當(dāng)前令牌數(shù)量 public TokenBucket(double rate, long capacity) { this.lastTime = System.currentTimeMillis(); this.rate = rate; this.capacity = capacity; this.tokens = capacity; } public synchronized boolean getToken() { long now = System.currentTimeMillis(); long timeElapsed = now - lastTime; tokens += timeElapsed * rate; if (tokens > capacity) { tokens = capacity; } lastTime = now; if (tokens >= 1) { tokens--; return true; } else { return false; } } }
漏斗桶算法
漏桶算法(Leaky Bucket)是網(wǎng)絡(luò)世界中流量整形(Traffic Shaping)或速率限制(Rate Limiting)時經(jīng)常使用的一種算法,它的主要目的是控制數(shù)據(jù)注入到網(wǎng)絡(luò)的速率,平滑網(wǎng)絡(luò)上的突發(fā)流量。漏桶算法提供了一種機制,通過它,突發(fā)流量可以被整形以便為網(wǎng)絡(luò)提供一個穩(wěn)定的流量。在sentinel中被稱為勻速器
優(yōu)點:
- 控制速率:漏斗桶算法可以限制數(shù)據(jù)流量的傳輸速率,確保各個環(huán)節(jié)的數(shù)據(jù)處理能力都得到了滿足,從而避免了系統(tǒng)因為數(shù)據(jù)過多而導(dǎo)致的崩潰和癱瘓。
- 平滑輸出:漏斗桶算法通過將數(shù)據(jù)分散到不同的時間段內(nèi)進行處理,使得數(shù)據(jù)傳輸?shù)妮敵龈悠椒€(wěn),不會出現(xiàn)明顯的波動,提高了網(wǎng)絡(luò)的穩(wěn)定性。
代碼如下:
public class LeakyBucket { private int capacity; //漏桶容量 private int rate; //漏水速率 private int water; //當(dāng)前水量 private Instant timestamp; //上次漏水時間 public LeakyBucket(int capacity, int rate) { this.capacity = capacity; this.rate = rate; this.water = 0; this.timestamp = Instant.now(); } public synchronized boolean allow() { //判斷是否允許通過 Instant now = Instant.now(); long duration = now.toEpochMilli() - timestamp.toEpochMilli(); //計算距上次漏水過去了多久 int outflow = (int) (duration * rate / 1000); //計算過去的時間內(nèi)漏出的水量 water = Math.max(0, water - outflow); //更新當(dāng)前水量,不能小于0 if (water < capacity) { //如果漏桶還沒滿,放行 water++; timestamp = now; return true; } return false; //否則拒絕通過 } }
到此這篇關(guān)于Java中常見的4種限流算法詳解的文章就介紹到這了,更多相關(guān)Java限流算法內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java Listener監(jiān)聽器使用規(guī)范詳細介紹
監(jiān)聽器是一個專門用于對其他對象身上發(fā)生的事件或狀態(tài)改變進行監(jiān)聽和相應(yīng)處理的對象,當(dāng)被監(jiān)視的對象發(fā)生情況時,立即采取相應(yīng)的行動。監(jiān)聽器其實就是一個實現(xiàn)特定接口的普通java程序,這個程序?qū)iT用于監(jiān)聽另一個java對象的方法調(diào)用或?qū)傩愿淖?/div> 2023-01-01Java詳解如何將excel數(shù)據(jù)轉(zhuǎn)為樹形
在平常的辦公工作中,excel數(shù)據(jù)的操作是最常見的需求,今天就來看一下通過Java如何來實現(xiàn)將excel數(shù)據(jù)轉(zhuǎn)為樹形,感興趣的朋友可以了解下2022-08-08Java中Json字符串直接轉(zhuǎn)換為對象的方法(包括多層List集合)
下面小編就為大家?guī)硪黄狫ava中Json字符串直接轉(zhuǎn)換為對象的方法(包括多層List集合)。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-08-08最新評論