golang中單機鎖的具體實現(xiàn)詳解
1、鎖的概念引入
首先,為什么需要鎖?
在并發(fā)編程中,多個線程或進程可能同時訪問和修改同一個共享資源(例如變量、數(shù)據(jù)結(jié)構(gòu)、文件)等,若不引入合適的同步機制,會引發(fā)以下問題:
- 數(shù)據(jù)競爭:多個線程同時修改一個資源,最終的結(jié)果跟線程的執(zhí)行順序有關(guān),結(jié)果是不可預(yù)測的。
- 數(shù)據(jù)不一致:一個線程在修改資源,而另一個線程讀取了未修改完的數(shù)據(jù),從而導(dǎo)致讀取了錯誤的數(shù)據(jù)。
- 資源競爭:多線程競爭同一個資源,浪費系統(tǒng)的性能。
因此,我們需要一把鎖,來保證同一時間只有一個人能寫數(shù)據(jù),確保共享資源在并發(fā)訪問下的正確性和一致性。
在這里,引入兩種常見的并發(fā)控制處理機制,即樂觀鎖與悲觀鎖:
樂觀鎖:假定在并發(fā)操作中,資源的搶占并不是很激烈,數(shù)據(jù)被修改的可能性不是很大,那這時候就不需要對共享資源區(qū)進行加鎖再操作,而是先修改了數(shù)據(jù),最終來判斷數(shù)據(jù)有沒有被修改,沒有被修改則提交修改指,否則重試。
悲觀鎖:與樂觀鎖相反,它假設(shè)場景的資源競爭激烈,對共享資源區(qū)的訪問必須要求持有鎖。
針對不同的場景需要采取因地制宜的策略,比較樂觀鎖與悲觀所,它們的優(yōu)缺點顯而易見:
策略 | 優(yōu)點 | 缺點 |
---|---|---|
樂觀鎖 | 不需要實際上鎖,性能高 | 若沖突時,需要重新進行操作,多次重試可能會導(dǎo)致性能下降明顯 |
悲觀鎖 | 訪問數(shù)據(jù)一定需要持有鎖,保證并發(fā)場景下的數(shù)據(jù)正確性 | 加鎖期間,其他等待鎖的線程需要被阻塞,性能低 |
2、Sync.Mutex
Go對單機鎖的實現(xiàn),考慮了實際環(huán)境中協(xié)程對資源競爭程度的變化,制定了一套鎖升級的過程。具體方案如下:
- 首先采取樂觀的態(tài)度,Goroutine會保持自旋態(tài),通過CAS操作嘗試獲取鎖。
- 當多次獲取失敗,將會由樂觀態(tài)度轉(zhuǎn)入悲觀態(tài)度,判定當前并發(fā)資源競爭程度劇烈,進入阻塞態(tài)等待被喚醒。
從樂觀轉(zhuǎn)向悲觀的判定規(guī)則如下,滿足其中之一即發(fā)生轉(zhuǎn)變:
- Goroutine自旋嘗試次數(shù)超過4次
- 當前P的執(zhí)行隊列中存在等待被執(zhí)行的G(避免自旋影響GMP調(diào)度性能)
- CPU是單核的(其他Goroutine執(zhí)行不了,自旋無意義)
除此之外,為了防止被阻塞的協(xié)程等待過長時間也沒有獲取到鎖,導(dǎo)致用戶的整體體驗下降,引入了饑餓的概念:
- 饑餓態(tài):若Goroutine被阻塞等待的時間>1ms,則這個協(xié)程被視為處于饑餓狀態(tài)
- 饑餓模式:表示當前鎖是否處于特定的模式,在該模式下,鎖的交接是公平的,按順序交給等待最久的協(xié)程。
饑餓模式與正常模式的轉(zhuǎn)變規(guī)則如下:
普通模式->饑餓模式:存在阻塞的協(xié)程,阻塞時間超過1ms
饑餓模式->普通模式:阻塞隊列清空,亦或者獲得鎖的協(xié)程的等待時間小于1ms,則恢復(fù)
接下來步入源碼,觀看具體的實現(xiàn)。
2.1、數(shù)據(jù)結(jié)構(gòu)
位于包sync/mutex.go中,對鎖的定義如下:
type Mutex struct { state int32 sema uint32 }
- state:標識目前鎖的狀態(tài)信息,包括了是否處于饑餓模式、是否存在喚醒的阻塞協(xié)程、是否上鎖、以及處于等待鎖的協(xié)程個數(shù)有多少。
- seme:用于阻塞和喚醒協(xié)程的信號量。
將state看作一個二進制字符串,它存儲信息的規(guī)則如下:
- 第一位標識是否處于上鎖,0表示否,1表示上鎖(mutexLocked)
- 第二位標識是否存在喚醒的阻塞協(xié)程(mutexWoken)
- 第三位標識是否處于饑餓模式(mutexStarving)
- 從第四位開始,記錄了處于阻塞態(tài)的協(xié)程個數(shù)
const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexStarving mutexWaiterShift = iota starvationThresholdNs = 1e6 //饑餓閾值 )
2.2、獲取鎖Lock()
func (m *Mutex) Lock() { if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } m.lockSlow() }
嘗試直接通過CAS操作直接獲取鎖,若成功則返回,否則說明鎖被獲取,步入LockSlow。
2.3、LockSlow()
源碼較長,進行拆分講解:
var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state
(1)定義了基本的常量,含義如下:
- waitStartTime:記錄當前協(xié)程等待的時間,只有被阻塞才會使用
- awoke:標識當前協(xié)程是否被Unlock喚醒
- iter:記錄當前協(xié)程自旋嘗試次數(shù)
- old:記錄舊的鎖的狀態(tài)信息
for { //處于上鎖狀態(tài),并且不處于饑餓狀態(tài)中,并且當前的協(xié)程允許繼續(xù)自旋下去 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } //... }
(2)進入嘗試獲取鎖的循環(huán)中,兩個if表示:
- 若鎖處于上鎖狀態(tài),并且不處于饑餓狀態(tài)中,并且當前的協(xié)程允許繼續(xù)自旋下去(非單核CPU、自旋次數(shù)<=4、調(diào)度器P的本地隊列不存在等待執(zhí)行的G),則步入:
- 若當前協(xié)程并非從等待隊列喚醒、并且不存在被喚醒的等待協(xié)程、并且存在位于阻塞的協(xié)程、則嘗試設(shè)置mutexWoken標識為1,若成功:
- 標識當前的協(xié)程為被喚醒的協(xié)程。(雖然并非實際從阻塞中喚醒)
- 告訴P,當前的協(xié)程處于自旋態(tài)
- 更新
iter
計數(shù)器,與old
記錄的當前鎖的狀態(tài)信息,進行下一次重試循環(huán)
- 若當前協(xié)程并非從等待隊列喚醒、并且不存在被喚醒的等待協(xié)程、并且存在位于阻塞的協(xié)程、則嘗試設(shè)置mutexWoken標識為1,若成功:
這里存在的唯一疑惑為,為什么要將awoke標識為true?
首先,因為當前鎖并非處于饑餓模式,因此當前的搶占鎖的模式是不公平的,若當前鎖的阻塞隊列還沒有被喚醒的協(xié)程,那就要求不要喚醒了,嘗試讓當前正在嘗試的協(xié)程獲取到鎖,避免喚醒協(xié)程進行資源競爭。
for { //... new := old if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { new &^= mutexWoken } //... }
(3)進行狀態(tài)更新:
當協(xié)程從步驟2走出來時,只能說明它位于以下兩個狀態(tài)之一:
- 旋不動了,或者鎖進入饑餓模式了,鎖要讓給別人了,總之是獲取不到鎖了(悲觀)。
- 鎖被釋放了。
不論如何,都需要進行一些狀態(tài)的更新,為接下來的打算做準備。
用new存儲一個鎖即將要進入的新狀態(tài)信息,更新規(guī)則:
- 若鎖不處于饑餓模式:說明鎖可能被釋放了,也可能是自旋次數(shù)過多,不管接下來是否能拿到鎖,鎖都會被某一個協(xié)程獲取,因此置
mutexLocked
為1。 - 若鎖可能處于饑餓狀態(tài),或者鎖沒有被釋放:那說明自己是搶不到鎖了,即將進入阻塞態(tài),阻塞協(xié)程計數(shù)器+1。
- 若當前的協(xié)程是被喚醒的,并且已經(jīng)處在饑餓態(tài)中而且鎖仍然鎖著:鎖進入絕對公平的饑餓模式。
- 若當前協(xié)程是被喚醒的:清除
mutexWoken
標識位,因為接下來可能需要有協(xié)程被喚醒(饑餓模式)。
雖然更新的有點多,但是可以歸納為:
- 若鎖釋放了,那就標識一下接下來鎖要被獲取即可。
- 若鎖沒有釋放,并給當前協(xié)程等待了很久,那鎖就進入饑餓狀態(tài),接下來需要有阻塞協(xié)程被喚醒。
(4)嘗試更新信息:
if atomic.CompareAndSwapInt32(&m.state, old, new) { //... } else { old = m.state }
接下來嘗試將new更新進state,若更新失敗,說明當前有另一個協(xié)程介入了,為了防止數(shù)據(jù)的一致性丟失,要全部重來一次。
(5)狀態(tài)更新成功,具體判斷是要沉睡還是獲取鎖成功:
步入步驟4的if主支中,此時有兩個狀態(tài):
if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } //... } else { //... }
因為當前狀態(tài),可能是鎖釋放了,檢查鎖更新前是否已經(jīng)被釋放了并且不是饑餓模式,若是那說明獲取鎖成功了,函數(shù)結(jié)束了。
if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // If we were already waiting before, queue at the front of the queue. queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 2) //.... } else { //... }
否則,說明當前協(xié)程要進入阻塞態(tài)了,記錄一下開始阻塞的時間,用于醒來是判斷是否饑餓。然后進入阻塞沉睡中。
(6)若步驟5進入阻塞,則被喚醒后:
if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // If we were already waiting before, queue at the front of the queue. queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 2) //喚醒 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state //若鎖處于饑餓模式 if old&mutexStarving != 0 { //鎖的異常處理 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } //將要更新的信號量 delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 //.... } else { //... }
從阻塞中喚醒,首先計算一些協(xié)程的阻塞時間,以及當前的最新鎖狀態(tài)。
若鎖處于饑餓模式:那么當前協(xié)程將直接獲取鎖,當前協(xié)程是因為饑餓模式被喚醒的,不存在其他協(xié)程搶占鎖。于是更新信號量,將記錄阻塞協(xié)程數(shù)-1,將鎖的上鎖態(tài)置1。若當前從饑餓模式喚醒的協(xié)程,等待時間已經(jīng)不到1ms了或者是最后一個等待的協(xié)程,那么將將鎖從饑餓模式轉(zhuǎn)化為正常模式。至此,獲取成功,退出函數(shù)。
否則,只是普通的隨機喚醒,于是開始嘗試進行搶占,回到步驟1。
2.4、釋放鎖Unlock()
func (m *Mutex) Unlock() { //直接釋放鎖 new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { m.unlockSlow(new) } }
通過原子操作,直接將鎖的mutexLocked標識置為0。若置0后,鎖的狀態(tài)不為0,那就說明存在需要獲取鎖的協(xié)程,步入unlockSlow。
2.5、unlockSlow()
func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { fatal("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 2) return } old = m.state } } else { runtime_Semrelease(&m.sema, true, 2) } }
(1)首先進行了異常狀態(tài)處理,若釋放了一個已經(jīng)釋放了到鎖,那么直接fatal,程序終止。
if (new+mutexLocked)&mutexLocked == 0 { fatal("sync: unlock of unlocked mutex") }
(2)若鎖不處于饑餓狀態(tài):
- 若此時的等待協(xié)程數(shù)量為0,或者鎖被上鎖了、含有被喚醒的協(xié)程、鎖處于饑餓模式:都說明有新的協(xié)程介入了流程,已經(jīng)完成了交接,可以直接退出
- 喚醒一個處于阻塞態(tài)的協(xié)程。
否則,處于饑餓狀態(tài),喚醒等待最久的協(xié)程。
3、Sync.RWMutex
對于共享資源區(qū)的操作,可以劃分為讀與寫兩大類。假設(shè)在一個場景中,對共享資源區(qū)繼續(xù)讀的操作遠大于寫的操作,如果每個協(xié)程的讀操作都需要獲取互斥鎖,這帶來的性能損耗是非常大的。
RWMutex是一個可以運用在讀操作>寫操作中的提高性能的鎖,可以將它視為由一個讀鎖與一個寫鎖構(gòu)成。其運作規(guī)則具體如下:
- 讀鎖允許多個讀協(xié)程同時讀取共享資源區(qū),若有協(xié)程需要修改資源區(qū)的數(shù)據(jù),那么它需要被阻塞。
- 寫鎖具有嚴格的排他性,當共享資源區(qū)被上了寫鎖時,任何其他goroutine都不得訪問。
- 可見在最壞的情況下,所有的協(xié)程都是需要寫操作時,讀寫鎖會退化成普通的Mutex。
3.1、數(shù)據(jù)結(jié)構(gòu)
type RWMutex struct { w Mutex // held if there are pending writers writerSem uint32 // semaphore for writers to wait for completing readers readerSem uint32 // semaphore for readers to wait for completing writers readerCount atomic.Int32 // number of pending readers readerWait atomic.Int32 // number of departing readers } const rwmutexMaxReaders = 1 << 30 //最大的讀協(xié)程數(shù)量
- w:一個互斥的寫鎖
- writerSem:關(guān)聯(lián)被阻塞的寫協(xié)程的信號量
- readerSem:關(guān)聯(lián)被阻塞的讀協(xié)程的信號量
- readerCount:正常情況下,記錄正在讀取的協(xié)程數(shù)量;但若當前是寫協(xié)程正在持有鎖,那么實際記錄讀協(xié)程的數(shù)量為readerCount - rwmutexMaxReader
- readerWait:記錄釋放下一個寫協(xié)程,還需要等待讀協(xié)程完成的數(shù)量
3.2、讀鎖流程RLock()
func (rw *RWMutex) RLock() { if rw.readerCount.Add(1) < 0 { // A writer is pending, wait for it. runtime_SemacquireRWMutexR(&rw.readerSem, false, 0) } }
對readerCount+1,表示新加入一個讀協(xié)程。若結(jié)果<0,說明當前鎖正在被寫協(xié)程占據(jù),令當前的讀協(xié)程阻塞。
3.3、讀釋放鎖流程RUnlock()
func (rw *RWMutex) RUnlock() { if r := rw.readerCount.Add(-1); r < 0 { // Outlined slow-path to allow the fast-path to be inlined rw.rUnlockSlow(r) } }
對readerCount-1,表示減少一個讀協(xié)程。若結(jié)果<0,說明當前鎖正在被寫協(xié)程占據(jù),步入runlockslow。
3.4、rUnlockSlow()
func (rw *RWMutex) rUnlockSlow(r int32) { if r+1 == 0 || r+1 == -rwmutexMaxReaders { race.Enable() fatal("sync: RUnlock of unlocked RWMutex") } if rw.readerWait.Add(-1) == 0 { // The last reader unblocks the writer. runtime_Semrelease(&rw.writerSem, false, 1) } }
首先進行錯誤處理,若發(fā)現(xiàn)當前協(xié)程為占用過讀鎖,或者讀流程的協(xié)程數(shù)量上限,系統(tǒng)出現(xiàn)異常,fatal。
否則,對readerWait-1,若結(jié)果為0,說明當前協(xié)程是最后一個介入讀鎖流程的協(xié)程,此時需要釋放一個寫鎖。
3.5、寫鎖流程Lock()
func (rw *RWMutex) Lock() { // First, resolve competition with other writers. rw.w.Lock() // Announce to readers there is a pending writer. r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && rw.readerWait.Add(r) != 0 { runtime_SemacquireRWMutex(&rw.writerSem, false, 0) } }
首先嘗試獲取寫鎖,若獲取成功,需要將readerCount-最大讀協(xié)程數(shù),表示現(xiàn)在鎖被讀協(xié)程占據(jù)。
r表示處于讀流程的協(xié)程數(shù)量,若r不為0,那么就將readerWait加上r,等這些讀協(xié)程都讀取完畢,再去寫。將這個寫協(xié)程阻塞。(讀寫鎖并非讀、寫公平,讀協(xié)程優(yōu)先。)
3.6、寫釋放鎖流程Unlock()
func (rw *RWMutex) Unlock() { // Announce to readers there is no active writer. r := rw.readerCount.Add(rwmutexMaxReaders) if r >= rwmutexMaxReaders { race.Enable() fatal("sync: Unlock of unlocked RWMutex") } // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } // Allow other writers to proceed. rw.w.Unlock() }
重新將readerCount置為正常指,表示釋放了寫鎖。若讀協(xié)程超過最大上限,則異常。
然后喚醒所有阻塞的讀協(xié)程。(讀協(xié)程優(yōu)先)
解鎖。
以上就是golang中單機鎖的具體實現(xiàn)詳解的詳細內(nèi)容,更多關(guān)于go單機鎖的資料請關(guān)注腳本之家其
相關(guān)文章
實時通信的服務(wù)器推送機制 EventSource(SSE) 簡介附go實現(xiàn)示例代碼
EventSource是一種非常有用的 API,適用于許多實時應(yīng)用場景,它提供了一種簡單而可靠的方式來建立服務(wù)器推送連接,并實現(xiàn)實時更新和通知,這篇文章主要介紹了實時通信的服務(wù)器推送機制 EventSource(SSE)簡介附go實現(xiàn)示例,需要的朋友可以參考下2024-03-03golang中使用proto3協(xié)議導(dǎo)致的空值字段不顯示的問題處理方案
這篇文章主要介紹了golang中使用proto3協(xié)議導(dǎo)致的空值字段不顯示的問題處理方案,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習或者工作具有一定的參考學(xué)習價值,需要的朋友們下面隨著小編來一起學(xué)習學(xué)習吧2019-10-10