go?singleflight緩存雪崩源碼分析與應(yīng)用
一、緩存雪崩的應(yīng)用
背景:
我們在重啟pod的時候,此時會導(dǎo)致gocache中重啟,然后緩存同時大批量失效。如果此時并發(fā)比較高,會有很多goroutine,去同時訪問redis。
加單飛,將一組相同的請求合并成一個請求,實際上只會去請求一次,然后對所有的請求返回相同的結(jié)果
singlefight實驗:
singlefight_test.go
需要重新從redis獲取數(shù)據(jù)存取到 gocache。
func BenchmarkUse(b *testing.B) { ctx := context.Background() wordTouchRedisClient.Set(ctx, "k", "v", time.Second*600) goCache := cache.New(time.Second*60, time.Second*60) //sg := singleflight.Group{} for i := 0; i < b.N; i++ { _, ok := goCache.Get("k") if !ok { go func() { //_, _, _ = sg.Do("k", func() (interface{}, error) { v, _ := wordTouchRedisClient.Get(ctx, "k").Result() goCache.Set("k", v, time.Second*60) //return v, nil //}) }() } } } BenchmarkUse-8 94518 20173 ns/op
此時引入單飛
func BenchmarkUse(b *testing.B) { ctx := context.Background() wordTouchRedisClient.Set(ctx, "k", "v", time.Second*600) goCache := cache.New(time.Second*60, time.Second*60) sg := singleflight.Group{} for i := 0; i < b.N; i++ { _, ok := goCache.Get("k") if !ok { go func() { _, _, _ = sg.Do("k", func() (interface{}, error) { v, _ := wordTouchRedisClient.Get(ctx, "k").Result() goCache.Set("k", v, time.Second*60) return v, nil }) }() } } } BenchmarkUse-8 21307608 46.96 ns/op BenchmarkUse-2 25675206 45.37 ns/op
風(fēng)險:
- 如果一個報錯, 同一批都報錯
二、源碼分析
源碼注釋
// Copyright 2013 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. // Package singleflight provides a duplicate function call suppression // mechanism. // singleflight包提供了重復(fù)函數(shù)調(diào)用抑制機制。 package singleflight // import "golang.org/x/sync/singleflight" import ( "bytes" "errors" "fmt" "runtime" "runtime/debug" "sync" ) // errGoexit indicates the runtime.Goexit was called in // the user given function. // errGoexit 表示 runtime.Goexit 被用戶的函數(shù)調(diào)用了 var errGoexit = errors.New("runtime.Goexit was called") // A panicError is an arbitrary value recovered from a panic // panicError 是從panic中 恢復(fù)的任意值 // with the stack trace during the execution of given function. // 執(zhí)行給定函數(shù)期間的堆棧跟蹤 type panicError struct { value interface{} stack []byte } // Error implements error interface. // Error 實現(xiàn)錯誤接口 func (p *panicError) Error() string { return fmt.Sprintf("%v\n\n%s", p.value, p.stack) } func newPanicError(v interface{}) error { stack := debug.Stack() // The first line of the stack trace is of the form "goroutine N [status]:" // 堆棧跟蹤的第一行的形式為“goroutine N [status]:” // but by the time the panic reaches Do the goroutine may no longer exist // 但當(dāng)panic達到 Do 時,goroutine 可能不再存在 // and its status will have changed. Trim out the misleading line. // 并且它的狀態(tài)將會改變。修剪掉誤導(dǎo)性的線條。 if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { stack = stack[line+1:] } return &panicError{value: v, stack: stack} } // call is an in-flight or completed singleflight.Do call // call 是正在進行的或已完成的 singleflight.Do() 調(diào)用 type call struct { wg sync.WaitGroup // These fields are written once before the WaitGroup is done // 這些字段在 WaitGroup 完成之前寫入一次 // and are only read after the WaitGroup is done. // 并且僅在 WaitGroup 完成后才讀取。 val interface{} err error // These fields are read and written with the singleflight // 這些字段是用 singleflight mutex 讀寫的 // mutex held before the WaitGroup is done, and are read but // 在 WaitGroup完成前。 // not written after the WaitGroup is done. // 并且 只讀不寫,在WaitGroup完成后。 dups int chans []chan<- Result } // Group represents a class of work and forms a namespace in // Group 代表一個工作類,并在其中形成一個命名空間 // which units of work can be executed with duplicate suppression. // 哪些工作單元可以通過重復(fù)抑制來執(zhí)行。 type Group struct { mu sync.Mutex // protects m 用來保護m,并發(fā)安全 m map[string]*call // lazily initialized 延遲初始化 } // Result holds the results of Do, so they can be passed // Result保存了Do的結(jié)果,因此可以傳遞 // on a channel. // 在通道上 type Result struct { Val interface{} Err error Shared bool } // Do executes and returns the results of the given function, // Do 執(zhí)行并返回給定函數(shù)的結(jié)果 // making sure that only one execution is in-flight for a given key at a time. // 確保在某一時刻對于給定的鍵只有一次正在執(zhí)行 // If a duplicate comes in, the duplicate caller waits for the original // 如果有重復(fù)的調(diào)用者進入,則重復(fù)的調(diào)用者將等待最初者 // to complete and receives the same results. // 完成并收到相同的結(jié)果。 // The return value shared indicates whether v was given to multiple callers. // 返回值shared表示v是否被給予多個調(diào)用者。 func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ g.mu.Unlock() c.wg.Wait() if e, ok := c.err.(*panicError); ok { panic(e) } else if c.err == errGoexit { runtime.Goexit() } return c.val, c.err, true } c := new(call) c.wg.Add(1) g.m[key] = c g.mu.Unlock() g.doCall(c, key, fn) return c.val, c.err, c.dups > 0 } // DoChan is like Do but returns a channel that will receive the // results when they are ready. // DoChan 與 Do 類似,但返回一個chanel通道 接收準備好后的結(jié)果。 // // The returned channel will not be closed. // 返回的channel通道不會被關(guān)閉。 func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { ch := make(chan Result, 1) g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ c.chans = append(c.chans, ch) g.mu.Unlock() return ch } c := &call{chans: []chan<- Result{ch}} c.wg.Add(1) g.m[key] = c g.mu.Unlock() go g.doCall(c, key, fn) return ch } // doCall handles the single call for a key. // doCall 處理對key的單個調(diào)用。 func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { normalReturn := false recovered := false // use double-defer to distinguish panic from runtime.Goexit, // 使用雙重延遲 來區(qū)分panic和runtime.Goexit, // more details see https://golang.org/cl/134395 // 更多詳情參見 https://golang.org/cl/134395 defer func() { // the given function invoked runtime.Goexit // 調(diào)用給定函數(shù)runtime.Goexit if !normalReturn && !recovered { c.err = errGoexit } g.mu.Lock() defer g.mu.Unlock() c.wg.Done() if g.m[key] == c { delete(g.m, key) } if e, ok := c.err.(*panicError); ok { // In order to prevent the waiting channels from being blocked forever, // 為了防止等待通道永遠被阻塞, // needs to ensure that this panic cannot be recovered. // 需要確保這種panic恐慌無法恢復(fù)。 if len(c.chans) > 0 { go panic(e) select {} // Keep this goroutine around so that it will appear in the crash dump. // 保留此 goroutine,以便它出現(xiàn)在故障轉(zhuǎn)儲中。 } else { panic(e) } } else if c.err == errGoexit { // Already in the process of goexit, no need to call again // 已經(jīng)在goexit過程中,無需再次調(diào)用 } else { // Normal return // 正常返回 for _, ch := range c.chans { ch <- Result{c.val, c.err, c.dups > 0} } } }() func() { defer func() { if !normalReturn { // Ideally, we would wait to take a stack trace until we've determined // 理想情況下,我們會等待獲取堆棧跟蹤,直到我們確定 // whether this is a panic or a runtime.Goexit. // 這是恐慌還是runtime.Goexit。 // // Unfortunately, the only way we can distinguish the two is to see // 不幸的是,我們區(qū)分兩者的唯一方法就是看 // whether the recover stopped the goroutine from terminating, and by // 恢復(fù)是否阻止 goroutine 終止,并且通過 // the time we know that, the part of the stack trace relevant to the // 當(dāng)我們知道時,堆棧跟蹤中與 // panic has been discarded. // 恐慌已被丟棄。 if r := recover(); r != nil { c.err = newPanicError(r) } } }() c.val, c.err = fn() normalReturn = true }() if !normalReturn { recovered = true } } // Forget tells the singleflight to forget about a key. Future calls // Forget 告訴 singleflight 忘記某個鍵。未來的calls調(diào)用 // to Do for this key will call the function rather than waiting for // 為此鍵執(zhí)行的操作將調(diào)用該函數(shù)而不是等待 // an earlier call to complete. // 較早的調(diào)用完成。 func (g *Group) Forget(key string) { g.mu.Lock() delete(g.m, key) g.mu.Unlock() }
并發(fā)情況下的goroutine執(zhí)行情況
func BenchmarkUse(b *testing.B) { ctx := context.Background() wordTouchRedisClient.Set(ctx, "k", "v", time.Second*600) goCache := cache.New(time.Second*60, time.Second*60) sg := singleflight.Group{} for i := 0; i < b.N; i++ { _, ok := goCache.Get("k") if !ok { go func() { _, _, _ = sg.Do("k", func() (interface{}, error) { v, _ := wordTouchRedisClient.Get(ctx, "k").Result() goCache.Set("k", v, time.Second*60) return v, nil }) }() } } }
如圖表展示
就是在第一個 子goroutine的從開始到結(jié)束,啟動的 其余子goroutine,都和第一個goroutine,都擁有相同的call,為同一個group。然后返回同樣的結(jié)果。
第一個子goroutine,結(jié)束完,就刪掉key,然后在下面的goroutine,為新的一組。
以上就是go singleflight緩存雪崩源碼分析與應(yīng)用的詳細內(nèi)容,更多關(guān)于go singleflight緩存雪崩的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go語言學(xué)習(xí)之將mp4通過rtmp推送流媒體服務(wù)的實現(xiàn)方法
對音視頻一直是小白,決定沉下心來,好好研究一下音視頻知識,下面這篇文章主要給大家介紹了關(guān)于Go語言學(xué)習(xí)之將mp4通過rtmp推送流媒體服務(wù)的實現(xiàn)方法,需要的朋友可以參考下2022-12-12golang 獲取當(dāng)前執(zhí)行程序路徑的操作
這篇文章主要介紹了golang 獲取當(dāng)前程序執(zhí)行路徑的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12golang原生http包實現(xiàn)各種情況的get請求方式
這篇文章主要介紹了golang原生http包實現(xiàn)各種情況的get請求方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-08-08