詳解go-zero如何實(shí)現(xiàn)令牌桶限流
上一篇文章介紹了 如何實(shí)現(xiàn)計(jì)數(shù)器限流。主要有兩種實(shí)現(xiàn)方式,分別是固定窗口和滑動(dòng)窗口,并且分析了 go-zero 采用固定窗口方式實(shí)現(xiàn)的源碼。
但是采用固定窗口實(shí)現(xiàn)的限流器會(huì)有兩個(gè)問題:
- 會(huì)出現(xiàn)請(qǐng)求量超出限制值兩倍的情況
- 無法很好處理流量突增問題
這篇文章來介紹一下令牌桶算法,可以很好解決以上兩個(gè)問題。
工作原理
算法概念如下:
- 令牌以固定速率生成;
- 生成的令牌放入令牌桶中存放,如果令牌桶滿了則多余的令牌會(huì)直接丟棄,當(dāng)請(qǐng)求到達(dá)時(shí),會(huì)嘗試從令牌桶中取令牌,取到了令牌的請(qǐng)求可以執(zhí)行;
- 如果桶空了,那么嘗試取令牌的請(qǐng)求會(huì)被直接丟棄。

令牌桶算法既能夠?qū)⑺械恼?qǐng)求平均分布到時(shí)間區(qū)間內(nèi),又能接受服務(wù)器能夠承受范圍內(nèi)的突發(fā)請(qǐng)求,因此是目前使用較為廣泛的一種限流算法。
源碼實(shí)現(xiàn)
源碼分析我們還是以 go-zero 項(xiàng)目為例,首先來看生成令牌的部分,依然是使用 Redis 來實(shí)現(xiàn)。
//?core/limit/tokenlimit.go
//?生成?token?速率
script?=?`local?rate?=?tonumber(ARGV[1])
//?通容量
local?capacity?=?tonumber(ARGV[2])
//?當(dāng)前時(shí)間戳
local?now?=?tonumber(ARGV[3])
//?請(qǐng)求數(shù)量
local?requested?=?tonumber(ARGV[4])
//?需要多少秒才能把桶填滿
local?fill_time?=?capacity/rate
//?向下取整,ttl?為填滿時(shí)間?2?倍
local?ttl?=?math.floor(fill_time*2)
//?當(dāng)前桶剩余容量,如果為?nil,說明第一次使用,賦值為桶最大容量
local?last_tokens?=?tonumber(redis.call("get",?KEYS[1]))
if?last_tokens?==?nil?then
????last_tokens?=?capacity
end
//?上次請(qǐng)求時(shí)間戳,如果為?nil?則賦值?0
local?last_refreshed?=?tonumber(redis.call("get",?KEYS[2]))
if?last_refreshed?==?nil?then
????last_refreshed?=?0
end
//?距離上一次請(qǐng)求的時(shí)間跨度
local?delta?=?math.max(0,?now-last_refreshed)
//?距離上一次請(qǐng)求的時(shí)間跨度能生成的?token?數(shù)量和桶內(nèi)剩余?token?數(shù)量的和
//?與桶容量比較,取二者的小值
local?filled_tokens?=?math.min(capacity,?last_tokens+(delta*rate))
//?判斷請(qǐng)求數(shù)量和桶內(nèi)?token?數(shù)量的大小
local?allowed?=?filled_tokens?>=?requested
//?被請(qǐng)求消耗掉之后,更新剩余?token?數(shù)量
local?new_tokens?=?filled_tokens
if?allowed?then
????new_tokens?=?filled_tokens?-?requested
end
//?更新?redis?token
redis.call("setex",?KEYS[1],?ttl,?new_tokens)
//?更新?redis?刷新時(shí)間
redis.call("setex",?KEYS[2],?ttl,?now)
return?allowed`Redis 中主要保存兩個(gè) key,分別是 token 數(shù)量和刷新時(shí)間。
核心思想就是比較兩次請(qǐng)求時(shí)間間隔內(nèi)生成的 token 數(shù)量 + 桶內(nèi)剩余 token 數(shù)量,和請(qǐng)求量之間的大小,如果滿足則允許,否則則不允許。
限流器初始化:
//?A?TokenLimiter?controls?how?frequently?events?are?allowed?to?happen?with?in?one?second.
type?TokenLimiter?struct?{
????//?生成?token?速率
????rate???????????int
????//?桶容量
????burst??????????int
????store??????????*redis.Redis
????//?桶?key
????tokenKey???????string
????//?桶刷新時(shí)間?key
????timestampKey???string
????rescueLock?????sync.Mutex
????//?redis?健康標(biāo)識(shí)
????redisAlive?????uint32
????//?redis?健康監(jiān)控啟動(dòng)狀態(tài)
????monitorStarted?bool
????//?內(nèi)置單機(jī)限流器
????rescueLimiter??*xrate.Limiter
}
//?NewTokenLimiter?returns?a?new?TokenLimiter?that?allows?events?up?to?rate?and?permits
//?bursts?of?at?most?burst?tokens.
func?NewTokenLimiter(rate,?burst?int,?store?*redis.Redis,?key?string)?*TokenLimiter?{
????tokenKey?:=?fmt.Sprintf(tokenFormat,?key)
????timestampKey?:=?fmt.Sprintf(timestampFormat,?key)
????return?&TokenLimiter{
????????rate:??????????rate,
????????burst:?????????burst,
????????store:?????????store,
????????tokenKey:??????tokenKey,
????????timestampKey:??timestampKey,
????????redisAlive:????1,
????????rescueLimiter:?xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)),?burst),
????}
}其中有一個(gè)變量 rescueLimiter,這是一個(gè)進(jìn)程內(nèi)的限流器。如果 Redis 發(fā)生故障了,那么就使用這個(gè),算是一個(gè)保障,盡量避免系統(tǒng)被突發(fā)流量拖垮。

