欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

一文教你學(xué)會Go中singleflight的使用

 更新時(shí)間:2024年02月22日 11:19:03   作者:彭亞川Allen  
緩存在項(xiàng)目中使用應(yīng)該是非常頻繁的,提到緩存只要了解過?singleflight?,基本都會用于緩存實(shí)現(xiàn)的一部分吧,下面就跟隨小編一起來學(xué)習(xí)一下singleflight的使用吧

寫作背景

緩存在項(xiàng)目中使用應(yīng)該是非常頻繁的,提到緩存只要了解過 singleflight ,基本都會用于緩存實(shí)現(xiàn)的一部分吧?但 singleflight 要用好也不容易。

名稱解釋

singleflight 來源于準(zhǔn)官方庫(也可以說官方擴(kuò)展庫)golang.org/x/sync/singleflight 包中。它的作用是避免同一個(gè) key 對下游發(fā)起多次請求,降低下游流量。

源碼剖析

3 個(gè)結(jié)構(gòu)體

Group 是 singleflight 的核心,代表一個(gè)組,用于執(zhí)行具有重復(fù)抑制的工作單元。

type Group struct {
	mu sync.Mutex       
	m  map[string]*call
}

mu 是保護(hù) m 字段的互斥鎖,確保對調(diào)用信息的訪問是線程安全的。m 是一個(gè) map,鍵是函數(shù)的唯一標(biāo)識符,值是 call 結(jié)構(gòu)體,代表一次函數(shù)調(diào)用的信息,包括函數(shù)的返回值和錯誤。

call 代表一次函數(shù)調(diào)用的信息,把函數(shù)的調(diào)用結(jié)果封裝到 call 中

type call struct {
	wg sync.WaitGroup

	// 這些字段在 WaitGroup 完成之前只被寫入一次,并且在 WaitGroup 完成之后只被讀取
	val interface{} // 函數(shù)調(diào)用的返回值
	err error       // 函數(shù)調(diào)用可能出現(xiàn)的錯誤

	dups  int          // 相同 key 調(diào)用次數(shù)
	chans []chan<- Result // 結(jié)果通道列表,僅調(diào)用 DoChan() 方法時(shí)返回
}

Result 結(jié)構(gòu)體用于保存 DoChan() 方法的執(zhí)行結(jié)果,以便將結(jié)果傳遞給通道。

type Result struct {
	Val    interface{}
	Err    error
	Shared bool
}

4 個(gè)方法

Group 主要提供了 3 個(gè)公開方法和 1 個(gè)非公開方法。

Do() 方法,相同的 key 對應(yīng)的 fn 函數(shù)只會調(diào)用一次。返回值 v 調(diào)用 fn() 方法返回的結(jié)果;err 調(diào)用 fn() 返回的 err;shared:表示在多次調(diào)用的結(jié)果是否共享。

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
		c.dups++
		g.mu.Unlock()
		c.wg.Wait()

		if e, ok := c.err.(*panicError); ok {
			panic(e)
		} else if c.err == errGoexit {
			runtime.Goexit()
		}
		return c.val, c.err, true
	}
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	g.doCall(c, key, fn)
	return c.val, c.err, c.dups > 0
}

源碼比較簡單,如果 key 對應(yīng)的 fn 函數(shù)已被調(diào)用,則等待 fn 函數(shù)調(diào)用完成直接返回結(jié)果。如果 fn 未被調(diào)用,new(call) 存入 m 中,執(zhí)行 doCal() 方法。

doCall()  方法,調(diào)用 key 對應(yīng)的 fn 方法。

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
	normalReturn := false
	recovered := false
	defer func() {
		if !normalReturn && !recovered {
			c.err = errGoexit
		}

		g.mu.Lock()
		defer g.mu.Unlock()
		c.wg.Done()
		if g.m[key] == c {
			delete(g.m, key)
		}

		if e, ok := c.err.(*panicError); ok {
			if len(c.chans) > 0 {
				go panic(e)
				select {} 
			} else {
				panic(e)
			}
		} else if c.err == errGoexit {
		} else {
			for _, ch := range c.chans {
				ch <- Result{c.val, c.err, c.dups > 0}
			}
		}
	}()

	func() {
		defer func() {
			if !normalReturn {
				if r := recover(); r != nil {
					c.err = newPanicError(r)
				}
			}
		}()

		c.val, c.err = fn()
		normalReturn = true
	}()

	if !normalReturn {
		recovered = true
	}
}

