欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Java實(shí)現(xiàn)系統(tǒng)限流的示例代碼

 更新時(shí)間:2023年09月04日 10:23:42   作者:不焦躁的程序員  
限流是保障系統(tǒng)高可用的方式之一,也是大廠高頻面試題,它在微服務(wù)系統(tǒng)中,緩存、限流、熔斷是保證系統(tǒng)高可用的三板斧,所以本文我們就來聊聊如何實(shí)現(xiàn)系統(tǒng)限流吧

限流是保障系統(tǒng)高可用的方式之一,也是大廠高頻面試題,如果面試官問一句,“如何實(shí)現(xiàn)每秒鐘1000個(gè)請(qǐng)求的限流?”,你要是分分鐘給他寫上幾種限流方案,那豈不香哉,哈哈!話不多說,我來列幾種常用限流實(shí)現(xiàn)方式。

1、Guava RateLimiter

Guava是Java領(lǐng)域很優(yōu)秀的開源項(xiàng)目,包含了日常開發(fā)常用的集合、String、緩存等, 其中RateLimiter是常用限流工具。

RateLimiter是基于令牌桶算法實(shí)現(xiàn)的,如果每秒10個(gè)令牌,內(nèi)部實(shí)現(xiàn),會(huì)每100ms生產(chǎn)1個(gè)令牌。

使用Guava RateLimiter,如下:

1.引入pom依賴:

<dependency>
??<groupId>com.google.guava</groupId>
??<artifactId>guava</artifactId>
??<version>23.0</version>
</dependency>

2.代碼:

public?class?GuavaRateLimiterTest?{
????//比如每秒生產(chǎn)10個(gè)令牌,相當(dāng)于每100ms生產(chǎn)1個(gè)令牌
????private?RateLimiter?rateLimiter?=?RateLimiter.create(10);
????/**
?????*?模擬執(zhí)行業(yè)務(wù)方法
?????*/
????public?void?exeBiz()?{
????????if?(rateLimiter.tryAcquire(1))?{
????????????try?{
????????????????Thread.sleep(500);
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????????System.out.println("線程"?+?Thread.currentThread().getName()?+?":執(zhí)行業(yè)務(wù)邏輯");
????????}?else?{
????????????System.out.println("線程"?+?Thread.currentThread().getName()?+?":被限流");
????????}
????}
????public?static?void?main(String[]?args)?throws?InterruptedException?{
????????GuavaRateLimiterTest?limiterTest?=?new?GuavaRateLimiterTest();
????????Thread.sleep(500);//等待500ms,讓limiter生產(chǎn)一些令牌
????????//模擬瞬間生產(chǎn)100個(gè)線程請(qǐng)求
????????for?(int?i?=?0;?i?<?100;?i++)?{
????????????new?Thread(limiterTest::exeBiz).start();
????????}
????}
}

2、滑窗計(jì)數(shù)

打個(gè)比方,某接口每秒允許100個(gè)請(qǐng)求,設(shè)置一個(gè)滑窗,窗口中有10個(gè)格子,每個(gè)格子占100ms,每100ms移動(dòng)一次?;瑒?dòng)窗口的格子劃分的越多,滑動(dòng)窗口的滾動(dòng)就越平滑,限流的統(tǒng)計(jì)就會(huì)越精確。

代碼如下:

