Golang熔斷器的開(kāi)發(fā)過(guò)程詳解
為什么需要熔斷
在分布式大行其道的今天,子系統(tǒng)與子系統(tǒng)之間通常使用RPC進(jìn)行通信,但由于是遠(yuǎn)程調(diào)用,如果整條鏈路中的某個(gè)服務(wù)出現(xiàn)異常,便可能雪崩從而導(dǎo)致整個(gè)系統(tǒng)崩潰(親身經(jīng)歷)。

如何解決
重試
我們可以很容易的想到加入重試機(jī)制以及超時(shí)時(shí)間來(lái)解決:在指定時(shí)間內(nèi)如果沒(méi)有返回,便觸發(fā)重試,達(dá)到重試閾值后如果還是沒(méi)有拿到正確的數(shù)據(jù)變做降級(jí)處理。
這樣做確實(shí)能夠保證整個(gè)系統(tǒng)不會(huì)雪崩,單次請(qǐng)求也能拿到合理的數(shù)據(jù)。但因?yàn)橹卦嚈C(jī)制,讓本就搖搖欲墜的故障服務(wù)雪上加霜,最后可能導(dǎo)致該服務(wù)完全不可用。
熔斷
重試行不通的原因是因?yàn)樗€會(huì)繼續(xù)調(diào)用故障服務(wù),甚至請(qǐng)求量比平時(shí)還翻了幾倍。所以我們的核心問(wèn)題是解決出現(xiàn)故障后繼續(xù)調(diào)用這個(gè)問(wèn)題,此時(shí)就可以引入熔斷器了。
簡(jiǎn)單來(lái)說(shuō),熔斷器會(huì)收集每次遠(yuǎn)程調(diào)用的結(jié)果,并根據(jù)一定的規(guī)則判斷目標(biāo)服務(wù)是否出現(xiàn)故障,如果出現(xiàn)故障,就不再調(diào)用,直接進(jìn)入降級(jí)處理并返回。

