詳解go-zero如何實現(xiàn)令牌桶限流
上一篇文章介紹了 如何實現(xiàn)計數(shù)器限流。主要有兩種實現(xiàn)方式,分別是固定窗口和滑動窗口,并且分析了 go-zero 采用固定窗口方式實現(xiàn)的源碼。
但是采用固定窗口實現(xiàn)的限流器會有兩個問題:
- 會出現(xiàn)請求量超出限制值兩倍的情況
- 無法很好處理流量突增問題
這篇文章來介紹一下令牌桶算法,可以很好解決以上兩個問題。
工作原理
算法概念如下:
- 令牌以固定速率生成;
- 生成的令牌放入令牌桶中存放,如果令牌桶滿了則多余的令牌會直接丟棄,當請求到達時,會嘗試從令牌桶中取令牌,取到了令牌的請求可以執(zhí)行;
- 如果桶空了,那么嘗試取令牌的請求會被直接丟棄。
令牌桶算法既能夠將所有的請求平均分布到時間區(qū)間內,又能接受服務器能夠承受范圍內的突發(fā)請求,因此是目前使用較為廣泛的一種限流算法。
源碼實現(xiàn)
源碼分析我們還是以 go-zero 項目為例,首先來看生成令牌的部分,依然是使用 Redis 來實現(xiàn)。
//?core/limit/tokenlimit.go //?生成?token?速率 script?=?`local?rate?=?tonumber(ARGV[1]) //?通容量 local?capacity?=?tonumber(ARGV[2]) //?當前時間戳 local?now?=?tonumber(ARGV[3]) //?請求數(shù)量 local?requested?=?tonumber(ARGV[4]) //?需要多少秒才能把桶填滿 local?fill_time?=?capacity/rate //?向下取整,ttl?為填滿時間?2?倍 local?ttl?=?math.floor(fill_time*2) //?當前桶剩余容量,如果為?nil,說明第一次使用,賦值為桶最大容量 local?last_tokens?=?tonumber(redis.call("get",?KEYS[1])) if?last_tokens?==?nil?then ????last_tokens?=?capacity end //?上次請求時間戳,如果為?nil?則賦值?0 local?last_refreshed?=?tonumber(redis.call("get",?KEYS[2])) if?last_refreshed?==?nil?then ????last_refreshed?=?0 end //?距離上一次請求的時間跨度 local?delta?=?math.max(0,?now-last_refreshed) //?距離上一次請求的時間跨度能生成的?token?數(shù)量和桶內剩余?token?數(shù)量的和 //?與桶容量比較,取二者的小值 local?filled_tokens?=?math.min(capacity,?last_tokens+(delta*rate)) //?判斷請求數(shù)量和桶內?token?數(shù)量的大小 local?allowed?=?filled_tokens?>=?requested //?被請求消耗掉之后,更新剩余?token?數(shù)量 local?new_tokens?=?filled_tokens if?allowed?then ????new_tokens?=?filled_tokens?-?requested end //?更新?redis?token redis.call("setex",?KEYS[1],?ttl,?new_tokens) //?更新?redis?刷新時間 redis.call("setex",?KEYS[2],?ttl,?now) return?allowed`
Redis 中主要保存兩個 key,分別是 token 數(shù)量和刷新時間。
核心思想就是比較兩次請求時間間隔內生成的 token 數(shù)量 + 桶內剩余 token 數(shù)量,和請求量之間的大小,如果滿足則允許,否則則不允許。
限流器初始化:
//?A?TokenLimiter?controls?how?frequently?events?are?allowed?to?happen?with?in?one?second. type?TokenLimiter?struct?{ ????//?生成?token?速率 ????rate???????????int ????//?桶容量 ????burst??????????int ????store??????????*redis.Redis ????//?桶?key ????tokenKey???????string ????//?桶刷新時間?key ????timestampKey???string ????rescueLock?????sync.Mutex ????//?redis?健康標識 ????redisAlive?????uint32 ????//?redis?健康監(jiān)控啟動狀態(tài) ????monitorStarted?bool ????//?內置單機限流器 ????rescueLimiter??*xrate.Limiter } //?NewTokenLimiter?returns?a?new?TokenLimiter?that?allows?events?up?to?rate?and?permits //?bursts?of?at?most?burst?tokens. func?NewTokenLimiter(rate,?burst?int,?store?*redis.Redis,?key?string)?*TokenLimiter?{ ????tokenKey?:=?fmt.Sprintf(tokenFormat,?key) ????timestampKey?:=?fmt.Sprintf(timestampFormat,?key) ????return?&TokenLimiter{ ????????rate:??????????rate, ????????burst:?????????burst, ????????store:?????????store, ????????tokenKey:??????tokenKey, ????????timestampKey:??timestampKey, ????????redisAlive:????1, ????????rescueLimiter:?xrate.NewLimiter(xrate.Every(time.Second/time.Duration(rate)),?burst), ????} }
其中有一個變量 rescueLimiter
,這是一個進程內的限流器。如果 Redis 發(fā)生故障了,那么就使用這個,算是一個保障,盡量避免系統(tǒng)被突發(fā)流量拖垮。
提供了四個可調用方法:
//?Allow?is?shorthand?for?AllowN(time.Now(),?1). func?(lim?*TokenLimiter)?Allow()?bool?{ ????return?lim.AllowN(time.Now(),?1) } //?AllowCtx?is?shorthand?for?AllowNCtx(ctx,time.Now(),?1)?with?incoming?context. func?(lim?*TokenLimiter)?AllowCtx(ctx?context.Context)?bool?{ ????return?lim.AllowNCtx(ctx,?time.Now(),?1) } //?AllowN?reports?whether?n?events?may?happen?at?time?now. //?Use?this?method?if?you?intend?to?drop?/?skip?events?that?exceed?the?rate. //?Otherwise,?use?Reserve?or?Wait. func?(lim?*TokenLimiter)?AllowN(now?time.Time,?n?int)?bool?{ ????return?lim.reserveN(context.Background(),?now,?n) } //?AllowNCtx?reports?whether?n?events?may?happen?at?time?now?with?incoming?context. //?Use?this?method?if?you?intend?to?drop?/?skip?events?that?exceed?the?rate. //?Otherwise,?use?Reserve?or?Wait. func?(lim?*TokenLimiter)?AllowNCtx(ctx?context.Context,?now?time.Time,?n?int)?bool?{ ????return?lim.reserveN(ctx,?now,?n) }
最終調用的都是 reverveN
方法:
func?(lim?*TokenLimiter)?reserveN(ctx?context.Context,?now?time.Time,?n?int)?bool?{ ????//?判斷?Redis?健康狀態(tài),如果?Redis?故障,則使用進程內限流器 ????if?atomic.LoadUint32(&lim.redisAlive)?==?0?{ ????????return?lim.rescueLimiter.AllowN(now,?n) ????} ????//?執(zhí)行限流腳本 ????resp,?err?:=?lim.store.EvalCtx(ctx, ????????script, ????????[]string{ ????????????lim.tokenKey, ????????????lim.timestampKey, ????????}, ????????[]string{ ????????????strconv.Itoa(lim.rate), ????????????strconv.Itoa(lim.burst), ????????????strconv.FormatInt(now.Unix(),?10), ????????????strconv.Itoa(n), ????????}) ????//?redis?allowed?==?false ????//?Lua?boolean?false?->?r?Nil?bulk?reply ????if?err?==?redis.Nil?{ ????????return?false ????} ????if?errors.Is(err,?context.DeadlineExceeded)?||?errors.Is(err,?context.Canceled)?{ ????????logx.Errorf("fail?to?use?rate?limiter:?%s",?err) ????????return?false ????} ????if?err?!=?nil?{ ????????logx.Errorf("fail?to?use?rate?limiter:?%s,?use?in-process?limiter?for?rescue",?err) ????????//?如果有異常的話,會啟動進程內限流 ????????lim.startMonitor() ????????return?lim.rescueLimiter.AllowN(now,?n) ????} ????code,?ok?:=?resp.(int64) ????if?!ok?{ ????????logx.Errorf("fail?to?eval?redis?script:?%v,?use?in-process?limiter?for?rescue",?resp) ????????lim.startMonitor() ????????return?lim.rescueLimiter.AllowN(now,?n) ????} ????//?redis?allowed?==?true ????//?Lua?boolean?true?->?r?integer?reply?with?value?of?1 ????return?code?==?1 }
最后看一下進程內限流的啟動與恢復:
func?(lim?*TokenLimiter)?startMonitor()?{ ????lim.rescueLock.Lock() ????defer?lim.rescueLock.Unlock() ????//?需要加鎖保護,如果程序已經(jīng)啟動了,直接返回,不要重復啟動 ????if?lim.monitorStarted?{ ????????return ????} ????lim.monitorStarted?=?true ????atomic.StoreUint32(&lim.redisAlive,?0) ????go?lim.waitForRedis() } func?(lim?*TokenLimiter)?waitForRedis()?{ ????ticker?:=?time.NewTicker(pingInterval) ????//?更新監(jiān)控進程的狀態(tài) ????defer?func()?{ ????????ticker.Stop() ????????lim.rescueLock.Lock() ????????lim.monitorStarted?=?false ????????lim.rescueLock.Unlock() ????}() ????for?range?ticker.C?{ ????????//?對?redis?進行健康監(jiān)測,如果?redis?服務恢復了 ????????//?則更新?redisAlive?標識,并退出?goroutine ????????if?lim.store.Ping()?{ ????????????atomic.StoreUint32(&lim.redisAlive,?1) ????????????return ????????} ????} }
以上就是詳解go-zero如何實現(xiàn)令牌桶限流的詳細內容,更多關于go-zero令牌桶限流的資料請關注腳本之家其它相關文章!
相關文章
詳解golang避免循環(huán)import問題(“import cycle not allowed”)
這篇文章主要給大家介紹了關于golang中不允許循環(huán)import問題("import cycle not allowed")的相關資料,文中通過示例代碼介紹的非常詳細,需要的朋友可以參考借鑒,下面隨著小編來一起學習學習吧2018-08-08Golang?手寫一個簡單的并發(fā)任務?manager
這篇文章主要介紹了Golang?手寫一個簡單的并發(fā)任務?manager,文章圍繞主題展開詳細的內容介紹,具有一定的參考價值,需要的小伙伴可以參考一下2022-08-08