一文教你學(xué)會Go中singleflight的使用
寫作背景
緩存在項(xiàng)目中使用應(yīng)該是非常頻繁的,提到緩存只要了解過 singleflight ,基本都會用于緩存實(shí)現(xiàn)的一部分吧?但 singleflight 要用好也不容易。
名稱解釋
singleflight 來源于準(zhǔn)官方庫(也可以說官方擴(kuò)展庫)golang.org/x/sync/singleflight 包中。它的作用是避免同一個(gè) key 對下游發(fā)起多次請求,降低下游流量。
源碼剖析
3 個(gè)結(jié)構(gòu)體
Group 是 singleflight 的核心,代表一個(gè)組,用于執(zhí)行具有重復(fù)抑制的工作單元。
type Group struct { mu sync.Mutex m map[string]*call }
mu 是保護(hù) m 字段的互斥鎖,確保對調(diào)用信息的訪問是線程安全的。m 是一個(gè) map,鍵是函數(shù)的唯一標(biāo)識符,值是 call 結(jié)構(gòu)體,代表一次函數(shù)調(diào)用的信息,包括函數(shù)的返回值和錯誤。
call 代表一次函數(shù)調(diào)用的信息,把函數(shù)的調(diào)用結(jié)果封裝到 call 中
type call struct { wg sync.WaitGroup // 這些字段在 WaitGroup 完成之前只被寫入一次,并且在 WaitGroup 完成之后只被讀取 val interface{} // 函數(shù)調(diào)用的返回值 err error // 函數(shù)調(diào)用可能出現(xiàn)的錯誤 dups int // 相同 key 調(diào)用次數(shù) chans []chan<- Result // 結(jié)果通道列表,僅調(diào)用 DoChan() 方法時(shí)返回 }
Result 結(jié)構(gòu)體用于保存 DoChan() 方法的執(zhí)行結(jié)果,以便將結(jié)果傳遞給通道。
type Result struct { Val interface{} Err error Shared bool }
4 個(gè)方法
Group 主要提供了 3 個(gè)公開方法和 1 個(gè)非公開方法。
Do() 方法,相同的 key 對應(yīng)的 fn 函數(shù)只會調(diào)用一次。返回值 v 調(diào)用 fn() 方法返回的結(jié)果;err 調(diào)用 fn() 返回的 err;shared:表示在多次調(diào)用的結(jié)果是否共享。
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ g.mu.Unlock() c.wg.Wait() if e, ok := c.err.(*panicError); ok { panic(e) } else if c.err == errGoexit { runtime.Goexit() } return c.val, c.err, true } c := new(call) c.wg.Add(1) g.m[key] = c g.mu.Unlock() g.doCall(c, key, fn) return c.val, c.err, c.dups > 0 }
源碼比較簡單,如果 key 對應(yīng)的 fn 函數(shù)已被調(diào)用,則等待 fn 函數(shù)調(diào)用完成直接返回結(jié)果。如果 fn 未被調(diào)用,new(call) 存入 m 中,執(zhí)行 doCal() 方法。
doCall() 方法,調(diào)用 key 對應(yīng)的 fn 方法。
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { normalReturn := false recovered := false defer func() { if !normalReturn && !recovered { c.err = errGoexit } g.mu.Lock() defer g.mu.Unlock() c.wg.Done() if g.m[key] == c { delete(g.m, key) } if e, ok := c.err.(*panicError); ok { if len(c.chans) > 0 { go panic(e) select {} } else { panic(e) } } else if c.err == errGoexit { } else { for _, ch := range c.chans { ch <- Result{c.val, c.err, c.dups > 0} } } }() func() { defer func() { if !normalReturn { if r := recover(); r != nil { c.err = newPanicError(r) } } }() c.val, c.err = fn() normalReturn = true }() if !normalReturn { recovered = true } }
doCall() 代碼比較簡單,double defer 雙延遲機(jī)制區(qū)分 panic 和 runtime.Goexit。第二個(gè) defer 會先執(zhí)行調(diào)用 fn() 函數(shù),如果未正常返回將會補(bǔ)獲異常,并將堆棧信息存入 err 中。
第一個(gè) defer 先將 key 從 m 中移除,再就是異常處理,如果是 Goexit 正常退出,如果斷言是 panicError 將對外拋出 Panic。若正常退出將結(jié)果發(fā)送到 chans 通道列表中。
DoChan() 方法類似于 Do() 方法,返回通道(chan),通過通道接收數(shù)據(jù)。另外通道不會被關(guān)閉。
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { ch := make(chan Result, 1) g.mu.Lock() if g.m == nil { g.m = make(map[string]*call) } if c, ok := g.m[key]; ok { c.dups++ c.chans = append(c.chans, ch) g.mu.Unlock() return ch } c := &call{chans: []chan<- Result{ch}} c.wg.Add(1) g.m[key] = c g.mu.Unlock() go g.doCall(c, key, fn) return ch }
Forget() 方法,可以理解為丟棄某一個(gè) key,后面該 key 會被立即調(diào)用,而不是等待先前的調(diào)用完成。
func (g *Group) Forget(key string) { g.mu.Lock() delete(g.m, key) g.mu.Unlock() }
經(jīng)典案例
緩存場景在大家的業(yè)務(wù)場景中應(yīng)該是被廣泛使用的,大部分的場景使用應(yīng)該都是下圖吧?
從單體應(yīng)用到微服務(wù)化,調(diào)用下游服務(wù)一般如下圖吧?
假設(shè)緩存 Miss 所有流量會瞬間打到數(shù)據(jù)庫,或者所有流量都會打到 server2,如果學(xué)習(xí)過 singleflight 的同學(xué),肯定會把它用在 reids->db 或 server->server2 之間,包括我也是。如下圖(只舉數(shù)據(jù)庫案例)。
在使用 singleflight 之前你先確定下你的業(yè)務(wù)場景,key 相同的情況多嗎?(可以統(tǒng)計(jì)一些數(shù)據(jù),我們業(yè)務(wù)場景同一個(gè) key 多次調(diào)用下游概率是比較高的)如果 key 相同的情況比較少,singleflight 對你的幫助可能不大。
上面列舉 2 種方案。
1、 singleflight 介于 redis 和 db 之間,redis 是內(nèi)存緩存 qps 高、響應(yīng)也快。大部分情況不會成為瓶頸,但數(shù)據(jù)庫就不一樣了,所以這種方案可以防止緩存被擊穿流量打到數(shù)據(jù)庫。
2、 singleflight 介于 server 和 redis 之間,網(wǎng)上挺多推薦這種用法的,有必要用此方案嗎?大家可以思考下,文章末尾我給出我的想法。
我更傾向方案一。代碼如下:
func TestSingleFlight(t *testing.T) { var ( n = 10 k = "12344556" wg = sync.WaitGroup{} sf singleflight.Group ) for i := 0; i < n; i++ { go func() { wg.Add(1) defer wg.Done() r, err, shared := sf.Do(k, func() (interface{}, error) { return get(k) }) if err != nil { panic(err) } fmt.Printf("r=%v,shared=%v\n", r, shared) }() } wg.Wait() } func get(key string) (interface{}, error) { time.Sleep(time.Microsecond) // todo 模擬業(yè)務(wù)處理 return key, nil }
輸出結(jié)果如下
=== RUN TestSingleFlight
r=12344556,shared=true
r=12344556,shared=true
r=12344556,shared=true
r=12344556,shared=true
r=12344556,shared=true
r=12344556,shared=false
r=12344556,shared=true
r=12344556,shared=false
r=12344556,shared=true
r=12344556,shared=true
--- PASS: TestSingleFlight (0.00s)
PASS
打印結(jié)果中為 true 都代表 調(diào)用 get() 函數(shù)返回結(jié)果被共享。get 函數(shù)調(diào)用明顯降低了。
這種寫法在函數(shù)正常返回情況下是能拿到正確的結(jié)果,如果下游返回異常了呢?(業(yè)務(wù)上遇過下游返回3-4s的拉低業(yè)務(wù)處理速度)因?yàn)?nbsp;Do() 方法是以阻塞的方式來控制對下游的調(diào)用的,如果某一個(gè)請求被阻塞了,同一個(gè) key 后面的請求都會被阻塞。
假設(shè)有一場景(SOP),消費(fèi) kafka 消息處理業(yè)務(wù)邏輯,業(yè)務(wù)高峰期某一時(shí)間段生產(chǎn)消息量為 100 w,單 pod 消費(fèi)速度 500/s ,請求下游用 singleflight 控制對下游(三方接口)的并發(fā)量,假設(shè)下游某一次請求耗時(shí) 2s。這時(shí)會有幾個(gè)問題:
1、若某一個(gè) key 被阻塞后續(xù)該 key 大量請求被阻塞,若這批請求失敗從而導(dǎo)致消息處理失敗,如果對消息重試會加劇業(yè)務(wù)下游壓力。
2、單 pod 消費(fèi)速度從 500/s,降低到個(gè)位數(shù),消費(fèi)時(shí)間拉長,消息堆積(如果消息堆積對實(shí)時(shí)性要求場景影響視頻很大的)。
造成這個(gè)問題主要原因如下:
singleflight 是同步阻塞且缺乏超時(shí)控制機(jī)制,若某一個(gè) key 阻塞后面次 key 都會被阻塞并且等待第一次結(jié)束。
singleflight 雖然能降低對下游的請求量,但在某些場景失敗的情況也增加了。
我們有辦法給 singleflight 加一個(gè)超時(shí)時(shí)間嗎?答案是肯定有的
下面這段代碼 singleflight 沒有增加超時(shí)控制
var ( offset int32 = 0 ) func TestSingleFlight(t *testing.T) { var ( n int32 = 1000 k = "12344556" wg = sync.WaitGroup{} sf singleflight.Group failCnt int32 = 0 ) for i := 0; i < int(n); i++ { go func() { wg.Add(1) defer wg.Done() _, err, _ := sf.Do(k, func() (interface{}, error) { return get(k) }) if err != nil { atomic.AddInt32(&failCnt, 1) return } }() } wg.Wait() fmt.Printf("總請求數(shù)=%d,請求成功率=%d,請求失敗率=%d", n, n-failCnt, failCnt) } func get(key string) (interface{}, error) { var err error if atomic.AddInt32(&offset, 1) == 3 { // 假設(shè)偏移量 offset == 3 執(zhí)行耗時(shí)長,超時(shí)失敗了 time.Sleep(time.Microsecond * 500) err = fmt.Errorf("耗時(shí)長") } return key, err }
結(jié)果輸出如下
=== RUN TestSingleFlight
總請求數(shù)=1000,請求成功率=792,請求失敗率=208--- PASS: TestSingleFlight (0.00s)
PASS
singleflight 增加超時(shí)控制代碼如下
func TestSingleFlight(t *testing.T) { var ( n int32 = 1000 k = "12344556" wg = sync.WaitGroup{} sf singleflight.Group failCnt int32 = 0 ) for i := 0; i < int(n); i++ { go func() { wg.Add(1) defer wg.Done() _, err, _ := sf.Do(k, func() (interface{}, error) { ctx, _ := context.WithTimeout(context.TODO(), time.Microsecond*30) go func(_ctx context.Context) { <-_ctx.Done() sf.Forget(k) }(ctx) return get(k) }) if err != nil { atomic.AddInt32(&failCnt, 1) return } }() } wg.Wait() fmt.Printf("總請求數(shù)=%d,請求成功率=%d,請求失敗率=%d", n, n-failCnt, failCnt) }
利用 context.WithTimeout() 方法控制超時(shí),并且調(diào)用 Forget() 方法移除超時(shí) key 結(jié)果輸出如下
=== RUN TestSingleFlight
總請求數(shù)=1000,請求成功率=992,請求失敗率=8--- PASS: TestSingleFlight (0.00s)
PASS
成功率提高了失敗率明顯降低了。
下面我用 DoChan() 函數(shù)實(shí)現(xiàn)
var ( offset int32 = 0 ) func TestSingleFlight(t *testing.T) { var ( n int32 = 1000 // n 越大,效果越明顯 k = "12344556" wg = sync.WaitGroup{} sf singleflight.Group successCnt int32 = 0 ) for i := 0; i < int(n); i++ { go func() { wg.Add(1) defer wg.Done() ch := sf.DoChan(k, func() (interface{}, error) { return get(k) }) ctx, _ := context.WithTimeout(context.TODO(), time.Microsecond*100) select { case <-ctx.Done(): sf.Forget(k) return case ret := <-ch: if ret.Err != nil { return } atomic.AddInt32(&successCnt, 1) } }() } wg.Wait() fmt.Printf("總請求數(shù)=%d,請求成功率=%d,請求失敗率=%d", n, successCnt, n-successCnt) } func get(key string) (interface{}, error) { var err error if atomic.AddInt32(&offset, 1) == 3 { // 假設(shè)偏移量 offset == 3 執(zhí)行耗時(shí)長,超時(shí)失敗了 time.Sleep(time.Microsecond * 400) err = fmt.Errorf("耗時(shí)長") } return key, err }
大家自行驗(yàn)證
總結(jié)
1、singleflight 使用得當(dāng)確實(shí)能有效降低下游流量,我也推薦大家使用,但一定要注意同步阻塞問題,防止下游長耗時(shí)造成業(yè)務(wù)異常或高延遲,一定要做好正確性與降低業(yè)務(wù)下游流量權(quán)衡。
2、上面我留了一個(gè)問題,singleflight 有必要放在 server 應(yīng)用和 redis 之間嗎?我認(rèn)為沒必要,redis 是內(nèi)存數(shù)據(jù)庫,響應(yīng)快,高 qps 本身不會是瓶頸,保護(hù) redis 沒有意義。另外 singleflight 用途是防止 redis 擊穿流量打到數(shù)據(jù)庫,如果你業(yè)務(wù) qps 非常高并且對數(shù)據(jù)實(shí)時(shí)性要求高,為啥不通過其他手段把數(shù)據(jù)庫數(shù)據(jù)刷新到 redis 中?比如數(shù)據(jù)創(chuàng)建同步寫入 redis、或通過 binlog 寫入。
到此這篇關(guān)于一文教你學(xué)會Go中singleflight的使用的文章就介紹到這了,更多相關(guān)Go singleflight內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
GoLang?socket網(wǎng)絡(luò)編程傳輸數(shù)據(jù)包時(shí)進(jìn)行長度校驗(yàn)的方法
在GoLang?socket網(wǎng)絡(luò)編程中,為了確保數(shù)據(jù)交互的穩(wěn)定性和安全性,通常會通過傳輸數(shù)據(jù)的長度進(jìn)行校驗(yàn),發(fā)送端首先發(fā)送數(shù)據(jù)長度,然后發(fā)送數(shù)據(jù)本體,接收端則根據(jù)接收到的數(shù)據(jù)長度和數(shù)據(jù)本體進(jìn)行比較,以此來確認(rèn)數(shù)據(jù)是否傳輸成功2024-11-11golang 并發(fā)安全Map以及分段鎖的實(shí)現(xiàn)方法
這篇文章主要介紹了golang 并發(fā)安全Map以及分段鎖的實(shí)現(xiàn)方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2019-03-03Go?gRPC進(jìn)階教程服務(wù)超時(shí)設(shè)置
這篇文章主要為大家介紹了Go?gRPC進(jìn)階,gRPC請求的超時(shí)時(shí)間設(shè)置,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06Go?chassis云原生微服務(wù)開發(fā)框架應(yīng)用編程實(shí)戰(zhàn)
這篇文章主要為大家介紹了Go?chassis云原生微服務(wù)開發(fā)框架應(yīng)用編程實(shí)戰(zhàn)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08golang將多路復(fù)異步io轉(zhuǎn)成阻塞io的方法詳解
常見的IO模型有阻塞、非阻塞、IO多路復(fù)用,異,下面這篇文章主要給大家介紹了關(guān)于golang將多路復(fù)異步io轉(zhuǎn)成阻塞io的方法,文中給出了詳細(xì)的示例代碼,需要的朋友可以參考借鑒,下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2017-09-09