go語言中的map如何解決散列性能下降
寫在文章開頭
近期對go語言的map進行深入了解和探究,其中關(guān)于map解決大量沖突的擴容操作設(shè)計的十分巧妙,所以筆者特地整理了這篇文章來探討問題。

hmap擴容詳解
為什么需要擴容
go語言中map是由無數(shù)個bucket構(gòu)成,假設(shè)某個哈希對應的bucket空間已滿,則需要創(chuàng)建一個新的bmap存儲鍵值對,無數(shù)個bmap通過overflow指針進行關(guān)聯(lián)。 以下圖為例,假設(shè)我們需要查找key-111元素,就需要經(jīng)過以下幾個步驟:
- 通過哈希運算定位到
bucket[1]。 - 基于哈希值高位得到一個值
tophash。 - 最終遍歷當前
bucket[1]中的所有數(shù)組,終于在第二個溢出桶找到key-111。

很明顯,如果極端情況下因為有限的桶導致大量的沖突就很可能使map元素定位的時間復雜度退化為O(n),所以我們需要重新計算哈希值以及對桶進行擴容,從而解決極端的哈希沖突場景。
hmap擴容過程
默認情況下,map的進行擴容需要符合以下兩大條件之一:
- 未處于擴容且鍵值對數(shù)超過bucket數(shù)以及當前負載系統(tǒng)超過6.5。
- 未發(fā)生擴容且溢出桶數(shù)量大于bucket數(shù)。
假設(shè)我們當前hmap如下,所有key的算法都是通過哈希值的低3位和B(011)進行取模運算,因為符合上述某個條件之一觸發(fā)了,需要進行擴容。

一般情況擴容都是以原有bucket數(shù)*2,所以新的bucket數(shù)組長度為16,創(chuàng)建新的數(shù)組空間后,hmap的bucket指針該指向這個新的數(shù)組,而原有bucket則交由oldbucket管理。 也因為數(shù)組長度有8變?yōu)?code>16,所以記錄底數(shù)的變量B也由原來的3變?yōu)?(2^3變?yōu)?^4),因為原有的數(shù)組還有兩個空閑的溢出桶,所以新的數(shù)組也會創(chuàng)建同等數(shù)量的溢出交由管理空閑一處的指針extra.nextOverflow管理。

自此我們完成了擴容操作,go語言中的map為了避免擴容后大量遷移導致的計算耗時,對于舊有的bucket元素采用漸進式再哈希的方式進行遷移。 所以后續(xù)我們在進行相同的寫入操作時,若發(fā)現(xiàn)這個桶正處于擴容狀態(tài),那么它就會計算當前桶中每個元素的新位置,然后一次性進行驅(qū)逐。
以下圖為例,假設(shè)此時我們要修改key-111的鍵值對的值,在進行修改的過程中,map就會通過哈希定位到舊的bucket的key-111的值,然后進行修改,完成后基于全新的哈希算法(由%3改為%4)

