Java實(shí)現(xiàn)系統(tǒng)限流的示例代碼
限流是保障系統(tǒng)高可用的方式之一,也是大廠高頻面試題,如果面試官問一句,“如何實(shí)現(xiàn)每秒鐘1000個(gè)請(qǐng)求的限流?”,你要是分分鐘給他寫上幾種限流方案,那豈不香哉,哈哈!話不多說,我來列幾種常用限流實(shí)現(xiàn)方式。
1、Guava RateLimiter
Guava是Java領(lǐng)域很優(yōu)秀的開源項(xiàng)目,包含了日常開發(fā)常用的集合、String、緩存等, 其中RateLimiter是常用限流工具。
RateLimiter是基于令牌桶算法實(shí)現(xiàn)的,如果每秒10個(gè)令牌,內(nèi)部實(shí)現(xiàn),會(huì)每100ms生產(chǎn)1個(gè)令牌。
使用Guava RateLimiter,如下:
1.引入pom依賴:
<dependency> ??<groupId>com.google.guava</groupId> ??<artifactId>guava</artifactId> ??<version>23.0</version> </dependency>
2.代碼:
public?class?GuavaRateLimiterTest?{ ????//比如每秒生產(chǎn)10個(gè)令牌,相當(dāng)于每100ms生產(chǎn)1個(gè)令牌 ????private?RateLimiter?rateLimiter?=?RateLimiter.create(10); ????/** ?????*?模擬執(zhí)行業(yè)務(wù)方法 ?????*/ ????public?void?exeBiz()?{ ????????if?(rateLimiter.tryAcquire(1))?{ ????????????try?{ ????????????????Thread.sleep(500); ????????????}?catch?(InterruptedException?e)?{ ????????????????e.printStackTrace(); ????????????} ????????????System.out.println("線程"?+?Thread.currentThread().getName()?+?":執(zhí)行業(yè)務(wù)邏輯"); ????????}?else?{ ????????????System.out.println("線程"?+?Thread.currentThread().getName()?+?":被限流"); ????????} ????} ????public?static?void?main(String[]?args)?throws?InterruptedException?{ ????????GuavaRateLimiterTest?limiterTest?=?new?GuavaRateLimiterTest(); ????????Thread.sleep(500);//等待500ms,讓limiter生產(chǎn)一些令牌 ????????//模擬瞬間生產(chǎn)100個(gè)線程請(qǐng)求 ????????for?(int?i?=?0;?i?<?100;?i++)?{ ????????????new?Thread(limiterTest::exeBiz).start(); ????????} ????} }
2、滑窗計(jì)數(shù)
打個(gè)比方,某接口每秒允許100個(gè)請(qǐng)求,設(shè)置一個(gè)滑窗,窗口中有10個(gè)格子,每個(gè)格子占100ms,每100ms移動(dòng)一次?;瑒?dòng)窗口的格子劃分的越多,滑動(dòng)窗口的滾動(dòng)就越平滑,限流的統(tǒng)計(jì)就會(huì)越精確。
代碼如下:
/** ?*?滑窗計(jì)數(shù)器 ?*/ public?class?SliderWindowRateLimiter?implements?Runnable?{ ????//每秒允許的最大訪問數(shù) ????private?final?long?maxVisitPerSecond; ????//將每秒時(shí)間劃分N個(gè)塊 ????private?final?int?block; ????//每個(gè)塊存儲(chǔ)的數(shù)量 ????private?final?AtomicLong[]?countPerBlock; ????//滑動(dòng)窗口劃到了哪個(gè)塊兒,可以理解為滑動(dòng)窗口的起始下標(biāo)位置 ????private?volatile?int?index; ????//目前總的數(shù)量 ????private?AtomicLong?allCount; ????/** ?????*?構(gòu)造函數(shù) ?????* ?????*?@param?block,每秒鐘劃分N個(gè)窗口 ?????*?@param?maxVisitPerSecond?每秒最大訪問數(shù)量 ?????*/ ????public?SliderWindowRateLimiter(int?block,?long?maxVisitPerSecond)?{ ????????this.block?=?block; ????????this.maxVisitPerSecond?=?maxVisitPerSecond; ????????countPerBlock?=?new?AtomicLong[block]; ????????for?(int?i?=?0;?i?<?block;?i++)?{ ????????????countPerBlock[i]?=?new?AtomicLong(); ????????} ????????allCount?=?new?AtomicLong(0); ????} ????/** ?????*?判斷是否超過最大允許數(shù)量 ?????* ?????*?@return ?????*/ ????public?boolean?isOverLimit()?{ ????????return?currentQPS()?>?maxVisitPerSecond; ????} ????/** ?????*?獲取目前總的訪問數(shù) ?????* ?????*?@return ?????*/ ????public?long?currentQPS()?{ ????????return?allCount.get(); ????} ????/** ?????*?請(qǐng)求訪問進(jìn)來,判斷是否可以執(zhí)行業(yè)務(wù)邏輯 ?????*/ ????public?void?visit()?{ ????????countPerBlock[index].incrementAndGet(); ????????allCount.incrementAndGet(); ????????if?(isOverLimit())?{ ????????????System.out.println(Thread.currentThread().getName()?+?"被限流"?+?",currentQPS:"?+?currentQPS()?+?",index:"?+?index); ????????}?else?{ ????????????System.out.println(Thread.currentThread().getName()?+?"執(zhí)行業(yè)務(wù)邏輯"?+?",currentQPS:"?+?currentQPS()?+?",index:"?+?index); ????????} ????} ????/** ?????*?定時(shí)執(zhí)行器, ?????*?每N毫秒滑塊移動(dòng)一次,然后再設(shè)置下新滑塊的初始化數(shù)字0,然后新的請(qǐng)求會(huì)落到新的滑塊上 ?????*?同時(shí)總數(shù)減掉新滑塊上的數(shù)字,并且重置新的滑塊上的數(shù)量 ?????*/ ????@Override ????public?void?run()?{ ????????index?=?(index?+?1)?%?block; ????????long?val?=?countPerBlock[index].getAndSet(0); ????????allCount.addAndGet(-val); ????} ????public?static?void?main(String[]?args)?{ ????????SliderWindowRateLimiter?sliderWindowRateLimiter?=?new?SliderWindowRateLimiter(10,?100); ????????//固定的速率移動(dòng)滑塊 ????????ScheduledExecutorService?scheduledExecutorService?=?Executors.newSingleThreadScheduledExecutor(); ????????scheduledExecutorService.scheduleAtFixedRate(sliderWindowRateLimiter,?100,?100,?TimeUnit.MILLISECONDS); ????????//模擬不同速度的請(qǐng)求 ????????new?Thread(()?->?{ ????????????while?(true)?{ ????????????????sliderWindowRateLimiter.visit(); ????????????????try?{ ????????????????????Thread.sleep(10); ????????????????}?catch?(InterruptedException?e)?{ ????????????????????e.printStackTrace(); ????????????????} ????????????} ????????}).start(); ????????//模擬不同速度的請(qǐng)求 ????????new?Thread(()?->?{ ????????????while?(true)?{ ????????????????sliderWindowRateLimiter.visit(); ????????????????try?{ ????????????????????Thread.sleep(50); ????????????????}?catch?(InterruptedException?e)?{ ????????????????????e.printStackTrace(); ????????????????} ????????????} ????????}).start(); ????} }
3、信號(hào)量
利用Semaphore,每隔固定速率,釋放Semaphore的資源。線程獲取到資源,則執(zhí)行業(yè)務(wù)代碼。
代碼如下:
public?class?SemaphoreOne?{ ????private?static?Semaphore?semaphore?=?new?Semaphore(10); ????public?static?void?bizMethod()?throws?InterruptedException?{ ????????if?(!semaphore.tryAcquire())?{ ????????????System.out.println(Thread.currentThread().getName()?+?"被拒絕"); ????????????return; ????????} ????????System.out.println(Thread.currentThread().getName()?+?"執(zhí)行業(yè)務(wù)邏輯"); ????????Thread.sleep(500);//模擬處理業(yè)務(wù)邏輯需要1秒 ????????semaphore.release(); ????} ????public?static?void?main(String[]?args)?{ ????????Timer?timer?=?new?Timer(); ????????timer.scheduleAtFixedRate(new?TimerTask()?{ ????????????@Override ????????????public?void?run()?{ ????????????????semaphore.release(10); ????????????????System.out.println("釋放所有鎖"); ????????????} ????????},?1000,?1000); ????????for?(int?i?=?0;?i?<?10000;?i++)?{ ????????????try?{ ????????????????Thread.sleep(10);//模擬每隔10ms就有1個(gè)請(qǐng)求進(jìn)來 ????????????}?catch?(InterruptedException?e)?{ ????????????????e.printStackTrace(); ????????????} ????????????new?Thread(()?->?{ ????????????????try?{ ????????????????????SemaphoreOne.bizMethod(); ????????????????}?catch?(InterruptedException?e)?{ ????????????????????e.printStackTrace(); ????????????????} ????????????}).start(); ????????} ????} }
4、令牌桶
令牌桶算法:一個(gè)存放固定容量令牌的桶,按照固定速率往桶里添加令牌,如有剩余容量則添加,沒有則放棄。如果有請(qǐng)求進(jìn)來,則需要先從桶里獲取令牌,當(dāng)桶里沒有令牌可取時(shí),則拒絕任務(wù)。
令牌桶的優(yōu)點(diǎn)是:可以改變添加令牌的速率,一旦提高速率,則可以處理突發(fā)流量。
代碼如下:
public?class?TokenBucket?{ ????/** ?????*?定義的桶 ?????*/ ????public?class?Bucket?{ ????????//容量 ????????int?capacity; ????????//速率,每秒放多少 ????????int?rateCount; ????????//目前token個(gè)數(shù) ????????AtomicInteger?curCount?=?new?AtomicInteger(0); ????????public?Bucket(int?capacity,?int?rateCount)?{ ????????????this.capacity?=?capacity; ????????????this.rateCount?=?rateCount; ????????} ????????public?void?put()?{ ????????????if?(curCount.get()?<?capacity)?{ ????????????????System.out.println("目前數(shù)量=="?+?curCount.get()?+?",?我還可以繼續(xù)放"); ????????????????curCount.addAndGet(rateCount); ????????????} ????????} ????????public?boolean?get()?{ ????????????if?(curCount.get()?>=?1)?{ ????????????????curCount.decrementAndGet(); ????????????????return?true; ????????????} ????????????return?false; ????????} ????} ????@Test ????public?void?testTokenBucket()?throws?InterruptedException?{ ????????Bucket?bucket?=?new?Bucket(5,?2); ????????//固定線程,固定的速率往桶里放數(shù)據(jù),比如每秒N個(gè) ????????ScheduledThreadPoolExecutor?scheduledCheck?=?new?ScheduledThreadPoolExecutor(1); ????????scheduledCheck.scheduleAtFixedRate(()?->?{ ????????????bucket.put(); ????????},?0,?1,?TimeUnit.SECONDS); ????????//先等待一會(huì)兒,讓桶里放點(diǎn)token ????????Thread.sleep(6000); ????????//模擬瞬間10個(gè)線程進(jìn)來拿token ????????for?(int?i?=?0;?i?<?10;?i++)?{ ????????????new?Thread(()?->?{ ????????????????if?(bucket.get())?{ ????????????????????System.out.println(Thread.currentThread()?+?"獲取到了資源"); ????????????????}?else?{ ????????????????????System.out.println(Thread.currentThread()?+?"被拒絕"); ????????????????} ????????????}).start(); ????????} ????????//等待,往桶里放token ????????Thread.sleep(3000); ????????//繼續(xù)瞬間10個(gè)線程進(jìn)來拿token ????????for?(int?i?=?0;?i?<?10;?i++)?{ ????????????new?Thread(()?->?{ ????????????????if?(bucket.get())?{ ????????????????????System.out.println(Thread.currentThread()?+?"獲取到了資源"); ????????????????}?else?{ ????????????????????System.out.println(Thread.currentThread()?+?"被拒絕"); ????????????????} ????????????}).start(); ????????} ????} }
5、總結(jié)
本文主要介紹了幾種限流方法:Guava RateLimiter、簡單計(jì)數(shù)、滑窗計(jì)數(shù)、信號(hào)量、令牌桶,當(dāng)然,限流算法還有漏桶算法、nginx限流等等。我所寫的這些方法只是個(gè)人在實(shí)際項(xiàng)目總使用過的,或者是早年參加阿里筆試時(shí)寫過的方式。
以上就是Java實(shí)現(xiàn)系統(tǒng)限流的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于Java系統(tǒng)限流的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
解決fastjson泛型轉(zhuǎn)換報(bào)錯(cuò)的解決方法
這篇文章主要介紹了解決fastjson泛型轉(zhuǎn)換報(bào)錯(cuò)的解決方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11解決idea導(dǎo)入ssm項(xiàng)目啟動(dòng)tomcat報(bào)錯(cuò)404的問題
今天小編就為大家分享一篇解決idea導(dǎo)入ssm項(xiàng)目啟動(dòng)tomcat報(bào)錯(cuò)404的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-08-08詳解在idea 中使用Mybatis Generator逆向工程生成代碼
這篇文章主要介紹了在idea 中使用Mybatis Generator逆向工程生成代碼,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-12-12Java性能優(yōu)化之關(guān)于大對(duì)象復(fù)用的目標(biāo)和注意點(diǎn)
這篇文章主要介紹了Java性能優(yōu)化之關(guān)于大對(duì)象復(fù)用的目標(biāo)和注意點(diǎn),具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-03-03如何對(duì)quartz定時(shí)任務(wù)設(shè)置結(jié)束時(shí)間
這篇文章主要介紹了如何對(duì)quartz定時(shí)任務(wù)設(shè)置結(jié)束時(shí)間問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12Java如何實(shí)現(xiàn)壓縮文件與解壓縮zip文件
這篇文章主要介紹了Java如何實(shí)現(xiàn)壓縮文件與解壓縮zip文件問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-12-12SpringBoot上傳臨時(shí)文件被刪除引起報(bào)錯(cuò)的解決
這篇文章主要介紹了SpringBoot上傳臨時(shí)文件被刪除引起報(bào)錯(cuò)的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-11-11