golang中三種線程安全的MAP小結(jié)
一、map 是什么
map 是 Go 中用于存儲 key-value 關(guān)系數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu),類似 C++ 中的 map,Python 中的 dict。Go 中 map 的使用很簡單,但是對于初學者,經(jīng)常會犯兩個錯誤:沒有初始化,并發(fā)讀寫。
1、未初始化的 map 都是 nil,直接賦值會報 panic。map 作為結(jié)構(gòu)體成員的時候,很容易忘記對它的初始化。
2、并發(fā)讀寫是我們使用 map 中很常見的一個錯誤。多個協(xié)程并發(fā)讀寫同一個 key 的時候,會出現(xiàn)沖突,導致 panic。
Go 內(nèi)置的 map 類型并沒有對并發(fā)場景場景進行優(yōu)化,但是并發(fā)場景又很常見,如何實現(xiàn)線程安全(并發(fā)安全)的 map就很重要了
二、三種線程安全的 map
1、加讀寫鎖(RWMutex)
這是最容易想到的一種方式。常見的 map 的操作有增刪改查和遍歷,這里面查和遍歷是讀操作,增刪改是寫操作,因此對查和遍歷需要加讀鎖,對增刪改需要加寫鎖。
以 map[int]int 為例,借助 RWMutex,具體的實現(xiàn)方式如下:
type RWMap struct { // 一個讀寫鎖保護的線程安全的map sync.RWMutex // 讀寫鎖保護下面的map字段 m map[int]int } // 新建一個RWMap func NewRWMap(n int) *RWMap { return &RWMap{ m: make(map[int]int, n), } } func (m *RWMap) Get(k int) (int, bool) { //從map中讀取一個值 m.RLock() defer m.RUnlock() v, existed := m.m[k] // 在鎖的保護下從map中讀取 return v, existed } func (m *RWMap) Set(k int, v int) { // 設(shè)置一個鍵值對 m.Lock() // 鎖保護 defer m.Unlock() m.m[k] = v } func (m *RWMap) Delete(k int) { //刪除一個鍵 m.Lock() // 鎖保護 defer m.Unlock() delete(m.m, k) } func (m *RWMap) Len() int { // map的長度 m.RLock() // 鎖保護 defer m.RUnlock() return len(m.m) } func (m *RWMap) Each(f func(k, v int) bool) { // 遍歷map m.RLock() //遍歷期間一直持有讀鎖 defer m.RUnlock() for k, v := range m.m { if !f(k, v) { return } } }
2、分片加鎖
通過讀寫鎖 RWMutex 實現(xiàn)的線程安全的 map,功能上已經(jīng)完全滿足了需要,但是面對高并發(fā)的場景,僅僅功能滿足可不行,性能也得跟上。鎖是性能下降的萬惡之源之一。所以并發(fā)編程的原則就是盡可能減少鎖的使用。當鎖不得不用的時候,可以減小鎖的粒度和持有的時間。
在第一種方法中,加鎖的對象是整個 map,協(xié)程 A 對 map 中的 key 進行修改操作,會導致其它協(xié)程無法對其它 key 進行讀寫操作。一種解決思路是將這個 map 分成 n 塊,每個塊之間的讀寫操作都互不干擾,從而降低沖突的可能性。
Go 比較知名的分片 map 的實現(xiàn)是 orcaman/concurrent-map,它的定義如下:
var SHARD_COUNT = 32 // 分成SHARD_COUNT個分片的map type ConcurrentMap []*ConcurrentMapShared // 通過RWMutex保護的線程安全的分片,包含一個map type ConcurrentMapShared struct { items map[string]interface{} sync.RWMutex // Read Write mutex, guards access to internal map. } // 創(chuàng)建并發(fā)map func New() ConcurrentMap { m := make(ConcurrentMap, SHARD_COUNT) for i := 0; i < SHARD_COUNT; i++ { m[i] = &ConcurrentMapShared{items: make(map[string]interface{})} } return m } // 根據(jù)key計算分片索引 func (m ConcurrentMap) GetShard(key string) *ConcurrentMapShared { return m[uint(fnv32(key))%uint(SHARD_COUNT)] }
ConcurrentMap 其實就是一個切片,切片的每個元素都是第一種方法中攜帶了讀寫鎖的 map。
這里面 GetShard 方法就是用來計算每一個 key 應該分配到哪個分片上。再來看一下 Set 和 Get 操作。
func (m ConcurrentMap) Set(key string, value interface{}) { // 根據(jù)key計算出對應的分片 shard := m.GetShard(key) shard.Lock() //對這個分片加鎖,執(zhí)行業(yè)務(wù)操作 shard.items[key] = value shard.Unlock() } func (m ConcurrentMap) Get(key string) (interface{}, bool) { // 根據(jù)key計算出對應的分片 shard := m.GetShard(key) shard.RLock() // 從這個分片讀取key的值 val, ok := shard.items[key] shard.RUnlock() return val, ok }
Get 和 Set 方法類似,都是根據(jù) key 用 GetShard 計算出分片索引,找到對應的 map 塊,執(zhí)行讀寫操作。
3、sync 中的 map
分片加鎖的思路是將大塊的數(shù)據(jù)切分成小塊的數(shù)據(jù),從而減少沖突導致鎖阻塞的可能性。如果在一些特殊的場景下,將讀寫數(shù)據(jù)分開,是不是能在進一步提升性能呢?
在內(nèi)置的 sync 包中(Go 1.9+)也有一個線程安全的 map,通過將讀寫分離的方式實現(xiàn)了某些特定場景下的性能提升。
其實在生產(chǎn)環(huán)境中,sync.map 用的很少,官方文檔推薦的兩種使用場景是:
a) when the entry for a given key is only ever written once but read many times, as in caches that only grow.
b) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys.
兩種場景都比較苛刻,要么是一寫多讀,要么是各個協(xié)程操作的 key 集合沒有交集(或者交集很少)。所以官方建議先對自己的場景做性能測評,如果確實能顯著提高性能,再使用 sync.map。
sync.map 的整體思路就是用兩個數(shù)據(jù)結(jié)構(gòu)(只讀的 read 和可寫的 dirty)盡量將讀寫操作分開,來減少鎖對性能的影響。
下面詳細看下 sync.map 的定義和增刪改查實現(xiàn)。
sync.map 數(shù)據(jù)結(jié)構(gòu)定義
type Map struct { mu Mutex // 基本上你可以把它看成一個安全的只讀的map // 它包含的元素其實也是通過原子操作更新的,但是已刪除的entry就需要加鎖操作了 read atomic.Value // readOnly // 包含需要加鎖才能訪問的元素 // 包括所有在read字段中但未被expunged(刪除)的元素以及新加的元素 dirty map[interface{}]*entry // 記錄從read中讀取miss的次數(shù),一旦miss數(shù)和dirty長度一樣了,就會把dirty提升為read,并把dirty置空 misses int } type readOnly struct { m map[interface{}]*entry amended bool // 當dirty中包含read沒有的數(shù)據(jù)時為true,比如新增一條數(shù)據(jù) } // expunged是用來標識此項已經(jīng)刪掉的指針 // 當map中的一個項目被刪除了,只是把它的值標記為expunged,以后才有機會真正刪除此項 var expunged = unsafe.Pointer(new(interface{})) // entry代表一個值 type entry struct { p unsafe.Pointer // *interface{} }
Map 的定義中,read 字段通過 atomic.Values 存儲被高頻讀的 readOnly 類型的數(shù)據(jù)。dirty 存儲
Store 方法
Store 方法用來設(shè)置一個鍵值對,或者更新一個鍵值對。
func (m *Map) Store(key, value interface{}) { read, _ := m.read.Load().(readOnly) // 如果read字段包含這個項,說明是更新,cas更新項目的值即可 if e, ok := read.m[key]; ok && e.tryStore(&value) { return } // read中不存在,或者cas更新失敗,就需要加鎖訪問dirty了 m.mu.Lock() read, _ = m.read.Load().(readOnly) if e, ok := read.m[key]; ok { // 雙檢查,看看read是否已經(jīng)存在了 if e.unexpungeLocked() { // 此項目先前已經(jīng)被刪除了,需要添加到 dirty 中 m.dirty[key] = e } e.storeLocked(&value) // 更新 } else if e, ok := m.dirty[key]; ok { // 如果dirty中有此項 e.storeLocked(&value) // 直接更新 } else { // 否則就是一個新的key if !read.amended { //如果dirty為nil // 需要創(chuàng)建dirty對象,并且標記read的amended為true, // 說明有元素它不包含而dirty包含 m.dirtyLocked() m.read.Store(readOnly{m: read.m, amended: true}) } m.dirty[key] = newEntry(value) //將新值增加到dirty對象中 } m.mu.Unlock() } // tryStore利用 cas 操作來更新value。 // 更新之前會判斷這個鍵值對有沒有被打上刪除的標記 func (e *entry) tryStore(i *interface{}) bool { for { p := atomic.LoadPointer(&e.p) if p == expunged { return false } if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { return true } } } // 將值設(shè)置成 nil,表示沒有被刪除 func (e *entry) unexpungeLocked() (wasExpunged bool) { return atomic.CompareAndSwapPointer(&e.p, expunged, nil) } // 通過復制 read 生成 dirty func (m *Map) dirtyLocked() { if m.dirty != nil { return } read, _ := m.read.Load().(readOnly) m.dirty = make(map[interface{}]*entry, len(read.m)) for k, e := range read.m { if !e.tryExpungeLocked() { m.dirty[k] = e } } } // 標記刪除 func (e *entry) tryExpungeLocked() (isExpunged bool) { p := atomic.LoadPointer(&e.p) for p == nil { if atomic.CompareAndSwapPointer(&e.p, nil, expunged) { return true } p = atomic.LoadPointer(&e.p) } return p == expunged }
第2-6行,通過 cas 進行鍵值對更新,更新成功直接返回。
第8-28行,通過互斥鎖加鎖來處理處理新增鍵值對和更新失敗的場景(鍵值對被標記刪除)。
第11行,再次檢查 read 中是否已經(jīng)存在要 Store 的 key(雙檢查是因為之前檢查的時候沒有加鎖,中途可能有協(xié)程修改了 read)。
如果該鍵值對之前被標記刪除,先將這個鍵值對寫到 dirty 中,同時更新 read。
如果 dirty 中已經(jīng)有這一項了,直接更新 read。
如果是一個新的 key。dirty 為空的情況下通過復制 read 創(chuàng)建 dirty,不為空的情況下直接更新 dirty。
Load 方法
Load 方法比較簡單,先是從 read 中讀數(shù)據(jù),讀不到,再通過互斥鎖鎖從 dirty 中讀數(shù)據(jù)。
func (m *Map) Load(key interface{}) (value interface{}, ok bool) { // 首先從read處理 read, _ := m.read.Load().(readOnly) e, ok := read.m[key] if !ok && read.amended { // 如果不存在并且dirty不為nil(有新的元素) m.mu.Lock() // 雙檢查,看看read中現(xiàn)在是否存在此key read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended {//依然不存在,并且dirty不為nil e, ok = m.dirty[key]// 從dirty中讀取 // 不管dirty中存不存在,miss數(shù)都加1 m.missLocked() } m.mu.Unlock() } if !ok { return nil, false } return e.load() //返回讀取的對象,e既可能是從read中獲得的,也可能是從dirty中獲得的 } func (m *Map) missLocked() { m.misses++ // misses計數(shù)加一 if m.misses < len(m.dirty) { // 如果沒達到閾值(dirty字段的長度),返回 return } m.read.Store(readOnly{m: m.dirty}) //把dirty字段的內(nèi)存提升為read字段 m.dirty = nil // 清空dirty m.misses = 0 // misses數(shù)重置為0 }
這里需要注意的是,如果出現(xiàn)多次從 read 中讀不到數(shù)據(jù),得到 dirty 中讀取的情況,就直接把 dirty 升級成 read,以提高 read 效率。
Delete 方法
下面是 Go1.13 中 Delete 的實現(xiàn)方式,如果 key 在 read 中,就將值置成 nil;如果在 dirty 中,直接刪除 key。
func (m *Map) Delete(key interface{}) { read, _ := m.read.Load().(readOnly) e, ok := read.m[key] if !ok && read.amended { m.mu.Lock() read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended { // 說明可能在 delete(m.dirty, key) } m.mu.Unlock() } if ok { e.delete() } } func (e *entry) delete() (hadValue bool) { for { p := atomic.LoadPointer(&e.p) if p == nil || p == expunged { return false } if atomic.CompareAndSwapPointer(&e.p, p, nil) { return true } } }
補充說明一下,delete() 執(zhí)行完之后,e.p 變成 nil,下次 Store 的時候,執(zhí)行到 dirtyLocked() 這一步的時候,會被標記成 enpunged。因此在 read 中 nil 和 enpunged 都表示刪除狀態(tài)。
sync.map 總結(jié)
上面對源碼粗略的梳理了一遍,最后在總結(jié)一下 sync.map 的實現(xiàn)思路:
- 讀寫分離。讀(更新)相關(guān)的操作盡量通過不加鎖的 read 實現(xiàn),寫(新增)相關(guān)的操作通過 dirty 加鎖實現(xiàn)。
- 動態(tài)調(diào)整。新寫入的 key 都只存在 dirty 中,如果 dirty 中的 key 被多次讀取,dirty 就會上升成不需要加鎖的 read。
- 延遲刪除。Delete 只是把被刪除的 key 標記成 nil,新增 key-value 的時候,標記成 enpunged;dirty 上升成 read 的時候,標記刪除的 key 被批量移出 map。這樣的好處是 dirty 變成 read 之前,這些 key 都會命中 read,而 read 不需要加鎖,無論是讀還是更新,性能都很高。
總結(jié)了 sync.map 的設(shè)計思路后,我們就能理解官方文檔推薦的 sync.map 的兩種應用場景了。
三、總結(jié)
Go 內(nèi)置的 map 使用起來很方便,但是在并發(fā)頻繁的 Go 程序中很容易出現(xiàn)并發(fā)讀寫沖突導致的問題。本文介紹了三種常見的線程安全 map 的實現(xiàn)方式,分別是讀寫鎖、分片鎖和 sync.map。
較常使用的是前兩種,而在特定的場景下,sync.map 的性能會有更優(yōu)的表現(xiàn)。
到此這篇關(guān)于golang中三種線程安全的MAP小結(jié)的文章就介紹到這了,更多相關(guān)golang 線程安全的MAP內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
golang封裝一個執(zhí)行命令行的函數(shù)(return?stderr/stdout/exitcode)示例代碼
在?Go?語言中,您可以使用?os/exec?包來執(zhí)行外部命令,不通過調(diào)用?shell,并且能夠獲得進程的退出碼、標準輸出和標準錯誤輸出,下面給大家分享golang封裝一個執(zhí)行命令行的函數(shù)(return?stderr/stdout/exitcode)的方法,感興趣的朋友跟隨小編一起看看吧2024-06-06golang實現(xiàn)實時監(jiān)聽文件并自動切換目錄
這篇文章主要給大家介紹了golang實現(xiàn)實時監(jiān)聽文件,并自動切換目錄,文中通過代碼示例給大家介紹的非常詳細,對大家的學習或工作有一定的參考價值,需要的朋友可以參考下2023-12-12golang?gorm開發(fā)架構(gòu)及寫插件示例
這篇文章主要為大家介紹了golang?gorm開發(fā)架構(gòu)及寫插件的詳細示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步早日升職加薪2022-04-04