詳解Go語言如何實現(xiàn)并發(fā)安全的map
go語言提供的數(shù)據(jù)類型中,只有channel是并發(fā)安全的,基礎(chǔ)map并不是并發(fā)安全的。以下三種方案實現(xiàn)了并發(fā)安全的map。
方案一:讀寫鎖+map
實現(xiàn)原理
給map添加一把讀寫鎖,讀操作加讀鎖進(jìn)行讀?。惶砑?,更新,刪除,遍歷,獲取長度這些操作加寫鎖后在進(jìn)行操作。
代碼實現(xiàn)
以下代碼是并發(fā)map的實現(xiàn)演示:
type RWMap struct { sync.RWMutex m map[any]any } func NewGRWMap() *RWMap { return &RWMap{ m: make(map[any]any), } } func (m *RWMap) Get(k int) (any, bool) { m.RLock() defer m.RUnlock() v, existed := m.m[k] return v, existed } func (m *RWMap) Set(k any, v any) { m.Lock() defer m.Unlock() m.m[k] = v } func (m *RWMap) Delete(k any) { m.Lock() defer m.Unlock() delete(m.m, k) } func (m *RWMap) Len() int { m.RLock() defer m.RUnlock() return len(m.m) } func (m *RWMap) Each(f func(k, v any) bool) { m.RLock() defer m.RUnlock() for k, v := range m.m { if !f(k, v) { return } } }
上述代碼在讀的時候加了個讀鎖,這個讀鎖在sync.RWMutex中并沒有使用鎖,只是將 readerCount
這個字段+1。增刪改是加了寫鎖,寫鎖在sync.RWMutex中每次都需要加鎖。 以上可知,讀寫鎖的加鎖力度很大,當(dāng)需要讀多寫少的情況下可以使用讀寫鎖加map實現(xiàn)并發(fā)安全。
方案二:分片加鎖
實現(xiàn)原理
分片加鎖的原理就如其名字一樣,將一個map分成多個片段(一個片段是一個map),每個片段有自己的鎖。
代碼實現(xiàn)
concurrent-map提供了一種高性能的解決方案:通過對內(nèi)部map
進(jìn)行分片,降低鎖粒度,從而達(dá)到最少的鎖等待時間(鎖沖突)
以下是分片加鎖中重要的數(shù)據(jù)類型的結(jié)構(gòu)體:
var SHARD_COUNT = 32 type ConcurrentMap[K comparable, V any] struct { shards []*ConcurrentMapShared[K, V] sharding func(key K) uint32 } type ConcurrentMapShared[K comparable, V any] struct { items map[K]V sync.RWMutex }
結(jié)構(gòu)如下圖所示:
上述代碼使用泛型實現(xiàn)了不同類型的map[comparable]any
。調(diào)用NewWithCustomShardingFunction
函數(shù),傳入泛型的類型參數(shù)和哈希函數(shù),就可以得到一個并發(fā)安全的map了。
func create[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] { m := ConcurrentMap[K, V]{ sharding: sharding, shards: make([]*ConcurrentMapShared[K, V], SHARD_COUNT), } for i := 0; i < SHARD_COUNT; i++ { m.shards[i] = &ConcurrentMapShared[K, V]{items: make(map[K]V)} } return m } func New[V any]() ConcurrentMap[string, V] { return create[string, V](fnv32) } func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] { return create[K, V](sharding) }
添加過程
func (m ConcurrentMap[K, V]) Set(key K, value V) { shard := m.GetShard(key) shard.Lock() shard.items[key] = value shard.Unlock() } func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V] { return m.shards[uint(m.sharding(key))%uint(SHARD_COUNT)] }
大致流程如上圖所示:
- 調(diào)用sharding得到哈希值。
- 對哈希值取模于切片長度,得到對應(yīng)的分片map。
- 對map加寫鎖進(jìn)行操作。
其他流程大致一樣,大致都是先找“坑”,再進(jìn)行讀寫操作。也可以將分片加鎖方案簡單的理解為是對讀寫加鎖的方案的升級。
方案三:sync.Map
我們先看一下1.21版本的sync.Map的結(jié)構(gòu)體:
type Map struct { mu Mutex read atomic.Pointer[readOnly] dirty map[any]*entry misses int } type readOnly struct { m map[any]*entry amended bool } type entry struct { p atomic.Pointer[any] }
這個結(jié)構(gòu)的關(guān)系如下圖所示:
應(yīng)對特殊場景的 sync.Map
在官方文檔中指出,在以下兩個場景中使用sync.Map,會比使用讀寫鎖+map,的性能好:
- 只會增長的緩存系統(tǒng)中,一個key只寫入一次而被讀很多次;
- 多個goroutine為不相交的讀,寫和重寫的鍵值對。
sync.Map的操作流程
讀Load()
func (x *Pointer[T]) Load() *T { return (*T)(LoadPointer(&x.v)) } func (m *Map) loadReadOnly() readOnly { if p := m.read.Load(); p != nil { return *p } return readOnly{} } func (m *Map) Load(key any) (value any, ok bool) { read := m.loadReadOnly() e, ok := read.m[key] if !ok && read.amended { m.mu.Lock() read = m.loadReadOnly() e, ok = read.m[key] if !ok && read.amended { e, ok = m.dirty[key] m.missLocked() } m.mu.Unlock() } if !ok { return nil, false } return e.load() } func (e *entry) load() (value any, ok bool) { p := e.p.Load() if p == nil || p == expunged { return nil, false } return *p, true } func (m *Map) missLocked() { m.misses++ if m.misses < len(m.dirty) { return } m.read.Store(&readOnly{m: m.dirty}) m.dirty = nil m.misses = 0 }
讀流程:
先去read中讀,有數(shù)據(jù)直接讀取e.load()
結(jié)束;沒有加鎖,去dirty中讀,e換成dirty的,調(diào)用m.missLocked()
,判斷dirty是否存在這個key,不存在return nil, false
;存在e.load()
.
!ok && read.amended
這個判斷是在read不存在key并且dirty存在read中沒有的數(shù)據(jù)時為true。 m.missLocked()
記錄miss的次數(shù),當(dāng)miss的次數(shù)大于m.dirty
的長度時將dirty數(shù)據(jù)給read,dirty清空,miss重置為0。
寫Store()
func (m *Map) Store(key, value any) { _, _ = m.Swap(key, value) } func (m *Map) Swap(key, value any) (previous any, loaded bool) { read := m.loadReadOnly() if e, ok := read.m[key]; ok { if v, ok := e.trySwap(&value); ok { if v == nil { return nil, false } return *v, true } } m.mu.Lock() read = m.loadReadOnly() if e, ok := read.m[key]; ok { if e.unexpungeLocked() { // The entry was previously expunged, which implies that there is a // non-nil dirty map and this entry is not in it. m.dirty[key] = e } if v := e.swapLocked(&value); v != nil { loaded = true previous = *v } } else if e, ok := m.dirty[key]; ok { if v := e.swapLocked(&value); v != nil { loaded = true previous = *v } } else { if !read.amended { // We're adding the first new key to the dirty map. // Make sure it is allocated and mark the read-only map as incomplete. m.dirtyLocked() m.read.Store(&readOnly{m: read.m, amended: true}) } m.dirty[key] = newEntry(value) } m.mu.Unlock() return previous, loaded } func (m *Map) dirtyLocked() { if m.dirty != nil { return } read := m.loadReadOnly() m.dirty = make(map[any]*entry, len(read.m)) for k, e := range read.m { if !e.tryExpungeLocked() { m.dirty[k] = e } } }
寫的流程:
先去read看key是否存在;存在:如果key的value值為expunged
,返回false,走dirty操作;否則,使用cas原子操作直接賦值,結(jié)束流程。
返回false,走dirty操作:先加鎖,再走一次read,看是否存在key。
- read存在,使用
e.unexpungeLocked()
使用cas將entry設(shè)置為nil,若cas成功,將dirty中的entry設(shè)置為nil。使用cas設(shè)置value。 - read不存在,dirty存在,使用cas設(shè)置value。
- 以上都不滿足(read,dirty都不存在),判斷read中是否缺少數(shù)據(jù),缺少時給dirty添加key-value;不缺少時調(diào)用
m.dirtyLocked()
,將read中的數(shù)據(jù)更新到dirty中,將其中刪除的數(shù)據(jù)設(shè)置為expunged
,之后將read的amended
設(shè)置為true,最后給dirty添加key-value。
解鎖,結(jié)束。
刪Delete()
func (m *Map) Delete(key any) { m.LoadAndDelete(key) } func (m *Map) LoadAndDelete(key any) (value any, loaded bool) { read := m.loadReadOnly() e, ok := read.m[key] if !ok && read.amended { m.mu.Lock() read = m.loadReadOnly() e, ok = read.m[key] if !ok && read.amended { e, ok = m.dirty[key] delete(m.dirty, key) m.missLocked() } m.mu.Unlock() } if ok { return e.delete() } return nil, false } func (e *entry) delete() (value any, ok bool) { for { p := e.p.Load() if p == nil || p == expunged { return nil, false } if e.p.CompareAndSwap(p, nil) { return *p, true } } }
刪除流程:
先看read中是否存在,存在,直接調(diào)用e.delete()
結(jié)束
不存在,且read中缺少數(shù)據(jù),加鎖,再次查看read,存在:解鎖,調(diào)用e.delete()
結(jié)束;不存在:刪除dirty中的key,再調(diào)用m.missLocked(),解鎖,若dirty中存在并刪除了,還需要調(diào)用e.delete()
,若dirty不存在key,return結(jié)束。
遍歷Range()
func (m *Map) Range(f func(key, value any) bool) { read := m.loadReadOnly() if read.amended { m.mu.Lock() read = m.loadReadOnly() if read.amended { read = readOnly{m: m.dirty} m.read.Store(&read) m.dirty = nil m.misses = 0 } m.mu.Unlock() } for k, e := range read.m { v, ok := e.load() if !ok { continue } if !f(k, v) { break } } }
遍歷流程: 獲取read,若read的數(shù)據(jù)全,遍歷read,若數(shù)據(jù)不全,加鎖,將dirty數(shù)據(jù)更新到read中,并將dirty值為nil,misses置0,再遍歷read。
sync.Map安全并發(fā)實現(xiàn)
sync.Map在實現(xiàn)并發(fā)問題的同時提升性能的幾個優(yōu)化:
- 用空間換時間,使用兩個map,一個無鎖,一個有鎖,減少加鎖對性能的影響。
- 優(yōu)先從無鎖的map中讀取,更新,刪除。
- 動態(tài)調(diào)整,miss次數(shù)多之后,將加鎖map數(shù)據(jù)給無鎖map。
- 延遲刪除,刪除一個鍵值只是進(jìn)行了軟刪除,在動態(tài)調(diào)整時會進(jìn)行硬刪除。
- double check,訪問有鎖map,加鎖后再次檢查無鎖map,是否有數(shù)據(jù)。
到此這篇關(guān)于詳解Go語言如何實現(xiàn)并發(fā)安全的map的文章就介紹到這了,更多相關(guān)Go實現(xiàn)并發(fā)安全map內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go語言中使用 buffered channel 實現(xiàn)線程安全的 pool
這篇文章主要介紹了Go語言中使用 buffered channel 實現(xiàn)線程安全的 pool,因為Go語言自帶的sync.Pool并不是很好用,所以自己實現(xiàn)了一線程安全的 pool,需要的朋友可以參考下2014-10-10go gin+token(JWT)驗證實現(xiàn)登陸驗證
本文主要介紹了go gin+token(JWT)驗證實現(xiàn)登陸驗證,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-12-12