golang 熔斷限流降級實踐
限流 - 2k 但是我的服務(wù)能力只有1k,所以這個時候多出來的流量怎么辦: 1. 拒絕 2. 排隊等待。用戶體驗不太好: 當(dāng)前訪問用戶過多,請稍后重試和你的服務(wù)直接掛了
用戶體驗降級了 - 原本是訪問流暢,下單流暢 -> 當(dāng)前訪問用戶過多,請稍后重試
熔斷 - 比如A服務(wù)訪問B服務(wù),這個時候B服務(wù)很慢 - B服務(wù)壓力過大,導(dǎo)致了出現(xiàn)了不少請求錯誤,調(diào)用方很容易出現(xiàn)一個問題: 每次調(diào)用都超時 2k,結(jié)果這個時候數(shù)據(jù)庫出現(xiàn)了問題, 超時重試 - 網(wǎng)絡(luò) 2k的流量突然變成了3k。這讓原本就滿負(fù)荷的b服務(wù)雪上加霜,如果這個時候調(diào)用方有一種機(jī)制:比如說 1. 發(fā)現(xiàn)了大部分請求很慢 - 50%請求都很慢, 2. 發(fā)現(xiàn)我的請求有50%都錯誤了 3. 錯誤數(shù)量很多,比如1s出現(xiàn)了20個錯誤 。上述三種情況出現(xiàn),在一段時間內(nèi)不發(fā)給B服務(wù),直接拒絕。一段時間之后再發(fā)送。以此來緩解B服務(wù)的壓力,避免B服務(wù)掛掉的可能。
技術(shù)選型
限流
QPS限流
package main import ( "fmt" "log" "math/rand" "time" sentinel "github.com/alibaba/sentinel-golang/api" "github.com/alibaba/sentinel-golang/core/base" "github.com/alibaba/sentinel-golang/core/flow" ) const resName = "example-flow-qps-resource" func main() { //初始化sentinel err := sentinel.InitDefault() if err != nil { log.Fatal(err) } //配置限流規(guī)則 _, err = flow.LoadRules([]*flow.Rule{ { Resource: resName, TokenCalculateStrategy: flow.Direct, ControlBehavior: flow.Reject, Threshold: 10, StatIntervalInMs: 1000, }, }) if err != nil { log.Fatalf("Unexpected error: %+v", err) return } ch := make(chan struct{}) for i := 0; i < 10; i++ { go func() { for { e, b := sentinel.Entry(resName, sentinel.WithTrafficType(base.Inbound)) if b != nil { // Blocked. We could get the block reason from the BlockError. fmt.Println("被限流") time.Sleep(time.Duration(rand.Uint64()%10) * time.Millisecond) } else { // Passed, wrap the logic here. fmt.Println("成功") time.Sleep(time.Duration(rand.Uint64()%10) * time.Millisecond) // Be sure the entry is exited finally. e.Exit() } } }() } // Simulate a scenario in which flow rules are updated concurrently go func() { time.Sleep(time.Second * 10) _, err = flow.LoadRules([]*flow.Rule{ { Resource: resName, TokenCalculateStrategy: flow.Direct, ControlBehavior: flow.Reject, Threshold: 80, StatIntervalInMs: 1000, }, }) if err != nil { log.Fatalf("Unexpected error: %+v", err) return } }() <-ch }
warmup限流
WarmUp 方式,即預(yù)熱/冷啟動方式。當(dāng)系統(tǒng)長期處于低水位的情況下,當(dāng)流量突然增加時,直接把系統(tǒng)拉升到高水位可能瞬間把系統(tǒng)壓垮。通過"冷啟動",讓通過的流量緩慢增加,在一定時間內(nèi)逐漸增加到閾值上限,給冷系統(tǒng)一個預(yù)熱的時間,避免冷系統(tǒng)被壓垮。這塊設(shè)計和 Java 類似。通常冷啟動的過程系統(tǒng)允許通過的 QPS 曲線如下圖所示:
package main import ( "fmt" "log" "math/rand" "sync" "time" sentinel "github.com/alibaba/sentinel-golang/api" "github.com/alibaba/sentinel-golang/core/base" "github.com/alibaba/sentinel-golang/core/flow" ) const resName = "example-flow-qps-resource" func main() { //初始化sentinel err := sentinel.InitDefault() if err != nil { log.Fatal(err) } //配置限流規(guī)則 _, err = flow.LoadRules([]*flow.Rule{ { Resource: resName, TokenCalculateStrategy: flow.WarmUp, //冷啟動策略 ControlBehavior: flow.Reject, //直接拒絕 Threshold: 10, //1s10個并發(fā) WarmUpPeriodSec: 30, //60s預(yù)熱 }, }) if err != nil { log.Fatalf("Unexpected error: %+v", err) return } var mutex sync.Mutex lastSecond := time.Now().Second() total := 0 totalRestrict := 0 totalPass := 0 ch := make(chan struct{}) for i := 0; i < 10; i++ { go func() { for { e, b := sentinel.Entry(resName, sentinel.WithTrafficType(base.Inbound)) if b != nil { // Blocked. We could get the block reason from the BlockError. mutex.Lock() totalRestrict++ mutex.Unlock() time.Sleep(time.Duration(rand.Uint64()%10) * time.Millisecond) } else { // Passed, wrap the logic here. mutex.Lock() totalPass++ mutex.Unlock() time.Sleep(time.Duration(rand.Uint64()%10) * time.Millisecond) // Be sure the entry is exited finally. e.Exit() } mutex.Lock() total++ mutex.Unlock() if time.Now().Second() != lastSecond { mutex.Lock() lastSecond = time.Now().Second() fmt.Println("total=", total, ",totalRestrict=", totalRestrict, ",totalPass=", totalPass) total = 0 totalRestrict = 0 totalPass = 0 mutex.Unlock() } } }() } <-ch }
趨勢逐漸增加,一直到達(dá)每秒10個左右
Throttling策略
計算時間間隔,每過了時間間隔才允許通過
package main import ( "fmt" "log" "math/rand" "time" sentinel "github.com/alibaba/sentinel-golang/api" "github.com/alibaba/sentinel-golang/core/base" "github.com/alibaba/sentinel-golang/core/flow" ) const resName = "example-flow-qps-resource" func main() { //初始化sentinel err := sentinel.InitDefault() if err != nil { log.Fatal(err) } //配置限流規(guī)則 _, err = flow.LoadRules([]*flow.Rule{ { Resource: resName, TokenCalculateStrategy: flow.Direct, ControlBehavior: flow.Throttling, Threshold: 10, StatIntervalInMs: 1000, }, }) if err != nil { log.Fatalf("Unexpected error: %+v", err) return } ch := make(chan struct{}) for i := 0; i < 10; i++ { go func() { for { e, b := sentinel.Entry(resName, sentinel.WithTrafficType(base.Inbound)) if b != nil { // Blocked. We could get the block reason from the BlockError. fmt.Println("被限流") time.Sleep(time.Duration(rand.Uint64()%10) * time.Millisecond) } else { // Passed, wrap the logic here. fmt.Println("成功") time.Sleep(time.Duration(rand.Uint64()%10) * time.Millisecond) // Be sure the entry is exited finally. e.Exit() } } }() } // Simulate a scenario in which flow rules are updated concurrently go func() { time.Sleep(time.Second * 10) _, err = flow.LoadRules([]*flow.Rule{ { Resource: resName, TokenCalculateStrategy: flow.Direct, ControlBehavior: flow.Reject, Threshold: 80, StatIntervalInMs: 1000, }, }) if err != nil { log.Fatalf("Unexpected error: %+v", err) return } }() <-ch }
package main import ( "fmt" "log" "time" sentinel "github.com/alibaba/sentinel-golang/api" "github.com/alibaba/sentinel-golang/core/base" "github.com/alibaba/sentinel-golang/core/flow" ) const resName = "example-flow-qps-resource" func main() { //初始化sentinel err := sentinel.InitDefault() if err != nil { log.Fatal(err) } //配置限流規(guī)則 _, err = flow.LoadRules([]*flow.Rule{ { Resource: resName, TokenCalculateStrategy: flow.Direct, ControlBehavior: flow.Throttling, Threshold: 10, StatIntervalInMs: 1000, }, }) if err != nil { log.Fatalf("Unexpected error: %+v", err) return } for i := 0; i < 10; i++ { e, b := sentinel.Entry(resName, sentinel.WithTrafficType(base.Inbound)) if b != nil { // Blocked. We could get the block reason from the BlockError. fmt.Println("被限流") } else { // Passed, wrap the logic here. fmt.Println("成功") // Be sure the entry is exited finally. e.Exit() } time.Sleep(100 * time.Millisecond) } }
熔斷
Sentinel 熔斷降級基于熔斷器模式 (circuit breaker pattern) 實現(xiàn)。熔斷器內(nèi)部維護(hù)了一個熔斷器的狀態(tài)機(jī),狀態(tài)機(jī)的轉(zhuǎn)換關(guān)系如下圖所示:
熔斷器有三種狀態(tài):
1.Closed 狀態(tài):也是初始狀態(tài),該狀態(tài)下,熔斷器會保持閉合,對資源的訪問直接通過熔斷器的檢查。
2.Open 狀態(tài):斷開狀態(tài),熔斷器處于開啟狀態(tài),對資源的訪問會被切斷。
3.Half-Open 狀態(tài):半開狀態(tài),該狀態(tài)下除了探測流量,其余對資源的訪問也會被切斷。探測流量指熔斷器處于半開狀態(tài)時,會周期性的允許一定數(shù)目的探測請求通過,如果探測請求能夠正常的返回,代表探測成功,此時熔斷器會重置狀態(tài)到 Closed 狀態(tài),結(jié)束熔斷;如果探測失敗,則回滾到 Open 狀態(tài)。
這三種狀態(tài)之間的轉(zhuǎn)換關(guān)系這里做一個更加清晰的解釋:
1.初始狀態(tài)下,熔斷器處于 Closed 狀態(tài)。如果基于熔斷器的統(tǒng)計數(shù)據(jù)表明當(dāng)前資源觸發(fā)了設(shè)定的閾值,那么熔斷器會切換狀態(tài)到 Open 狀態(tài);
2.Open 狀態(tài)即代表熔斷狀態(tài),所有請求都會直接被拒絕。熔斷器規(guī)則中會配置一個熔斷超時重試的時間,經(jīng)過熔斷超時重試時長后熔斷器會將狀態(tài)置為 Half-Open 狀態(tài),從而進(jìn)行探測機(jī)制;
3.處于 Half-Open 狀態(tài)的熔斷器會周期性去做探測。
Sentinel 提供了監(jiān)聽器去監(jiān)聽熔斷器狀態(tài)機(jī)的三種狀態(tài)的轉(zhuǎn)換,方便用戶去自定義擴(kuò)展:
熔斷策略
Sentinel 熔斷器的三種熔斷策略都支持靜默期 (規(guī)則中通過MinRequestAmount字段表示)。靜默期是指一個最小的靜默請求數(shù),在一個統(tǒng)計周期內(nèi),如果對資源的請求數(shù)小于設(shè)置的靜默數(shù),那么熔斷器將不會基于其統(tǒng)計值去更改熔斷器的狀態(tài)。靜默期的設(shè)計理由也很簡單,舉個例子,假設(shè)在一個統(tǒng)計周期剛剛開始時候,第 1 個請求碰巧是個慢請求,這個時候這個時候的慢調(diào)用比例就會是 100%,很明顯是不合理,所以存在一定的巧合性。所以靜默期提高了熔斷器的精準(zhǔn)性以及降低誤判可能性。
Sentinel 支持以下幾種熔斷策略:
慢調(diào)用比例策略 (SlowRequestRatio)
Sentinel 的熔斷器不在靜默期,并且慢調(diào)用的比例大于設(shè)置的閾值,則接下來的熔斷周期內(nèi)對資源的訪問會自動地被熔斷。該策略下需要設(shè)置允許的調(diào)用 RT 臨界值(即最大的響應(yīng)時間),對該資源訪問的響應(yīng)時間大于該閾值則統(tǒng)計為慢調(diào)用。
package main import ( "errors" "fmt" "log" "math/rand" "time" sentinel "github.com/alibaba/sentinel-golang/api" "github.com/alibaba/sentinel-golang/core/circuitbreaker" "github.com/alibaba/sentinel-golang/core/config" "github.com/alibaba/sentinel-golang/logging" "github.com/alibaba/sentinel-golang/util" ) type stateChangeTestListener struct { } func (s *stateChangeTestListener) OnTransformToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) { fmt.Printf("rule.steategy: %+v, From %s to Closed, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis()) } func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) { fmt.Printf("rule.steategy: %+v, From %s to Open, snapshot: %.2f, time: %d\n", rule.Strategy, prev.String(), snapshot, util.CurrentTimeMillis()) } func (s *stateChangeTestListener) OnTransformToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) { fmt.Printf("rule.steategy: %+v, From %s to Half-Open, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis()) } func main() { conf := config.NewDefaultConfig() // for testing, logging output to console conf.Sentinel.Log.Logger = logging.NewConsoleLogger() err := sentinel.InitWithConfig(conf) if err != nil { log.Fatal(err) } ch := make(chan struct{}) // Register a state change listener so that we could observer the state change of the internal circuit breaker. circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{}) _, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{ // Statistic time span=5s, recoveryTimeout=3s, slowRtUpperBound=50ms, maxSlowRequestRatio=50% { Resource: "abc", Strategy: circuitbreaker.SlowRequestRatio, RetryTimeoutMs: 3000, MinRequestAmount: 10, StatIntervalMs: 5000, StatSlidingWindowBucketCount: 10, MaxAllowedRtMs: 50, //大于50ms是慢查詢 Threshold: 0.5, }, }) if err != nil { log.Fatal(err) } logging.Info("[CircuitBreaker SlowRtRatio] Sentinel Go circuit breaking demo is running. You may see the pass/block metric in the metric log.") go func() { for { e, b := sentinel.Entry("abc") if b != nil { // g1 blocked time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond) } else { if rand.Uint64()%20 > 9 { // Record current invocation as error. sentinel.TraceError(e, errors.New("biz error")) } // g1 passed time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond) e.Exit() } } }() go func() { for { e, b := sentinel.Entry("abc") if b != nil { // g2 blocked time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond) } else { // g2 passed time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond) e.Exit() } } }() <-ch }
錯誤比例策略 (ErrorRatio)
Sentinel 的熔斷器不在靜默期,并且在統(tǒng)計周期內(nèi)資源請求訪問異常的比例大于設(shè)定的閾值,則接下來的熔斷周期內(nèi)對資源的訪問會自動地被熔斷。
package main import ( "errors" "fmt" "log" "math/rand" "time" sentinel "github.com/alibaba/sentinel-golang/api" "github.com/alibaba/sentinel-golang/core/circuitbreaker" "github.com/alibaba/sentinel-golang/core/config" "github.com/alibaba/sentinel-golang/logging" "github.com/alibaba/sentinel-golang/util" ) type stateChangeTestListener struct{} func (s *stateChangeTestListener) OnTransformToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) { fmt.Printf("rule.steategy: %+v, From %s to Closed, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis()) } func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) { fmt.Printf("rule.steategy: %+v, From %s to Open, snapshot: %.2f, time: %d\n", rule.Strategy, prev.String(), snapshot, util.CurrentTimeMillis()) } func (s *stateChangeTestListener) OnTransformToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) { fmt.Printf("rule.steategy: %+v, From %s to Half-Open, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis()) } func main() { total := 0 totalPass := 0 totalBlock := 0 totalErr := 0 conf := config.NewDefaultConfig() // for testing, logging output to console conf.Sentinel.Log.Logger = logging.NewConsoleLogger() err := sentinel.InitWithConfig(conf) if err != nil { log.Fatal(err) } ch := make(chan struct{}) // Register a state change listener so that we could observer the state change of the internal circuit breaker. circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{}) _, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{ // Statistic time span=5s, recoveryTimeout=3s, maxErrorCount=50 { Resource: "abc", Strategy: circuitbreaker.ErrorRatio, RetryTimeoutMs: 3000, MinRequestAmount: 10, StatIntervalMs: 5000, StatSlidingWindowBucketCount: 10, Threshold: 0.4, }, }) if err != nil { log.Fatal(err) } logging.Info("[CircuitBreaker ErrorCount] Sentinel Go circuit breaking demo is running. You may see the pass/block metric in the metric log.") go func() { for { total++ e, b := sentinel.Entry("abc") if b != nil { // g1 blocked totalBlock++ fmt.Println("熔斷") time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond) } else { if rand.Uint64()%20 > 9 { totalErr++ // Record current invocation as error. sentinel.TraceError(e, errors.New("biz error")) } totalPass++ // g1 passed time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond) e.Exit() } } }() go func() { for { e, b := sentinel.Entry("abc") if b != nil { // g2 blocked totalBlock++ fmt.Println("寫成熔斷了") time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond) } else { // g2 passed totalPass++ time.Sleep(time.Duration(rand.Uint64()%80) * time.Millisecond) e.Exit() } } }() go func() { for { time.Sleep(time.Second) fmt.Println(float64(totalErr) / float64(total)) } }() <-ch }
錯誤計數(shù)策略 (ErrorCount)
Sentinel 的熔斷器不在靜默期,并且在統(tǒng)計周期內(nèi)資源請求訪問異常數(shù)大于設(shè)定的閾值,則接下來的熔斷周期內(nèi)對資源的訪問會自動地被熔斷。
注意:這里的錯誤比例熔斷和錯誤計數(shù)熔斷指的業(yè)務(wù)返回錯誤的比例或則計數(shù)。也就是說,如果規(guī)則指定熔斷器策略采用錯誤比例或則錯誤計數(shù),那么為了統(tǒng)計錯誤比例或錯誤計數(shù),需要調(diào)用API: api.TraceError(entry, err) 埋點(diǎn)每個請求的業(yè)務(wù)異常。
package main import ( "errors" "fmt" "log" "math/rand" "time" sentinel "github.com/alibaba/sentinel-golang/api" "github.com/alibaba/sentinel-golang/core/circuitbreaker" "github.com/alibaba/sentinel-golang/core/config" "github.com/alibaba/sentinel-golang/logging" "github.com/alibaba/sentinel-golang/util" ) type stateChangeTestListener struct { } func (s *stateChangeTestListener) OnTransformToClosed(prev circuitbreaker.State, rule circuitbreaker.Rule) { fmt.Printf("rule.steategy: %+v, From %s to Closed, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis()) } func (s *stateChangeTestListener) OnTransformToOpen(prev circuitbreaker.State, rule circuitbreaker.Rule, snapshot interface{}) { fmt.Printf("rule.steategy: %+v, From %s to Open, snapshot: %d, time: %d\n", rule.Strategy, prev.String(), snapshot, util.CurrentTimeMillis()) } func (s *stateChangeTestListener) OnTransformToHalfOpen(prev circuitbreaker.State, rule circuitbreaker.Rule) { fmt.Printf("rule.steategy: %+v, From %s to Half-Open, time: %d\n", rule.Strategy, prev.String(), util.CurrentTimeMillis()) } func main() { total := 0 totalPass := 0 totalBlock := 0 totalErr := 0 conf := config.NewDefaultConfig() // for testing, logging output to console conf.Sentinel.Log.Logger = logging.NewConsoleLogger() err := sentinel.InitWithConfig(conf) if err != nil { log.Fatal(err) } ch := make(chan struct{}) // Register a state change listener so that we could observer the state change of the internal circuit breaker. circuitbreaker.RegisterStateChangeListeners(&stateChangeTestListener{}) _, err = circuitbreaker.LoadRules([]*circuitbreaker.Rule{ // Statistic time span=5s, recoveryTimeout=3s, maxErrorCount=50 { Resource: "abc", Strategy: circuitbreaker.ErrorCount, RetryTimeoutMs: 3000, MinRequestAmount: 10, StatIntervalMs: 5000, StatSlidingWindowBucketCount: 10, Threshold: 50, }, }) if err != nil { log.Fatal(err) } logging.Info("[CircuitBreaker ErrorCount] Sentinel Go circuit breaking demo is running. You may see the pass/block metric in the metric log.") go func() { for { total++ e, b := sentinel.Entry("abc") if b != nil { // g1 blocked totalBlock++ fmt.Println("熔斷") time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond) } else { if rand.Uint64()%20 > 9 { totalErr++ // Record current invocation as error. sentinel.TraceError(e, errors.New("biz error")) } totalPass++ // g1 passed time.Sleep(time.Duration(rand.Uint64()%80+10) * time.Millisecond) e.Exit() } } }() go func() { for { e, b := sentinel.Entry("abc") if b != nil { // g2 blocked totalBlock++ fmt.Println("寫成熔斷了") time.Sleep(time.Duration(rand.Uint64()%20) * time.Millisecond) } else { // g2 passed totalPass++ time.Sleep(time.Duration(rand.Uint64()%80) * time.Millisecond) e.Exit() } } }() go func() { for { time.Sleep(time.Second) fmt.Println(totalErr) } }() <-ch }
gin集成sentinel限流
初始化
package initialize import ( sentinel "github.com/alibaba/sentinel-golang/api" "github.com/alibaba/sentinel-golang/core/flow" "go.uber.org/zap" ) func InitSentinel() { //初始化sentinel err := sentinel.InitDefault() if err != nil { zap.S().Fatal(err) } //配置限流規(guī)則 //這種配置從nacos讀取 _, err = flow.LoadRules([]*flow.Rule{ { Resource: "test", TokenCalculateStrategy: flow.Direct, ControlBehavior: flow.Reject, Threshold: 10, StatIntervalInMs: 1000, }, }) if err != nil { zap.S().Fatalf("Unexpected error: %+v", err) return } }
調(diào)用
initialize.InitSentinel()
限流
e, b := sentinel.Entry("goods-list", sentinel.WithTrafficType(base.Inbound)) if b != nil { ctx.JSON(http.StatusTooManyRequests, gin.H{ "msg": "請求過于頻繁,請稍后重試", }) return } r, err := global.GoodsSrvClient.GoodsList(context.WithValue(context.Background(), "ginContext", ctx), request) if err != nil { zap.S().Errorw("[List] 查詢 【商品列表】失敗") return } e.Exit()
到此這篇關(guān)于golang 熔斷限流降級實踐的文章就介紹到這了,更多相關(guān)golang 熔斷限流降級內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang中實現(xiàn)類似類與繼承的方法(示例代碼)
這篇文章主要介紹了Golang中實現(xiàn)類似類與繼承的方法,Go語言中通過方法接受者的類型來決定方法的歸屬和繼承關(guān)系,本文通過示例代碼講解的非常詳細(xì),需要的朋友可以參考下2024-04-04golang?四則運(yùn)算計算器yacc歸約手寫實現(xiàn)
這篇文章主要為大家介紹了golang?四則運(yùn)算?計算器?yacc?歸約的手寫實現(xiàn),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-07-07