doCall() 代碼比較簡單,double defer 雙延遲機(jī)制區(qū)分 panic 和 runtime.Goexit。第二個(gè) defer 會先執(zhí)行調(diào)用 fn() 函數(shù),如果未正常返回將會補(bǔ)獲異常,并將堆棧信息存入 err 中。

第一個(gè) defer 先將 key 從 m 中移除,再就是異常處理,如果是 Goexit 正常退出,如果斷言是 panicError 將對外拋出 Panic。若正常退出將結(jié)果發(fā)送到 chans 通道列表中。

DoChan() 方法類似于 Do() 方法,返回通道(chan),通過通道接收數(shù)據(jù)。另外通道不會被關(guān)閉。

func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
	ch := make(chan Result, 1)
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
		c.dups++
		c.chans = append(c.chans, ch)
		g.mu.Unlock()
		return ch
	}
	c := &call{chans: []chan<- Result{ch}}
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()

	go g.doCall(c, key, fn)

	return ch
}

Forget() 方法,可以理解為丟棄某一個(gè) key,后面該 key 會被立即調(diào)用,而不是等待先前的調(diào)用完成。

func (g *Group) Forget(key string) {
	g.mu.Lock()
	delete(g.m, key)
	g.mu.Unlock()
}

經(jīng)典案例

緩存場景在大家的業(yè)務(wù)場景中應(yīng)該是被廣泛使用的,大部分的場景使用應(yīng)該都是下圖吧?

從單體應(yīng)用到微服務(wù)化,調(diào)用下游服務(wù)一般如下圖吧?

假設(shè)緩存 Miss 所有流量會瞬間打到數(shù)據(jù)庫,或者所有流量都會打到 server2,如果學(xué)習(xí)過 singleflight 的同學(xué),肯定會把它用在 reids->db 或 server->server2 之間,包括我也是。如下圖(只舉數(shù)據(jù)庫案例)。

在使用 singleflight 之前你先確定下你的業(yè)務(wù)場景,key 相同的情況多嗎?(可以統(tǒng)計(jì)一些數(shù)據(jù),我們業(yè)務(wù)場景同一個(gè) key 多次調(diào)用下游概率是比較高的)如果 key 相同的情況比較少,singleflight 對你的幫助可能不大。

上面列舉 2 種方案。

1、  singleflight 介于 redis 和 db 之間,redis 是內(nèi)存緩存 qps 高、響應(yīng)也快。大部分情況不會成為瓶頸,但數(shù)據(jù)庫就不一樣了,所以這種方案可以防止緩存被擊穿流量打到數(shù)據(jù)庫。

2、  singleflight 介于 server 和 redis 之間,網(wǎng)上挺多推薦這種用法的,有必要用此方案嗎?大家可以思考下,文章末尾我給出我的想法。

我更傾向方案一。代碼如下:

func TestSingleFlight(t *testing.T) {
	var (
		n  = 10
		k  = "12344556"
		wg = sync.WaitGroup{}
		sf singleflight.Group
	)

	for i := 0; i < n; i++ {
		go func() {
			wg.Add(1)
			defer wg.Done()
			r, err, shared := sf.Do(k, func() (interface{}, error) {
				return get(k)
			})
			if err != nil {
				panic(err)
			}

			fmt.Printf("r=%v,shared=%v\n", r, shared)
		}()
	}

	wg.Wait()
}

func get(key string) (interface{}, error) {
	time.Sleep(time.Microsecond) // todo 模擬業(yè)務(wù)處理
	return key, nil
}

輸出結(jié)果如下

