Go實(shí)現(xiàn)并發(fā)緩存的示例代碼
并發(fā)不安全的 Memo
首先用一個(gè)例子演示函數(shù)記憶:
// A Memo caches the results of calling a Func. type Memo struct { f Func cache map[string]result } // Func is the type of the function to memoize. type Func func(key string) (interface{}, error) type result struct { value interface{} err error } func New(f Func) *Memo { return &Memo{f: f, cache: make(map[string]result)} } // NOTE: not concurrency-safe! func (memo *Memo) Get(key string) (interface{}, error) { res, ok := memo.cache[key] if !ok { res.value, res.err = memo.f(key) memo.cache[key] = res } return res.value, res.err }
其中函數(shù)f
是一個(gè)重量級(jí)的計(jì)算函數(shù),調(diào)用它的代價(jià)很大,所以要將結(jié)果緩存到一個(gè)map
中加快每次調(diào)用。這就是函數(shù)記憶。
每次調(diào)用Get
,將從memo
里查詢結(jié)果,如果沒查到,就要調(diào)用函數(shù)f
計(jì)算結(jié)果,再將它記錄到緩存中。
以上實(shí)現(xiàn)Get
方法在沒有使用同步的情況下更新了緩存cache
,整個(gè)Get
函數(shù)不是并發(fā)安全的。
安全但偽并發(fā)的 Memo
考慮每次調(diào)用Get
方法都加鎖:
type Memo struct { f Func mu sync.Mutex // guards cache cache map[string]result } // Get is concurrency-safe. func (memo *Memo) Get(key string) (value interface{}, err error) { memo.mu.Lock() res, ok := memo.cache[key] if !ok { res.value, res.err = memo.f(key) memo.cache[key] = res } memo.mu.Unlock() return res.value, res.err }
由于每次調(diào)用都請(qǐng)求互斥鎖,Get
又將并行的請(qǐng)求操作串行化了。
會(huì)導(dǎo)致多余計(jì)算的 Memo
考慮以下改進(jìn):
func (memo *Memo) Get(key string) (value interface{}, err error) { memo.mu.Lock() res, ok := memo.cache[key] memo.mu.Unlock() if !ok { res.value, res.err = memo.f(key) // Between the two critical sections, several goroutines // may race to compute f(key) and update the map. memo.mu.Lock() memo.cache[key] = res memo.mu.Unlock() } return res.value, res.err }
該版本分兩次獲取鎖:第一次用于查詢緩存,第二次用于在查詢無結(jié)果時(shí)進(jìn)行更新。
在理想情況下,我們應(yīng)該避免這種額外的處理。這個(gè)功能有時(shí)被稱為重復(fù)抑制(duplication suppression)。
通過通道進(jìn)行重復(fù)抑制
在第四個(gè)版本的緩存中,我們?yōu)槊總€(gè)entry
新加了一個(gè)通道ready
。在設(shè)置完entry
的result
字段后,通道會(huì)關(guān)閉,正在等待的goroutine會(huì)收到廣播,就可以從entry
中讀取結(jié)果了。
// Func is the type of the function to memoize. type Func func(string) (interface{}, error) type result struct { value interface{} err error } type entry struct { res result ready chan struct{} // closed when res is ready } func New(f Func) *Memo { return &Memo{f: f, cache: make(map[string]*entry)} } type Memo struct { f Func mu sync.Mutex // guards cache cache map[string]*entry //現(xiàn)在緩存返回的是一個(gè)entry } func (memo *Memo) Get(key string) (value interface{}, err error) { memo.mu.Lock() e := memo.cache[key] if e == nil { // This is the first request for this key. // This goroutine becomes responsible for computing // the value and broadcasting the ready condition. e = &entry{ready: make(chan struct{})} memo.cache[key] = e memo.mu.Unlock() e.res.value, e.res.err = memo.f(key) close(e.ready) // broadcast ready condition } else { // This is a repeat request for this key. memo.mu.Unlock() <-e.ready // wait for ready condition } return e.res.value, e.res.err }
當(dāng)Get
函數(shù)發(fā)現(xiàn)緩存memo
中沒有記錄時(shí),它構(gòu)造一個(gè)entry
放到緩存中,但這時(shí)key
對(duì)應(yīng)的結(jié)果還未計(jì)算。
這時(shí),如果其他goroutine調(diào)用了Get
函數(shù)查詢同樣的key
時(shí),它會(huì)到達(dá)<-e.ready
語句并因等待通道數(shù)據(jù)而阻塞。只有當(dāng)計(jì)算結(jié)束,負(fù)責(zé)計(jì)算結(jié)果的goroutine將通道關(guān)閉后,其它goroutine才能夠得以繼續(xù)執(zhí)行,并查詢出結(jié)果。
- 當(dāng)一個(gè)goroutine試圖查詢一個(gè)不存在的結(jié)果時(shí),它創(chuàng)建一個(gè)
entry
放到緩存中,并解鎖,然后調(diào)用f
進(jìn)行計(jì)算。計(jì)算完成后更新相應(yīng)的entry
就可以將ready
通道關(guān)閉; - 當(dāng)一個(gè)goroutine試圖查詢一個(gè)已經(jīng)存在的結(jié)果時(shí),他應(yīng)該立即放棄鎖,并等待查到的
entry
的通道的關(guān)閉。
「通過通信共享內(nèi)存」的另一設(shè)計(jì)
以上介紹了共享變量并上鎖的方法,另一種方案是通信順序進(jìn)程。
在新的設(shè)計(jì)中,map
變量限制在一個(gè)監(jiān)控goroutine中,而Get
的調(diào)用者則改為發(fā)送消息。
// Func is the type of the function to memoize. type Func func(key string) (interface{}, error) // A result is the result of calling a Func. type result struct { value interface{} err error } type entry struct { res result ready chan struct{} // closed when res is ready } // A request is a message requesting that the Func be applied to key. type request struct { key string response chan<- result // the client wants a single result } type Memo struct{ requests chan request } // New returns a memoization of f. Clients must subsequently call Close. func New(f Func) *Memo { memo := &Memo{requests: make(chan request)} go memo.server(f) return memo } func (memo *Memo) Get(key string) (interface{}, error) { response := make(chan result) memo.requests <- request{key, response} res := <-response return res.value, res.err } func (memo *Memo) Close() { close(memo.requests) } //!-get //!+monitor func (memo *Memo) server(f Func) { cache := make(map[string]*entry) for req := range memo.requests { e := cache[req.key] if e == nil { // This is the first request for this key. e = &entry{ready: make(chan struct{})} cache[req.key] = e go e.call(f, req.key) // call f(key) } go e.deliver(req.response) } } func (e *entry) call(f Func, key string) { // Evaluate the function. e.res.value, e.res.err = f(key) // Broadcast the ready condition. close(e.ready) } func (e *entry) deliver(response chan<- result) { // Wait for the ready condition. <-e.ready // Send the result to the client. response <- e.res }
Get
方法創(chuàng)建一個(gè)response
通道,并將它放在一個(gè)請(qǐng)求里,然后把它發(fā)送給監(jiān)控goroutine,然后從自己創(chuàng)建的response
通道中讀取。
監(jiān)控goroutine(即server
方法)不斷從request
通道中讀取,直至該通道被關(guān)閉。對(duì)于每個(gè)請(qǐng)求,它先從緩存中查詢,如果沒找到則創(chuàng)建并插入一個(gè)新的entry
:
監(jiān)控goroutine先創(chuàng)建一個(gè)entry
放到緩存中,然后它調(diào)用go e.call(f, req.key)
創(chuàng)建一個(gè)gorouitne來計(jì)算結(jié)果、關(guān)閉ready
通道。與此同時(shí)它調(diào)用go e.deliver(req.response)
等待ready
通道關(guān)閉,并將結(jié)果發(fā)送到response
通道中;
如果監(jiān)控goroutine
直接從緩存找到了結(jié)果,那么根據(jù)key
查到的entry
已經(jīng)包含一個(gè)已經(jīng)關(guān)閉的通道,它調(diào)用go e.deliver(req.response)
就可以直接將結(jié)果放到response
通道中。
總結(jié)起來,server
負(fù)責(zé)了從請(qǐng)求通道中讀取請(qǐng)求,對(duì)于未完成計(jì)算的key
,它創(chuàng)建新的goroutine執(zhí)行計(jì)算任務(wù),隨后通過請(qǐng)求中附帶的resp
通道答復(fù)請(qǐng)求。
更進(jìn)一步的改造,可以限制進(jìn)行計(jì)算的goroutine數(shù)量、通過context
包控制server
的生命周期等。
到此這篇關(guān)于Go實(shí)現(xiàn)并發(fā)緩存的示例代碼的文章就介紹到這了,更多相關(guān)Go 并發(fā)緩存內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
go語言map與string的相互轉(zhuǎn)換的實(shí)現(xiàn)
這篇文章主要介紹了go語言map與string的相互轉(zhuǎn)換的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04Go語言k8s?kubernetes使用leader?election實(shí)現(xiàn)選舉
這篇文章主要為大家介紹了Go語言?k8s?kubernetes?使用leader?election選舉,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10go進(jìn)行http請(qǐng)求偶發(fā)EOF問題分析
go使用連接池進(jìn)行http請(qǐng)求,一般都能請(qǐng)求成功,但偶然會(huì)出現(xiàn)請(qǐng)求失敗返回EOF錯(cuò)誤的情況,本文主要來帶大家分析一下為什么會(huì)出現(xiàn)這樣的問題并提供解決方法,需要的可以參考下2025-01-01