golang中sync.Map并發(fā)創(chuàng)建、讀取問(wèn)題實(shí)戰(zhàn)記錄
背景:
我們有一個(gè)用go做的項(xiàng)目,其中用到了zmq4進(jìn)行通信,一個(gè)簡(jiǎn)單的rpc過(guò)程,早期遠(yuǎn)端是使用一個(gè)map去做ip和具體socket的映射。
問(wèn)題
大概是這樣
struct SocketMap { sync.Mutex sockets map[string]*zmq4.Socket }
然后調(diào)用的時(shí)候的代碼大概就是這樣的:
func (pushList *SocketMap) push(ip string, data []byte) { pushList.Lock() defer pushList.UnLock() socket := pushList.sockets[string] if socket == nil { socket := zmq4.NewSocket() //do some initial operation like connect pushList.sockets[ip] = socket } socket.Send(data) }
相信大家都能看出問(wèn)題:當(dāng)push被并發(fā)訪問(wèn)的時(shí)候(事實(shí)上push會(huì)經(jīng)常被并發(fā)訪問(wèn)),由于這把大鎖的存在,同時(shí)只能有一個(gè)協(xié)程在臨界區(qū)工作,效率是會(huì)被大大降低的。
解決方案:會(huì)帶來(lái)crash的優(yōu)化
所以我們決定使用sync.Map來(lái)替代這個(gè)設(shè)計(jì),然后出了第一版代碼,寫的非常簡(jiǎn)單,只做了簡(jiǎn)單的替換:
struct SocketMap { sockets sync.Map } func (pushList *SocketMap) push(ip string, data []byte) { var socket *zmq4.Socket socketInter, ok = pushList.sockets.Load(ip) if !ok { socket = zmq4.NewSocket() //do some initial operation like connect pushList.sockets.Store(ip, socket) } else { socket = socketInter.(*zmq4.Socket) } socket.Send(data) }
乍一看似乎沒(méi)什么問(wèn)題?但是跑起來(lái)總是爆炸,然后一看log,提示有個(gè)非法地址。后來(lái)在github上才看到,zmq4.Socket不是線程安全的。上面的代碼恰恰會(huì)造成多個(gè)線程同時(shí)拿到socket實(shí)例,然后就crash了。
解決方案2: 加一把鎖也擋不住的沖突
然后怎么辦呢?看來(lái)也只能加鎖了,不過(guò)這次加鎖不能加到整個(gè)map上,否則還會(huì)有性能問(wèn)題,那就考慮減小鎖的粒度吧,使用鎖包裝socket。這個(gè)時(shí)候我們的代碼也就呼之欲出了:
struct SocketMutex{ sync.Mutex socket *zmq4.Socket } struct SocketMap { sockets sync.Map } func (pushList *SocketMap) push(ip string, data []byte) { var socket *SocketMutex socketInter, ok = pushList.sockets.Load(ip) if !ok { socket = &{ socket: zmq4.NewSocket() } //do some initial operation like connect pushList.sockets.Store(ip, newSocket) } else { socket = socketInter.(*SocketMutex) } socket.Lock() defer socket.Unlock() socket.socket.Send(data) }
但是這樣還是有問(wèn)題,相信經(jīng)驗(yàn)比較豐富的老哥一眼就能看出來(lái),問(wèn)題處在socketInter, ok = pushList.sockets.Load(ip)這行代碼上,如果map中沒(méi)有這個(gè)值,且有多個(gè)協(xié)程同時(shí)訪問(wèn)到這行代碼,顯然這幾個(gè)協(xié)程的ok都會(huì)置為false,然后都進(jìn)入第一個(gè)if代碼塊,創(chuàng)建多個(gè)socket實(shí)例,并且爭(zhēng)相覆蓋原有值。
單純解決這個(gè)問(wèn)題也很簡(jiǎn)單,就是使用sync.Map.LoadOrStore(key interface{}, value interface{}) (v interface{}, loaded bool)
這個(gè)api,來(lái)原子地去做讀寫。
然而這還沒(méi)完,我們的寫入新值的操作不光是調(diào)用一個(gè)api創(chuàng)建socket就完了,還要有一系列的初始化操作,我們必須保證在初始化完成之前,其他通過(guò)Load拿到這個(gè)實(shí)例的協(xié)程無(wú)法真正訪問(wèn)socket實(shí)例。
這時(shí)候顯然sync.Map自帶的機(jī)制已經(jīng)無(wú)法解決這個(gè)問(wèn)題了,那么我們必須尋求其他的手段,要么鎖,要么就sync.WaitGroup或者whatever的其他什么東西。
解決方案3: 閉包帶來(lái)的神奇體驗(yàn)
后來(lái)經(jīng)大佬指點(diǎn),我在encoder.go中看到了這么一段代碼:
func typeEncoder(t reflect.Type) encoderFunc { if fi, ok := encoderCache.Load(t); ok { return fi.(encoderFunc) } // To deal with recursive types, populate the map with an // indirect func before we build it. This type waits on the // real func (f) to be ready and then calls it. This indirect // func is only used for recursive types. var ( wg sync.WaitGroup f encoderFunc ) wg.Add(1) fi, loaded := encoderCache.LoadOrStore(t, encoderFunc(func(e *encodeState, v reflect.Value, opts encOpts) { wg.Wait() f(e, v, opts) })) if loaded { return fi.(encoderFunc) } // Compute the real encoder and replace the indirect func with it. f = newTypeEncoder(t, true) wg.Done() encoderCache.Store(t, f) return f }
豁然開朗,我們可以在sync.Map中存放一個(gè)閉包函數(shù),然后在閉包函數(shù)中等待本地的sync.WaitGroup
完成再返回實(shí)例。于是最終的代碼也就成型了。
struct SocketMutex{ sync.Mutex socket *zmq4.Socket } struct SocketMap { sockets sync.Map } func (pushList *SocketMap) push(ip string, data []byte) { type SocketFunc func()*SocketMutex var ( socket *SocketMutex w sync.WaitGroup ) socket = &SocketMutex { socket : zmq4.NewSocket() } w.Add(1) socketf, ok = pushList.sockets.LoadOrStore(ip, SocketFunc(func()*SocketMutex) { w.Wait() return socket }) if !ok { socket = &{ socket: zmq4.NewSocket() } //do some initial operation like connect w.Done() } else { socket = socketInter.(*SockeFunc)() } socket.Lock() defer socket.Unlock() socket.socket.Send(data) }
總結(jié):
并發(fā)代碼中的競(jìng)爭(zhēng)問(wèn)題,每一行代碼的重入性都要深思熟慮啊。
總的來(lái)說(shuō)要保持以下幾個(gè)準(zhǔn)則:
(1) 不可重入訪問(wèn)的系統(tǒng)資源,如socketfd, filefd,signalfd(事實(shí)上大多數(shù)這種系統(tǒng)資源都是不可重入的)等,在使用無(wú)鎖結(jié)構(gòu)的容器、讀寫鎖封裝的容器時(shí),需要給每個(gè)資源單獨(dú)加鎖或者使用其他手段保證系統(tǒng)資源在臨界區(qū)受到有效保護(hù)。
(2)如果有讀取,如果為空則寫入的邏輯,需要使用能提供原子性保證的LoadOrSave調(diào)用,或者沒(méi)有的話,自己實(shí)現(xiàn)也要保證讀取和寫入過(guò)程整體的原子性;防止并發(fā)訪問(wèn)Load調(diào)用時(shí),多個(gè)線程都返回否而創(chuàng)建多個(gè)實(shí)例,然后在Save的時(shí)候又互相覆蓋?!@個(gè)原則不光對(duì)成員是系統(tǒng)資源的時(shí)候生效,如果存放的是其他東西也同樣適用。
(3)如果資源創(chuàng)建完畢,還需要其他的初始化過(guò)程,則可以考慮在容器內(nèi)放置閉包,初始化過(guò)程使用sync.WaitGroup
保護(hù),在閉包中調(diào)用Wait方法等待初始化完成再給其他線程返回初始化好的實(shí)例。而初始化過(guò)程完成后,可以置換閉包函數(shù),不再調(diào)用Wait方法,來(lái)減少可能的開銷。
好了,以上就是這篇文章的全部?jī)?nèi)容了,希望本文的內(nèi)容對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,如果有疑問(wèn)大家可以留言交流,謝謝大家對(duì)腳本之家的支持。
相關(guān)文章
GoZero實(shí)現(xiàn)數(shù)據(jù)庫(kù)MySQL單例模式連接的簡(jiǎn)單示例
在 GoZero 框架中實(shí)現(xiàn)數(shù)據(jù)庫(kù)的單例連接可以通過(guò)以下步驟來(lái)完成,GoZero 使用 gorm 作為默認(rèn)的數(shù)據(jù)庫(kù)操作框架,接下來(lái)我會(huì)展示一個(gè)簡(jiǎn)單的單例模式實(shí)現(xiàn),需要的朋友可以參考下2025-02-02Go實(shí)現(xiàn)字符串與數(shù)字的高效轉(zhuǎn)換
在軟件開發(fā)的世界里,數(shù)據(jù)類型轉(zhuǎn)換是一項(xiàng)基礎(chǔ)而重要的技能,尤其在Go語(yǔ)言這樣類型嚴(yán)格的語(yǔ)言中,正確高效地進(jìn)行類型轉(zhuǎn)換對(duì)于性能優(yōu)化和代碼質(zhì)量至關(guān)重要,本文給大家介紹了Go實(shí)現(xiàn)字符串與數(shù)字的高效轉(zhuǎn)換,需要的朋友可以參考下2024-02-02go使用makefile腳本編譯應(yīng)用的方法小結(jié)
makefile可以看作是make工具的腳本文件, 而make主要用來(lái)處理一系列命令。常用的比如用來(lái)編譯和打包文件, 在C/C++的編譯打包中應(yīng)用最廣泛了,這篇文章主要介紹了go使用makefile腳本編譯應(yīng)用,需要的朋友可以參考下2022-08-08golang實(shí)現(xiàn)京東支付v2版本的示例代碼
這篇文章主要介紹了golang實(shí)現(xiàn)京東支付v2版本,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-03-03Golang自定義開發(fā)Prometheus?exporter詳解
Exporter是基于Prometheus實(shí)施的監(jiān)控系統(tǒng)中重要的組成部分,承擔(dān)數(shù)據(jù)指標(biāo)的采集工作,這篇文章主要為大家介紹了如何自定義編寫開發(fā)?Prometheus?exporter,感興趣的可以了解一下2023-06-06一文帶你了解Go語(yǔ)言中的類型斷言和類型轉(zhuǎn)換
在Go中,類型斷言和類型轉(zhuǎn)換是一個(gè)令人困惑的事情,他們似乎都在做同樣的事情。最明顯的不同點(diǎn)是他們具有不同的語(yǔ)法(variable.(type)?vs?type(variable)?)。本文我們就來(lái)深入研究一下二者的區(qū)別2022-09-09