提供了四個(gè)可調(diào)用方法:
//?Allow?is?shorthand?for?AllowN(time.Now(),?1).
func?(lim?*TokenLimiter)?Allow()?bool?{
????return?lim.AllowN(time.Now(),?1)
}
//?AllowCtx?is?shorthand?for?AllowNCtx(ctx,time.Now(),?1)?with?incoming?context.
func?(lim?*TokenLimiter)?AllowCtx(ctx?context.Context)?bool?{
????return?lim.AllowNCtx(ctx,?time.Now(),?1)
}
//?AllowN?reports?whether?n?events?may?happen?at?time?now.
//?Use?this?method?if?you?intend?to?drop?/?skip?events?that?exceed?the?rate.
//?Otherwise,?use?Reserve?or?Wait.
func?(lim?*TokenLimiter)?AllowN(now?time.Time,?n?int)?bool?{
????return?lim.reserveN(context.Background(),?now,?n)
}
//?AllowNCtx?reports?whether?n?events?may?happen?at?time?now?with?incoming?context.
//?Use?this?method?if?you?intend?to?drop?/?skip?events?that?exceed?the?rate.
//?Otherwise,?use?Reserve?or?Wait.
func?(lim?*TokenLimiter)?AllowNCtx(ctx?context.Context,?now?time.Time,?n?int)?bool?{
????return?lim.reserveN(ctx,?now,?n)
}最終調(diào)用的都是 reverveN 方法:
func?(lim?*TokenLimiter)?reserveN(ctx?context.Context,?now?time.Time,?n?int)?bool?{
????//?判斷?Redis?健康狀態(tài),如果?Redis?故障,則使用進(jìn)程內(nèi)限流器
????if?atomic.LoadUint32(&lim.redisAlive)?==?0?{
????????return?lim.rescueLimiter.AllowN(now,?n)
????}
????//?執(zhí)行限流腳本
????resp,?err?:=?lim.store.EvalCtx(ctx,
????????script,
????????[]string{
????????????lim.tokenKey,
????????????lim.timestampKey,
????????},
????????[]string{
????????????strconv.Itoa(lim.rate),
????????????strconv.Itoa(lim.burst),
????????????strconv.FormatInt(now.Unix(),?10),
????????????strconv.Itoa(n),
????????})
????//?redis?allowed?==?false
????//?Lua?boolean?false?->?r?Nil?bulk?reply
????if?err?==?redis.Nil?{
????????return?false
????}
????if?errors.Is(err,?context.DeadlineExceeded)?||?errors.Is(err,?context.Canceled)?{
????????logx.Errorf("fail?to?use?rate?limiter:?%s",?err)
????????return?false
????}
????if?err?!=?nil?{
????????logx.Errorf("fail?to?use?rate?limiter:?%s,?use?in-process?limiter?for?rescue",?err)
????????//?如果有異常的話,會(huì)啟動(dòng)進(jìn)程內(nèi)限流
????????lim.startMonitor()
????????return?lim.rescueLimiter.AllowN(now,?n)
????}
????code,?ok?:=?resp.(int64)
????if?!ok?{
????????logx.Errorf("fail?to?eval?redis?script:?%v,?use?in-process?limiter?for?rescue",?resp)
????????lim.startMonitor()
????????return?lim.rescueLimiter.AllowN(now,?n)
????}
????//?redis?allowed?==?true
????//?Lua?boolean?true?->?r?integer?reply?with?value?of?1
????return?code?==?1
}最后看一下進(jìn)程內(nèi)限流的啟動(dòng)與恢復(fù):
func?(lim?*TokenLimiter)?startMonitor()?{
????lim.rescueLock.Lock()
????defer?lim.rescueLock.Unlock()
????//?需要加鎖保護(hù),如果程序已經(jīng)啟動(dòng)了,直接返回,不要重復(fù)啟動(dòng)
????if?lim.monitorStarted?{
????????return
????}
????lim.monitorStarted?=?true
????atomic.StoreUint32(&lim.redisAlive,?0)
????go?lim.waitForRedis()
}
func?(lim?*TokenLimiter)?waitForRedis()?{
????ticker?:=?time.NewTicker(pingInterval)
????//?更新監(jiān)控進(jìn)程的狀態(tài)
????defer?func()?{
????????ticker.Stop()
????????lim.rescueLock.Lock()
????????lim.monitorStarted?=?false
????????lim.rescueLock.Unlock()
????}()
????for?range?ticker.C?{
????????//?對(duì)?redis?進(jìn)行健康監(jiān)測(cè),如果?redis?服務(wù)恢復(fù)了
????????//?則更新?redisAlive?標(biāo)識(shí),并退出?goroutine
????????if?lim.store.Ping()?{
????????????atomic.StoreUint32(&lim.redisAlive,?1)
????????????return
????????}
????}
}以上就是詳解go-zero如何實(shí)現(xiàn)令牌桶限流的詳細(xì)內(nèi)容,更多關(guān)于go-zero令牌桶限流的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Golang使用Docker進(jìn)行集成測(cè)試的示例詳解
集成測(cè)試需要解決外部依賴問題,如?MySQL、Redis、網(wǎng)絡(luò)等依賴,本文就來聊聊?Go?程序如何使用?Docker?來解決集成測(cè)試中外部依賴問題吧2023-07-07
詳解Go?flag實(shí)現(xiàn)二級(jí)子命令的方法
這篇文章主要介紹了Go?flag?詳解,實(shí)現(xiàn)二級(jí)子命令,本文就探討一下?Go?語言中如何寫一個(gè)擁有類似特性的命令行程序,需要的朋友可以參考下2022-07-07
Golang標(biāo)準(zhǔn)庫之errors包應(yīng)用方式
Go語言的errors包提供了基礎(chǔ)的錯(cuò)誤處理能力,允許通過errors.New創(chuàng)建自定義error對(duì)象,error在Go中是一個(gè)接口,通過實(shí)現(xiàn)Error方法來定義錯(cuò)誤文本,對(duì)錯(cuò)誤的比較通常基于對(duì)象地址,而非文本內(nèi)容,因此即使兩個(gè)錯(cuò)誤文本相同2024-10-10
Go文件操作(新建打開寫入讀取刪除關(guān)閉)學(xué)習(xí)筆記
這篇文章主要為大家介紹了Go文件操作(新建打開寫入讀取刪除關(guān)閉)學(xué)習(xí)筆記,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01
Go語言使用buffer讀取文件的實(shí)現(xiàn)示例
本文主要介紹了Go語言使用buffer讀取文件的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-04-04
go語言題解LeetCode674最長(zhǎng)連續(xù)遞增序列
這篇文章主要為大家介紹了go語言題解LeetCode674最長(zhǎng)連續(xù)遞增序列示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-12-12
go?module化?import?調(diào)用本地模塊?tidy的方法
這篇文章主要介紹了go?module化?import?調(diào)用本地模塊?tidy的相關(guān)知識(shí),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-09-09
Golang發(fā)送http GET請(qǐng)求的示例代碼
這篇文章主要介紹了Golang發(fā)送http GET請(qǐng)求的示例代碼,幫助大家更好的理解和使用golang,感興趣的朋友可以了解下2020-12-12
Golang實(shí)現(xiàn)獲取與解析命令行參數(shù)
這篇文章主要為大家詳細(xì)介紹了Golang如何實(shí)現(xiàn)獲取與解析命令行參數(shù),文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,需要的小伙伴可以參考一下2024-01-01

