Java 常見的限流算法詳細分析并實現(xiàn)
為什么要限流
在保證可用的情況下盡可能多增加進入的人數(shù),其余的人在排隊等待,或者返回友好提示,保證里面的進行系統(tǒng)的用戶可以正常使用,防止系統(tǒng)雪崩。
限流算法
限流算法很多,常見的有三類,分別是 計數(shù)器算法 、漏桶算法、令牌桶算法 。
(1)計數(shù)器:
??????????在一段時間間隔內(nèi),處理請求的最大數(shù)量固定,超過部分不做處理。
(2)漏桶:
??????????漏桶大小固定,處理速度固定,但請求進入速度不固定(在突發(fā)情況請求過多時,會丟棄過多的請求)。
(3)令牌桶:
??????????令牌桶的大小固定,令牌的產(chǎn)生速度固定,但是消耗令牌(即請求)速度不固定(可以應(yīng)對一些某些時間請求過多的情況);每個請求都會從令牌桶中取出令牌,如果沒有令牌則丟棄該次請求。
計數(shù)器限流
在一段時間間隔內(nèi),處理請求的最大數(shù)量固定,超過部分不做處理。
舉個例子,比如我們規(guī)定對于A接口,我們1分鐘的訪問次數(shù)不能超過100次。
那么我們可以這么做:
在一開 始的時候,我們可以設(shè)置一個計數(shù)器counter,每當一個請求過來的時候,counter就加1,如果counter的值大于100并且該請求與第一個請求的間隔時間還在1分鐘之內(nèi),那么說明請求數(shù)過多,拒絕訪問;
如果該請求與第一個請求的間隔時間大于1分鐘,且counter的值還在限流范圍內(nèi),那么就重置 counter,就是這么簡單粗暴。
![[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-pkGvVRdd-1647523333397)(/upload/2021/09/image-0cbbc44ec76d446896e989962e8122e5.png)]](http://img.jbzj.com/file_images/article/202204/2022040717575537.png)
代碼實現(xiàn):?
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
//計數(shù)器 限流
public class CounterLimiter {
//起始時間
private static long startTime = System.currentTimeMillis();
//時間間隔1000ms
private static long interval = 1000;
//每個時間間隔內(nèi),限制數(shù)量
private static long limit = 3;
//累加器
private static AtomicLong accumulator = new AtomicLong();
/**
* true 代表放行,請求可已通過
* false 代表限制,不讓請求通過
*/
public static boolean tryAcquire() {
long nowTime = System.currentTimeMillis();
//判斷是否在上一個時間間隔內(nèi)
if (nowTime < startTime + interval) {
//如果還在上個時間間隔內(nèi)
long count = accumulator.incrementAndGet();
if (count <= limit) {
return true;
} else {
return false;
}
} else {
//如果不在上一個時間間隔內(nèi)
synchronized (CounterLimiter.class) {
//防止重復(fù)初始化
if (nowTime > startTime + interval) {
startTime = nowTime;
accumulator.set(0);
}
}
//再次進行判斷
long count = accumulator.incrementAndGet();
if (count <= limit) {
return true;
} else {
return false;
}
}
}
// 測試
public static void main(String[] args) {
//線程池,用于多線程模擬測試
ExecutorService pool = Executors.newFixedThreadPool(10);
// 被限制的次數(shù)
AtomicInteger limited = new AtomicInteger(0);
// 線程數(shù)
final int threads = 2;
// 每條線程的執(zhí)行輪數(shù)
final int turns = 20;
// 同步器
CountDownLatch countDownLatch = new CountDownLatch(threads);
long start = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(() ->
{
try {
for (int j = 0; j < turns; j++) {
boolean flag = tryAcquire();
if (!flag) {
// 被限制的次數(shù)累積
limited.getAndIncrement();
}
Thread.sleep(200);
}
} catch (Exception e) {
e.printStackTrace();
}
//等待所有線程結(jié)束
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
float time = (System.currentTimeMillis() - start) / 1000F;
//輸出統(tǒng)計結(jié)果
System.out.println("限制的次數(shù)為:" + limited.get() +
",通過的次數(shù)為:" + (threads * turns - limited.get()));
System.out.println("限制的比例為:" + (float) limited.get() / (float) (threads * turns));
System.out.println("運行的時長為:" + time + "s");
}
}
計數(shù)器限流的不足:?
這個算法雖然簡單,但是存在臨界問題,我們看下圖:
![[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-xUvgpono-1647523333398)(/upload/2021/09/image-d55d813bbcb544479109dd2248e90ff0.png)]](http://img.jbzj.com/file_images/article/202204/2022040717575538.png)
從上圖中我們可以看到,假設(shè)有一個惡意用戶,他在0:59時,瞬間發(fā)送了100個請求,并且1:00又瞬間發(fā)送了100個請求,那么其實這個用戶在 1秒里面,瞬間發(fā)送了200個請求。
我們剛才規(guī)定的是1分鐘最多100個請求(規(guī)劃的吞吐量),也就是每秒鐘最多1.7個請求,用戶通過在時間窗口的重置節(jié)點處突發(fā)請求, 可以瞬間超過我們的速率限制。
用戶有可能通過算法的這個漏洞,瞬間壓垮我們的應(yīng)用。
漏桶限流
漏桶算法限流的基本原理為:水(對應(yīng)請求)從進水口進入到漏桶里,漏桶以一定的速度出水(請求放行),當水流入速度過大,桶內(nèi)的總水量大于桶容量會直接溢出,請求被拒絕。
大致的漏桶限流規(guī)則如下:
(1)進水口(對應(yīng)客戶端請求)以任意速率流入進入漏桶。
(2)漏桶的容量是固定的,出水(放行)速率也是固定的。
(3)漏桶容量是不變的,如果處理速度太慢,桶內(nèi)水量會超出了桶的容量,則后面流入的水滴會溢出,表示請求拒絕。
![[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-ohzhfhmz-1647523333399)(/upload/2021/09/image-afa56bce847f4d8b99e2dfd1eb3509aa.png)]](http://img.jbzj.com/file_images/article/202204/2022040717575539.png)
?漏桶算法其實很簡單,可以粗略的認為就是注水漏水過程,往桶中以任意速率流入水,以一定速率流出水,當水超過桶容量(capacity)則丟棄,因為桶容量是不變的,保證了整體的速率。
以一定速率流出水,
![[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-hFtmsnIH-1647523333399)(/upload/2021/09/image-c7d684d693f74fa5bf4077febb8fccbc.png)]](http://img.jbzj.com/file_images/article/202204/2022040717575540.png)
削峰: 有大量流量進入時,會發(fā)生溢出,從而限流保護服務(wù)可用
緩沖: 不至于直接請求到服務(wù)器, 緩沖壓力
代碼實現(xiàn):?
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
//漏斗限流
public class LeakBucketLimiter {
//桶的大小
private static long capacity = 10;
//流出速率,每秒兩個
private static long rate = 2;
//開始時間
private static long startTime = System.currentTimeMillis();
//桶中剩余的水
private static AtomicLong water = new AtomicLong();
/**
* true 代表放行,請求可已通過
* false 代表限制,不讓請求通過
*/
public synchronized static boolean tryAcquire() {
//如果桶的余量問0,直接放行
if (water.get() == 0) {
startTime = System.currentTimeMillis();
water.set(1);
return true;
}
//計算從當前時間到開始時間流出的水,和現(xiàn)在桶中剩余的水
//桶中剩余的水
water.set(water.get() - (System.currentTimeMillis() - startTime) / 1000 * rate);
//防止出現(xiàn)<0的情況
water.set(Math.max(0, water.get()));
//設(shè)置新的開始時間
startTime += (System.currentTimeMillis() - startTime) / 1000 * 1000;
//如果當前水小于容量,表示可以放行
if (water.get() < capacity) {
water.incrementAndGet();
return true;
} else {
return false;
}
}
// 測試
public static void main(String[] args) {
//線程池,用于多線程模擬測試
ExecutorService pool = Executors.newFixedThreadPool(10);
// 被限制的次數(shù)
AtomicInteger limited = new AtomicInteger(0);
// 線程數(shù)
final int threads = 2;
// 每條線程的執(zhí)行輪數(shù)
final int turns = 20;
// 同步器
CountDownLatch countDownLatch = new CountDownLatch(threads);
long start = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(() ->
{
try {
for (int j = 0; j < turns; j++) {
boolean flag = tryAcquire();
if (!flag) {
// 被限制的次數(shù)累積
limited.getAndIncrement();
}
Thread.sleep(200);
}
} catch (Exception e) {
e.printStackTrace();
}
//等待所有線程結(jié)束
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
float time = (System.currentTimeMillis() - start) / 1000F;
//輸出統(tǒng)計結(jié)果
System.out.println("限制的次數(shù)為:" + limited.get() +
",通過的次數(shù)為:" + (threads * turns - limited.get()));
System.out.println("限制的比例為:" + (float) limited.get() / (float) (threads * turns));
System.out.println("運行的時長為:" + time + "s");
}
}
漏桶的不足:?
漏桶的出水速度固定,也就是請求放行速度是固定的。
漏桶出口的速度固定,不能靈活的應(yīng)對后端能力提升。比如,通過動態(tài)擴容,后端流量從1000QPS提升到1WQPS,漏桶沒有辦法。
令牌桶限流
令牌桶算法中新請求到來時會從桶里拿走一個令牌,如果桶內(nèi)沒有令牌可拿,就拒絕服務(wù)。 當然,令牌的數(shù)量也是有上限的。令牌的數(shù)量與時間和發(fā)放速率強相關(guān),時間流逝的時間越長,會不斷往桶里加入越多的令牌,如果令牌發(fā)放的速度比申請速度快,令牌桶會放滿令牌,直到令牌占滿整個令牌桶。
令牌桶限流大致的規(guī)則如下:
(1)進水口按照某個速度,向桶中放入令牌。
(2)令牌的容量是固定的,但是放行的速度不是固定的,只要桶中還有剩余令牌,一旦請求過來就能申請成功,然后放行。
(3)如果令牌的發(fā)放速度,慢于請求到來速度,桶內(nèi)就無牌可領(lǐng),請求就會被拒絕。
總之,令牌的發(fā)送速率可以設(shè)置,從而可以對突發(fā)的出口流量進行有效的應(yīng)對。
![[外鏈圖片轉(zhuǎn)存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-OlS9VCnB-1647523333401)(/upload/2021/09/image-f5b402eae3b2409c8939b34d8be41f9c.png)]](http://img.jbzj.com/file_images/article/202204/2022040717575641.png)
代碼實現(xiàn):?
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
//令牌桶
public class TokenBucketLimiter {
//桶的容量
private static long capacity = 10;
//放入令牌的速率,每秒2個
private static long rate = 2;
//上次放置令牌的時間
private static long lastTime = System.currentTimeMillis();
//桶中令牌的余量
private static AtomicLong tokenNum = new AtomicLong();
/**
* true 代表放行,請求可已通過
* false 代表限制,不讓請求通過
*/
public synchronized static boolean tryAcquire() {
//更新桶中剩余令牌的數(shù)量
long now = System.currentTimeMillis();
tokenNum.addAndGet((now - lastTime) / 1000 * rate);
tokenNum.set(Math.min(capacity, tokenNum.get()));
//更新時間
lastTime += (now - lastTime) / 1000 * 1000;
//桶中還有令牌就放行
if (tokenNum.get() > 0) {
tokenNum.decrementAndGet();
return true;
} else {
return false;
}
}
//測試
public static void main(String[] args) {
//線程池,用于多線程模擬測試
ExecutorService pool = Executors.newFixedThreadPool(10);
// 被限制的次數(shù)
AtomicInteger limited = new AtomicInteger(0);
// 線程數(shù)
final int threads = 2;
// 每條線程的執(zhí)行輪數(shù)
final int turns = 20;
// 同步器
CountDownLatch countDownLatch = new CountDownLatch(threads);
long start = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(() ->
{
try {
for (int j = 0; j < turns; j++) {
boolean flag = tryAcquire();
if (!flag) {
// 被限制的次數(shù)累積
limited.getAndIncrement();
}
Thread.sleep(200);
}
} catch (Exception e) {
e.printStackTrace();
}
//等待所有線程結(jié)束
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
float time = (System.currentTimeMillis() - start) / 1000F;
//輸出統(tǒng)計結(jié)果
System.out.println("限制的次數(shù)為:" + limited.get() +
",通過的次數(shù)為:" + (threads * turns - limited.get()));
System.out.println("限制的比例為:" + (float) limited.get() / (float) (threads * turns));
System.out.println("運行的時長為:" + time + "s");
}
}
令牌桶的好處:?
令牌桶的好處之一就是可以方便地應(yīng)對 突發(fā)出口流量(后端能力的提升)。
比如,可以改變令牌的發(fā)放速度,算法能按照新的發(fā)送速率調(diào)大令牌的發(fā)放數(shù)量,使得出口突發(fā)流量能被處理。
到此這篇關(guān)于Java 常見的限流算法詳細分析并實現(xiàn)的文章就介紹到這了,更多相關(guān)Java 限流算法內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
jackson 如何將實體轉(zhuǎn)json json字符串轉(zhuǎn)實體
這篇文章主要介紹了jackson 實現(xiàn)將實體轉(zhuǎn)json json字符串轉(zhuǎn)實體,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-10-10
SpringCloud Feign Jackson自定義配置方式
這篇文章主要介紹了SpringCloud Feign Jackson自定義配置方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-03-03