=== RUN   TestSingleFlight
r=12344556,shared=true
r=12344556,shared=true
r=12344556,shared=true
r=12344556,shared=true
r=12344556,shared=true
r=12344556,shared=false
r=12344556,shared=true
r=12344556,shared=false
r=12344556,shared=true
r=12344556,shared=true
--- PASS: TestSingleFlight (0.00s)
PASS

打印結(jié)果中為 true 都代表 調(diào)用 get() 函數(shù)返回結(jié)果被共享。get 函數(shù)調(diào)用明顯降低了。

這種寫法在函數(shù)正常返回情況下是能拿到正確的結(jié)果,如果下游返回異常了呢?(業(yè)務(wù)上遇過下游返回3-4s的拉低業(yè)務(wù)處理速度)因?yàn)?nbsp;Do() 方法是以阻塞的方式來控制對下游的調(diào)用的,如果某一個(gè)請求被阻塞了,同一個(gè) key 后面的請求都會被阻塞。

假設(shè)有一場景(SOP),消費(fèi) kafka 消息處理業(yè)務(wù)邏輯,業(yè)務(wù)高峰期某一時(shí)間段生產(chǎn)消息量為 100 w,單 pod 消費(fèi)速度 500/s ,請求下游用 singleflight 控制對下游(三方接口)的并發(fā)量,假設(shè)下游某一次請求耗時(shí) 2s。這時(shí)會有幾個(gè)問題:

1、若某一個(gè) key 被阻塞后續(xù)該 key 大量請求被阻塞,若這批請求失敗從而導(dǎo)致消息處理失敗,如果對消息重試會加劇業(yè)務(wù)下游壓力。

2、單 pod 消費(fèi)速度從 500/s,降低到個(gè)位數(shù),消費(fèi)時(shí)間拉長,消息堆積(如果消息堆積對實(shí)時(shí)性要求場景影響視頻很大的)。

造成這個(gè)問題主要原因如下:

singleflight 是同步阻塞且缺乏超時(shí)控制機(jī)制,若某一個(gè) key 阻塞后面次 key 都會被阻塞并且等待第一次結(jié)束。

singleflight 雖然能降低對下游的請求量,但在某些場景失敗的情況也增加了。

我們有辦法給 singleflight 加一個(gè)超時(shí)時(shí)間嗎?答案是肯定有的

下面這段代碼 singleflight 沒有增加超時(shí)控制

var (
	offset int32 = 0
)

func TestSingleFlight(t *testing.T) {
	var (
		n       int32 = 1000
		k             = "12344556"
		wg            = sync.WaitGroup{}
		sf      singleflight.Group
		failCnt int32 = 0
	)

	for i := 0; i < int(n); i++ {
		go func() {
			wg.Add(1)
			defer wg.Done()
			_, err, _ := sf.Do(k, func() (interface{}, error) {
				return get(k)
			})
			if err != nil {
				atomic.AddInt32(&failCnt, 1)
				return
			}
		}()
	}

	wg.Wait()
	fmt.Printf("總請求數(shù)=%d,請求成功率=%d,請求失敗率=%d", n, n-failCnt, failCnt)
}

func get(key string) (interface{}, error) {
	var err error
	if atomic.AddInt32(&offset, 1) == 3 { // 假設(shè)偏移量 offset == 3 執(zhí)行耗時(shí)長,超時(shí)失敗了
		time.Sleep(time.Microsecond * 500)
		err = fmt.Errorf("耗時(shí)長")
	}

	return key, err
}

結(jié)果輸出如下

=== RUN   TestSingleFlight
總請求數(shù)=1000,請求成功率=792,請求失敗率=208--- PASS: TestSingleFlight (0.00s)
PASS

singleflight 增加超時(shí)控制代碼如下

