go RWMutex的實(shí)現(xiàn)示例
Overview
go 里面的 rwlock 是 write preferred 的,可以避免寫(xiě)鎖饑餓。
讀鎖和寫(xiě)鎖按照先來(lái)后到的規(guī)則持有鎖,一旦有協(xié)程持有了寫(xiě)鎖,后面的協(xié)程只能在寫(xiě)鎖被釋放后才能得到讀鎖。
同樣,一旦有 >= 1 個(gè)協(xié)程寫(xiě)到了讀鎖,只有等這些讀鎖全部釋放后,后面的協(xié)程才能拿到寫(xiě)鎖。
下面了解一下 Go 的 RWMutex 是如何實(shí)現(xiàn)的吧,下面的代碼取自 go1.17.2/src/sync/rwmutex.go,并刪減了 race 相關(guān)的代碼。
PS: rwmutex 的代碼挺短的,其實(shí)讀源碼也沒(méi)那么可怕...
RWMutex 的結(jié)構(gòu)
RWMutex 總體上是通過(guò): 普通鎖和條件變量來(lái)實(shí)現(xiàn)的
type RWMutex struct { w Mutex // held if there are pending writers writerSem uint32 // semaphore for writers to wait for completing readers readerSem uint32 // semaphore for readers to wait for completing writers readerCount int32 // number of pending readers readerWait int32 // number of departing readers }
Lock
func (rw *RWMutex) Lock() { // First, resolve competition with other writers. rw.w.Lock() // Announce to readers there is a pending writer. r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false, 0) } }
Unlock
const rwmutexMaxReaders = 1 << 30 func (rw *RWMutex) Unlock() { // Announce to readers there is no active writer. r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } // Allow other writers to proceed. rw.w.Unlock() }
RLock
func (rw *RWMutex) RLock() { if atomic.AddInt32(&rw.readerCount, 1) < 0 { // A writer is pending, wait for it. runtime_SemacquireMutex(&rw.readerSem, false, 0) } }
RUnlock
func (rw *RWMutex) RUnlock() { if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // Outlined slow-path to allow the fast-path to be inlined rw.rUnlockSlow(r) } } func (rw *RWMutex) rUnlockSlow(r int32) { // A writer is pending. if atomic.AddInt32(&rw.readerWait, -1) == 0 { // The last reader unblocks the writer. runtime_Semrelease(&rw.writerSem, false, 1) } }
Q1: 多個(gè)協(xié)程并發(fā)拿讀鎖,如何保證這些讀鎖協(xié)程都不會(huì)被阻塞?
func (rw *RWMutex) RLock() { if atomic.AddInt32(&rw.readerCount, 1) < 0 { // A writer is pending, wait for it. runtime_SemacquireMutex(&rw.readerSem, false, 0) } }
拿讀鎖時(shí),僅僅會(huì)增加 readerCount,因此讀鎖之間是可以正常并發(fā)的
Q2: 多個(gè)協(xié)程并發(fā)拿寫(xiě)鎖,如何保證只會(huì)有一個(gè)協(xié)程拿到寫(xiě)鎖?
func (rw *RWMutex) Lock() { // First, resolve competition with other writers. rw.w.Lock() // Announce to readers there is a pending writer. r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false, 0) } }
拿寫(xiě)鎖時(shí),會(huì)獲取 w.Lock,自然能保證同一時(shí)間只會(huì)有一把寫(xiě)鎖
Q3: 在讀鎖被拿到的情況下,新協(xié)程拿寫(xiě)鎖,如果保證寫(xiě)鎖現(xiàn)成會(huì)被阻塞?
func (rw *RWMutex) Lock() { // First, resolve competition with other writers. rw.w.Lock() // Announce to readers there is a pending writer. r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false, 0) } }
假設(shè)此時(shí)有 5 個(gè)協(xié)程拿到讀鎖,則 readerCount = 5,假設(shè) rwmutexMaxReaders = 100。
此時(shí)有一個(gè)新的協(xié)程 w1 想要拿寫(xiě)鎖。
在執(zhí)行
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
后, rw.readerCount = -95,r = 5。
在執(zhí)行
atomic.AddInt32(&rw.readerWait, r)
后,rw.readerWait = 5。
readerWait
記錄了在獲取寫(xiě)鎖的這一瞬間有多少個(gè)協(xié)程持有讀鎖。這一瞬間之后,就算有新的協(xié)程嘗試獲取讀鎖,也只會(huì)增加 readerCount ,而不會(huì)動(dòng)到 readerWait。
之后執(zhí)行 runtime_SemacquireMutex() 睡在了 writerSem 這個(gè)信號(hào)量上面。
Q4: 在讀鎖被拿到的情況下,新協(xié)程拿寫(xiě)鎖被阻塞,當(dāng)舊有的讀鎖協(xié)程全部釋放,如何喚醒等待的寫(xiě)鎖協(xié)程
func (rw *RWMutex) RUnlock() { if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // Outlined slow-path to allow the fast-path to be inlined rw.rUnlockSlow(r) } } func (rw *RWMutex) rUnlockSlow(r int32) { // A writer is pending. if atomic.AddInt32(&rw.readerWait, -1) == 0 { // The last reader unblocks the writer. runtime_Semrelease(&rw.writerSem, false, 1) } }
繼續(xù)上一步的場(chǎng)景,每當(dāng)執(zhí)行 RUnlock 時(shí),readerCount 都會(huì)減去1。當(dāng) readerCount 為負(fù)數(shù)時(shí),意味著有協(xié)程正在持有或者正在等待持有寫(xiě)鎖。
之前的五個(gè)讀協(xié)程中的四個(gè),每次 RUnlock() 之后,readerCount = -95 - 4 = -99,readerWait = 5 - 4 = 1。
當(dāng)最后一個(gè)讀協(xié)程調(diào)用 RUnlock() 之后,readerCount 變成了 -100,readerWait 變成 0,此時(shí)會(huì)喚醒在 writerSem 上沉睡的協(xié)程 w1。
Q5: 在寫(xiě)鎖被拿到的情況下,新協(xié)程拿讀鎖,如何讓新協(xié)程被阻塞?
func (rw *RWMutex) RLock() { if atomic.AddInt32(&rw.readerCount, 1) < 0 { // A writer is pending, wait for it. runtime_SemacquireMutex(&rw.readerSem, false, 0) } }
繼續(xù)上面的場(chǎng)景,readerCount = -100 + 1 = -99 < 0。
新的讀協(xié)程 r1 被沉睡在 readerSem 下面。
假設(shè)此時(shí)再來(lái)一個(gè)讀協(xié)程 r2,則 readerCount = -98,依舊沉睡。
Q6: 在寫(xiě)鎖被拿到的情況下,新協(xié)程拿讀鎖,寫(xiě)鎖協(xié)程釋放,如何喚醒等待的讀鎖協(xié)程?
繼續(xù)上面的場(chǎng)景,此時(shí)協(xié)程 w1 釋放寫(xiě)鎖
func (rw *RWMutex) Unlock() { // Announce to readers there is no active writer. r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } // Allow other writers to proceed. rw.w.Unlock() }
在執(zhí)行
atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
后,r = readerCount = -98 + 100 = 2,代表此時(shí)有兩個(gè)讀協(xié)程 r1 和 r2 在等待
ps: 如果此時(shí)有一些新的協(xié)程想要拿讀鎖,他會(huì)因?yàn)?readerCount = 2 + 1 = 3 > 0 而順利執(zhí)行下去,不會(huì)被阻塞
之后 for 循環(huán)執(zhí)行兩次,將協(xié)程 r1 和 協(xié)程 r2 都喚醒了。
Q7: 在寫(xiě)鎖被拿到的情況下,有兩個(gè)協(xié)程分別去搶讀鎖和寫(xiě)鎖,當(dāng)寫(xiě)鎖被釋放時(shí),這兩個(gè)協(xié)程誰(shuí)會(huì)勝利?
func (rw *RWMutex) Unlock() { // Announce to readers there is no active writer. r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } // Allow other writers to proceed. rw.w.Unlock() }
由于是先喚醒讀鎖,再調(diào)用 w.Unlock() ,因此肯定是讀協(xié)程先勝利!
認(rèn)為寫(xiě)的比較巧妙的兩個(gè)點(diǎn)
readerCount 與 rwmutexMaxReaders 的糾纏
通過(guò)
readerCount + rwmutexMaxReaders
以及readerCount - rwmutexMaxReaders
這兩個(gè)操作可以得知當(dāng)前是否有協(xié)程等待/持有寫(xiě)鎖以及當(dāng)前等待/持有讀鎖的協(xié)程數(shù)量readerCount 與 readerWait 的糾纏
在 Lock() 時(shí)直接將 readerCount 的值賦給 readerWait,在 readerWait = 0 而非 readerCount = 0 是喚醒寫(xiě)協(xié)程,可以避免在 Lock() 后來(lái)達(dá)到的讀協(xié)程先于寫(xiě)協(xié)程被執(zhí)行。
到此這篇關(guān)于go RWMutex的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)go RWMutex內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
如何利用Go語(yǔ)言實(shí)現(xiàn)LRU?Cache
這篇文章主要介紹了如何利用Go語(yǔ)言實(shí)現(xiàn)LRU?Cache,LRU是Least?Recently?Used的縮寫(xiě),是一種操作系統(tǒng)中常用的頁(yè)面置換算法,下面我們一起進(jìn)入文章了解更多內(nèi)容吧,需要的朋友可以參考一下2022-03-03Go中的 panic / recover 簡(jiǎn)介與實(shí)踐記錄
這篇文章主要介紹了Go中的 panic / recover 簡(jiǎn)介與實(shí)踐,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-04-04記一次go語(yǔ)言使用time.Duration類型踩過(guò)的坑
本文主要介紹了記一次go語(yǔ)言使用time.Duration類型踩過(guò)的坑,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-01-01golang jsoniter extension 處理動(dòng)態(tài)字段的實(shí)現(xiàn)方法
這篇文章主要介紹了golang jsoniter extension 處理動(dòng)態(tài)字段的實(shí)現(xiàn)方法,我們使用實(shí)例級(jí)別的 extension, 而非全局,可以針對(duì)不同業(yè)務(wù)邏輯有所區(qū)分,jsoniter 包提供了比較完善的定制能力,通過(guò)例子可以感受一下擴(kuò)展性,需要的朋友可以參考下2023-04-04golang中拿slice當(dāng)queue和拿list當(dāng)queue使用分析
這篇文章主要為大家介紹了golang?中拿slice當(dāng)queue和拿list當(dāng)queue使用分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08Go的固定時(shí)長(zhǎng)定時(shí)器和周期性時(shí)長(zhǎng)定時(shí)器
本文主要介紹了Go的固定時(shí)長(zhǎng)定時(shí)器和周期性時(shí)長(zhǎng)定時(shí)器,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08GO語(yǔ)言實(shí)現(xiàn)列出目錄和遍歷目錄的方法
這篇文章主要介紹了GO語(yǔ)言實(shí)現(xiàn)列出目錄和遍歷目錄的方法,涉及ioutil.ReadDir()與filepath.Walk()的應(yīng)用,是非常實(shí)用的技巧,需要的朋友可以參考下2014-12-12基于Go和PHP語(yǔ)言實(shí)現(xiàn)爬樓梯算法的思路詳解
這篇文章主要介紹了Go和PHP 實(shí)現(xiàn)爬樓梯算法,本文通過(guò)動(dòng)態(tài)規(guī)劃和斐波那契數(shù)列兩種解決思路給大家講解的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-05-05