Golang熔斷器的開發(fā)過程詳解
為什么需要熔斷
在分布式大行其道的今天,子系統(tǒng)與子系統(tǒng)之間通常使用RPC進(jìn)行通信,但由于是遠(yuǎn)程調(diào)用,如果整條鏈路中的某個服務(wù)出現(xiàn)異常,便可能雪崩從而導(dǎo)致整個系統(tǒng)崩潰(親身經(jīng)歷)。
如何解決
重試
我們可以很容易的想到加入重試機(jī)制以及超時時間來解決:在指定時間內(nèi)如果沒有返回,便觸發(fā)重試,達(dá)到重試閾值后如果還是沒有拿到正確的數(shù)據(jù)變做降級處理。
這樣做確實(shí)能夠保證整個系統(tǒng)不會雪崩,單次請求也能拿到合理的數(shù)據(jù)。但因?yàn)橹卦嚈C(jī)制,讓本就搖搖欲墜的故障服務(wù)雪上加霜,最后可能導(dǎo)致該服務(wù)完全不可用。
熔斷
重試行不通的原因是因?yàn)樗€會繼續(xù)調(diào)用故障服務(wù),甚至請求量比平時還翻了幾倍。所以我們的核心問題是解決出現(xiàn)故障后繼續(xù)調(diào)用這個問題,此時就可以引入熔斷器了。
簡單來說,熔斷器會收集每次遠(yuǎn)程調(diào)用的結(jié)果,并根據(jù)一定的規(guī)則判斷目標(biāo)服務(wù)是否出現(xiàn)故障,如果出現(xiàn)故障,就不再調(diào)用,直接進(jìn)入降級處理并返回。
總體設(shè)計(jì)
- 存儲外部調(diào)用結(jié)果,并根據(jù)這些結(jié)果(成功量/失敗量)來判斷目標(biāo)服務(wù)是否出現(xiàn)故障。
- 判定目標(biāo)服務(wù)出現(xiàn)故障后,熔斷器打開,后續(xù)請求直接降級處理。
- 熔斷器執(zhí)行時接收兩個function,一個是外部調(diào)用的函數(shù),另一個是降級處理的函數(shù):
err := hystrix.Do("test", func() error { // do something return nil }, func(err error) error { fmt.Println(err.Error()) return err })
實(shí)現(xiàn)
指標(biāo)
- 首先我們需要存儲外部調(diào)用的結(jié)果,包括成功量以及失敗量。但過早的調(diào)用結(jié)果對于當(dāng)前來說其實(shí)也沒有太多的參考意義,并且如果全部存儲下來也會有內(nèi)存問題。所以這里我們存儲最近10秒的數(shù)據(jù)就行了。
- 我們很容易的想到使用map來存儲最近10秒的數(shù)據(jù),key為時間戳,value為上報(bào)量。但map會存在擴(kuò)容以及刪除10秒前的key的問題,帶來一些額外開銷。這里我們可以用一個環(huán)形數(shù)組來解決。
type ( metrics struct { total *number success *number fail *number } number struct { buckets [10]*bucket mutex sync.RWMutex } bucket struct { timestamp int64 value float64 } )
我們定義了三個結(jié)構(gòu)體,bucket用于存儲上報(bào)量,number使用環(huán)形數(shù)組存儲最近10秒的上報(bào)量,而metrics存儲了請求總量、成功量以及失敗量。
這三個結(jié)構(gòu)體中,我們重點(diǎn)關(guān)注指標(biāo)的上報(bào)(increment)以及讀操作即可(sum)
func (number *number) increment(now int64) { index := now % 10 number.mutex.RLock() if now < number.buckets[index].timestamp { number.mutex.RUnlock() return } number.mutex.RUnlock() number.mutex.Lock() defer number.mutex.Unlock() if number.buckets[index].timestamp != now { number.buckets[index].value = 0 } number.buckets[index].timestamp = now number.buckets[index].value++ }
首先我們需要拿到當(dāng)前時間戳對應(yīng)的環(huán)形數(shù)組下標(biāo),然后加鎖,判斷當(dāng)前時間戳是否有效。注意第13~15行代碼,如果該下標(biāo)當(dāng)前存儲的數(shù)據(jù)是歷史數(shù)據(jù),那么重新賦值、覆蓋就好了(這就是環(huán)形數(shù)組的優(yōu)勢,無需擴(kuò)容以及執(zhí)行delete操作)。
func (number *number) sum() (sum float64) { number.mutex.RLock() defer number.mutex.RUnlock() now := time.Now().Unix() for _, ele := range number.buckets { if ele.timestamp <= now-10 { continue } sum += ele.value } return }
如果環(huán)形數(shù)組中存儲的指標(biāo)數(shù)據(jù)是10秒之前的,那么就不參與計(jì)算。
熔斷器
有了指標(biāo)數(shù)據(jù),我們就可以考慮如何通過它來判斷目標(biāo)服務(wù)是否出現(xiàn)故障了。最簡單的,我們可以定義一個閾值:當(dāng)最近10秒的請求錯誤率達(dá)到這個閾值后,就認(rèn)為目標(biāo)服務(wù)出現(xiàn)故障。
但這個判斷得基于一定的請求量才能開啟,否則得到的錯誤率與目標(biāo)服務(wù)當(dāng)前的運(yùn)行狀態(tài)對比會存在一定誤差,例如服務(wù)啟動后第一次請求目標(biāo)服務(wù),但因?yàn)橐恍┡棘F(xiàn)原因返回了error,那么此時的錯誤率就是100%,認(rèn)為目標(biāo)服務(wù)出現(xiàn)故障,后續(xù)請求都會被攔截。
熔斷器開啟后,還得想辦法將它關(guān)閉,否則就算目標(biāo)服務(wù)恢復(fù)了正常,熔斷器還是會將該請求攔截。我們可以設(shè)置一個時間窗口并記錄熔斷器打開的時間,只要過了這個時間窗口,我們便可以關(guān)閉熔斷器并重新收集指標(biāo)數(shù)據(jù)進(jìn)行再次進(jìn)行判斷。
type Circuit struct { Timeout time.Duration RequestVolumeThreshold int // 達(dá)到這個請求數(shù)量后才去判斷是否要開啟熔斷 ErrorPercentThreshold int // 請求數(shù)量大于等于 RequestVolumeThreshold 并且錯誤率到達(dá)這個百分比后就會啟動熔斷 SleepWindow int // 熔斷器被打開后 SleepWindow 的時間就是控制過多久后去嘗試服務(wù)是否可用了 單位為毫秒 open bool lastOpenTime int64 // 單位ms mutex sync.RWMutex metric *metrics } func (circuit *Circuit) isHealthy() bool { // 當(dāng)前總請求量小于設(shè)置的閾值 返回 if int(circuit.metric.totalRequest()) < circuit.RequestVolumeThreshold { return true } // 判斷錯誤率是否大于設(shè)定的閾值,從而判斷目標(biāo)服務(wù)是否出現(xiàn)故障 return circuit.metric.errorPercent() < circuit.ErrorPercentThreshold } func (circuit *Circuit) isOpen() bool { circuit.mutex.RLock() o := circuit.open circuit.mutex.RUnlock() if !o { return false } // 當(dāng)前時間與熔斷器打開時間進(jìn)行對比,如果過了時間窗口,那么恢復(fù)。 if circuit.lastOpenTime+int64(circuit.SleepWindow) < time.Now().UnixMilli() { circuit.setClose() return false } return true } func (circuit *Circuit) setClose() { circuit.mutex.Lock() defer circuit.mutex.Unlock() if !circuit.open { return } circuit.open = false // 清空指標(biāo)數(shù)據(jù) 重新計(jì)算 circuit.metric.clear() }
執(zhí)行過程
首先我們需要判斷熔斷器是否是打開的狀態(tài),如果是,那么直接降級處理。如果不是,便執(zhí)行傳入的run()函數(shù),得到返回結(jié)果并上報(bào)。
func (cmd *command) do() error { defer cmd.reportAllEvents() // 判斷熔斷器是否打開 if !cmd.circuit.allowRequest() { cmd.report(circuitOpenEvent) return cmd.tryFallback(ErrCircuitOpen) } // 設(shè)置超時時間 timer := time.NewTimer(cmd.circuit.Timeout) defer timer.Stop() finish, errCh := make(chan struct{}), make(chan error) go func() { if err := cmd.run(); err != nil { errCh <- err return } finish <- struct{}{} }() // 處理 超時、執(zhí)行成功、執(zhí)行失敗 這三種情況 // 超時以及執(zhí)行失敗都認(rèn)為錯誤,降級處理 select { case <-timer.C: return cmd.tryFallback(ErrTimeout) case <-finish: cmd.report(successEvent) return nil case err := <-errCh: return cmd.tryFallback(err) } }
以上就是熔斷器的全部思路以及核心代碼。我們通過metrics來收集指標(biāo)并使用Circuit配置熔斷規(guī)則以及根據(jù)metrics收集的指標(biāo)判斷目標(biāo)服務(wù)是否出現(xiàn)故障,最后使用Command來執(zhí)行配置的run()函數(shù)以及降級邏輯。
項(xiàng)目地址
https://gitee.com/colocust/hystrix
以上就是Golang熔斷器的開發(fā)過程詳解的詳細(xì)內(nèi)容,更多關(guān)于Golang熔斷器的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
源碼剖析Golang中map擴(kuò)容底層的實(shí)現(xiàn)
之前的文章詳細(xì)介紹過Go切片和map的基本使用,以及切片的擴(kuò)容機(jī)制。本文針對map的擴(kuò)容,會從源碼的角度全面的剖析一下map擴(kuò)容的底層實(shí)現(xiàn),需要的可以參考一下2023-03-03Jaeger?Client?Go入門并實(shí)現(xiàn)鏈路追蹤
這篇文章介紹了Jaeger?Client?Go入門并實(shí)現(xiàn)鏈路追蹤的方法,文中通過示例代碼介紹的非常詳細(xì)。對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-03-03Go創(chuàng)建一個包并使用(導(dǎo)入本地包和注意事項(xiàng))
有時候需要自己寫一個包方便多次使用,但是在導(dǎo)入自己寫的包時遇到了問題,本文主要介紹了Go創(chuàng)建一個包并使用(導(dǎo)入本地包和注意事項(xiàng)),感興趣的可以了解一下2023-11-11golang bad file descriptor問題的解決方法
這篇文章主要給大家介紹了golang bad file descriptor問題的解決方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-02-02Golang使用Gin框架實(shí)現(xiàn)路由分類處理請求流程詳解
Gin是一個golang的微框架,封裝比較優(yōu)雅,具有快速靈活,容錯方便等特點(diǎn),這篇文章主要介紹了Golang使用Gin框架實(shí)現(xiàn)路由分類處理請求,感興趣的同學(xué)可以參考下文2023-05-05基于Go語言實(shí)現(xiàn)一個并發(fā)下載器
這篇文章主要為大家詳細(xì)介紹了如何利用GO語言實(shí)現(xiàn)一個并發(fā)的文件下載器,可以在不重新啟動整個下載的情況下處理錯誤,感興趣的小伙伴可以了解一下2023-10-10