總體設(shè)計(jì)
- 存儲(chǔ)外部調(diào)用結(jié)果,并根據(jù)這些結(jié)果(成功量/失敗量)來(lái)判斷目標(biāo)服務(wù)是否出現(xiàn)故障。
- 判定目標(biāo)服務(wù)出現(xiàn)故障后,熔斷器打開(kāi),后續(xù)請(qǐng)求直接降級(jí)處理。
- 熔斷器執(zhí)行時(shí)接收兩個(gè)function,一個(gè)是外部調(diào)用的函數(shù),另一個(gè)是降級(jí)處理的函數(shù):
err := hystrix.Do("test", func() error {
// do something
return nil
}, func(err error) error {
fmt.Println(err.Error())
return err
})實(shí)現(xiàn)
指標(biāo)
- 首先我們需要存儲(chǔ)外部調(diào)用的結(jié)果,包括成功量以及失敗量。但過(guò)早的調(diào)用結(jié)果對(duì)于當(dāng)前來(lái)說(shuō)其實(shí)也沒(méi)有太多的參考意義,并且如果全部存儲(chǔ)下來(lái)也會(huì)有內(nèi)存問(wèn)題。所以這里我們存儲(chǔ)最近10秒的數(shù)據(jù)就行了。
- 我們很容易的想到使用map來(lái)存儲(chǔ)最近10秒的數(shù)據(jù),key為時(shí)間戳,value為上報(bào)量。但map會(huì)存在擴(kuò)容以及刪除10秒前的key的問(wèn)題,帶來(lái)一些額外開(kāi)銷(xiāo)。這里我們可以用一個(gè)環(huán)形數(shù)組來(lái)解決。
type (
metrics struct {
total *number
success *number
fail *number
}
number struct {
buckets [10]*bucket
mutex sync.RWMutex
}
bucket struct {
timestamp int64
value float64
}
)我們定義了三個(gè)結(jié)構(gòu)體,bucket用于存儲(chǔ)上報(bào)量,number使用環(huán)形數(shù)組存儲(chǔ)最近10秒的上報(bào)量,而metrics存儲(chǔ)了請(qǐng)求總量、成功量以及失敗量。
這三個(gè)結(jié)構(gòu)體中,我們重點(diǎn)關(guān)注指標(biāo)的上報(bào)(increment)以及讀操作即可(sum)
func (number *number) increment(now int64) {
index := now % 10
number.mutex.RLock()
if now < number.buckets[index].timestamp {
number.mutex.RUnlock()
return
}
number.mutex.RUnlock()
number.mutex.Lock()
defer number.mutex.Unlock()
if number.buckets[index].timestamp != now {
number.buckets[index].value = 0
}
number.buckets[index].timestamp = now
number.buckets[index].value++
}首先我們需要拿到當(dāng)前時(shí)間戳對(duì)應(yīng)的環(huán)形數(shù)組下標(biāo),然后加鎖,判斷當(dāng)前時(shí)間戳是否有效。注意第13~15行代碼,如果該下標(biāo)當(dāng)前存儲(chǔ)的數(shù)據(jù)是歷史數(shù)據(jù),那么重新賦值、覆蓋就好了(這就是環(huán)形數(shù)組的優(yōu)勢(shì),無(wú)需擴(kuò)容以及執(zhí)行delete操作)。
func (number *number) sum() (sum float64) {
number.mutex.RLock()
defer number.mutex.RUnlock()
now := time.Now().Unix()
for _, ele := range number.buckets {
if ele.timestamp <= now-10 {
continue
}
sum += ele.value
}
return
}如果環(huán)形數(shù)組中存儲(chǔ)的指標(biāo)數(shù)據(jù)是10秒之前的,那么就不參與計(jì)算。
熔斷器
有了指標(biāo)數(shù)據(jù),我們就可以考慮如何通過(guò)它來(lái)判斷目標(biāo)服務(wù)是否出現(xiàn)故障了。最簡(jiǎn)單的,我們可以定義一個(gè)閾值:當(dāng)最近10秒的請(qǐng)求錯(cuò)誤率達(dá)到這個(gè)閾值后,就認(rèn)為目標(biāo)服務(wù)出現(xiàn)故障。
但這個(gè)判斷得基于一定的請(qǐng)求量才能開(kāi)啟,否則得到的錯(cuò)誤率與目標(biāo)服務(wù)當(dāng)前的運(yùn)行狀態(tài)對(duì)比會(huì)存在一定誤差,例如服務(wù)啟動(dòng)后第一次請(qǐng)求目標(biāo)服務(wù),但因?yàn)橐恍┡棘F(xiàn)原因返回了error,那么此時(shí)的錯(cuò)誤率就是100%,認(rèn)為目標(biāo)服務(wù)出現(xiàn)故障,后續(xù)請(qǐng)求都會(huì)被攔截。
熔斷器開(kāi)啟后,還得想辦法將它關(guān)閉,否則就算目標(biāo)服務(wù)恢復(fù)了正常,熔斷器還是會(huì)將該請(qǐng)求攔截。我們可以設(shè)置一個(gè)時(shí)間窗口并記錄熔斷器打開(kāi)的時(shí)間,只要過(guò)了這個(gè)時(shí)間窗口,我們便可以關(guān)閉熔斷器并重新收集指標(biāo)數(shù)據(jù)進(jìn)行再次進(jìn)行判斷。
type Circuit struct {
Timeout time.Duration
RequestVolumeThreshold int // 達(dá)到這個(gè)請(qǐng)求數(shù)量后才去判斷是否要開(kāi)啟熔斷
ErrorPercentThreshold int // 請(qǐng)求數(shù)量大于等于 RequestVolumeThreshold 并且錯(cuò)誤率到達(dá)這個(gè)百分比后就會(huì)啟動(dòng)熔斷
SleepWindow int // 熔斷器被打開(kāi)后 SleepWindow 的時(shí)間就是控制過(guò)多久后去嘗試服務(wù)是否可用了 單位為毫秒
open bool
lastOpenTime int64 // 單位ms
mutex sync.RWMutex
metric *metrics
}
func (circuit *Circuit) isHealthy() bool {
// 當(dāng)前總請(qǐng)求量小于設(shè)置的閾值 返回
if int(circuit.metric.totalRequest()) < circuit.RequestVolumeThreshold {
return true
}
// 判斷錯(cuò)誤率是否大于設(shè)定的閾值,從而判斷目標(biāo)服務(wù)是否出現(xiàn)故障
return circuit.metric.errorPercent() < circuit.ErrorPercentThreshold
}
func (circuit *Circuit) isOpen() bool {
circuit.mutex.RLock()
o := circuit.open
circuit.mutex.RUnlock()
if !o {
return false
}
// 當(dāng)前時(shí)間與熔斷器打開(kāi)時(shí)間進(jìn)行對(duì)比,如果過(guò)了時(shí)間窗口,那么恢復(fù)。
if circuit.lastOpenTime+int64(circuit.SleepWindow) < time.Now().UnixMilli() {
circuit.setClose()
return false
}
return true
}
func (circuit *Circuit) setClose() {
circuit.mutex.Lock()
defer circuit.mutex.Unlock()
if !circuit.open {
return
}
circuit.open = false
// 清空指標(biāo)數(shù)據(jù) 重新計(jì)算
circuit.metric.clear()
}執(zhí)行過(guò)程
首先我們需要判斷熔斷器是否是打開(kāi)的狀態(tài),如果是,那么直接降級(jí)處理。如果不是,便執(zhí)行傳入的run()函數(shù),得到返回結(jié)果并上報(bào)。
func (cmd *command) do() error {
defer cmd.reportAllEvents()
// 判斷熔斷器是否打開(kāi)
if !cmd.circuit.allowRequest() {
cmd.report(circuitOpenEvent)
return cmd.tryFallback(ErrCircuitOpen)
}
// 設(shè)置超時(shí)時(shí)間
timer := time.NewTimer(cmd.circuit.Timeout)
defer timer.Stop()
finish, errCh := make(chan struct{}), make(chan error)
go func() {
if err := cmd.run(); err != nil {
errCh <- err
return
}
finish <- struct{}{}
}()
// 處理 超時(shí)、執(zhí)行成功、執(zhí)行失敗 這三種情況
// 超時(shí)以及執(zhí)行失敗都認(rèn)為錯(cuò)誤,降級(jí)處理
select {
case <-timer.C:
return cmd.tryFallback(ErrTimeout)
case <-finish:
cmd.report(successEvent)
return nil
case err := <-errCh:
return cmd.tryFallback(err)
}
}以上就是熔斷器的全部思路以及核心代碼。我們通過(guò)metrics來(lái)收集指標(biāo)并使用Circuit配置熔斷規(guī)則以及根據(jù)metrics收集的指標(biāo)判斷目標(biāo)服務(wù)是否出現(xiàn)故障,最后使用Command來(lái)執(zhí)行配置的run()函數(shù)以及降級(jí)邏輯。
項(xiàng)目地址
https://gitee.com/colocust/hystrix
以上就是Golang熔斷器的開(kāi)發(fā)過(guò)程詳解的詳細(xì)內(nèi)容,更多關(guān)于Golang熔斷器的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- golang高并發(fā)限流操作 ping / telnet
- 詳解Golang實(shí)現(xiàn)請(qǐng)求限流的幾種辦法
- golang?熔斷器的實(shí)現(xiàn)過(guò)程
- Golang官方限流器庫(kù)實(shí)現(xiàn)限流示例詳解
- Golang實(shí)現(xiàn)常見(jiàn)的限流算法的示例代碼
- Golang官方限流器time/rate的使用與實(shí)現(xiàn)詳解
- golang限流庫(kù)兩個(gè)大bug(半年之久無(wú)人提起)
- Golang限流器time/rate設(shè)計(jì)與實(shí)現(xiàn)詳解
- golang 熔斷限流降級(jí)實(shí)踐
相關(guān)文章
源碼剖析Golang中map擴(kuò)容底層的實(shí)現(xiàn)
之前的文章詳細(xì)介紹過(guò)Go切片和map的基本使用,以及切片的擴(kuò)容機(jī)制。本文針對(duì)map的擴(kuò)容,會(huì)從源碼的角度全面的剖析一下map擴(kuò)容的底層實(shí)現(xiàn),需要的可以參考一下2023-03-03
golang使用go test輸出單元測(cè)試覆蓋率的方式
單元測(cè)試覆蓋率是衡量代碼質(zhì)量的一個(gè)重要指標(biāo),重要的代碼文件覆蓋率應(yīng)該至少達(dá)到80%以上,Java 可以通過(guò)JaCoCo 統(tǒng)計(jì)覆蓋率,那么go 項(xiàng)目如何進(jìn)行代碼覆蓋率測(cè)試呢,本文將給大家詳細(xì)的介紹一下golang使用go test輸出單元測(cè)試覆蓋率的方式,需要的朋友可以參考下2024-02-02
Jaeger?Client?Go入門(mén)并實(shí)現(xiàn)鏈路追蹤
這篇文章介紹了Jaeger?Client?Go入門(mén)并實(shí)現(xiàn)鏈路追蹤的方法,文中通過(guò)示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-03-03
Go創(chuàng)建一個(gè)包并使用(導(dǎo)入本地包和注意事項(xiàng))
有時(shí)候需要自己寫(xiě)一個(gè)包方便多次使用,但是在導(dǎo)入自己寫(xiě)的包時(shí)遇到了問(wèn)題,本文主要介紹了Go創(chuàng)建一個(gè)包并使用(導(dǎo)入本地包和注意事項(xiàng)),感興趣的可以了解一下2023-11-11
golang bad file descriptor問(wèn)題的解決方法
這篇文章主要給大家介紹了golang bad file descriptor問(wèn)題的解決方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-02-02
Golang使用Gin框架實(shí)現(xiàn)路由分類(lèi)處理請(qǐng)求流程詳解
Gin是一個(gè)golang的微框架,封裝比較優(yōu)雅,具有快速靈活,容錯(cuò)方便等特點(diǎn),這篇文章主要介紹了Golang使用Gin框架實(shí)現(xiàn)路由分類(lèi)處理請(qǐng)求,感興趣的同學(xué)可以參考下文2023-05-05
基于Go語(yǔ)言實(shí)現(xiàn)一個(gè)并發(fā)下載器
這篇文章主要為大家詳細(xì)介紹了如何利用GO語(yǔ)言實(shí)現(xiàn)一個(gè)并發(fā)的文件下載器,可以在不重新啟動(dòng)整個(gè)下載的情況下處理錯(cuò)誤,感興趣的小伙伴可以了解一下2023-10-10