func TestSingleFlight(t *testing.T) {
	var (
		n       int32 = 1000
		k             = "12344556"
		wg            = sync.WaitGroup{}
		sf      singleflight.Group
		failCnt int32 = 0
	)

	for i := 0; i < int(n); i++ {
		go func() {
			wg.Add(1)
			defer wg.Done()
			_, err, _ := sf.Do(k, func() (interface{}, error) {
				ctx, _ := context.WithTimeout(context.TODO(), time.Microsecond*30)
				go func(_ctx context.Context) {
					<-_ctx.Done()
					sf.Forget(k)
				}(ctx)
				
				return get(k)
			})
			if err != nil {
				atomic.AddInt32(&failCnt, 1)
				return
			}
		}()
	}

	wg.Wait()
	fmt.Printf("總請求數(shù)=%d,請求成功率=%d,請求失敗率=%d", n, n-failCnt, failCnt)
}

利用 context.WithTimeout() 方法控制超時(shí),并且調(diào)用 Forget() 方法移除超時(shí) key 結(jié)果輸出如下

=== RUN   TestSingleFlight
總請求數(shù)=1000,請求成功率=992,請求失敗率=8--- PASS: TestSingleFlight (0.00s)
PASS

成功率提高了失敗率明顯降低了。

下面我用 DoChan() 函數(shù)實(shí)現(xiàn)

var (
	offset int32 = 0
)

func TestSingleFlight(t *testing.T) {
	var (
		n          int32 = 1000 // n 越大,效果越明顯
		k                = "12344556"
		wg               = sync.WaitGroup{}
		sf         singleflight.Group
		successCnt int32 = 0
	)

	for i := 0; i < int(n); i++ {
		go func() {
			wg.Add(1)
			defer wg.Done()
			ch := sf.DoChan(k, func() (interface{}, error) {
				return get(k)
			})

			ctx, _ := context.WithTimeout(context.TODO(), time.Microsecond*100)
			select {
			case <-ctx.Done():
				sf.Forget(k)
				return
			case ret := <-ch:
				if ret.Err != nil {
					return
				}
				atomic.AddInt32(&successCnt, 1)
			}
		}()
	}

	wg.Wait()
	fmt.Printf("總請求數(shù)=%d,請求成功率=%d,請求失敗率=%d", n, successCnt, n-successCnt)
}

func get(key string) (interface{}, error) {
	var err error
	if atomic.AddInt32(&offset, 1) == 3 { // 假設(shè)偏移量 offset == 3 執(zhí)行耗時(shí)長,超時(shí)失敗了
		time.Sleep(time.Microsecond * 400)
		err = fmt.Errorf("耗時(shí)長")
	}

	return key, err
}

大家自行驗(yàn)證

總結(jié)

1、singleflight 使用得當(dāng)確實(shí)能有效降低下游流量,我也推薦大家使用,但一定要注意同步阻塞問題,防止下游長耗時(shí)造成業(yè)務(wù)異常或高延遲,一定要做好正確性與降低業(yè)務(wù)下游流量權(quán)衡。

2、上面我留了一個(gè)問題,singleflight 有必要放在 server 應(yīng)用和 redis 之間嗎?我認(rèn)為沒必要,redis 是內(nèi)存數(shù)據(jù)庫,響應(yīng)快,高 qps 本身不會是瓶頸,保護(hù) redis 沒有意義。另外 singleflight 用途是防止 redis 擊穿流量打到數(shù)據(jù)庫,如果你業(yè)務(wù) qps 非常高并且對數(shù)據(jù)實(shí)時(shí)性要求高,為啥不通過其他手段把數(shù)據(jù)庫數(shù)據(jù)刷新到 redis 中?比如數(shù)據(jù)創(chuàng)建同步寫入 redis、或通過 binlog 寫入。

