Go語言atomic.Value如何不加鎖保證數(shù)據(jù)線程安全?
引言
很多人可能沒有注意過,在 Go(甚至是大部分語言)中,一條普通的賦值語句其實不是一個原子操作。例如,在32位機器上寫int64類型的變量就會有中間狀態(tài),它會被拆成兩次寫操作(匯編的MOV指令)——寫低 32 位和寫高 32 位。32機器上對int64進行賦值
如果一個線程剛寫完低32位,還沒來得及寫高32位時,另一個線程讀取了這個變量,那它得到的就是一個毫無邏輯的中間變量,這很有可能使我們的程序出現(xiàn)Bug。
這還只是一個基礎類型,如果我們對一個結構體進行賦值,那它出現(xiàn)并發(fā)問題的概率就更高了。很可能寫線程剛寫完一小半的字段,讀線程就來讀取這個變量,那么就只能讀到僅修改了一部分的值。這顯然破壞了變量的完整性,讀出來的值也是完全錯誤的。
面對這種多線程下變量的讀寫問題,Go給出的解決方案是atomic.Value,它使得我們可以不依賴于不保證兼容性的unsafe.Pointer類型,同時又能將任意數(shù)據(jù)類型的讀寫操作封裝成原子性操作。
atomic.Value的使用方式
atomic.Value類型對外提供了兩個讀寫方法:
v.Store(c)- 寫操作,將原始的變量c存放到一個atomic.Value類型的v里。c := v.Load()- 讀操作,從內(nèi)存中線程安全的v中讀取上一步存放的內(nèi)容。
下面是一個簡單的例子演示atomic.Value的用法。
type Rectangle struct {
length int
width int
}
var rect atomic.Value
func update(width, length int) {
rectLocal := new(Rectangle)
rectLocal.width = width
rectLocal.length = length
rect.Store(rectLocal)
}
func main() {
wg := sync.WaitGroup{}
wg.Add(10)
// 10 個協(xié)程并發(fā)更新
for i := 0; i < 10; i++ {
go func(i int) {
defer wg.Done()
update(i, i+5)
}(i)
}
wg.Wait()
r := rect.Load().(*Rectangle)
fmt.Printf("rect.width=%d\nrect.length=%d\n", r.width, r.length)
}你可能會好奇,為什么atomic.Value在不加鎖的情況下就提供了讀寫變量的線程安全保證,接下來我們就一起看看其內(nèi)部實現(xiàn)。
atomic.Value的內(nèi)部實現(xiàn)
atomic.Value被設計用來存儲任意類型的數(shù)據(jù),所以它內(nèi)部的字段是一個interface{}類型。
// A Value provides an atomic load and store of a consistently typed value.
// The zero value for a Value returns nil from Load.
// Once Store has been called, a Value must not be copied.
//
// A Value must not be copied after first use.
type Value struct {
v interface{}
}除了Value外,atomic包內(nèi)部定義了一個ifaceWords類型,這其實是interface{}的內(nèi)部表示 (runtime.eface),它的作用是將interface{}類型分解,得到其原始類型(typ)和真正的值(data)。
// ifaceWords is interface{} internal representation.
type ifaceWords struct {
typ unsafe.Pointer
data unsafe.Pointer
}寫入線程安全的保證
直接來看代碼
// Store sets the value of the Value to x.
// All calls to Store for a given Value must use values of the same concrete type.
// Store of an inconsistent type panics, as does Store(nil).
func (v *Value) Store(val interface{}) {
if val == nil {
panic("sync/atomic: store of nil value into Value")
}
// 通過unsafe.Pointer將現(xiàn)有的(v)和要寫入的值(val) 分別轉(zhuǎn)成ifaceWords類型。
// 這樣我們下一步就可以得到這兩個interface{}的原始類型(typ)和真正的值(data)。
vp := (*ifaceWords)(unsafe.Pointer(v))
vlp := (*ifaceWords)(unsafe.Pointer(&val))
for {
typ := LoadPointer(&vp.typ)
if typ == nil {
// Attempt to start first store.
// Disable preemption so that other goroutines can use
// active spin wait to wait for completion; and so that
// GC does not see the fake type accidentally.
runtime_procPin()
if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
runtime_procUnpin()
continue
}
// Complete first store.
StorePointer(&vp.data, vlp.data)
StorePointer(&vp.typ, vlp.typ)
runtime_procUnpin()
return
}
if uintptr(typ) == ^uintptr(0) {
// First store in progress. Wait.
// Since we disable preemption around the first store,
// we can wait with active spinning.
continue
}
// First store completed. Check type and overwrite data.
if typ != vlp.typ {
panic("sync/atomic: store of inconsistently typed value into Value")
}
StorePointer(&vp.data, vlp.data)
return
}
}大概的邏輯:
- 開始就是一個無限 for 循環(huán)。配合
CompareAndSwap使用,可以達到樂觀鎖的效果。 - 通過
LoadPointer這個原子操作拿到當前Value中存儲的類型。下面根據(jù)這個類型的不同,分3種情況處理。
第一次寫入
一個
atomic.Value實例被初始化后,它的typ和data字段會被設置為指針的零值 nil,所以先判斷如果typ是否為nil,如果是那就證明這個Value實例還未被寫入過數(shù)據(jù)。那之后就是一段初始寫入的操作:runtime_procPin()這是runtime中的一段函數(shù),一方面它禁止了調(diào)度器對當前 goroutine 的搶占(preemption),使得它在執(zhí)行當前邏輯的時候不被其他goroutine打斷,以便可以盡快地完成工作。另一方面,在禁止搶占期間,GC 線程也無法被啟用,這樣可以防止 GC 線程看到一個莫名其妙的指向^uintptr(0)的類型(這是賦值過程中的中間狀態(tài))。1)使用
CAS操作,先嘗試將typ設置為^uintptr(0)這個中間狀態(tài)。如果失敗,則證明已經(jīng)有別的線程搶先完成了賦值操作,那它就解除搶占鎖,然后重新回到 for 循環(huán)第一步。2)如果設置成功,那證明當前線程搶到了這個"樂觀鎖”,它可以安全的把
v設為傳入的新值了。注意,這里是先寫data字段,然后再寫typ字段。因為我們是以typ字段的值作為寫入完成與否的判斷依據(jù)的第一次寫入還未完成
如果看到
typ字段還是^uintptr(0)這個中間類型,證明剛剛的第一次寫入還沒有完成,所以它會繼續(xù)循環(huán),一直等到第一次寫入完成。第一次寫入已完成
首先檢查上一次寫入的類型與這一次要寫入的類型是否一致,如果不一致則拋出異常。反之,則直接把這一次要寫入的值寫入到
data字段。
這個邏輯的主要思想就是,為了完成多個字段的原子性寫入,我們可以抓住其中的一個字段,以它的狀態(tài)來標志整個原子寫入的狀態(tài)。
讀?。↙oad)操作
先上代碼:
// Load returns the value set by the most recent Store.
// It returns nil if there has been no call to Store for this Value.
func (v *Value) Load() (val interface{}) {
vp := (*ifaceWords)(unsafe.Pointer(v))
typ := LoadPointer(&vp.typ)
if typ == nil || uintptr(typ) == ^uintptr(0) {
// First store not yet completed.
return nil
}
data := LoadPointer(&vp.data)
vlp := (*ifaceWords)(unsafe.Pointer(&val))
vlp.typ = typ
vlp.data = data
return
}讀取相對就簡單很多了,它有兩個分支:
- 如果當前的
typ是 nil 或者^uintptr(0),那就證明第一次寫入還沒有開始,或者還沒完成,那就直接返回 nil (不對外暴露中間狀態(tài))。 - 否則,根據(jù)當前看到的
typ和data構造出一個新的interface{}返回出去。
總結
本文由淺入深的介紹了atomic.Value的使用姿勢,以及內(nèi)部實現(xiàn)。另外,原子操作由底層硬件支持,對于一個變量更新的保護,原子操作通常會更有效率,并且更能利用計算機多核的優(yōu)勢,如果要更新的是一個復合對象,則應當使用atomic.Value封裝好的實現(xiàn)。
而我們做并發(fā)同步控制常用到的Mutex鎖,則是由操作系統(tǒng)的調(diào)度器實現(xiàn),鎖應當用來保護一段邏輯。
以上就是Go語言atomic.Value如何不加鎖保證數(shù)據(jù)線程安全?的詳細內(nèi)容,更多關于Go語言atomic.Value如何不加鎖保證數(shù)據(jù)線程安全?的資料請關注腳本之家其它相關文章!