/**
?*?滑窗計(jì)數(shù)器
?*/
public?class?SliderWindowRateLimiter?implements?Runnable?{
????//每秒允許的最大訪問數(shù)
????private?final?long?maxVisitPerSecond;
????//將每秒時(shí)間劃分N個(gè)塊
????private?final?int?block;
????//每個(gè)塊存儲(chǔ)的數(shù)量
????private?final?AtomicLong[]?countPerBlock;
????//滑動(dòng)窗口劃到了哪個(gè)塊兒,可以理解為滑動(dòng)窗口的起始下標(biāo)位置
????private?volatile?int?index;
????//目前總的數(shù)量
????private?AtomicLong?allCount;
????/**
?????*?構(gòu)造函數(shù)
?????*
?????*?@param?block,每秒鐘劃分N個(gè)窗口
?????*?@param?maxVisitPerSecond?每秒最大訪問數(shù)量
?????*/
????public?SliderWindowRateLimiter(int?block,?long?maxVisitPerSecond)?{
????????this.block?=?block;
????????this.maxVisitPerSecond?=?maxVisitPerSecond;
????????countPerBlock?=?new?AtomicLong[block];
????????for?(int?i?=?0;?i?<?block;?i++)?{
????????????countPerBlock[i]?=?new?AtomicLong();
????????}
????????allCount?=?new?AtomicLong(0);
????}
????/**
?????*?判斷是否超過最大允許數(shù)量
?????*
?????*?@return
?????*/
????public?boolean?isOverLimit()?{
????????return?currentQPS()?>?maxVisitPerSecond;
????}
????/**
?????*?獲取目前總的訪問數(shù)
?????*
?????*?@return
?????*/
????public?long?currentQPS()?{
????????return?allCount.get();
????}
????/**
?????*?請(qǐng)求訪問進(jìn)來,判斷是否可以執(zhí)行業(yè)務(wù)邏輯
?????*/
????public?void?visit()?{
????????countPerBlock[index].incrementAndGet();
????????allCount.incrementAndGet();
????????if?(isOverLimit())?{
????????????System.out.println(Thread.currentThread().getName()?+?"被限流"?+?",currentQPS:"?+?currentQPS()?+?",index:"?+?index);
????????}?else?{
????????????System.out.println(Thread.currentThread().getName()?+?"執(zhí)行業(yè)務(wù)邏輯"?+?",currentQPS:"?+?currentQPS()?+?",index:"?+?index);
????????}
????}
????/**
?????*?定時(shí)執(zhí)行器,
?????*?每N毫秒滑塊移動(dòng)一次,然后再設(shè)置下新滑塊的初始化數(shù)字0,然后新的請(qǐng)求會(huì)落到新的滑塊上
?????*?同時(shí)總數(shù)減掉新滑塊上的數(shù)字,并且重置新的滑塊上的數(shù)量
?????*/
????@Override
????public?void?run()?{
????????index?=?(index?+?1)?%?block;
????????long?val?=?countPerBlock[index].getAndSet(0);
????????allCount.addAndGet(-val);
????}
????public?static?void?main(String[]?args)?{
????????SliderWindowRateLimiter?sliderWindowRateLimiter?=?new?SliderWindowRateLimiter(10,?100);
????????//固定的速率移動(dòng)滑塊
????????ScheduledExecutorService?scheduledExecutorService?=?Executors.newSingleThreadScheduledExecutor();
????????scheduledExecutorService.scheduleAtFixedRate(sliderWindowRateLimiter,?100,?100,?TimeUnit.MILLISECONDS);
????????//模擬不同速度的請(qǐng)求
????????new?Thread(()?->?{
????????????while?(true)?{
????????????????sliderWindowRateLimiter.visit();
????????????????try?{
????????????????????Thread.sleep(10);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????}).start();
????????//模擬不同速度的請(qǐng)求
????????new?Thread(()?->?{
????????????while?(true)?{
????????????????sliderWindowRateLimiter.visit();
????????????????try?{
????????????????????Thread.sleep(50);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????}).start();
????}
}

3、信號(hào)量

利用Semaphore,每隔固定速率,釋放Semaphore的資源。線程獲取到資源,則執(zhí)行業(yè)務(wù)代碼。

代碼如下:

public?class?SemaphoreOne?{
????private?static?Semaphore?semaphore?=?new?Semaphore(10);
????public?static?void?bizMethod()?throws?InterruptedException?{
????????if?(!semaphore.tryAcquire())?{
????????????System.out.println(Thread.currentThread().getName()?+?"被拒絕");
????????????return;
????????}
????????System.out.println(Thread.currentThread().getName()?+?"執(zhí)行業(yè)務(wù)邏輯");
????????Thread.sleep(500);//模擬處理業(yè)務(wù)邏輯需要1秒
????????semaphore.release();
????}
????public?static?void?main(String[]?args)?{
????????Timer?timer?=?new?Timer();
????????timer.scheduleAtFixedRate(new?TimerTask()?{
????????????@Override
????????????public?void?run()?{
????????????????semaphore.release(10);
????????????????System.out.println("釋放所有鎖");
????????????}
????????},?1000,?1000);
????????for?(int?i?=?0;?i?<?10000;?i++)?{
????????????try?{
????????????????Thread.sleep(10);//模擬每隔10ms就有1個(gè)請(qǐng)求進(jìn)來
????????????}?catch?(InterruptedException?e)?{
????????????????e.printStackTrace();
????????????}
????????????new?Thread(()?->?{
????????????????try?{
????????????????????SemaphoreOne.bizMethod();
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}).start();
????????}
????}
}

4、令牌桶

令牌桶算法:一個(gè)存放固定容量令牌的桶,按照固定速率往桶里添加令牌,如有剩余容量則添加,沒有則放棄。如果有請(qǐng)求進(jìn)來,則需要先從桶里獲取令牌,當(dāng)桶里沒有令牌可取時(shí),則拒絕任務(wù)。

令牌桶的優(yōu)點(diǎn)是:可以改變添加令牌的速率,一旦提高速率,則可以處理突發(fā)流量。

代碼如下:

public?class?TokenBucket?{
????/**
?????*?定義的桶
?????*/
????public?class?Bucket?{
????????//容量
????????int?capacity;
????????//速率,每秒放多少
????????int?rateCount;
????????//目前token個(gè)數(shù)
????????AtomicInteger?curCount?=?new?AtomicInteger(0);
????????public?Bucket(int?capacity,?int?rateCount)?{
????????????this.capacity?=?capacity;
????????????this.rateCount?=?rateCount;
????????}
????????public?void?put()?{
????????????if?(curCount.get()?<?capacity)?{
????????????????System.out.println("目前數(shù)量=="?+?curCount.get()?+?",?我還可以繼續(xù)放");
????????????????curCount.addAndGet(rateCount);
????????????}
????????}
????????public?boolean?get()?{
????????????if?(curCount.get()?>=?1)?{
????????????????curCount.decrementAndGet();
????????????????return?true;
????????????}
????????????return?false;
????????}
????}
????@Test
????public?void?testTokenBucket()?throws?InterruptedException?{
????????Bucket?bucket?=?new?Bucket(5,?2);
????????//固定線程,固定的速率往桶里放數(shù)據(jù),比如每秒N個(gè)
????????ScheduledThreadPoolExecutor?scheduledCheck?=?new?ScheduledThreadPoolExecutor(1);
????????scheduledCheck.scheduleAtFixedRate(()?->?{
????????????bucket.put();
????????},?0,?1,?TimeUnit.SECONDS);
????????//先等待一會(huì)兒,讓桶里放點(diǎn)token
????????Thread.sleep(6000);
????????//模擬瞬間10個(gè)線程進(jìn)來拿token
????????for?(int?i?=?0;?i?<?10;?i++)?{
????????????new?Thread(()?->?{
????????????????if?(bucket.get())?{
????????????????????System.out.println(Thread.currentThread()?+?"獲取到了資源");
????????????????}?else?{
????????????????????System.out.println(Thread.currentThread()?+?"被拒絕");
????????????????}
????????????}).start();
????????}
????????//等待,往桶里放token
????????Thread.sleep(3000);
????????//繼續(xù)瞬間10個(gè)線程進(jìn)來拿token
????????for?(int?i?=?0;?i?<?10;?i++)?{
????????????new?Thread(()?->?{
????????????????if?(bucket.get())?{
????????????????????System.out.println(Thread.currentThread()?+?"獲取到了資源");
????????????????}?else?{
????????????????????System.out.println(Thread.currentThread()?+?"被拒絕");
????????????????}
????????????}).start();
????????}
????}
}

5、總結(jié)

本文主要介紹了幾種限流方法:Guava RateLimiter、簡單計(jì)數(shù)、滑窗計(jì)數(shù)、信號(hào)量、令牌桶,當(dāng)然,限流算法還有漏桶算法、nginx限流等等。我所寫的這些方法只是個(gè)人在實(shí)際項(xiàng)目總使用過的,或者是早年參加阿里筆試時(shí)寫過的方式。

以上就是Java實(shí)現(xiàn)系統(tǒng)限流的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于Java系統(tǒng)限流的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

最新評(píng)論