到此這篇關(guān)于一文教你學(xué)會Go中singleflight的使用的文章就介紹到這了,更多相關(guān)Go singleflight內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Golang中常用的語法糖分享

    Golang中常用的語法糖分享

    語法糖,也稱糖語法,是由英國計(jì)算機(jī)科學(xué)家彼得·蘭丁提出的,用于表示編程語言中的某種類型的語法,這些語法不會影響功能,但使用起來卻很方便,本文就來看看Golang中常用的語法糖有哪些吧
    2023-05-05
  • GoLang?socket網(wǎng)絡(luò)編程傳輸數(shù)據(jù)包時(shí)進(jìn)行長度校驗(yàn)的方法

    GoLang?socket網(wǎng)絡(luò)編程傳輸數(shù)據(jù)包時(shí)進(jìn)行長度校驗(yàn)的方法

    在GoLang?socket網(wǎng)絡(luò)編程中,為了確保數(shù)據(jù)交互的穩(wěn)定性和安全性,通常會通過傳輸數(shù)據(jù)的長度進(jìn)行校驗(yàn),發(fā)送端首先發(fā)送數(shù)據(jù)長度,然后發(fā)送數(shù)據(jù)本體,接收端則根據(jù)接收到的數(shù)據(jù)長度和數(shù)據(jù)本體進(jìn)行比較,以此來確認(rèn)數(shù)據(jù)是否傳輸成功
    2024-11-11
  • golang 并發(fā)安全Map以及分段鎖的實(shí)現(xiàn)方法

    golang 并發(fā)安全Map以及分段鎖的實(shí)現(xiàn)方法

    這篇文章主要介紹了golang 并發(fā)安全Map以及分段鎖的實(shí)現(xiàn)方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧
    2019-03-03
  • Go?gRPC進(jìn)階教程服務(wù)超時(shí)設(shè)置

    Go?gRPC進(jìn)階教程服務(wù)超時(shí)設(shè)置

    這篇文章主要為大家介紹了Go?gRPC進(jìn)階,gRPC請求的超時(shí)時(shí)間設(shè)置,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-06-06
  • golang實(shí)現(xiàn)枚舉的幾種方式

    golang實(shí)現(xiàn)枚舉的幾種方式

    在Go語言中,雖沒有內(nèi)置枚舉類型,但可通過常量、結(jié)構(gòu)體或自定義類型和方法實(shí)現(xiàn)枚舉功能,這些方法提高了代碼的可讀性和維護(hù)性,避免了魔法數(shù)字的使用,感興趣的可以了解一下
    2024-09-09
  • Go?chassis云原生微服務(wù)開發(fā)框架應(yīng)用編程實(shí)戰(zhàn)

    Go?chassis云原生微服務(wù)開發(fā)框架應(yīng)用編程實(shí)戰(zhàn)

    這篇文章主要為大家介紹了Go?chassis云原生微服務(wù)開發(fā)框架應(yīng)用編程實(shí)戰(zhàn)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-08-08
  • GO語言求100以內(nèi)的素?cái)?shù)

    GO語言求100以內(nèi)的素?cái)?shù)

    這篇文章主要介紹了GO語言求100以內(nèi)的素?cái)?shù),主要通過篩選法來實(shí)現(xiàn),涉及GO語言基本的循環(huán)與函數(shù)調(diào)用方法,需要的朋友可以參考下
    2014-12-12
  • golang將多路復(fù)異步io轉(zhuǎn)成阻塞io的方法詳解

    golang將多路復(fù)異步io轉(zhuǎn)成阻塞io的方法詳解

    常見的IO模型有阻塞、非阻塞、IO多路復(fù)用,異,下面這篇文章主要給大家介紹了關(guān)于golang將多路復(fù)異步io轉(zhuǎn)成阻塞io的方法,文中給出了詳細(xì)的示例代碼,需要的朋友可以參考借鑒,下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。
    2017-09-09
  • golang開發(fā)中channel使用

    golang開發(fā)中channel使用

    channel[通道]是golang的一種重要特性,正是因?yàn)閏hannel的存在才使得golang不同于其它語言。這篇文章主要介紹了golang開發(fā)中channel使用,需要的朋友可以參考下
    2020-09-09
  • Go中時(shí)間與時(shí)區(qū)問題的深入講解

    Go中時(shí)間與時(shí)區(qū)問題的深入講解

    go語言中如果不設(shè)置指定的時(shí)區(qū),通過time.Now()獲取到的就是本地時(shí)區(qū),下面這篇文章主要給大家介紹了關(guān)于Go中時(shí)間與時(shí)區(qū)問題的相關(guān)資料,需要的朋友可以參考下
    2021-12-12

最新評論