go?原子操作的方式及實(shí)現(xiàn)原理全面深入解析
在我們前面的一些介紹 sync
包相關(guān)的文章中,我們應(yīng)該也發(fā)現(xiàn)了,其中有不少地方使用了原子操作。 比如 sync.WaitGroup
、sync.Map
再到 sync.Pool
,這些結(jié)構(gòu)體的實(shí)現(xiàn)中都有原子操作的身影。 原子操作在并發(fā)編程中是一種非常重要的操作,它可以保證并發(fā)安全,而且效率也很高。 本文將會(huì)深入探討一下 go 中原子操作的原理、使用場(chǎng)景、用法等內(nèi)容。
什么是原子操作?
原子操作是變量級(jí)別的互斥鎖。
如果讓我用一句話來(lái)說(shuō)明什么是原子操作,那就是:原子操作是變量級(jí)別的互斥鎖。 簡(jiǎn)單來(lái)說(shuō),就是同一時(shí)刻,只能有一個(gè) CPU 對(duì)變量進(jìn)行讀或?qū)憽?當(dāng)我們想要對(duì)某個(gè)變量做并發(fā)安全的修改,除了使用官方提供的 Mutex
,還可以使用 sync/atomic
包的原子操作, 它能夠保證對(duì)變量的讀取或修改期間不被其他的協(xié)程所影響。
我們可以用下圖來(lái)表示:
說(shuō)明:在上圖中,我們有三個(gè) CPU 邏輯核,其中 CPU 1 正在對(duì)變量 v
做原子操作,這個(gè)時(shí)候 CPU 2 和 CPU 3 不能對(duì) v
做任何操作, 在 CPU 1 操作完成后,CPU 2 和 CPU 3 可以獲取到 v
的最新值。
從這個(gè)角度看,我們可以把 sync/atomic
包中的原子操作看成是變量級(jí)別的互斥鎖。 就是說(shuō),在 go 中,當(dāng)一個(gè)協(xié)程對(duì)變量做原子操作時(shí),其他協(xié)程不能對(duì)這個(gè)變量做任何操作,直到這個(gè)協(xié)程操作完成。
原子操作的使用場(chǎng)景是什么?
拿一個(gè)簡(jiǎn)單的例子來(lái)說(shuō)明一下原子操作的使用場(chǎng)景:
func TestAtomic(t *testing.T) { var sum = 0 var wg sync.WaitGroup wg.Add(1000) // 啟動(dòng) 1000 個(gè)協(xié)程,每個(gè)協(xié)程對(duì) sum 做加法操作 for i := 0; i < 1000; i++ { go func() { defer wg.Done() sum++ }() } // 等待所有的協(xié)程都執(zhí)行完畢 wg.Wait() fmt.Println(sum) // 這里輸出多少呢? }
我們可以在自己的電腦上運(yùn)行一下這段代碼,看看輸出的結(jié)果是多少。 不出意外的話,應(yīng)該每次可能都不一樣,而且應(yīng)該也不是 1000,這是為什么呢?
這是因?yàn)?,CPU 在對(duì) sum
做加法的時(shí)候,需要先將 sum
目前的值讀取到 CPU 的寄存器中,然后再進(jìn)行加法操作,最后再寫回到內(nèi)存中。 如果有兩個(gè) CPU 同時(shí)取了 sum
的值,然后都進(jìn)行了加法操作,然后都再寫回到內(nèi)存中,那么就會(huì)導(dǎo)致 sum
的值被覆蓋,從而導(dǎo)致結(jié)果不正確。
舉個(gè)例子,目前內(nèi)存中的 sum
為 1,然后兩個(gè) CPU 同時(shí)取了這個(gè) 1 來(lái)做加法,然后都得到了結(jié)果 2, 然后這兩個(gè) CPU 將各自的計(jì)算結(jié)果寫回到內(nèi)存中,那么內(nèi)存中的 sum
就變成了 2,而不是 3。
在這種場(chǎng)景下,我們可以使用原子操作來(lái)實(shí)現(xiàn)并發(fā)安全的加法操作:
func TestAtomic1(t *testing.T) { // 將 sum 的類型改成 int32,因?yàn)樵硬僮髦荒茚槍?duì) int32、int64、uint32、uint64、uintptr 這幾種類型 var sum int32 = 0 var wg sync.WaitGroup wg.Add(1000) // 啟動(dòng) 1000 個(gè)協(xié)程,每個(gè)協(xié)程對(duì) sum 做加法操作 for i := 0; i < 1000; i++ { go func() { defer wg.Done() // 將 sum++ 改成下面這樣 atomic.AddInt32(&sum, 1) }() } wg.Wait() fmt.Println(sum) // 輸出 1000 }
在上面這個(gè)例子中,我們每次執(zhí)行都能得到 1000 這個(gè)結(jié)果。
因?yàn)槭褂迷硬僮鞯臅r(shí)候,同一時(shí)刻只能有一個(gè) CPU 對(duì)變量進(jìn)行讀或?qū)?,所以就不?huì)出現(xiàn)上面的問(wèn)題了。
所以很多需要對(duì)變量做并發(fā)讀寫的地方,我們都可以考慮一下,是否可以使用原子操作來(lái)實(shí)現(xiàn)并發(fā)安全的操作(而不是使用互斥鎖,互斥鎖效率相比原子操作要低一些)。
原子操作的使用場(chǎng)景也是和互斥鎖類似的,但是不一樣的是,我們的鎖粒度只是一個(gè)變量而已。也就是說(shuō),當(dāng)我們不允許多個(gè) CPU 同時(shí)對(duì)變量進(jìn)行讀寫的時(shí)候(保證變量同一時(shí)刻只能一個(gè) CPU 操作),就可以使用原子操作。
原子操作是怎么實(shí)現(xiàn)的?
看完上面原子操作的介紹,有沒(méi)有覺(jué)得原子操作很神奇,居然有這么好用的東西。那它到底是怎么實(shí)現(xiàn)的呢?
一般情況下,原子操作的實(shí)現(xiàn)需要特殊的 CPU 指令或者系統(tǒng)調(diào)用。 這些指令或者系統(tǒng)調(diào)用可以保證在執(zhí)行期間不會(huì)被其他操作或事件中斷,從而保證操作的原子性。
例如,在 x86 架構(gòu)的 CPU 中,可以使用 LOCK
前綴來(lái)實(shí)現(xiàn)原子操作。 LOCK
前綴可以與其他指令一起使用,用于鎖定內(nèi)存總線,防止其他 CPU 訪問(wèn)同一內(nèi)存地址,從而實(shí)現(xiàn)原子操作。 在使用 LOCK
前綴的指令執(zhí)行期間,CPU 會(huì)將當(dāng)前處理器緩存中的數(shù)據(jù)寫回到內(nèi)存中,并鎖定該內(nèi)存地址, 防止其他 CPU 修改該地址的數(shù)據(jù)(所以原子操作總是可以讀取到最新的數(shù)據(jù))。 一旦當(dāng)前 CPU 對(duì)該地址的操作完成,CPU 會(huì)釋放該內(nèi)存地址的鎖定,其他 CPU 才能繼續(xù)對(duì)該地址進(jìn)行訪問(wèn)。
x86 LOCK 的時(shí)候發(fā)生了什么
我們?cè)賮?lái)捋一下上面的內(nèi)容,看看 LOCK
前綴是如何實(shí)現(xiàn)原子操作的:
- CPU 會(huì)將當(dāng)前處理器緩存中的數(shù)據(jù)寫回到內(nèi)存中。(因此我們總能讀取到最新的數(shù)據(jù))
- 然后鎖定該內(nèi)存地址,防止其他 CPU 修改該地址的數(shù)據(jù)。
- 一旦當(dāng)前 CPU 對(duì)該地址的操作完成,CPU 會(huì)釋放該內(nèi)存地址的鎖定,其他 CPU 才能繼續(xù)對(duì)該地址進(jìn)行訪問(wèn)。
其他架構(gòu)的 CPU 可能會(huì)略有不同,但是原理是一樣的。
原子操作有什么特征?
- 不會(huì)被中斷:原子操作是一個(gè)不可分割的操作,要么全部執(zhí)行,要么全部不執(zhí)行,不會(huì)出現(xiàn)中間狀態(tài)。這是保證原子性的基本前提。同時(shí),原子操作過(guò)程中不會(huì)有上下文切換的過(guò)程。
- 操作對(duì)象是共享變量:原子操作通常是對(duì)共享變量進(jìn)行的,也就是說(shuō),多個(gè)協(xié)程可以同時(shí)訪問(wèn)這個(gè)變量,因此需要采用原子操作來(lái)保證數(shù)據(jù)的一致性和正確性。
- 并發(fā)安全:原子操作是并發(fā)安全的,可以保證多個(gè)協(xié)程同時(shí)進(jìn)行操作時(shí)不會(huì)出現(xiàn)數(shù)據(jù)競(jìng)爭(zhēng)問(wèn)題(雖然說(shuō)是同時(shí),但是實(shí)際上在操作那個(gè)變量的時(shí)候是互斥的)。
- 無(wú)需加鎖:原子操作不需要使用互斥鎖來(lái)保證數(shù)據(jù)的一致性和正確性,因此可以避免互斥鎖的使用帶來(lái)的性能損失。
- 適用場(chǎng)景比較局限:原子操作適用于操作單個(gè)變量,如果需要同時(shí)并發(fā)讀寫多個(gè)變量,可能需要考慮使用互斥鎖。
go 里面有哪些原子操作?
在 go 中,主要有以下幾種原子操作:Add
、CompareAndSwap
、Load
、Store
、Swap
。
增減(Add)
- 用于進(jìn)行增加或減少的原子操作,函數(shù)名以
Add
為前綴,后綴針對(duì)特定類型的名稱。 - 原子增被操作的類型只能是數(shù)值類型,即
int32
、int64
、uint32
、uint64
、uintptr
- 原子增減函數(shù)的第一個(gè)參數(shù)為原值,第二個(gè)參數(shù)是要增減多少。
- 方法:
func AddInt32(addr *int32, delta int32) (new int32) func AddInt64(addr *int64, delta int64) (new int64) func AddUint32(addr *uint32, delta uint32) (new uint32) func AddUint64(addr *uint64, delta uint64) (new uint64) func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
int32
和 int64
的第二個(gè)參數(shù)可以是負(fù)數(shù),這樣就可以做原子減法了。
比較并交換(CompareAndSwap)
也就是我們常見(jiàn)的 CAS
,在 CAS
操作中,會(huì)需要拿舊的值跟 old
比較,如果相等,就將 new
賦值給 addr
。 如果不相等,則不做任何操作。最后返回一個(gè) bool
值,表示是否成功 swap
。
也就是說(shuō),這個(gè)操作可能是不成功的。這很正常,在并發(fā)環(huán)境下,多個(gè)協(xié)程對(duì)同一個(gè)變量進(jìn)行操作,肯定會(huì)存在競(jìng)爭(zhēng)的情況。 在這種情況下,偶爾的失敗是正常的,我們只需要在失敗的時(shí)候,重新嘗試即可。 因?yàn)樵硬僮餍枰臅r(shí)間往往是比較短的,因此在失敗的時(shí)候,我們可以通過(guò)自旋的方式來(lái)再次進(jìn)行嘗試。
在這種情況下,如果不自旋,那就需要將這個(gè)協(xié)程掛起,等待其他協(xié)程完成操作,然后再次嘗試。這個(gè)過(guò)程相比自旋可能會(huì)更加耗時(shí)。 因?yàn)楹苡锌赡苓@次原子操作不成功,下一次就成功了。如果我們每次都將協(xié)程掛起,那么效率就會(huì)大大降低。
for
+ 原子操作的方式,在 go 的 sync
包中很多地方都有使用,比如 sync.Map
,sync.Pool
等。 這也是使用原子操作時(shí)一個(gè)非常常見(jiàn)的使用模式。
CompareAndSwap
的功能:
- 用于比較并交換的原子操作,函數(shù)名以
CompareAndSwap
為前綴,后綴針對(duì)特定類型的名稱。 - 原子比較并交換被操作的類型可以是數(shù)值類型或指針類型,即
int32
、int64
、uint32
、uint64
、uintptr
、unsafe.Pointer
- 原子比較并交換函數(shù)的第一個(gè)參數(shù)為原值指針,第二個(gè)參數(shù)是要比較的值,第三個(gè)參數(shù)是要交換的值。
- 方法:
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool) func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool) func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool) func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool) func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool) func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
載入(Load)
原子性的讀取操作接受一個(gè)對(duì)應(yīng)類型的指針值,返回該指針指向的值。原子性讀取意味著讀取值的同時(shí),當(dāng)前計(jì)算機(jī)的任何 CPU 都不會(huì)進(jìn)行針對(duì)值的讀寫操作。
如果不使用原子 Load
,當(dāng)使用 v := value
這種賦值方式為變量 v
賦值時(shí),讀取到的 value
可能不是最新的,因?yàn)樵谧x取操作時(shí)其他協(xié)程對(duì)它的讀寫操作可能會(huì)同時(shí)發(fā)生。
Load 操作有下面這些:
func LoadInt32(addr *int32) (val int32) func LoadInt64(addr *int64) (val int64) func LoadUint32(addr *uint32) (val uint32) func LoadUint64(addr *uint64) (val uint64) func LoadUintptr(addr *uintptr) (val uintptr) func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
存儲(chǔ)(Store)
Store
可以將 val
值保存到 *addr
中,Store
操作是原子性的,因此在執(zhí)行 Store
操作時(shí),當(dāng)前計(jì)算機(jī)的任何 CPU 都不會(huì)進(jìn)行針對(duì) *addr
的讀寫操作。
- 原子性存儲(chǔ)會(huì)將
val
值保存到*addr
中。 - 與讀操作對(duì)應(yīng)的寫入操作,
sync/atomic
提供了與原子值載入Load
函數(shù)相對(duì)應(yīng)的原子值存儲(chǔ)Store
函數(shù),原子性存儲(chǔ)函數(shù)均以Store
為前綴。
Store
操作有下面這些:
func StoreInt32(addr *int32, val int32) func StoreInt64(addr *int64, val int64) func StoreUint32(addr *uint32, val uint32) func StoreUint64(addr *uint64, val uint64) func StoreUintptr(addr *uintpre, val uintptr) func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
交換(Swap)
Swap
跟 Store
有點(diǎn)類似,但是它會(huì)返回 *addr
的舊值。
func SwapInt32(addr *int32, new int32) (old int32) func SwapInt64(addr *int64, new int64) (old int64) func SwapUint32(addr *uint32, new uint32) (old uint32) func SwapUint64(addr *uint64, new uint64) (old uint64) func SwapUintptr(addr *uintptr, new uintptr) (old uintptr) func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
原子操作任意類型的值 - atomic.Value
從上一節(jié)中,我們知道了在 go 中原子操作可以操作 int32
、int64
、uint32
、uint64
、uintptr
、unsafe.Pointer
這些類型的值。 但是在實(shí)際開(kāi)發(fā)中,我們的類型還有很多,比如 string
、struct
等等,那這些類型的值如何進(jìn)行原子操作呢?答案是使用 atomic.Value
。
atomic.Value
是一個(gè)結(jié)構(gòu)體,它的內(nèi)部有一個(gè) any
類型的字段,存儲(chǔ)了我們要原子操作的值,也就是一個(gè)任意類型的值。
atomic.Value
支持以下操作:
Load
:原子性的讀取Value
中的值。Store
:原子性的存儲(chǔ)一個(gè)值到Value
中。Swap
:原子性的交換Value
中的值,返回舊值。CompareAndSwap
:原子性的比較并交換Value
中的值,如果舊值和old
相等,則將new
存入Value
中,返回true
,否則返回false
。
atomic.Value
的這些操作跟上面講到的那些操作其實(shí)差不多,只不過(guò) atomic.Value
可以操作任意類型的值。 那 atomic.Value
是如何實(shí)現(xiàn)的呢?
atomic.Value 源碼分析
atomic.Value
是一個(gè)結(jié)構(gòu)體,這個(gè)結(jié)構(gòu)體只有一個(gè)字段:
// Value 提供一致類型值的原子加載和存儲(chǔ)。 type Value struct { v any }
Load - 讀取
Load
返回由最近的 Store
設(shè)置的值。如果還沒(méi)有 Store
過(guò)任何值,則返回 nil
。
// Load 返回由最近的 Store 設(shè)置的值。 func (v *Value) Load() (val any) { // atomic.Value 轉(zhuǎn)換為 efaceWords vp := (*efaceWords)(unsafe.Pointer(v)) // 判斷 atomic.Value 的類型 typ := LoadPointer(&vp.typ) // 第一次 Store 還沒(méi)有完成,直接返回 nil if typ == nil || typ == unsafe.Pointer(&firstStoreInProgress) { // firstStoreInProgress 是一個(gè)特殊的變量,存儲(chǔ)到 typ 中用來(lái)表示第一次 Store 還沒(méi)有完成 return nil } // 獲取 atomic.Value 的值 data := LoadPointer(&vp.data) // 將 val 轉(zhuǎn)換為 efaceWords 類型 vlp := (*efaceWords)(unsafe.Pointer(&val)) // 分別賦值給 val 的 typ 和 data vlp.typ = typ vlp.data = data return }
在 atomic.Value
的源碼中,我們都可以看到 efaceWords
的身影,它實(shí)際上代表的是 interface{}/any
類型:
// 表示一個(gè) interface{}/any 類型 type efaceWords struct { typ unsafe.Pointer data unsafe.Pointer }
看到這里我們會(huì)不會(huì)覺(jué)得很困惑,直接返回 val
不就可以了嗎?為什么要將 val
轉(zhuǎn)換為 efaceWords
類型呢?
這是因?yàn)?go 中的原子操作只能操作 int32
、int64
、uint32
、uint64
、uintptr
、unsafe.Pointer
這些類型的值, 不支持 interface{}
類型,但是如果了解 interface{}
底層結(jié)構(gòu)的話,我們就知道 interface{}
底層其實(shí)就是一個(gè)結(jié)構(gòu)體, 它有兩個(gè)字段,一個(gè)是 type
,一個(gè)是 data
,type
用來(lái)存儲(chǔ) interface{}
的類型,data
用來(lái)存儲(chǔ) interface{}
的值。 而且這兩個(gè)字段都是 unsafe.Pointer
類型的,所以其實(shí)我們可以對(duì) interface{}
的 type
和 data
分別進(jìn)行原子操作, 這樣最終其實(shí)也可以達(dá)到了原子操作 interface{}
的目的了,是不是非常地巧妙呢?
Store - 存儲(chǔ)
Store
將 Value
的值設(shè)置為 val
。對(duì)給定值的所有存儲(chǔ)調(diào)用必須使用相同具體類型的值。不一致類型的存儲(chǔ)會(huì)發(fā)生恐慌,Store(nil)
也會(huì) panic
。
// Store 將 Value 的值設(shè)置為 val。 func (v *Value) Store(val any) { // 不能存儲(chǔ) nil 值 if val == nil { panic("sync/atomic: store of nil value into Value") } // atomic.Value 轉(zhuǎn)換為 efaceWords vp := (*efaceWords)(unsafe.Pointer(v)) // val 轉(zhuǎn)換為 efaceWords vlp := (*efaceWords)(unsafe.Pointer(&val)) // 自旋進(jìn)行原子操作,這個(gè)過(guò)程不會(huì)很久,開(kāi)銷相比互斥鎖小 for { // LoadPointer 可以保證獲取到的是最新的 typ := LoadPointer(&vp.typ) // 第一次 store 的時(shí)候 typ 還是 nil,說(shuō)明是第一次 store if typ == nil { // 嘗試開(kāi)始第一次 Store。 // 禁用搶占,以便其他 goroutines 可以自旋等待完成。 // (如果允許搶占,那么其他 goroutine 自旋等待的時(shí)間可能會(huì)比較長(zhǎng),因?yàn)榭赡軙?huì)需要進(jìn)行協(xié)程調(diào)度。) runtime_procPin() // 搶占失敗,意味著有其他 goroutine 成功 store 了,允許搶占,再次嘗試 Store // 這也是一個(gè)原子操作。 if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) { runtime_procUnpin() continue } // 完成第一次 store // 因?yàn)橛?firstStoreInProgress 標(biāo)識(shí)的保護(hù),所以下面的兩個(gè)原子操作是安全的。 StorePointer(&vp.data, vlp.data) // 存儲(chǔ)值(原子操作) StorePointer(&vp.typ, vlp.typ) // 存儲(chǔ)類型(原子操作) runtime_procUnpin() // 允許搶占 return } // 另外一個(gè) goroutine 正在進(jìn)行第一次 Store。自旋等待。 if typ == unsafe.Pointer(&firstStoreInProgress) { continue } // 第一次 Store 已經(jīng)完成了,下面不是第一次 Store 了。 // 需要檢查當(dāng)前 Store 的類型跟第一次 Store 的類型是否一致,不一致就 panic。 if typ != vlp.typ { panic("sync/atomic: store of inconsistently typed value into Value") } // 后續(xù)的 Store 只需要 Store 值部分就可以了。 // 因?yàn)?atomic.Value 只能保存一種類型的值。 StorePointer(&vp.data, vlp.data) return } }
在 Store
中,有以下幾個(gè)注意的點(diǎn):
- 使用
firstStoreInProgress
來(lái)確保第一次Store
的時(shí)候,只有一個(gè)goroutine
可以進(jìn)行Store
操作,其他的goroutine
需要自旋等待。如果沒(méi)有這個(gè)保護(hù),那么存儲(chǔ)typ
和data
的時(shí)候就會(huì)出現(xiàn)競(jìng)爭(zhēng)(因?yàn)樾枰獌蓚€(gè)原子操作),導(dǎo)致數(shù)據(jù)不一致。在這里其實(shí)可以將firstStoreInProgress
看作是一個(gè)互斥鎖。 - 在進(jìn)行第一次
Store
的時(shí)候,會(huì)將當(dāng)前的 goroutine 和P
綁定,這樣拿到firstStoreInProgress
鎖的協(xié)程就可以盡快地完成第一次Store
操作,這樣一來(lái),其他的協(xié)程也不用等待太久。 - 在第一次
Store
的時(shí)候,會(huì)有兩個(gè)原子操作,分別存儲(chǔ)類型和值,但是因?yàn)橛?firstStoreInProgress
的保護(hù),所以這兩個(gè)原子操作本質(zhì)上是對(duì)interface{}
的一個(gè)原子存儲(chǔ)操作。 - 其他協(xié)程在看到有
firstStoreInProgress
標(biāo)識(shí)的時(shí)候,就會(huì)自旋等待,直到第一次Store
完成。 - 在后續(xù)的
Store
操作中,只需要存儲(chǔ)值就可以了,因?yàn)?atomic.Value
只能保存一種類型的值。
Swap - 交換
Swap
將 Value
的值設(shè)置為 new
并返回舊值。對(duì)給定值的所有交換調(diào)用必須使用相同具體類型的值。同時(shí),不一致類型的交換會(huì)發(fā)生恐慌,Swap(nil)
也會(huì) panic
。
// Swap 將 Value 的值設(shè)置為 new 并返回舊值。 func (v *Value) Swap(new any) (old any) { // 不能存儲(chǔ) nil 值 if new == nil { panic("sync/atomic: swap of nil value into Value") } // atomic.Value 轉(zhuǎn)換為 efaceWords vp := (*efaceWords)(unsafe.Pointer(v)) // new 轉(zhuǎn)換為 efaceWords np := (*efaceWords)(unsafe.Pointer(&new)) // 自旋進(jìn)行原子操作,這個(gè)過(guò)程不會(huì)很久,開(kāi)銷相比互斥鎖小 for { // 下面這部分代碼跟 Store 一樣,不細(xì)說(shuō)了。 // 這部分代碼是進(jìn)行第一次存儲(chǔ)的代碼。 typ := LoadPointer(&vp.typ) if typ == nil { runtime_procPin() if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) { runtime_procUnpin() continue } StorePointer(&vp.data, np.data) StorePointer(&vp.typ, np.typ) runtime_procUnpin() return nil } if typ == unsafe.Pointer(&firstStoreInProgress) { continue } if typ != np.typ { panic("sync/atomic: swap of inconsistently typed value into Value") } // ---- 下面是 Swap 的特有邏輯 ---- // op 是返回值 op := (*efaceWords)(unsafe.Pointer(&old)) // 返回舊的值 op.typ, op.data = np.typ, SwapPointer(&vp.data, np.data) return old } }
CompareAndSwap - 比較并交換
CompareAndSwap
將 Value
的值與 old
比較,如果相等則設(shè)置為 new
并返回 true
,否則返回 false
。 對(duì)給定值的所有比較和交換調(diào)用必須使用相同具體類型的值。同時(shí),不一致類型的比較和交換會(huì)發(fā)生恐慌,CompareAndSwap(nil, nil)
也會(huì) panic
。
// CompareAndSwap 比較并交換。 func (v *Value) CompareAndSwap(old, new any) (swapped bool) { // 注意:old 是可以為 nil 的,new 不能為 nil。 // old 是 nil 表示是第一次進(jìn)行 Store 操作。 if new == nil { panic("sync/atomic: compare and swap of nil value into Value") } // atomic.Value 轉(zhuǎn)換為 efaceWords vp := (*efaceWords)(unsafe.Pointer(v)) // new 轉(zhuǎn)換為 efaceWords np := (*efaceWords)(unsafe.Pointer(&new)) // old 轉(zhuǎn)換為 efaceWords op := (*efaceWords)(unsafe.Pointer(&old)) // old 和 new 類型必須一致,且不能為 nil if op.typ != nil && np.typ != op.typ { panic("sync/atomic: compare and swap of inconsistently typed values") } // 自旋進(jìn)行原子操作,這個(gè)過(guò)程不會(huì)很久,開(kāi)銷相比互斥鎖小 for { // LoadPointer 可以保證獲取到的 typ 是最新的 typ := LoadPointer(&vp.typ) if typ == nil { // atomic.Value 是 nil,還沒(méi) Store 過(guò) // 準(zhǔn)備進(jìn)行第一次 Store,但是傳遞進(jìn)來(lái)的 old 不是 nil,compare 這一步就失敗了。直接返回 false if old != nil { return false } // 下面這部分代碼跟 Store 一樣,不細(xì)說(shuō)了。 // 這部分代碼是進(jìn)行第一次存儲(chǔ)的代碼。 runtime_procPin() if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) { runtime_procUnpin() continue } StorePointer(&vp.data, np.data) StorePointer(&vp.typ, np.typ) runtime_procUnpin() return true } if typ == unsafe.Pointer(&firstStoreInProgress) { continue } if typ != np.typ { panic("sync/atomic: compare and swap of inconsistently typed value into Value") } // 通過(guò)運(yùn)行時(shí)相等性檢查比較舊版本和當(dāng)前版本。 // 這允許對(duì)值類型進(jìn)行比較,這是包函數(shù)所沒(méi)有的。 // 下面的 CompareAndSwapPointer 僅確保 vp.data 自 LoadPointer 以來(lái)沒(méi)有更改。 data := LoadPointer(&vp.data) var i any (*efaceWords)(unsafe.Pointer(&i)).typ = typ (*efaceWords)(unsafe.Pointer(&i)).data = data if i != old { // atomic.Value 跟 old 不相等 return false } // 只做 val 部分的 cas 操作 return CompareAndSwapPointer(&vp.data, data, np.data) } }
這里需要特別說(shuō)明的只有最后那個(gè)比較相等的判斷,也就是 data := LoadPointer(&vp.data)
以及往后的幾行代碼。 在開(kāi)發(fā) atomic.Value
第一版的時(shí)候,那個(gè)開(kāi)發(fā)者其實(shí)是將這幾行寫成 CompareAndSwapPointer(&vp.data, old.data, np.data)
這種形式的。 但是在舊的寫法中,會(huì)存在一個(gè)問(wèn)題,如果我們做 CAS
操作的時(shí)候,如果傳遞的參數(shù) old
是一個(gè)結(jié)構(gòu)體的值這種類型,那么這個(gè)結(jié)構(gòu)體的值是會(huì)被拷貝一份的, 同時(shí)再會(huì)被轉(zhuǎn)換為 interface{}/any
類型,這個(gè)過(guò)程中,其實(shí)參數(shù)的 old
的 data
部分指針指向的內(nèi)存跟 vp.data
指向的內(nèi)存是不一樣的。 這樣的話,CAS
操作就會(huì)失敗,這個(gè)時(shí)候就會(huì)返回 false
,但是我們本意是要比較它的值,出現(xiàn)這種結(jié)果顯然不是我們想要的。
將值作為 interface{}
參數(shù)使用的時(shí)候,會(huì)存在一個(gè)將值轉(zhuǎn)換為 interface{}
的過(guò)程。具體我們可以看看 interface{}
的實(shí)現(xiàn)原理。
所以,在上面的實(shí)現(xiàn)中,會(huì)將舊值的 typ
和 data
賦值給一個(gè) any
類型的變量, 然后使用 i != old
這種方式進(jìn)行判斷,這樣就可以實(shí)現(xiàn)在比較的時(shí)候,比較的是值,而不是由值轉(zhuǎn)換為 interface{}
后的指針。
其他原子類型
我們現(xiàn)在知道了,atomic.Value
可以對(duì)任意類型做原子操作。 而對(duì)于其他的原子類型,比如 int32
、int64
、uint32
、uint64
、uintptr
、unsafe.Pointer
等, 其實(shí)在 go 中也提供了包裝的類型,讓我們可以以對(duì)象的方式來(lái)操作這些類型。
對(duì)應(yīng)的類型如下:
atomic.Bool
:這個(gè)比較特別,但底層實(shí)際上是一個(gè)uint32
類型的值。我們對(duì)atomic.Bool
做原子操作的時(shí)候,實(shí)際上是對(duì)uint32
做原子操作。atomic.Int32
:int32
類型的包裝類型atomic.Int64
:int64
類型的包裝類型atomic.Uint32
:uint32
類型的包裝類型atomic.Uint64
:uint64
類型的包裝類型atomic.Uintptr
:uintptr
類型的包裝類型atomic.Pointer
:unsafe.Pointer
類型的包裝類型
這幾種類型的實(shí)現(xiàn)的代碼基本一樣,除了類型不一樣,我們可以看看 atomic.Int32
的實(shí)現(xiàn):
// An Int32 is an atomic int32. The zero value is zero. type Int32 struct { _ noCopy v int32 } // Load atomically loads and returns the value stored in x. func (x *Int32) Load() int32 { return LoadInt32(&x.v) } // Store atomically stores val into x. func (x *Int32) Store(val int32) { StoreInt32(&x.v, val) } // Swap atomically stores new into x and returns the previous value. func (x *Int32) Swap(new int32) (old int32) { return SwapInt32(&x.v, new) } // CompareAndSwap executes the compare-and-swap operation for x. func (x *Int32) CompareAndSwap(old, new int32) (swapped bool) { return CompareAndSwapInt32(&x.v, old, new) }
可以看到,atomic.Int32
的實(shí)現(xiàn)都是基于 atomic
包中 int32
類型相關(guān)的原子操作函數(shù)來(lái)實(shí)現(xiàn)的。
原子操作與互斥鎖比較
那我們有了互斥鎖,為什么還要有原子操作呢?我們進(jìn)行比較一下就知道了:
原子操作 | 互斥鎖 | |
---|---|---|
保護(hù)的范圍 | 變量 | 代碼塊 |
保護(hù)的粒度 | 小 | 大 |
性能 | 高 | 低 |
如何實(shí)現(xiàn)的 | 硬件指令 | 軟件層面實(shí)現(xiàn),邏輯較多 |
如果我們只需要對(duì)某一個(gè)變量做并發(fā)讀寫,那么使用原子操作就可以了,因?yàn)樵硬僮鞯男阅鼙然コ怄i高很多。 但是如果我們需要對(duì)多個(gè)變量做并發(fā)讀寫,那么就需要用到互斥鎖了,這種場(chǎng)景往往是在一段代碼中對(duì)不同變量做讀寫。
性能比較
我們前面這個(gè)表格提到了原子操作與互斥鎖性能上有差異,我們寫幾行代碼來(lái)進(jìn)行比較一下:
// 系統(tǒng)信息 cpu: Intel(R) Core(TM) i7-8700 CPU @ 3.20GHz // 10.13 ns/op func BenchmarkMutex(b *testing.B) { var mu sync.Mutex for i := 0; i < b.N; i++ { mu.Lock() mu.Unlock() } } // 5.849 ns/op func BenchmarkAtomic(b *testing.B) { var sum atomic.Uint64 for i := 0; i < b.N; i++ { sum.Add(uint64(1)) } }
在對(duì) Mutex
的性能測(cè)試中,我只是寫了簡(jiǎn)單的 Lock()
和 UnLock()
操作,因?yàn)檫@種比較才算是對(duì) Mutex
本身的測(cè)試,而在 Atomic
的性能測(cè)試中,對(duì) sum
做原子累加的操作。最終結(jié)果是,使用 Atomic
的操作耗時(shí)大概比 Mutex
少了 40%
以上。
在實(shí)際開(kāi)發(fā)中,Mutex
保護(hù)的臨界區(qū)內(nèi)往往有更多操作,也就意味著 Mutex
鎖需要耗費(fèi)更長(zhǎng)的時(shí)間才能釋放,也就是會(huì)需要耗費(fèi)比上面這個(gè) 40%
還要多的時(shí)間另外一個(gè)協(xié)程才能獲取到 Mutex
鎖。
go 的 sync 包中的原子操作
在文章的開(kāi)頭,我們就說(shuō)了,在 go 的 sync.Map
和 sync.Pool
中都有用到了原子操作,本節(jié)就來(lái)看一看這些操作。
sync.Map 中的原子操作
在 sync.Map
中使用到了一個(gè) entry
結(jié)構(gòu)體,這個(gè)結(jié)構(gòu)體中大部分操作都是原子操作,我們可以看看它下面這兩個(gè)方法的定義:
// 刪除 entry func (e *entry) delete() (value any, ok bool) { for { p := e.p.Load() // 已經(jīng)被刪除了,不需要再刪除 if p == nil || p == expunged { return nil, false } // 刪除成功 if e.p.CompareAndSwap(p, nil) { return *p, true } } } // 如果條目尚未刪除,trySwap 將交換一個(gè)值。 func (e *entry) trySwap(i *any) (*any, bool) { for { p := e.p.Load() // 已經(jīng)被刪除了 if p == expunged { return nil, false } // swap 成功 if e.p.CompareAndSwap(p, i) { return p, true } } }
我們可以看到一個(gè)非常典型的特征就是 for
+ CompareAndSwap
的組合,這個(gè)組合在 entry
中出現(xiàn)了很多次。
如果我們也需要對(duì)變量做并發(fā)讀寫,也可以嘗試一下這種 for + CompareAndSwap 的組合。
sync.WaitGroup 中的原子操作
在 sync.WaitGroup
中有一個(gè)類型為 atomic.Uint64
的 state
字段,這個(gè)變量是用來(lái)記錄 WaitGroup
的狀態(tài)的。 在實(shí)際使用中,它的高 32 位用來(lái)記錄 WaitGroup
的計(jì)數(shù)器,低 32 位用來(lái)記錄 WaitGroup
的 Waiter
的數(shù)量,也就是等待條件變量滿足的協(xié)程數(shù)量。
如果不使用一個(gè)變量來(lái)記錄這兩個(gè)值,那么我們就需要使用兩個(gè)變量來(lái)記錄,這樣就會(huì)導(dǎo)致我們需要對(duì)兩個(gè)變量做并發(fā)讀寫, 在這種情況下,我們就需要使用互斥鎖來(lái)保護(hù)這兩個(gè)變量,這樣就會(huì)導(dǎo)致性能的下降。
而使用一個(gè)變量來(lái)記錄這兩個(gè)值,我們就可以使用原子操作來(lái)保護(hù)這個(gè)變量,這樣就可以保證并發(fā)讀寫的安全性,同時(shí)也能得到更好的性能:
// WaitGroup 的 Add 函數(shù):高 32 位加上 delta state := wg.state.Add(uint64(delta) << 32) // WaitGroup 的 Wait 函數(shù):低 32 位加 1 // 等待者的數(shù)量加 1 wg.state.CompareAndSwap(state, state+1)
CAS 操作有失敗必然有成功
當(dāng)然這里是指指向同一行 CAS
代碼的時(shí)候(也就是有競(jìng)爭(zhēng)的時(shí)候),如果是指向不同行 CAS
代碼的時(shí)候,那么就不一定了。 比如下面這個(gè)例子,我們把前面計(jì)算 sum
的例子改一改,改成用 CAS
操作來(lái)完成:
func TestCas(t *testing.T) { var sum int32 = 0 var wg sync.WaitGroup wg.Add(1000) for i := 0; i < 1000; i++ { go func() { defer wg.Done() // 這一行是有可能會(huì)失敗的 atomic.CompareAndSwapInt32(&sum, sum, sum+1) }() } wg.Wait() fmt.Println(sum) // 不是 1000 }
在這個(gè)例子中,我們把 atomic.AddInt32(&sum, 1)
改成了 atomic.CompareAndSwapInt32(&sum, sum, sum+1)
, 這樣就會(huì)導(dǎo)致有可能會(huì)有多個(gè) goroutine 同時(shí)執(zhí)行到 atomic.CompareAndSwapInt32(&sum, sum, sum+1)
這一行代碼, 這樣肯定會(huì)有不同的 goroutine 同時(shí)拿到一個(gè)相同的 sum
的舊值,那么在這種情況下,就會(huì)導(dǎo)致 CAS
操作失敗。 也就是說(shuō),將 sum
替換為 sum + 1
的操作可能會(huì)失敗。
失敗意味著什么呢?意味著另外一個(gè)協(xié)程序先把 sum
的值加 1 了,這個(gè)時(shí)候其實(shí)我們不應(yīng)該在舊的 sum
上加 1 了, 而是應(yīng)該在最新的 sum
上加上 1,那我們應(yīng)該怎么做呢?我們可以在 CAS
操作失敗的時(shí)候,重新獲取 sum
的值, 然后再次嘗試 CAS
操作,直到成功為止:
func TestCas(t *testing.T) { var sum int32 = 0 var wg sync.WaitGroup wg.Add(1000) for i := 0; i < 1000; i++ { go func() { defer wg.Done() // cas 失敗的時(shí)候,重新獲取 sum 的值進(jìn)行計(jì)算。 // cas 成功則返回。 for { if atomic.CompareAndSwapInt32(&sum, sum, sum+1) { return } } }() } wg.Wait() fmt.Println(sum) }
總結(jié)
原子操作是并發(fā)編程中非常重要的一個(gè)概念,它可以保證并發(fā)讀寫的安全性,同時(shí)也能得到更好的性能。
最后,總結(jié)一下本文講到的內(nèi)容:
- 原子操作是更加底層的操作,它保護(hù)的是單個(gè)變量,而互斥鎖可以保護(hù)一個(gè)代碼片段,它們的使用場(chǎng)景是不一樣的。
- 原子操作需要通過(guò) CPU 指令來(lái)實(shí)現(xiàn),而互斥鎖是在軟件層面實(shí)現(xiàn)的。
- go 里面的原子操作有以下這些:
Add
:原子增減CompareAndSwap
:原子比較并交換Load
:原子讀取Store
:原子寫入Swap
:原子交換
- go 里面所有類型都能使用原子操作,只是不同類型的原子操作使用的函數(shù)不太一樣。
atomic.Value
可以用來(lái)原子操作任意類型的變量。- go 里面有些底層實(shí)現(xiàn)也使用了原子操作,比如:
sync.WaitGroup
:使用原子操作來(lái)保證計(jì)數(shù)器和等待者數(shù)量的并發(fā)讀寫安全性。sync.Map
:entry
結(jié)構(gòu)體中基本所有操作都有原子操作的身影。
- 原子操作有失敗必然有成功(說(shuō)的是同一行
CAS
操作),如果CAS
操作失敗了,那么我們可以重新獲取舊值,然后再次嘗試CAS
操作,直到成功為止。
總的來(lái)說(shuō),原子操作本身其實(shí)沒(méi)有太復(fù)雜的邏輯,我們理解了它的原理之后,就可以很容易的使用它了。
作者:eleven26
鏈接:https://juejin.cn/post/7214060626519719997
來(lái)源:稀土掘金
著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。
以上就是go 原子操作的方式及實(shí)現(xiàn)原理深入解析的詳細(xì)內(nèi)容,更多關(guān)于go 原子操作方式原理的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go語(yǔ)言fmt庫(kù)詳解與應(yīng)用實(shí)例(格式化輸入輸出功能)
fmt庫(kù)是Go語(yǔ)言中一個(gè)強(qiáng)大而靈活的庫(kù),提供了豐富的格式化輸入輸出功能,通過(guò)本文的介紹和實(shí)例演示,相信你對(duì)fmt庫(kù)的使用有了更深的理解,感興趣的朋友一起看看吧2023-10-10golang實(shí)現(xiàn)一個(gè)簡(jiǎn)單的websocket聊天室功能
這篇文章主要介紹了golang實(shí)現(xiàn)一個(gè)簡(jiǎn)單的websocket聊天室功能,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-10-10golang實(shí)現(xiàn)多協(xié)程下載文件(支持?jǐn)帱c(diǎn)續(xù)傳)
本文主要介紹了golang實(shí)現(xiàn)多協(xié)程下載文件,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-11-11go語(yǔ)言數(shù)據(jù)類型之字符串string
這篇文章介紹了go語(yǔ)言數(shù)據(jù)類型之字符串string,文中通過(guò)示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-07-07Golang處理gRPC請(qǐng)求/響應(yīng)元數(shù)據(jù)的示例代碼
前段時(shí)間實(shí)現(xiàn)內(nèi)部gRPC框架時(shí),為了實(shí)現(xiàn)在服務(wù)端攔截器中打印請(qǐng)求及響應(yīng)的頭部信息,便查閱了部分關(guān)于元數(shù)據(jù)的資料,因?yàn)橹形木W(wǎng)絡(luò)上對(duì)于該領(lǐng)域的信息較少,于是在這做了一些簡(jiǎn)單的總結(jié),需要的朋友可以參考下2024-03-03Go?并發(fā)編程協(xié)程及調(diào)度機(jī)制詳情
這篇文章主要介紹了Go并發(fā)編程協(xié)程及調(diào)度機(jī)制詳情,協(xié)程是Go語(yǔ)言最大的特色之一,goroutine的實(shí)現(xiàn)其實(shí)是通過(guò)協(xié)程,更多相關(guān)內(nèi)容需要的朋友可以參考一下2022-09-09