Go語(yǔ)言實(shí)現(xiàn)常見(jiàn)限流算法的示例代碼
用go語(yǔ)言嘗試計(jì)數(shù)器、滑動(dòng)窗口、漏斗算法、令牌桶算法等算法
1. 計(jì)數(shù)器
計(jì)數(shù)器是一種最簡(jiǎn)單限流算法,其原理就是:在一段時(shí)間間隔內(nèi),對(duì)請(qǐng)求進(jìn)行計(jì)數(shù),與閥值進(jìn)行比較判斷是否需要限流,一旦到了時(shí)間臨界點(diǎn),將計(jì)數(shù)器清零。
- 可以在程序中設(shè)置一個(gè)變量
count
,當(dāng)過(guò)來(lái)一個(gè)請(qǐng)求我就將這個(gè)數(shù)+1
,同時(shí)記錄請(qǐng)求時(shí)間。 - 當(dāng)下一個(gè)請(qǐng)求來(lái)的時(shí)候判斷
count
的計(jì)數(shù)值是否超過(guò)設(shè)定的頻次,以及當(dāng)前請(qǐng)求的時(shí)間和第一次請(qǐng)求時(shí)間是否在1
分鐘內(nèi)。 - 如果在
1
分鐘內(nèi)并且超過(guò)設(shè)定的頻次則證明請(qǐng)求過(guò)多,后面的請(qǐng)求就拒絕掉。 - 如果該請(qǐng)求與第一個(gè)請(qǐng)求的間隔時(shí)間大于計(jì)數(shù)周期,且
count
值還在限流范圍內(nèi),就重置count
。
常見(jiàn)使用redis。比如對(duì)每秒限流,可以每秒設(shè)置一個(gè)key(limit-2021-11-20-11:11:11、limit-2021-11-20-11:11:12)。首先獲取當(dāng)前時(shí)間拼接一個(gè)如上的key。如果key存在且未超過(guò)某個(gè)閾值就自增,超過(guò)閾值就拒絕;如果key不存在就代表這新的一秒沒(méi)有請(qǐng)求則重置計(jì)數(shù)。
Go實(shí)現(xiàn)
type Counter struct { rate int // 計(jì)數(shù)周期內(nèi)最多允許的請(qǐng)求數(shù) begin time.Time // 計(jì)數(shù)開(kāi)始時(shí)間 cycle time.Duration // 計(jì)數(shù)周期 count int // 計(jì)數(shù)周期內(nèi)累計(jì)收到的請(qǐng)求數(shù) lock sync.Mutex } func (l *Counter) Allow() bool { l.lock.Lock() defer l.lock.Unlock() if l.count == l.rate-1 { now := time.Now() if now.Sub(l.begin) >= l.cycle { //速度允許范圍內(nèi), 重置計(jì)數(shù)器 l.Reset(now) return true } else { return false } } else { //沒(méi)有達(dá)到速率限制,計(jì)數(shù)加1 l.count++ return true } } func (l *Counter) Set(r int, cycle time.Duration) { l.rate = r l.begin = time.Now() l.cycle = cycle l.count = 0 } func (l *Counter) Reset(t time.Time) { l.begin = t l.count = 0 } func main() { var lr Counter lr.Set(3, time.Second) // 1s內(nèi)最多請(qǐng)求3次 var wg sync.WaitGroup wg.Add(10) for i := 0; i < 10; i++ { go func(i int) { if lr.Allow() { log.Println("ok:", i) } else { log.Println("fail:", i) } wg.Done() }(i) time.Sleep(200 * time.Millisecond) } wg.Wait() } /* 每秒三個(gè) 2021/11/20 16:22:39 ok: 0 2021/11/20 16:22:39 ok: 1 2021/11/20 16:22:39 ok: 2 2021/11/20 16:22:39 fail: 3 2021/11/20 16:22:40 fail: 4 2021/11/20 16:22:40 ok: 5 2021/11/20 16:22:40 ok: 6 2021/11/20 16:22:40 ok: 7 2021/11/20 16:22:41 ok: 8 2021/11/20 16:22:41 fail: 9 */
缺點(diǎn)
如果有個(gè)需求對(duì)于某個(gè)接口 /query
每分鐘最多允許訪問(wèn) 200 次,假設(shè)有個(gè)用戶在第 59 秒的最后幾毫秒瞬間發(fā)送 200 個(gè)請(qǐng)求,當(dāng) 59 秒結(jié)束后 Counter
清零了,他在下一秒的時(shí)候又發(fā)送 200 個(gè)請(qǐng)求。那么在 1 秒鐘內(nèi)這個(gè)用戶發(fā)送了 2 倍的請(qǐng)求,這個(gè)是符合我們的設(shè)計(jì)邏輯的,這也是計(jì)數(shù)器方法的設(shè)計(jì)缺陷,系統(tǒng)可能會(huì)承受惡意用戶的大量請(qǐng)求,甚至擊穿系統(tǒng)。
2. 滑動(dòng)窗口
滑動(dòng)窗口是針對(duì)計(jì)數(shù)器存在的臨界點(diǎn)缺陷,所謂 滑動(dòng)窗口(Sliding window) 是一種流量控制技術(shù),這個(gè)詞出現(xiàn)在 TCP
協(xié)議中?;瑒?dòng)窗口把固定時(shí)間片進(jìn)行劃分,并且隨著時(shí)間的流逝,進(jìn)行移動(dòng),固定數(shù)量的可以移動(dòng)的格子,進(jìn)行計(jì)數(shù)并判斷閥值。
算法思想
上圖中我們用紅色的虛線代表一個(gè)時(shí)間窗口(一分鐘
),每個(gè)時(shí)間窗口有 6
個(gè)格子,每個(gè)格子是 10
秒鐘。每過(guò) 10
秒鐘時(shí)間窗口向右移動(dòng)一格,可以看紅色箭頭的方向。我們?yōu)槊總€(gè)格子都設(shè)置一個(gè)獨(dú)立的計(jì)數(shù)器 Counter
,假如一個(gè)請(qǐng)求在 0:45
訪問(wèn)了那么我們將第五個(gè)格子的計(jì)數(shù)器 +1
(也是就是 0:40~0:50
),在判斷限流的時(shí)候需要把所有格子的計(jì)數(shù)加起來(lái)和設(shè)定的頻次進(jìn)行比較即可。
那么滑動(dòng)窗口如何解決我們上面遇到的問(wèn)題呢?來(lái)看下面的圖:
當(dāng)用戶在0:59
秒鐘發(fā)送了 200
個(gè)請(qǐng)求就會(huì)被第六個(gè)格子的計(jì)數(shù)器記錄 +200
,當(dāng)下一秒的時(shí)候時(shí)間窗口向右移動(dòng)了一個(gè),此時(shí)計(jì)數(shù)器已經(jīng)記錄了該用戶發(fā)送的 200
個(gè)請(qǐng)求,所以再發(fā)送的話就會(huì)觸發(fā)限流,則拒絕新的請(qǐng)求。
其實(shí)計(jì)數(shù)器就是滑動(dòng)窗口啊,只不過(guò)只有一個(gè)格子而已,所以想讓限流做的更精確只需要?jiǎng)澐指嗟母褡泳涂梢粤?,為了更精確我們也不知道到底該設(shè)置多少個(gè)格子,格子的數(shù)量影響著滑動(dòng)窗口算法的精度,依然有時(shí)間片的概念,無(wú)法根本解決臨界點(diǎn)問(wèn)題。
適用場(chǎng)景
與令牌桶一樣,有應(yīng)對(duì)突發(fā)流量的能力
Go實(shí)現(xiàn)
主要就是實(shí)現(xiàn)sliding window算法。可以參考Bilibili開(kāi)源的kratos框架里circuit breaker用循環(huán)列表保存time slot對(duì)象的實(shí)現(xiàn),他們這個(gè)實(shí)現(xiàn)的好處是不用頻繁的創(chuàng)建和銷毀time slot對(duì)象。下面給出一個(gè)簡(jiǎn)單的基本實(shí)現(xiàn):
var winMu map[string]*sync.RWMutex func init() { winMu = make(map[string]*sync.RWMutex) } type timeSlot struct { timestamp time.Time // 這個(gè)timeSlot的時(shí)間起點(diǎn) count int // 落在這個(gè)timeSlot內(nèi)的請(qǐng)求數(shù) } func countReq(win []*timeSlot) int { var count int for _, ts := range win { count += ts.count } return count } type SlidingWindowLimiter struct { SlotDuration time.Duration // time slot的長(zhǎng)度 WinDuration time.Duration // sliding window的長(zhǎng)度 numSlots int // window內(nèi)最多有多少個(gè)slot maxReq int // win duration內(nèi)允許的最大請(qǐng)求數(shù) windows map[string][]*timeSlot } func NewSliding(slotDuration time.Duration, winDuration time.Duration, maxReq int) *SlidingWindowLimiter { return &SlidingWindowLimiter{ SlotDuration: slotDuration, WinDuration: winDuration, numSlots: int(winDuration / slotDuration), windows: make(map[string][]*timeSlot), maxReq: maxReq, } } // 獲取user_id/ip的時(shí)間窗口 func (l *SlidingWindowLimiter) getWindow(uidOrIp string) []*timeSlot { win, ok := l.windows[uidOrIp] if !ok { win = make([]*timeSlot, 0, l.numSlots) } return win } func (l *SlidingWindowLimiter) storeWindow(uidOrIp string, win []*timeSlot) { l.windows[uidOrIp] = win } func (l *SlidingWindowLimiter) validate(uidOrIp string) bool { // 同一user_id/ip并發(fā)安全 mu, ok := winMu[uidOrIp] if !ok { var m sync.RWMutex mu = &m winMu[uidOrIp] = mu } mu.Lock() defer mu.Unlock() win := l.getWindow(uidOrIp) now := time.Now() // 已經(jīng)過(guò)期的time slot移出時(shí)間窗 timeoutOffset := -1 for i, ts := range win { if ts.timestamp.Add(l.WinDuration).After(now) { break } timeoutOffset = i } if timeoutOffset > -1 { win = win[timeoutOffset+1:] } // 判斷請(qǐng)求是否超限 var result bool if countReq(win) < l.maxReq { result = true } // 記錄這次的請(qǐng)求數(shù) var lastSlot *timeSlot if len(win) > 0 { lastSlot = win[len(win)-1] if lastSlot.timestamp.Add(l.SlotDuration).Before(now) { lastSlot = &timeSlot{timestamp: now, count: 1} win = append(win, lastSlot) } else { lastSlot.count++ } } else { lastSlot = &timeSlot{timestamp: now, count: 1} win = append(win, lastSlot) } l.storeWindow(uidOrIp, win) return result } func (l *SlidingWindowLimiter) getUidOrIp() string { return "127.0.0.1" } func (l *SlidingWindowLimiter) IsLimited() bool { return !l.validate(l.getUidOrIp()) } func main() { limiter := NewSliding(100*time.Millisecond, time.Second, 10) for i := 0; i < 5; i++ { fmt.Println(limiter.IsLimited()) } time.Sleep(100 * time.Millisecond) for i := 0; i < 5; i++ { fmt.Println(limiter.IsLimited()) } fmt.Println(limiter.IsLimited()) for _, v := range limiter.windows[limiter.getUidOrIp()] { fmt.Println(v.timestamp, v.count) } fmt.Println("a thousand years later...") time.Sleep(time.Second) for i := 0; i < 7; i++ { fmt.Println(limiter.IsLimited()) } for _, v := range limiter.windows[limiter.getUidOrIp()] { fmt.Println(v.timestamp, v.count) } }
3. 漏桶算法
算法思想
與令牌桶是“反向”的算法,當(dāng)有請(qǐng)求到來(lái)時(shí)先放到木桶中,worker以固定的速度從木桶中取出請(qǐng)求進(jìn)行相應(yīng)。如果木桶已經(jīng)滿了,直接返回請(qǐng)求頻率超限的錯(cuò)誤碼或者頁(yè)面
特點(diǎn)
漏桶算法有以下特點(diǎn):
- 漏桶具有固定容量,出水速率是固定常量(流出請(qǐng)求)
- 如果桶是空的,則不需流出水滴
- 可以以任意速率流入水滴到漏桶(流入請(qǐng)求)
- 如果流入水滴超出了桶的容量,則流入的水滴溢出(新請(qǐng)求被拒絕)
漏桶限制的是常量流出速率(即流出速率是一個(gè)固定常量值),所以最大的速率就是出水的速率,不能出現(xiàn)突發(fā)流量。
適用場(chǎng)景
流量最均勻的限流方式,一般用于流量“整形”,例如保護(hù)數(shù)據(jù)庫(kù)的限流。先把對(duì)數(shù)據(jù)庫(kù)的訪問(wèn)加入到木桶中,worker再以db能夠承受的qps從木桶中取出請(qǐng)求,去訪問(wèn)數(shù)據(jù)庫(kù)。不太適合電商搶購(gòu)和微博出現(xiàn)熱點(diǎn)事件等場(chǎng)景的限流,一是應(yīng)對(duì)突發(fā)流量不是很靈活,二是為每個(gè)user_id/ip維護(hù)一個(gè)隊(duì)列(木桶),workder從這些隊(duì)列中拉取任務(wù),資源的消耗會(huì)比較大。
Go實(shí)現(xiàn)
通常使用隊(duì)列來(lái)實(shí)現(xiàn),在go語(yǔ)言中可以通過(guò)buffered channel來(lái)快速實(shí)現(xiàn),任務(wù)加入channel,開(kāi)啟一定數(shù)量的 worker 從 channel 中獲取任務(wù)執(zhí)行。
// 封裝業(yè)務(wù)邏輯的執(zhí)行結(jié)果 type Result struct { Msg string } // 執(zhí)行的業(yè)務(wù)邏輯函數(shù) type Handler func() Result // 每個(gè)請(qǐng)求來(lái)了,把需要執(zhí)行的業(yè)務(wù)邏輯封裝成Task,放入木桶,等待worker取出執(zhí)行 type Task struct { handler Handler // worker從木桶中取出請(qǐng)求對(duì)象后要執(zhí)行的業(yè)務(wù)邏輯函數(shù) resChan chan Result // 等待worker執(zhí)行并返回結(jié)果的channel taskID int } func NewTask(id int, handler Handler) Task { return Task{ handler: handler, resChan: make(chan Result), taskID: id, } } // 漏桶 type LeakyBucket struct { BucketSize int // 木桶的大小 WorkerNum int // 同時(shí)從木桶中獲取任務(wù)執(zhí)行的worker數(shù)量 bucket chan Task // 存方任務(wù)的木桶 } func NewLeakyBucket(bucketSize int, workNum int) *LeakyBucket { return &LeakyBucket{ BucketSize: bucketSize, WorkerNum: workNum, bucket: make(chan Task, bucketSize), } } func (b *LeakyBucket) AddTask(task Task) bool { // 如果木桶已經(jīng)滿了,返回false select { case b.bucket <- task: default: fmt.Printf("request[id=%d] is refused\n", task.taskID) return false } // 如果成功入桶,調(diào)用者會(huì)等待worker執(zhí)行結(jié)果 resp := <-task.resChan fmt.Printf("request[id=%d] is run ok, resp[%v]\n", task.taskID, resp) return true } func (b *LeakyBucket) Start(ctx context.Context) { // 開(kāi)啟worker從木桶拉取任務(wù)執(zhí)行 for i := 0; i < b.WorkerNum; i++ { go func(ctx context.Context) { for { select { case <-ctx.Done(): return default: task := <-b.bucket result := task.handler() task.resChan <- result } } }(ctx) } } func main() { bucket := NewLeakyBucket(10, 4) ctx, cancel := context.WithCancel(context.Background()) defer cancel() bucket.Start(ctx) // 開(kāi)啟消費(fèi)者 // 模擬20個(gè)并發(fā)請(qǐng)求 var wg sync.WaitGroup wg.Add(20) for i := 0; i < 20; i++ { go func(id int) { defer wg.Done() task := NewTask(id, func() Result { time.Sleep(300 * time.Millisecond) return Result{} }) bucket.AddTask(task) }(i) } wg.Wait() time.Sleep(10 * time.Second) }
4. 令牌桶算法
算法思想
令牌桶算法(Token Bucket)
是網(wǎng)絡(luò)流量整形(Traffic Shaping)
和速率限制(Rate Limiting)
中最常使用的一種算法。典型情況下,令牌桶算法用來(lái)控制發(fā)送到網(wǎng)絡(luò)上的數(shù)據(jù)的數(shù)目,并允許突發(fā)數(shù)據(jù)的發(fā)送。想象有一個(gè)木桶,以固定的速度往木桶里加入令牌,木桶滿了則不再加入令牌。服務(wù)收到請(qǐng)求時(shí)嘗試從木桶中取出一個(gè)令牌,如果能夠得到令牌則繼續(xù)執(zhí)行后續(xù)的業(yè)務(wù)邏輯;如果沒(méi)有得到令牌,直接返回反問(wèn)頻率超限的錯(cuò)誤碼或頁(yè)面等,不繼續(xù)執(zhí)行后續(xù)的業(yè)務(wù)邏輯
特點(diǎn)
由于木桶內(nèi)只要有令牌,請(qǐng)求就可以被處理,所以令牌桶算法可以支持突發(fā)流量。同時(shí)由于往木桶添加令牌的速度是固定的,且木桶的容量有上限,所以單位時(shí)間內(nèi)處理的請(qǐng)求數(shù)目也能夠得到控制,起到限流的目的。假設(shè)加入令牌的速度為 1token/10ms,桶的容量為500,在請(qǐng)求比較的少的時(shí)候(小于每10毫秒1個(gè)請(qǐng)求)時(shí),木桶可以先"攢"一些令牌(最多500個(gè))。當(dāng)有突發(fā)流量時(shí),一下把木桶內(nèi)的令牌取空,也就是有500個(gè)在并發(fā)執(zhí)行的業(yè)務(wù)邏輯,之后要等每10ms補(bǔ)充一個(gè)新的令牌才能接收一個(gè)新的請(qǐng)求。
木桶的容量 - 考慮業(yè)務(wù)邏輯的資源消耗和機(jī)器能承載并發(fā)處理多少業(yè)務(wù)邏輯。生成令牌的速度 - 太慢的話起不到“攢”令牌應(yīng)對(duì)突發(fā)流量的效果。
- 令牌按固定的速率被放入令牌桶中
- 桶中最多存放
B
個(gè)令牌,當(dāng)桶滿時(shí),新添加的令牌被丟棄或拒絕 - 如果桶中的令牌不足
N
個(gè),則不會(huì)刪除令牌,且請(qǐng)求將被限流(丟棄或阻塞等待)
令牌桶限制的是平均流入速率(允許突發(fā)請(qǐng)求,只要有令牌就可以處理,支持一次拿3個(gè)令牌,4個(gè)令牌...),并允許一定程度突發(fā)流量。
適用場(chǎng)景
適合電商搶購(gòu)或者微博出現(xiàn)熱點(diǎn)事件這種場(chǎng)景,因?yàn)樵谙蘖鞯耐瑫r(shí)可以應(yīng)對(duì)一定的突發(fā)流量。如果采用均勻速度處理請(qǐng)求的算法,在發(fā)生熱點(diǎn)時(shí)間的時(shí)候,會(huì)造成大量的用戶無(wú)法訪問(wèn),對(duì)用戶體驗(yàn)的損害比較大。
Go實(shí)現(xiàn)
假設(shè)每100ms生產(chǎn)一個(gè)令牌,按user_id/IP記錄訪問(wèn)最近一次訪問(wèn)的時(shí)間戳 t_last 和令牌數(shù),每次請(qǐng)求時(shí)如果 now - last > 100ms, 增加 (now - last) / 100ms個(gè)令牌。然后,如果令牌數(shù) > 0,令牌數(shù) -1 繼續(xù)執(zhí)行后續(xù)的業(yè)務(wù)邏輯,否則返回請(qǐng)求頻率超限的錯(cuò)誤碼或頁(yè)面。
type TokenBucket struct { lock sync.Mutex rate time.Duration // 多長(zhǎng)時(shí)間生成一個(gè)令牌 capacity int // 桶的容量 tokens int // 桶中當(dāng)前token數(shù)量 last time.Time // 桶上次放token的時(shí)間戳 s } func NewTokenBucket(bucketSize int, tokenRate time.Duration) *TokenBucket { return &TokenBucket{ capacity: bucketSize, rate: tokenRate, } } // 驗(yàn)證是否能獲取一個(gè)令牌 返回是否被限流 func (t *TokenBucket) Allow() bool { t.lock.Lock() defer t.lock.Unlock() now := time.Now() if t.last.IsZero() { // 第一次訪問(wèn)初始化為最大令牌數(shù) t.last, t.tokens = now, t.capacity } else { if t.last.Add(t.rate).Before(now) { // 如果 now 與上次請(qǐng)求的間隔超過(guò)了 token rate // 則增加令牌,更新last t.tokens += int(now.Sub(t.last) / t.rate) if t.tokens > t.capacity { t.tokens = t.capacity } t.last = now } } if t.tokens > 0 { // 如果令牌數(shù)大于0,取走一個(gè)令牌 t.tokens-- return true } // 沒(méi)有令牌,則拒絕 return false } func main() { tokenBucket := NewTokenBucket(5, 100*time.Millisecond) for i := 0; i < 10; i++ { fmt.Println(tokenBucket.Allow()) } time.Sleep(100 * time.Millisecond) fmt.Println(tokenBucket.Allow()) }
以上就是Go語(yǔ)言實(shí)現(xiàn)常見(jiàn)限流算法的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于Go語(yǔ)言限流算法的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
golang的匿名函數(shù)和普通函數(shù)的區(qū)別解析
匿名函數(shù)是不具名的函數(shù),可以在不定義函數(shù)名的情況下直接使用,通常用于函數(shù)內(nèi)部的局部作用域中,這篇文章主要介紹了golang的匿名函數(shù)和普通函數(shù)的區(qū)別,需要的朋友可以參考下2023-03-03Golang cron 定時(shí)器和定時(shí)任務(wù)的使用場(chǎng)景
Ticker是一個(gè)周期觸發(fā)定時(shí)的計(jì)時(shí)器,它會(huì)按照一個(gè)時(shí)間間隔往channel發(fā)送系統(tǒng)當(dāng)前時(shí)間,而channel的接收者可以以固定的時(shí)間間隔從channel中讀取事件,這篇文章主要介紹了Golang cron 定時(shí)器和定時(shí)任務(wù),需要的朋友可以參考下2022-09-09淺析Golang如何向已關(guān)閉的chan讀寫(xiě)數(shù)據(jù)
這篇文章主要為大家詳細(xì)介紹了Golang如何向已關(guān)閉的chan讀寫(xiě)數(shù)據(jù),文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-02-02