一文教你學會Go中singleflight的使用
寫作背景
緩存在項目中使用應該是非常頻繁的,提到緩存只要了解過 singleflight ,基本都會用于緩存實現(xiàn)的一部分吧?但 singleflight 要用好也不容易。
名稱解釋
singleflight 來源于準官方庫(也可以說官方擴展庫)golang.org/x/sync/singleflight 包中。它的作用是避免同一個 key 對下游發(fā)起多次請求,降低下游流量。
源碼剖析
3 個結構體
Group 是 singleflight 的核心,代表一個組,用于執(zhí)行具有重復抑制的工作單元。
type Group struct {
mu sync.Mutex
m map[string]*call
}
mu 是保護 m 字段的互斥鎖,確保對調用信息的訪問是線程安全的。m 是一個 map,鍵是函數(shù)的唯一標識符,值是 call 結構體,代表一次函數(shù)調用的信息,包括函數(shù)的返回值和錯誤。
call 代表一次函數(shù)調用的信息,把函數(shù)的調用結果封裝到 call 中
type call struct {
wg sync.WaitGroup
// 這些字段在 WaitGroup 完成之前只被寫入一次,并且在 WaitGroup 完成之后只被讀取
val interface{} // 函數(shù)調用的返回值
err error // 函數(shù)調用可能出現(xiàn)的錯誤
dups int // 相同 key 調用次數(shù)
chans []chan<- Result // 結果通道列表,僅調用 DoChan() 方法時返回
}
Result 結構體用于保存 DoChan() 方法的執(zhí)行結果,以便將結果傳遞給通道。
type Result struct {
Val interface{}
Err error
Shared bool
}4 個方法
Group 主要提供了 3 個公開方法和 1 個非公開方法。
Do() 方法,相同的 key 對應的 fn 函數(shù)只會調用一次。返回值 v 調用 fn() 方法返回的結果;err 調用 fn() 返回的 err;shared:表示在多次調用的結果是否共享。
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
g.mu.Unlock()
c.wg.Wait()
if e, ok := c.err.(*panicError); ok {
panic(e)
} else if c.err == errGoexit {
runtime.Goexit()
}
return c.val, c.err, true
}
c := new(call)
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
g.doCall(c, key, fn)
return c.val, c.err, c.dups > 0
}
源碼比較簡單,如果 key 對應的 fn 函數(shù)已被調用,則等待 fn 函數(shù)調用完成直接返回結果。如果 fn 未被調用,new(call) 存入 m 中,執(zhí)行 doCal() 方法。
doCall() 方法,調用 key 對應的 fn 方法。
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
normalReturn := false
recovered := false
defer func() {
if !normalReturn && !recovered {
c.err = errGoexit
}
g.mu.Lock()
defer g.mu.Unlock()
c.wg.Done()
if g.m[key] == c {
delete(g.m, key)
}
if e, ok := c.err.(*panicError); ok {
if len(c.chans) > 0 {
go panic(e)
select {}
} else {
panic(e)
}
} else if c.err == errGoexit {
} else {
for _, ch := range c.chans {
ch <- Result{c.val, c.err, c.dups > 0}
}
}
}()
func() {
defer func() {
if !normalReturn {
if r := recover(); r != nil {
c.err = newPanicError(r)
}
}
}()
c.val, c.err = fn()
normalReturn = true
}()
if !normalReturn {
recovered = true
}
}
doCall() 代碼比較簡單,double defer 雙延遲機制區(qū)分 panic 和 runtime.Goexit。第二個 defer 會先執(zhí)行調用 fn() 函數(shù),如果未正常返回將會補獲異常,并將堆棧信息存入 err 中。
第一個 defer 先將 key 從 m 中移除,再就是異常處理,如果是 Goexit 正常退出,如果斷言是 panicError 將對外拋出 Panic。若正常退出將結果發(fā)送到 chans 通道列表中。
DoChan() 方法類似于 Do() 方法,返回通道(chan),通過通道接收數(shù)據(jù)。另外通道不會被關閉。
func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
ch := make(chan Result, 1)
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call)
}
if c, ok := g.m[key]; ok {
c.dups++
c.chans = append(c.chans, ch)
g.mu.Unlock()
return ch
}
c := &call{chans: []chan<- Result{ch}}
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()
go g.doCall(c, key, fn)
return ch
}
Forget() 方法,可以理解為丟棄某一個 key,后面該 key 會被立即調用,而不是等待先前的調用完成。
func (g *Group) Forget(key string) {
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
}
經(jīng)典案例
緩存場景在大家的業(yè)務場景中應該是被廣泛使用的,大部分的場景使用應該都是下圖吧?

從單體應用到微服務化,調用下游服務一般如下圖吧?