源碼印證
我們現(xiàn)在通過一段代碼調(diào)試來了解一下go語言中的map時機的工作流程:
func main() {
m := make(map[int]string)
for i := 0; i < 9; i++ {
m[i] = "xiaoming"
}
}
上述這段代碼經(jīng)過編譯之后,實際調(diào)用的是mapassign_fast64這個方法:
0x0092 00146 (F:\github\awesomeProject\main.go:10) CALL runtime.mapassign_fast64(SB)
這里筆者就貼出這段代碼的核心片段,從核心代碼可以看出當map符合進行擴容的條件時會調(diào)用hashGrow進行擴容,再通過goto again到again對應代碼段進行驅(qū)逐操作:
func mapassign_fast64(t *maptype, h *hmap, key uint64) unsafe.Pointer {
// ......
again:
bucket := hash & bucketMask(h.B)
if h.growing() {
growWork_fast64(t, h, bucket)
}
b := (*bmap)(add(h.buckets, bucket*uintptr(t.bucketsize)))
var insertb *bmap
var inserti uintptr
var insertk unsafe.Pointer
// ......
//如果map未進行擴容,且當前map負載超過最大值(默認6.5)或者溢出桶數(shù)量超過了bucket數(shù)量則進行擴容
if !h.growing() && (overLoadFactor(h.count+1, h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
goto again // Growing the table invalidates everything, so try again
}
// ......
}
hashGrow的擴容邏輯和上圖的流程差不多,讀者可以基于筆者給出的核心注釋進行了解:
func hashGrow(t *maptype, h *hmap) {
//默認B的值是+1
bigger := uint8(1)
//oldbuckets 記錄現(xiàn)在的bucket
oldbuckets := h.buckets
//創(chuàng)建一個新的bucket和溢出桶
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger, nil)
//更新b、oldbuckets 指針指向原有bucket、buckets 指向新創(chuàng)建的bucket
h.B += bigger
h.flags = flags
h.oldbuckets = oldbuckets
h.buckets = newbuckets
h.nevacuate = 0
h.noverflow = 0
//如果原有的bucket還有空閑溢出桶,則記錄到h.extra.oldoverflow指針上
if h.extra != nil && h.extra.overflow != nil {
// Promote current overflow buckets to the old generation.
if h.extra.oldoverflow != nil {
throw("oldoverflow is not nil")
}
h.extra.oldoverflow = h.extra.overflow
h.extra.overflow = nil
}
//如果剛剛有新創(chuàng)建的溢出桶,則用h.extra.nextOverflow指針進行管理
if nextOverflow != nil {
if h.extra == nil {
h.extra = new(mapextra)
}
h.extra.nextOverflow = nextOverflow
}
}
進行修改操作時會觸發(fā)的again代碼段的growWork_fast64方法其內(nèi)部就涉及了驅(qū)逐操作方法evacuate_fast64方法,因為有了上文的圖解,所以對于這段代碼的解讀就很容易了,我們直接查看evacuate_fast64的核心注釋即可了解驅(qū)逐操作的含義:
func growWork_fast64(t *maptype, h *hmap, bucket uintptr) {
//將對應哈希的oldbucket的元素驅(qū)逐到新bucket上
evacuate_fast64(t, h, bucket&h.oldbucketmask())
//......
}
func evacuate_fast64(t *maptype, h *hmap, oldbucket uintptr) {
//獲取舊的bucket的指針
b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
//......
//通過[2]evacDst數(shù)組的0索引記錄oldbucket的bucket、keys、values指針,1記錄new buckets的bucket、keys、values指針
if !evacuated(b) {
var xy [2]evacDst
x := &xy[0]
//記錄oldbucket的bucket、keys、values指針
x.b = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
x.k = add(unsafe.Pointer(x.b), dataOffset)
x.e = add(x.k, bucketCnt*8)
//new buckets的bucket、keys、values指針
if !h.sameSizeGrow() {
// Only calculate y pointers if we're growing bigger.
// Otherwise GC can see bad pointers.
y := &xy[1]
y.b = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
y.k = add(unsafe.Pointer(y.b), dataOffset)
y.e = add(y.k, bucketCnt*8)
}
//循環(huán)遍歷非空的tophash進行驅(qū)逐操作
for ; b != nil; b = b.overflow(t) {
//獲取在old bucket上的地址
k := add(unsafe.Pointer(b), dataOffset)
e := add(k, bucketCnt*8)
for i := 0; i < bucketCnt; i, k, e = i+1, add(k, 8), add(e, uintptr(t.elemsize)) {
//計算在舊的bucket的tophash以定位鍵值對
top := b.tophash[i]
if isEmpty(top) {
b.tophash[i] = evacuatedEmpty
continue
}
if top < minTopHash {
throw("bad map state")
}
//計算在新的bucket上的hash值
var useY uint8
if !h.sameSizeGrow() {
// Compute hash to make our evacuation decision (whether we need
// to send this key/elem to bucket x or bucket y).
hash := t.hasher(k, uintptr(h.hash0))
if hash&newbit != 0 {
useY = 1
}
}
b.tophash[i] = evacuatedX + useY // evacuatedX + 1 == evacuatedY, enforced in makemap
//獲取新的bucket的指針地址
dst := &xy[useY] // evacuation destination
//到新bucke的tophash數(shù)組上的位置記錄這個tophash
dst.b.tophash[dst.i&(bucketCnt-1)] = top // mask dst.i as an optimization, to avoid a bounds check
//將舊的bucket的key復制到新bucket上
if t.key.ptrdata != 0 && writeBarrier.enabled {
if goarch.PtrSize == 8 {
// 通過指針操作,將舊的key復制到新的bucket的key上
*(*unsafe.Pointer)(dst.k) = *(*unsafe.Pointer)(k)
} else {
// There are three ways to squeeze at least one 32 bit pointer into 64 bits.
// Give up and call typedmemmove.
typedmemmove(t.key, dst.k, k)
}
} else {
*(*uint64)(dst.k) = *(*uint64)(k)
}
//復制value到新bucket的位置上
typedmemmove(t.elem, dst.e, e)
//....
}
}
// 完成后刪除舊的bucket的鍵值對,輔助GC
if h.flags&oldIterator == 0 && t.bucket.ptrdata != 0 {
b := add(h.oldbuckets, oldbucket*uintptr(t.bucketsize))
// Preserve b.tophash because the evacuation
// state is maintained there.
ptr := add(b, dataOffset)
n := uintptr(t.bucketsize) - dataOffset
memclrHasPointers(ptr, n)
}
}
if oldbucket == h.nevacuate {
advanceEvacuationMark(h, t, newbit)
}
}
經(jīng)過這段代碼之后,我們的舊的bucket上的桶的數(shù)據(jù)也就都驅(qū)逐完成了,隨后跳出這些代碼段,再次回到mapassign_fast64的賦值操作的代碼找到合適key的位置設(shè)置鍵值對。
如下所示,可以看到核心步驟大致是:
- for循環(huán)定位到哈希算法得出的
tophash對應key的位置。 - 如果找到空位置則跳出循環(huán)進行鍵值對設(shè)置,并更新
count值。 - 若找到和當前key相等的位置,直接跳到
done代碼段設(shè)置value。
func mapassign_fast64(t *maptype, h *hmap, key uint64) unsafe.Pointer {
//......
//定位到key的指針
bucketloop:
for {
for i := uintptr(0); i < bucketCnt; i++ {
//若定位到的bucket為nil,則說明這個位置沒有被使用過,則進入該分支定位指針
if isEmpty(b.tophash[i]) {
if insertb == nil {
insertb = b
inserti = i
}
//如果找到當前tophash為0值,說明這個位置沒有被用過,直接退出循環(huán),到外部直接賦值
if b.tophash[i] == emptyRest {
break bucketloop
}
continue
}
//獲取k判斷和當前key值是否一致,若不一致的continue,反之記錄直接到done代碼段,直接設(shè)置value的值
k := *((*uint64)(add(unsafe.Pointer(b), dataOffset+i*8)))
if k != key {
continue
}
insertb = b
inserti = i
goto done
}
//.....
}
//定位到key位置設(shè)置key
insertk = add(unsafe.Pointer(insertb), dataOffset+inserti*8)
// store new key at insert position
*(*uint64)(insertk) = key
//元素數(shù)量+1
h.count++
done:
//設(shè)置value
elem := add(unsafe.Pointer(insertb), dataOffset+bucketCnt*8+inserti*uintptr(t.elemsize))
if h.flags&hashWriting == 0 {
fatal("concurrent map writes")
}
h.flags &^= hashWriting
return elem
}
擴容未完成時如何讀
因為map是在操作時進行驅(qū)逐操作的,所以在讀取時需要會按照以下步驟執(zhí)行:
- 定位到當前
hash對應bucket設(shè)置給指針b。 - 查看當前
hash對應oldbucket是否發(fā)生驅(qū)逐操作,若未發(fā)生則說明當前的值在oldbucket上,將指針b指向oldbucket。 - 到b指針的
bucket上查到對應的key,若找到則返回value。
對應的核心代碼如下,讀者可參照注釋了解大體流程:
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
//......
//定位hash值
hash := t.hasher(key, uintptr(h.hash0))
//先定位到bucket指針
m := bucketMask(h.B)
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
//在基于hash到oldbucket定位舊的bucket指針,若未發(fā)生驅(qū)逐則上b指針指向oldbucket
if c := h.oldbuckets; c != nil {
if !h.sameSizeGrow() {
// There used to be half as many buckets; mask down one more power of two.
m >>= 1
}
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
//若未發(fā)生驅(qū)逐則上b指針指向oldbucket
if !evacuated(oldb) {
b = oldb
}
}
top := tophash(hash)
bucketloop:
//循環(huán)定位鍵值對
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
//......
//如果找到的key和我們要查詢的key相等則直接返回
if t.key.equal(key, k) {
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
if t.indirectelem() {
e = *((*unsafe.Pointer)(e))
}
return e
}
}
}
return unsafe.Pointer(&zeroVal[0])
}
小結(jié)
本文通過圖解+代碼的形式了解go語言中的map如何通過擴容解決散列性能退化,以及在擴容未完成期間如何實現(xiàn)元素的快速定位查詢,希望對你有幫助。
以上就是go語言中的map如何解決散列性能下降的詳細內(nèi)容,更多關(guān)于go map散列性能下降的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
使用golang獲取linux上文件的訪問/創(chuàng)建/修改時間

