Go實現(xiàn)并發(fā)緩存的示例代碼
并發(fā)不安全的 Memo
首先用一個例子演示函數(shù)記憶:
// A Memo caches the results of calling a Func.
type Memo struct {
f Func
cache map[string]result
}
// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)
type result struct {
value interface{}
err error
}
func New(f Func) *Memo {
return &Memo{f: f, cache: make(map[string]result)}
}
// NOTE: not concurrency-safe!
func (memo *Memo) Get(key string) (interface{}, error) {
res, ok := memo.cache[key]
if !ok {
res.value, res.err = memo.f(key)
memo.cache[key] = res
}
return res.value, res.err
}
其中函數(shù)f是一個重量級的計算函數(shù),調(diào)用它的代價很大,所以要將結(jié)果緩存到一個map中加快每次調(diào)用。這就是函數(shù)記憶。
每次調(diào)用Get,將從memo里查詢結(jié)果,如果沒查到,就要調(diào)用函數(shù)f計算結(jié)果,再將它記錄到緩存中。
以上實現(xiàn)Get方法在沒有使用同步的情況下更新了緩存cache,整個Get函數(shù)不是并發(fā)安全的。
安全但偽并發(fā)的 Memo
考慮每次調(diào)用Get方法都加鎖:
type Memo struct {
f Func
mu sync.Mutex // guards cache
cache map[string]result
}
// Get is concurrency-safe.
func (memo *Memo) Get(key string) (value interface{}, err error) {
memo.mu.Lock()
res, ok := memo.cache[key]
if !ok {
res.value, res.err = memo.f(key)
memo.cache[key] = res
}
memo.mu.Unlock()
return res.value, res.err
}
由于每次調(diào)用都請求互斥鎖,Get又將并行的請求操作串行化了。
會導致多余計算的 Memo
考慮以下改進:
func (memo *Memo) Get(key string) (value interface{}, err error) {
memo.mu.Lock()
res, ok := memo.cache[key]
memo.mu.Unlock()
if !ok {
res.value, res.err = memo.f(key)
// Between the two critical sections, several goroutines
// may race to compute f(key) and update the map.
memo.mu.Lock()
memo.cache[key] = res
memo.mu.Unlock()
}
return res.value, res.err
}
該版本分兩次獲取鎖:第一次用于查詢緩存,第二次用于在查詢無結(jié)果時進行更新。
在理想情況下,我們應該避免這種額外的處理。這個功能有時被稱為重復抑制(duplication suppression)。
通過通道進行重復抑制
在第四個版本的緩存中,我們?yōu)槊總€entry新加了一個通道ready。在設(shè)置完entry的result字段后,通道會關(guān)閉,正在等待的goroutine會收到廣播,就可以從entry中讀取結(jié)果了。
// Func is the type of the function to memoize.
type Func func(string) (interface{}, error)
type result struct {
value interface{}
err error
}
type entry struct {
res result
ready chan struct{} // closed when res is ready
}
func New(f Func) *Memo {
return &Memo{f: f, cache: make(map[string]*entry)}
}
type Memo struct {
f Func
mu sync.Mutex // guards cache
cache map[string]*entry //現(xiàn)在緩存返回的是一個entry
}
func (memo *Memo) Get(key string) (value interface{}, err error) {
memo.mu.Lock()
e := memo.cache[key]
if e == nil {
// This is the first request for this key.
// This goroutine becomes responsible for computing
// the value and broadcasting the ready condition.
e = &entry{ready: make(chan struct{})}
memo.cache[key] = e
memo.mu.Unlock()
e.res.value, e.res.err = memo.f(key)
close(e.ready) // broadcast ready condition
} else {
// This is a repeat request for this key.
memo.mu.Unlock()
<-e.ready // wait for ready condition
}
return e.res.value, e.res.err
}
當Get函數(shù)發(fā)現(xiàn)緩存memo中沒有記錄時,它構(gòu)造一個entry放到緩存中,但這時key對應的結(jié)果還未計算。
這時,如果其他goroutine調(diào)用了Get函數(shù)查詢同樣的key時,它會到達<-e.ready語句并因等待通道數(shù)據(jù)而阻塞。只有當計算結(jié)束,負責計算結(jié)果的goroutine將通道關(guān)閉后,其它goroutine才能夠得以繼續(xù)執(zhí)行,并查詢出結(jié)果。
- 當一個goroutine試圖查詢一個不存在的結(jié)果時,它創(chuàng)建一個
entry放到緩存中,并解鎖,然后調(diào)用f進行計算。計算完成后更新相應的entry就可以將ready通道關(guān)閉; - 當一個goroutine試圖查詢一個已經(jīng)存在的結(jié)果時,他應該立即放棄鎖,并等待查到的
entry的通道的關(guān)閉。
「通過通信共享內(nèi)存」的另一設(shè)計
以上介紹了共享變量并上鎖的方法,另一種方案是通信順序進程。
在新的設(shè)計中,map變量限制在一個監(jiān)控goroutine中,而Get的調(diào)用者則改為發(fā)送消息。
// Func is the type of the function to memoize.
type Func func(key string) (interface{}, error)
// A result is the result of calling a Func.
type result struct {
value interface{}
err error
}
type entry struct {
res result
ready chan struct{} // closed when res is ready
}
// A request is a message requesting that the Func be applied to key.
type request struct {
key string
response chan<- result // the client wants a single result
}
type Memo struct{ requests chan request }
// New returns a memoization of f. Clients must subsequently call Close.
func New(f Func) *Memo {
memo := &Memo{requests: make(chan request)}
go memo.server(f)
return memo
}
func (memo *Memo) Get(key string) (interface{}, error) {
response := make(chan result)
memo.requests <- request{key, response}
res := <-response
return res.value, res.err
}
func (memo *Memo) Close() { close(memo.requests) }
//!-get
//!+monitor
func (memo *Memo) server(f Func) {
cache := make(map[string]*entry)
for req := range memo.requests {
e := cache[req.key]
if e == nil {
// This is the first request for this key.
e = &entry{ready: make(chan struct{})}
cache[req.key] = e
go e.call(f, req.key) // call f(key)
}
go e.deliver(req.response)
}
}
func (e *entry) call(f Func, key string) {
// Evaluate the function.
e.res.value, e.res.err = f(key)
// Broadcast the ready condition.
close(e.ready)
}
func (e *entry) deliver(response chan<- result) {
// Wait for the ready condition.
<-e.ready
// Send the result to the client.
response <- e.res
}
Get方法創(chuàng)建一個response通道,并將它放在一個請求里,然后把它發(fā)送給監(jiān)控goroutine,然后從自己創(chuàng)建的response通道中讀取。
監(jiān)控goroutine(即server方法)不斷從request通道中讀取,直至該通道被關(guān)閉。對于每個請求,它先從緩存中查詢,如果沒找到則創(chuàng)建并插入一個新的entry:
監(jiān)控goroutine先創(chuàng)建一個entry放到緩存中,然后它調(diào)用go e.call(f, req.key)創(chuàng)建一個gorouitne來計算結(jié)果、關(guān)閉ready通道。與此同時它調(diào)用go e.deliver(req.response)等待ready通道關(guān)閉,并將結(jié)果發(fā)送到response通道中;
如果監(jiān)控goroutine直接從緩存找到了結(jié)果,那么根據(jù)key查到的entry已經(jīng)包含一個已經(jīng)關(guān)閉的通道,它調(diào)用go e.deliver(req.response)就可以直接將結(jié)果放到response通道中。
總結(jié)起來,server負責了從請求通道中讀取請求,對于未完成計算的key,它創(chuàng)建新的goroutine執(zhí)行計算任務,隨后通過請求中附帶的resp通道答復請求。
更進一步的改造,可以限制進行計算的goroutine數(shù)量、通過context包控制server的生命周期等。
到此這篇關(guān)于Go實現(xiàn)并發(fā)緩存的示例代碼的文章就介紹到這了,更多相關(guān)Go 并發(fā)緩存內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
go語言map與string的相互轉(zhuǎn)換的實現(xiàn)
這篇文章主要介紹了go語言map與string的相互轉(zhuǎn)換的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2021-04-04
Go語言k8s?kubernetes使用leader?election實現(xiàn)選舉
這篇文章主要為大家介紹了Go語言?k8s?kubernetes?使用leader?election選舉,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-10-10

