golang中單機(jī)鎖的具體實(shí)現(xiàn)詳解
1、鎖的概念引入
首先,為什么需要鎖?
在并發(fā)編程中,多個(gè)線程或進(jìn)程可能同時(shí)訪問和修改同一個(gè)共享資源(例如變量、數(shù)據(jù)結(jié)構(gòu)、文件)等,若不引入合適的同步機(jī)制,會(huì)引發(fā)以下問題:
- 數(shù)據(jù)競爭:多個(gè)線程同時(shí)修改一個(gè)資源,最終的結(jié)果跟線程的執(zhí)行順序有關(guān),結(jié)果是不可預(yù)測的。
- 數(shù)據(jù)不一致:一個(gè)線程在修改資源,而另一個(gè)線程讀取了未修改完的數(shù)據(jù),從而導(dǎo)致讀取了錯(cuò)誤的數(shù)據(jù)。
- 資源競爭:多線程競爭同一個(gè)資源,浪費(fèi)系統(tǒng)的性能。
因此,我們需要一把鎖,來保證同一時(shí)間只有一個(gè)人能寫數(shù)據(jù),確保共享資源在并發(fā)訪問下的正確性和一致性。
在這里,引入兩種常見的并發(fā)控制處理機(jī)制,即樂觀鎖與悲觀鎖:
樂觀鎖:假定在并發(fā)操作中,資源的搶占并不是很激烈,數(shù)據(jù)被修改的可能性不是很大,那這時(shí)候就不需要對(duì)共享資源區(qū)進(jìn)行加鎖再操作,而是先修改了數(shù)據(jù),最終來判斷數(shù)據(jù)有沒有被修改,沒有被修改則提交修改指,否則重試。
悲觀鎖:與樂觀鎖相反,它假設(shè)場景的資源競爭激烈,對(duì)共享資源區(qū)的訪問必須要求持有鎖。
針對(duì)不同的場景需要采取因地制宜的策略,比較樂觀鎖與悲觀所,它們的優(yōu)缺點(diǎn)顯而易見:
策略 | 優(yōu)點(diǎn) | 缺點(diǎn) |
---|---|---|
樂觀鎖 | 不需要實(shí)際上鎖,性能高 | 若沖突時(shí),需要重新進(jìn)行操作,多次重試可能會(huì)導(dǎo)致性能下降明顯 |
悲觀鎖 | 訪問數(shù)據(jù)一定需要持有鎖,保證并發(fā)場景下的數(shù)據(jù)正確性 | 加鎖期間,其他等待鎖的線程需要被阻塞,性能低 |
2、Sync.Mutex
Go對(duì)單機(jī)鎖的實(shí)現(xiàn),考慮了實(shí)際環(huán)境中協(xié)程對(duì)資源競爭程度的變化,制定了一套鎖升級(jí)的過程。具體方案如下:
- 首先采取樂觀的態(tài)度,Goroutine會(huì)保持自旋態(tài),通過CAS操作嘗試獲取鎖。
- 當(dāng)多次獲取失敗,將會(huì)由樂觀態(tài)度轉(zhuǎn)入悲觀態(tài)度,判定當(dāng)前并發(fā)資源競爭程度劇烈,進(jìn)入阻塞態(tài)等待被喚醒。
從樂觀轉(zhuǎn)向悲觀的判定規(guī)則如下,滿足其中之一即發(fā)生轉(zhuǎn)變:
- Goroutine自旋嘗試次數(shù)超過4次
- 當(dāng)前P的執(zhí)行隊(duì)列中存在等待被執(zhí)行的G(避免自旋影響GMP調(diào)度性能)
- CPU是單核的(其他Goroutine執(zhí)行不了,自旋無意義)
除此之外,為了防止被阻塞的協(xié)程等待過長時(shí)間也沒有獲取到鎖,導(dǎo)致用戶的整體體驗(yàn)下降,引入了饑餓的概念:
- 饑餓態(tài):若Goroutine被阻塞等待的時(shí)間>1ms,則這個(gè)協(xié)程被視為處于饑餓狀態(tài)
- 饑餓模式:表示當(dāng)前鎖是否處于特定的模式,在該模式下,鎖的交接是公平的,按順序交給等待最久的協(xié)程。
饑餓模式與正常模式的轉(zhuǎn)變規(guī)則如下:
普通模式->饑餓模式:存在阻塞的協(xié)程,阻塞時(shí)間超過1ms
饑餓模式->普通模式:阻塞隊(duì)列清空,亦或者獲得鎖的協(xié)程的等待時(shí)間小于1ms,則恢復(fù)
接下來步入源碼,觀看具體的實(shí)現(xiàn)。
2.1、數(shù)據(jù)結(jié)構(gòu)
位于包sync/mutex.go中,對(duì)鎖的定義如下:
type Mutex struct { state int32 sema uint32 }
- state:標(biāo)識(shí)目前鎖的狀態(tài)信息,包括了是否處于饑餓模式、是否存在喚醒的阻塞協(xié)程、是否上鎖、以及處于等待鎖的協(xié)程個(gè)數(shù)有多少。
- seme:用于阻塞和喚醒協(xié)程的信號(hào)量。
將state看作一個(gè)二進(jìn)制字符串,它存儲(chǔ)信息的規(guī)則如下:
- 第一位標(biāo)識(shí)是否處于上鎖,0表示否,1表示上鎖(mutexLocked)
- 第二位標(biāo)識(shí)是否存在喚醒的阻塞協(xié)程(mutexWoken)
- 第三位標(biāo)識(shí)是否處于饑餓模式(mutexStarving)
- 從第四位開始,記錄了處于阻塞態(tài)的協(xié)程個(gè)數(shù)
const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexStarving mutexWaiterShift = iota starvationThresholdNs = 1e6 //饑餓閾值 )
2.2、獲取鎖Lock()
func (m *Mutex) Lock() { if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } m.lockSlow() }
嘗試直接通過CAS操作直接獲取鎖,若成功則返回,否則說明鎖被獲取,步入LockSlow。
2.3、LockSlow()
源碼較長,進(jìn)行拆分講解:
var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state
(1)定義了基本的常量,含義如下:
- waitStartTime:記錄當(dāng)前協(xié)程等待的時(shí)間,只有被阻塞才會(huì)使用
- awoke:標(biāo)識(shí)當(dāng)前協(xié)程是否被Unlock喚醒
- iter:記錄當(dāng)前協(xié)程自旋嘗試次數(shù)
- old:記錄舊的鎖的狀態(tài)信息
for { //處于上鎖狀態(tài),并且不處于饑餓狀態(tài)中,并且當(dāng)前的協(xié)程允許繼續(xù)自旋下去 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } //... }
(2)進(jìn)入嘗試獲取鎖的循環(huán)中,兩個(gè)if表示:
- 若鎖處于上鎖狀態(tài),并且不處于饑餓狀態(tài)中,并且當(dāng)前的協(xié)程允許繼續(xù)自旋下去(非單核CPU、自旋次數(shù)<=4、調(diào)度器P的本地隊(duì)列不存在等待執(zhí)行的G),則步入:
- 若當(dāng)前協(xié)程并非從等待隊(duì)列喚醒、并且不存在被喚醒的等待協(xié)程、并且存在位于阻塞的協(xié)程、則嘗試設(shè)置mutexWoken標(biāo)識(shí)為1,若成功:
- 標(biāo)識(shí)當(dāng)前的協(xié)程為被喚醒的協(xié)程。(雖然并非實(shí)際從阻塞中喚醒)
- 告訴P,當(dāng)前的協(xié)程處于自旋態(tài)
- 更新
iter
計(jì)數(shù)器,與old
記錄的當(dāng)前鎖的狀態(tài)信息,進(jìn)行下一次重試循環(huán)
- 若當(dāng)前協(xié)程并非從等待隊(duì)列喚醒、并且不存在被喚醒的等待協(xié)程、并且存在位于阻塞的協(xié)程、則嘗試設(shè)置mutexWoken標(biāo)識(shí)為1,若成功:
這里存在的唯一疑惑為,為什么要將awoke標(biāo)識(shí)為true?
首先,因?yàn)楫?dāng)前鎖并非處于饑餓模式,因此當(dāng)前的搶占鎖的模式是不公平的,若當(dāng)前鎖的阻塞隊(duì)列還沒有被喚醒的協(xié)程,那就要求不要喚醒了,嘗試讓當(dāng)前正在嘗試的協(xié)程獲取到鎖,避免喚醒協(xié)程進(jìn)行資源競爭。
for { //... new := old if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { new &^= mutexWoken } //... }
(3)進(jìn)行狀態(tài)更新:
當(dāng)協(xié)程從步驟2走出來時(shí),只能說明它位于以下兩個(gè)狀態(tài)之一:
- 旋不動(dòng)了,或者鎖進(jìn)入饑餓模式了,鎖要讓給別人了,總之是獲取不到鎖了(悲觀)。
- 鎖被釋放了。
不論如何,都需要進(jìn)行一些狀態(tài)的更新,為接下來的打算做準(zhǔn)備。
用new存儲(chǔ)一個(gè)鎖即將要進(jìn)入的新狀態(tài)信息,更新規(guī)則:
- 若鎖不處于饑餓模式:說明鎖可能被釋放了,也可能是自旋次數(shù)過多,不管接下來是否能拿到鎖,鎖都會(huì)被某一個(gè)協(xié)程獲取,因此置
mutexLocked
為1。 - 若鎖可能處于饑餓狀態(tài),或者鎖沒有被釋放:那說明自己是搶不到鎖了,即將進(jìn)入阻塞態(tài),阻塞協(xié)程計(jì)數(shù)器+1。
- 若當(dāng)前的協(xié)程是被喚醒的,并且已經(jīng)處在饑餓態(tài)中而且鎖仍然鎖著:鎖進(jìn)入絕對(duì)公平的饑餓模式。
- 若當(dāng)前協(xié)程是被喚醒的:清除
mutexWoken
標(biāo)識(shí)位,因?yàn)榻酉聛?strong>可能需要有協(xié)程被喚醒(饑餓模式)。
雖然更新的有點(diǎn)多,但是可以歸納為:
- 若鎖釋放了,那就標(biāo)識(shí)一下接下來鎖要被獲取即可。
- 若鎖沒有釋放,并給當(dāng)前協(xié)程等待了很久,那鎖就進(jìn)入饑餓狀態(tài),接下來需要有阻塞協(xié)程被喚醒。
(4)嘗試更新信息:
if atomic.CompareAndSwapInt32(&m.state, old, new) { //... } else { old = m.state }
接下來嘗試將new更新進(jìn)state,若更新失敗,說明當(dāng)前有另一個(gè)協(xié)程介入了,為了防止數(shù)據(jù)的一致性丟失,要全部重來一次。
(5)狀態(tài)更新成功,具體判斷是要沉睡還是獲取鎖成功:
步入步驟4的if主支中,此時(shí)有兩個(gè)狀態(tài):
if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } //... } else { //... }
因?yàn)楫?dāng)前狀態(tài),可能是鎖釋放了,檢查鎖更新前是否已經(jīng)被釋放了并且不是饑餓模式,若是那說明獲取鎖成功了,函數(shù)結(jié)束了。
if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // If we were already waiting before, queue at the front of the queue. queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 2) //.... } else { //... }
否則,說明當(dāng)前協(xié)程要進(jìn)入阻塞態(tài)了,記錄一下開始阻塞的時(shí)間,用于醒來是判斷是否饑餓。然后進(jìn)入阻塞沉睡中。
(6)若步驟5進(jìn)入阻塞,則被喚醒后:
if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // If we were already waiting before, queue at the front of the queue. queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 2) //喚醒 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state //若鎖處于饑餓模式 if old&mutexStarving != 0 { //鎖的異常處理 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } //將要更新的信號(hào)量 delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 //.... } else { //... }
從阻塞中喚醒,首先計(jì)算一些協(xié)程的阻塞時(shí)間,以及當(dāng)前的最新鎖狀態(tài)。
若鎖處于饑餓模式:那么當(dāng)前協(xié)程將直接獲取鎖,當(dāng)前協(xié)程是因?yàn)轲囸I模式被喚醒的,不存在其他協(xié)程搶占鎖。于是更新信號(hào)量,將記錄阻塞協(xié)程數(shù)-1,將鎖的上鎖態(tài)置1。若當(dāng)前從饑餓模式喚醒的協(xié)程,等待時(shí)間已經(jīng)不到1ms了或者是最后一個(gè)等待的協(xié)程,那么將將鎖從饑餓模式轉(zhuǎn)化為正常模式。至此,獲取成功,退出函數(shù)。
否則,只是普通的隨機(jī)喚醒,于是開始嘗試進(jìn)行搶占,回到步驟1。
2.4、釋放鎖Unlock()
func (m *Mutex) Unlock() { //直接釋放鎖 new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { m.unlockSlow(new) } }
通過原子操作,直接將鎖的mutexLocked標(biāo)識(shí)置為0。若置0后,鎖的狀態(tài)不為0,那就說明存在需要獲取鎖的協(xié)程,步入unlockSlow。
2.5、unlockSlow()
func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { fatal("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 2) return } old = m.state } } else { runtime_Semrelease(&m.sema, true, 2) } }
(1)首先進(jìn)行了異常狀態(tài)處理,若釋放了一個(gè)已經(jīng)釋放了到鎖,那么直接fatal,程序終止。
if (new+mutexLocked)&mutexLocked == 0 { fatal("sync: unlock of unlocked mutex") }
(2)若鎖不處于饑餓狀態(tài):
- 若此時(shí)的等待協(xié)程數(shù)量為0,或者鎖被上鎖了、含有被喚醒的協(xié)程、鎖處于饑餓模式:都說明有新的協(xié)程介入了流程,已經(jīng)完成了交接,可以直接退出
- 喚醒一個(gè)處于阻塞態(tài)的協(xié)程。
否則,處于饑餓狀態(tài),喚醒等待最久的協(xié)程。
3、Sync.RWMutex
對(duì)于共享資源區(qū)的操作,可以劃分為讀與寫兩大類。假設(shè)在一個(gè)場景中,對(duì)共享資源區(qū)繼續(xù)讀的操作遠(yuǎn)大于寫的操作,如果每個(gè)協(xié)程的讀操作都需要獲取互斥鎖,這帶來的性能損耗是非常大的。
RWMutex是一個(gè)可以運(yùn)用在讀操作>寫操作中的提高性能的鎖,可以將它視為由一個(gè)讀鎖與一個(gè)寫鎖構(gòu)成。其運(yùn)作規(guī)則具體如下:
- 讀鎖允許多個(gè)讀協(xié)程同時(shí)讀取共享資源區(qū),若有協(xié)程需要修改資源區(qū)的數(shù)據(jù),那么它需要被阻塞。
- 寫鎖具有嚴(yán)格的排他性,當(dāng)共享資源區(qū)被上了寫鎖時(shí),任何其他goroutine都不得訪問。
- 可見在最壞的情況下,所有的協(xié)程都是需要寫操作時(shí),讀寫鎖會(huì)退化成普通的Mutex。
3.1、數(shù)據(jù)結(jié)構(gòu)
type RWMutex struct { w Mutex // held if there are pending writers writerSem uint32 // semaphore for writers to wait for completing readers readerSem uint32 // semaphore for readers to wait for completing writers readerCount atomic.Int32 // number of pending readers readerWait atomic.Int32 // number of departing readers } const rwmutexMaxReaders = 1 << 30 //最大的讀協(xié)程數(shù)量
- w:一個(gè)互斥的寫鎖
- writerSem:關(guān)聯(lián)被阻塞的寫協(xié)程的信號(hào)量
- readerSem:關(guān)聯(lián)被阻塞的讀協(xié)程的信號(hào)量
- readerCount:正常情況下,記錄正在讀取的協(xié)程數(shù)量;但若當(dāng)前是寫協(xié)程正在持有鎖,那么實(shí)際記錄讀協(xié)程的數(shù)量為readerCount - rwmutexMaxReader
- readerWait:記錄釋放下一個(gè)寫協(xié)程,還需要等待讀協(xié)程完成的數(shù)量
3.2、讀鎖流程RLock()
func (rw *RWMutex) RLock() { if rw.readerCount.Add(1) < 0 { // A writer is pending, wait for it. runtime_SemacquireRWMutexR(&rw.readerSem, false, 0) } }
對(duì)readerCount+1,表示新加入一個(gè)讀協(xié)程。若結(jié)果<0,說明當(dāng)前鎖正在被寫協(xié)程占據(jù),令當(dāng)前的讀協(xié)程阻塞。
3.3、讀釋放鎖流程RUnlock()
func (rw *RWMutex) RUnlock() { if r := rw.readerCount.Add(-1); r < 0 { // Outlined slow-path to allow the fast-path to be inlined rw.rUnlockSlow(r) } }
對(duì)readerCount-1,表示減少一個(gè)讀協(xié)程。若結(jié)果<0,說明當(dāng)前鎖正在被寫協(xié)程占據(jù),步入runlockslow。
3.4、rUnlockSlow()
func (rw *RWMutex) rUnlockSlow(r int32) { if r+1 == 0 || r+1 == -rwmutexMaxReaders { race.Enable() fatal("sync: RUnlock of unlocked RWMutex") } if rw.readerWait.Add(-1) == 0 { // The last reader unblocks the writer. runtime_Semrelease(&rw.writerSem, false, 1) } }
首先進(jìn)行錯(cuò)誤處理,若發(fā)現(xiàn)當(dāng)前協(xié)程為占用過讀鎖,或者讀流程的協(xié)程數(shù)量上限,系統(tǒng)出現(xiàn)異常,fatal。
否則,對(duì)readerWait-1,若結(jié)果為0,說明當(dāng)前協(xié)程是最后一個(gè)介入讀鎖流程的協(xié)程,此時(shí)需要釋放一個(gè)寫鎖。
3.5、寫鎖流程Lock()
func (rw *RWMutex) Lock() { // First, resolve competition with other writers. rw.w.Lock() // Announce to readers there is a pending writer. r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && rw.readerWait.Add(r) != 0 { runtime_SemacquireRWMutex(&rw.writerSem, false, 0) } }
首先嘗試獲取寫鎖,若獲取成功,需要將readerCount-最大讀協(xié)程數(shù),表示現(xiàn)在鎖被讀協(xié)程占據(jù)。
r表示處于讀流程的協(xié)程數(shù)量,若r不為0,那么就將readerWait加上r,等這些讀協(xié)程都讀取完畢,再去寫。將這個(gè)寫協(xié)程阻塞。(讀寫鎖并非讀、寫公平,讀協(xié)程優(yōu)先。)
3.6、寫釋放鎖流程Unlock()
func (rw *RWMutex) Unlock() { // Announce to readers there is no active writer. r := rw.readerCount.Add(rwmutexMaxReaders) if r >= rwmutexMaxReaders { race.Enable() fatal("sync: Unlock of unlocked RWMutex") } // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } // Allow other writers to proceed. rw.w.Unlock() }
重新將readerCount置為正常指,表示釋放了寫鎖。若讀協(xié)程超過最大上限,則異常。
然后喚醒所有阻塞的讀協(xié)程。(讀協(xié)程優(yōu)先)
解鎖。
以上就是golang中單機(jī)鎖的具體實(shí)現(xiàn)詳解的詳細(xì)內(nèi)容,更多關(guān)于go單機(jī)鎖的資料請(qǐng)關(guān)注腳本之家其
相關(guān)文章
實(shí)時(shí)通信的服務(wù)器推送機(jī)制 EventSource(SSE) 簡介附go實(shí)現(xiàn)示例代碼
EventSource是一種非常有用的 API,適用于許多實(shí)時(shí)應(yīng)用場景,它提供了一種簡單而可靠的方式來建立服務(wù)器推送連接,并實(shí)現(xiàn)實(shí)時(shí)更新和通知,這篇文章主要介紹了實(shí)時(shí)通信的服務(wù)器推送機(jī)制 EventSource(SSE)簡介附go實(shí)現(xiàn)示例,需要的朋友可以參考下2024-03-03golang中使用proto3協(xié)議導(dǎo)致的空值字段不顯示的問題處理方案
這篇文章主要介紹了golang中使用proto3協(xié)議導(dǎo)致的空值字段不顯示的問題處理方案,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-10-10Go 語言中的 http.FileSystem詳細(xì)解析
在本文中,我們深入探討了 Go 語言中的 http.FileSystem 接口,并介紹了它的基本原理、使用方法以及實(shí)際應(yīng)用場景,感興趣的朋友跟隨小編一起看看吧2024-03-03