假設緩存 Miss 所有流量會瞬間打到數(shù)據(jù)庫,或者所有流量都會打到 server2,如果學習過 singleflight 的同學,肯定會把它用在 reids->db 或 server->server2 之間,包括我也是。如下圖(只舉數(shù)據(jù)庫案例)。

在使用 singleflight 之前你先確定下你的業(yè)務場景,key 相同的情況多嗎?(可以統(tǒng)計一些數(shù)據(jù),我們業(yè)務場景同一個 key 多次調用下游概率是比較高的)如果 key 相同的情況比較少,singleflight 對你的幫助可能不大。
上面列舉 2 種方案。
1、 singleflight 介于 redis 和 db 之間,redis 是內存緩存 qps 高、響應也快。大部分情況不會成為瓶頸,但數(shù)據(jù)庫就不一樣了,所以這種方案可以防止緩存被擊穿流量打到數(shù)據(jù)庫。
2、 singleflight 介于 server 和 redis 之間,網(wǎng)上挺多推薦這種用法的,有必要用此方案嗎?大家可以思考下,文章末尾我給出我的想法。
我更傾向方案一。代碼如下:
func TestSingleFlight(t *testing.T) {
var (
n = 10
k = "12344556"
wg = sync.WaitGroup{}
sf singleflight.Group
)
for i := 0; i < n; i++ {
go func() {
wg.Add(1)
defer wg.Done()
r, err, shared := sf.Do(k, func() (interface{}, error) {
return get(k)
})
if err != nil {
panic(err)
}
fmt.Printf("r=%v,shared=%v\n", r, shared)
}()
}
wg.Wait()
}
func get(key string) (interface{}, error) {
time.Sleep(time.Microsecond) // todo 模擬業(yè)務處理
return key, nil
}
輸出結果如下
=== RUN TestSingleFlight
r=12344556,shared=true
r=12344556,shared=true
r=12344556,shared=true
r=12344556,shared=true
r=12344556,shared=true
r=12344556,shared=false
r=12344556,shared=true
r=12344556,shared=false
r=12344556,shared=true
r=12344556,shared=true
--- PASS: TestSingleFlight (0.00s)
PASS
打印結果中為 true 都代表 調用 get() 函數(shù)返回結果被共享。get 函數(shù)調用明顯降低了。
這種寫法在函數(shù)正常返回情況下是能拿到正確的結果,如果下游返回異常了呢?(業(yè)務上遇過下游返回3-4s的拉低業(yè)務處理速度)因為 Do() 方法是以阻塞的方式來控制對下游的調用的,如果某一個請求被阻塞了,同一個 key 后面的請求都會被阻塞。
假設有一場景(SOP),消費 kafka 消息處理業(yè)務邏輯,業(yè)務高峰期某一時間段生產(chǎn)消息量為 100 w,單 pod 消費速度 500/s ,請求下游用 singleflight 控制對下游(三方接口)的并發(fā)量,假設下游某一次請求耗時 2s。這時會有幾個問題:
1、若某一個 key 被阻塞后續(xù)該 key 大量請求被阻塞,若這批請求失敗從而導致消息處理失敗,如果對消息重試會加劇業(yè)務下游壓力。
2、單 pod 消費速度從 500/s,降低到個位數(shù),消費時間拉長,消息堆積(如果消息堆積對實時性要求場景影響視頻很大的)。
造成這個問題主要原因如下:
singleflight 是同步阻塞且缺乏超時控制機制,若某一個 key 阻塞后面次 key 都會被阻塞并且等待第一次結束。
singleflight 雖然能降低對下游的請求量,但在某些場景失敗的情況也增加了。
我們有辦法給 singleflight 加一個超時時間嗎?答案是肯定有的
下面這段代碼 singleflight 沒有增加超時控制
var (
offset int32 = 0
)
func TestSingleFlight(t *testing.T) {
var (
n int32 = 1000
k = "12344556"
wg = sync.WaitGroup{}
sf singleflight.Group
failCnt int32 = 0
)
for i := 0; i < int(n); i++ {
go func() {
wg.Add(1)
defer wg.Done()
_, err, _ := sf.Do(k, func() (interface{}, error) {
return get(k)
})
if err != nil {
atomic.AddInt32(&failCnt, 1)
return
}
}()
}
wg.Wait()
fmt.Printf("總請求數(shù)=%d,請求成功率=%d,請求失敗率=%d", n, n-failCnt, failCnt)
}
func get(key string) (interface{}, error) {
var err error
if atomic.AddInt32(&offset, 1) == 3 { // 假設偏移量 offset == 3 執(zhí)行耗時長,超時失敗了
time.Sleep(time.Microsecond * 500)
err = fmt.Errorf("耗時長")
}
return key, err
}
結果輸出如下
=== RUN TestSingleFlight
總請求數(shù)=1000,請求成功率=792,請求失敗率=208--- PASS: TestSingleFlight (0.00s)
PASS
singleflight 增加超時控制代碼如下
func TestSingleFlight(t *testing.T) {
var (
n int32 = 1000
k = "12344556"
wg = sync.WaitGroup{}
sf singleflight.Group
failCnt int32 = 0
)
for i := 0; i < int(n); i++ {
go func() {
wg.Add(1)
defer wg.Done()
_, err, _ := sf.Do(k, func() (interface{}, error) {
ctx, _ := context.WithTimeout(context.TODO(), time.Microsecond*30)
go func(_ctx context.Context) {
<-_ctx.Done()
sf.Forget(k)
}(ctx)
return get(k)
})
if err != nil {
atomic.AddInt32(&failCnt, 1)
return
}
}()
}
wg.Wait()
fmt.Printf("總請求數(shù)=%d,請求成功率=%d,請求失敗率=%d", n, n-failCnt, failCnt)
}
利用 context.WithTimeout() 方法控制超時,并且調用 Forget() 方法移除超時 key 結果輸出如下
=== RUN TestSingleFlight
總請求數(shù)=1000,請求成功率=992,請求失敗率=8--- PASS: TestSingleFlight (0.00s)
PASS
成功率提高了失敗率明顯降低了。
下面我用 DoChan() 函數(shù)實現(xiàn)
var (
offset int32 = 0
)
func TestSingleFlight(t *testing.T) {
var (
n int32 = 1000 // n 越大,效果越明顯
k = "12344556"
wg = sync.WaitGroup{}
sf singleflight.Group
successCnt int32 = 0
)
for i := 0; i < int(n); i++ {
go func() {
wg.Add(1)
defer wg.Done()
ch := sf.DoChan(k, func() (interface{}, error) {
return get(k)
})
ctx, _ := context.WithTimeout(context.TODO(), time.Microsecond*100)
select {
case <-ctx.Done():
sf.Forget(k)
return
case ret := <-ch:
if ret.Err != nil {
return
}
atomic.AddInt32(&successCnt, 1)
}
}()
}
wg.Wait()
fmt.Printf("總請求數(shù)=%d,請求成功率=%d,請求失敗率=%d", n, successCnt, n-successCnt)
}
func get(key string) (interface{}, error) {
var err error
if atomic.AddInt32(&offset, 1) == 3 { // 假設偏移量 offset == 3 執(zhí)行耗時長,超時失敗了
time.Sleep(time.Microsecond * 400)
err = fmt.Errorf("耗時長")
}
return key, err
}
大家自行驗證
總結
1、singleflight 使用得當確實能有效降低下游流量,我也推薦大家使用,但一定要注意同步阻塞問題,防止下游長耗時造成業(yè)務異常或高延遲,一定要做好正確性與降低業(yè)務下游流量權衡。
2、上面我留了一個問題,singleflight 有必要放在 server 應用和 redis 之間嗎?我認為沒必要,redis 是內存數(shù)據(jù)庫,響應快,高 qps 本身不會是瓶頸,保護 redis 沒有意義。另外 singleflight 用途是防止 redis 擊穿流量打到數(shù)據(jù)庫,如果你業(yè)務 qps 非常高并且對數(shù)據(jù)實時性要求高,為啥不通過其他手段把數(shù)據(jù)庫數(shù)據(jù)刷新到 redis 中?比如數(shù)據(jù)創(chuàng)建同步寫入 redis、或通過 binlog 寫入。
到此這篇關于一文教你學會Go中singleflight的使用的文章就介紹到這了,更多相關Go singleflight內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
GoLang?socket網(wǎng)絡編程傳輸數(shù)據(jù)包時進行長度校驗的方法
在GoLang?socket網(wǎng)絡編程中,為了確保數(shù)據(jù)交互的穩(wěn)定性和安全性,通常會通過傳輸數(shù)據(jù)的長度進行校驗,發(fā)送端首先發(fā)送數(shù)據(jù)長度,然后發(fā)送數(shù)據(jù)本體,接收端則根據(jù)接收到的數(shù)據(jù)長度和數(shù)據(jù)本體進行比較,以此來確認數(shù)據(jù)是否傳輸成功2024-11-11
golang 并發(fā)安全Map以及分段鎖的實現(xiàn)方法
這篇文章主要介紹了golang 并發(fā)安全Map以及分段鎖的實現(xiàn)方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2019-03-03
Go?chassis云原生微服務開發(fā)框架應用編程實戰(zhàn)
這篇文章主要為大家介紹了Go?chassis云原生微服務開發(fā)框架應用編程實戰(zhàn)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-08-08

