Go語言讀寫鎖RWMutex的源碼分析
前言
在前面兩篇文章中 初見 Go Mutex 、Go Mutex 源碼詳解,我們學(xué)習(xí)了 Go語言 中的 Mutex
,它是一把互斥鎖,每次只允許一個(gè) goroutine
進(jìn)入臨界區(qū),可以保證臨界區(qū)資源的狀態(tài)正確性。但是有的情況下,并不是所有 goroutine
都會(huì)修改臨界區(qū)狀態(tài),可能只是讀取臨界區(qū)的數(shù)據(jù),如果此時(shí)還是需要每個(gè) goroutine
拿到鎖依次進(jìn)入的話,效率就有些低下了。例如房間里面有一幅畫,有人想修改,有人只是想看一下,完全可以放要看的一部分人進(jìn)去,等他們看完再讓修改的人進(jìn)去修改,這樣既提高了效率,也保證了臨界區(qū)資源的安全。看和修改,對(duì)應(yīng)的就是讀和寫,本篇文章我們就一起來學(xué)習(xí)下 Go語言 中的讀寫鎖sync.RWMutex
。
說明:本文中的示例,均是基于Go1.17 64位機(jī)器
RWMutex 總覽
RWMutex
是一個(gè)讀/寫互斥鎖,在某一時(shí)刻只能由任意數(shù)量的 reader
持有 或者 一個(gè) writer 持有。也就是說,要么放行任意數(shù)量的 reader,多個(gè) reader 可以并行讀;要么放行一個(gè) writer,多個(gè) writer 需要串行寫。
RWMutex 對(duì)外暴露的方法有五個(gè):
- RLock():讀操作獲取鎖,如果鎖已經(jīng)被
writer
占用,會(huì)一直阻塞直到writer
釋放鎖;否則直接獲得鎖; - RUnlock():讀操作完畢之后釋放鎖;
- Lock():寫操作獲取鎖,如果鎖已經(jīng)被
reader
或者writer
占用,會(huì)一直阻塞直到獲取到鎖;否則直接獲得鎖; - Unlock():寫操作完畢之后釋放鎖;
- RLocker():返回讀操作的
Locker
對(duì)象,該對(duì)象的Lock()
方法對(duì)應(yīng)RWMutex
的RLock()
,Unlock()
方法對(duì)應(yīng)RWMutex
的RUnlock()
方法。
一旦涉及到多個(gè) reader 和 writer ,就需要考慮優(yōu)先級(jí)問題,是 reader 優(yōu)先還是 writer 優(yōu)先:
- reader優(yōu)先:只要有 reader 要進(jìn)行讀操作,writer 就一直等待,直到?jīng)]有 reader 到來。這種方式做到了讀操作的并發(fā),但是如果 reader 持續(xù)到來,會(huì)導(dǎo)致 writer 饑餓,一直不能進(jìn)行寫操作;
- writer優(yōu)先:只要有 writer 要進(jìn)行寫操作,reader 就一直等待,直到?jīng)]有 writer 到來。這種方式提高了寫操作的優(yōu)先級(jí),但是如果 writer 持續(xù)到來,會(huì)導(dǎo)致 reader 饑餓,一直不能進(jìn)行讀操作;
- 沒有優(yōu)先級(jí):按照先來先到的順序,沒有誰比誰更優(yōu)先,這種相對(duì)來說會(huì)更公平。
我們先來看下 RWMutex 的運(yùn)行機(jī)制,就可以知道它的優(yōu)先級(jí)是什么了。
可以想象 RWMutex 有兩個(gè)隊(duì)伍,一個(gè)是包含 所有reader 和你獲得準(zhǔn)入權(quán)writer 的 隊(duì)列A,一個(gè)是還沒有獲得準(zhǔn)入權(quán) writer 的 隊(duì)列B。
- 隊(duì)列 A 最多只允許有 一個(gè)writer,如果有其他 writer,需要在 隊(duì)列B 等待;
- 當(dāng)一個(gè) writer 到了 隊(duì)列A 后,只允許它 之前的reader 執(zhí)行讀操作,新來的 reader 需要在 隊(duì)列A 后面排隊(duì);
- 當(dāng)前面的 reader 執(zhí)行完讀操作之后,writer 執(zhí)行寫操作;
- writer 執(zhí)行完寫操作后,讓 后面的reader 執(zhí)行讀操作,再喚醒隊(duì)列B 的一個(gè) writer 到 隊(duì)列A 后面排隊(duì)。
初始時(shí)刻 隊(duì)列A 中 writer W1
前面有三個(gè) reader,后面有兩個(gè) reader,隊(duì)列B中有兩個(gè) writer
RWMutex運(yùn)行示例:初始時(shí)刻
并發(fā)讀 多個(gè) reader 可以同時(shí)獲取到讀鎖,進(jìn)入臨界區(qū)進(jìn)行讀操作;writer W1
在 隊(duì)列A 中等待,同時(shí)又來了兩個(gè) reader,直接在 隊(duì)列A 后面排隊(duì)
RWMutex運(yùn)行示例:并發(fā)讀
寫操作 W1
前面所有的 reader 完成后,W1
獲得鎖,進(jìn)入臨界區(qū)操作
RWMutex運(yùn)行示例:寫操作
獲得準(zhǔn)入權(quán) W1
完成寫操作退出,先讓后面排隊(duì)的 reader 進(jìn)行讀操作,然后從 隊(duì)列B 中喚醒 W2
到 隊(duì)列A 排隊(duì)。W2
從 隊(duì)列B 到 隊(duì)列A 的過程中,R8
先到了 隊(duì)列A,因此 R8
可以執(zhí)行讀操作。R9
、R10
、R11
在 W2
之后到的,所以在后面排隊(duì);新來的 W4
直接在隊(duì)列B 排隊(duì)。
RWMutex運(yùn)行示例:獲得準(zhǔn)入權(quán)
從上面的示例可以看出,RWMutex
可以看作是沒有優(yōu)先級(jí),按照先來先到的順序去執(zhí)行,只不過是 多個(gè)reader 可以 并行 去執(zhí)行罷了。
深入源碼
數(shù)據(jù)結(jié)構(gòu)
type?RWMutex?struct?{ ?w???????????Mutex??//?控制?writer?在?隊(duì)列B?排隊(duì) ?writerSem???uint32?//?寫信號(hào)量,用于等待前面的?reader?完成讀操作 ?readerSem???uint32?//?讀信號(hào)量,用于等待前面的?writer?完成寫操作 ?readerCount?int32??//?reader?的總數(shù)量,同時(shí)也指示是否有?writer?在隊(duì)列A?中等待 ?readerWait??int32??//?隊(duì)列A?中?writer?前面?reader?的數(shù)量 } //?允許最大的?reader?數(shù)量 const?rwmutexMaxReaders?=?1?<<?30
上述中的幾個(gè)變量,比較特殊的是 readerCount
,不僅表示當(dāng)前 所有reader
的數(shù)量,同時(shí)表示是否有 writer
在隊(duì)列A中等待。當(dāng) readerCount
變?yōu)?nbsp;負(fù)數(shù)
時(shí),就代表有 writer 在隊(duì)列A 中等待了。
- 當(dāng)有 writer 進(jìn)入 隊(duì)列A 后,會(huì)將
readerCount
變?yōu)樨?fù)數(shù),即readerCount = readerCount - rwmutexMaxReaders
,同時(shí)利用readerWait
變量記錄它前面有多少個(gè) reader; - 如果有新來的 reader,發(fā)現(xiàn)
readerCount
是負(fù)數(shù),就會(huì)直接去后面排隊(duì); - writer 前面的 reader 在釋放鎖時(shí),會(huì)將
readerCount
和readerWait
都減一,當(dāng) readerWait==0 時(shí),表示 writer 前面的所有 reader 都執(zhí)行完了,可以讓 writer 執(zhí)行寫操作了; - writer 執(zhí)行寫操作完畢后,會(huì)將 readerCount 再變回正數(shù),
readerCount = readerCount + rwmutexMaxReaders
。
舉例:假設(shè)當(dāng)前有兩個(gè) reader,readerCount = 2;允許最大的reader 數(shù)量為 10
- 當(dāng) writer 進(jìn)入隊(duì)列A 時(shí),readerCount = readerCount - rwmutexMaxReaders = -8,readerWait = readerCount = 2
- 如果再來 3 個(gè)reader,readerCount = readerCount + 3 = -5
- 獲得讀鎖的兩個(gè)reader 執(zhí)行完后,readerCount = readerCount - 2 = -7,readerWait = readerWait-2 =0,writer 獲得鎖
- writer 執(zhí)行完后,readerCount = readerCount + rwmutexMaxReaders = 3,當(dāng)前有 3個(gè) reader
RLock()
reader 執(zhí)行讀操作之前,需要調(diào)用 RLock() 獲取鎖
func?(rw?*RWMutex)?RLock()?{ ? ??//?reader?加鎖,將?readerCount?加一,表示多了個(gè)?reader ??if?atomic.AddInt32(&rw.readerCount,?1)?<?0?{ ?? ????//?如果?readerCount<0,說明有?writer?在自己前面等待,排隊(duì)等待讀信號(hào)量 ??runtime_SemacquireMutex(&rw.readerSem,?false,?0) ?} }
RUnlock()
reader 執(zhí)行完讀操作后,調(diào)用 RUnlock() 釋放鎖
func?(rw?*RWMutex)?RUnlock()?{ ??//?reader?釋放鎖,將?readerCount?減一,表示少了個(gè)?reader ?if?r?:=?atomic.AddInt32(&rw.readerCount,?-1);?r?<?0?{ ?? ????//?如果readerCount<0,說明有?writer?在自己后面等待,看是否要讓?writer?運(yùn)行 ??rw.rUnlockSlow(r) ?} }
func?(rw?*RWMutex)?rUnlockSlow(r?int32)?{ ??//?將?readerWait?減一,表示前面的?reader?少了一個(gè) ?if?atomic.AddInt32(&rw.readerWait,?-1)?==?0?{ ???? ??//?如果?readerWait?變?yōu)榱?,那么自己就是最后一個(gè)完成的?reader ????//?釋放寫信號(hào)量,讓 writer 運(yùn)行 ??runtime_Semrelease(&rw.writerSem,?false,?1) ?} }
Lock()
writer 執(zhí)行寫操作之前,調(diào)用 Lock() 獲取鎖
func?(rw?*RWMutex)?Lock()?{ ???//?利用互斥鎖,如果前面有 writer,那么就需要等待互斥鎖,即在隊(duì)列B 中排隊(duì)等待;如果沒有,可以直接進(jìn)入?隊(duì)列A 排隊(duì) ???rw.w.Lock() ??//?atomic.AddInt32(&rw.readerCount,?-rwmutexMaxReaders)?將?readerCount?變成了負(fù)數(shù) ??//?再加?rwmutexMaxReaders,相當(dāng)于?r?=?readerCount,r?就是?writer?前面的?reader?數(shù)量 ???r?:=?atomic.AddInt32(&rw.readerCount,?-rwmutexMaxReaders)?+?rwmutexMaxReaders ??? ???//?如果?r!=?0?,表示自己前面有?reader,那么令?readerWait?=?r,要等前面的?reader?運(yùn)行完 ???if?r?!=?0?&&?atomic.AddInt32(&rw.readerWait,?r)?!=?0?{ ??????runtime_SemacquireMutex(&rw.writerSem,?false,?0) ???} }
Lock()
和 RUnlock()
是會(huì)并發(fā)進(jìn)行的:
- 如果 Lock() 將 readerCount 變?yōu)樨?fù)數(shù)后,假設(shè) r=3,表示加入的那一刻前面有三個(gè) reader,還沒有賦值 readerWait CPU 就被強(qiáng)占了,readerWait = 0;
- 假設(shè)此時(shí)三個(gè) reader 的 RUnlock() 會(huì)進(jìn)入到 rUnlockSlow() 邏輯,每個(gè) reader 都將 readerWait 減一, readerWait 會(huì)變成負(fù)數(shù),此時(shí)不符合喚醒 writer 的條件;
- 三個(gè) reader 運(yùn)行完之后,此時(shí) readerWait = -3, Lock() 運(yùn)行到 atomic.AddInt32(&rw.readerWait, r) = -3+3 =0,也不會(huì)休眠,直接獲取到鎖,因?yàn)榍懊娴?reader 都運(yùn)行完了。
這就是為什么 rUnlockSlow()
要判斷 atomic.AddInt32(&rw.readerWait, -1) == 0
以及 Lock()
要判斷 atomic.AddInt32(&rw.readerWait, r) != 0
的原因。
Unlock()
writer 執(zhí)行寫操作之后,調(diào)用 Lock() 釋放鎖
func?(rw?*RWMutex)?Unlock()?{ ??//?將?readerCount?變?yōu)檎龜?shù),表示當(dāng)前沒有?writer?在隊(duì)列A?等待了 ?r?:=?atomic.AddInt32(&rw.readerCount,?rwmutexMaxReaders) ??//?將自己后面等待的?reader?喚醒,可以進(jìn)行讀操作了 ?for?i?:=?0;?i?<?int(r);?i++?{ ??runtime_Semrelease(&rw.readerSem,?false,?0) ?} ? ??//?釋放互斥鎖,如果隊(duì)列B有writer,相當(dāng)于喚醒一個(gè)來隊(duì)列A 排隊(duì) ?rw.w.Unlock() }
writer 對(duì) readerCount 一加一減,不會(huì)改變整體狀態(tài),只是用正負(fù)來表示是否有 writer 在等待。當(dāng)然,如果在 writer 將 readerCount變?yōu)樨?fù)數(shù)后,來了很多 reader,將 readerCount 變?yōu)榱苏龜?shù),此時(shí)reader 在 writer 沒有釋放鎖的時(shí)候就獲取到鎖了,是有問題的。但是 rwmutexMaxReaders 非常大,可以不考慮這個(gè)問題。
常見問題
不可復(fù)制
和 Mutex 一樣,RWMutex 也是不可復(fù)制。不能復(fù)制的原因和互斥鎖一樣。一旦讀寫鎖被使用,它的字段就會(huì)記錄它當(dāng)前的一些狀態(tài)。這個(gè)時(shí)候你去復(fù)制這把鎖,就會(huì)把它的狀態(tài)也給復(fù)制過來。但是,原來的鎖在釋放的時(shí)候,并不會(huì)修改你復(fù)制出來的這個(gè)讀寫鎖,這就會(huì)導(dǎo)致復(fù)制出來的讀寫鎖的狀態(tài)不對(duì),可能永遠(yuǎn)無法釋放鎖。
不可重入
不可重入的原因是,獲得鎖之后,還沒釋放鎖,又申請(qǐng)鎖,這樣有可能造成死鎖。比如 reader A 獲取到了讀鎖,writer B 等待 reader A 釋放鎖,reader 還沒釋放鎖又申請(qǐng)了一把鎖,但是這把鎖申請(qǐng)不成功,他需要等待 writer B。這就形成了一個(gè)循環(huán)等待的死鎖。
加鎖和釋放鎖一定要成對(duì)出現(xiàn),不能忘記釋放鎖,也不能解鎖一個(gè)未加鎖的鎖。
實(shí)戰(zhàn)一下
Go 中的 map 是不支持 并發(fā)寫的,我們可以利用 讀寫鎖 RWMutex 來實(shí)現(xiàn)并發(fā)安全的 map。在讀多寫少的情況下,使用 RWMutex 要比 Mutex 性能高。
package?main import?( ?"fmt" ?"math/rand" ?"sync" ?"time" ) type?ConcurrentMap?struct?{ ?m?????sync.RWMutex ?items?map[string]interface{} } func?(c?*ConcurrentMap)?Add(key?string,?value?interface{})?{ ?c.m.Lock() ?defer?c.m.Unlock() ?c.items[key]?=?value } func?(c?*ConcurrentMap)?Remove(key?string)?{ ?c.m.Lock() ?defer?c.m.Unlock() ?delete(c.items,?key) } func?(c?*ConcurrentMap)?Get(key?string)?interface{}?{ ?c.m.RLock() ?defer?c.m.RUnlock() ?return?c.items[key] } func?NewConcurrentMap()?ConcurrentMap?{ ?return?ConcurrentMap{ ??items:?make(map[string]interface{}), ?} } func?main()?{ ?m?:=?NewConcurrentMap() ?var?wait?sync.WaitGroup ?wait.Add(10000) ?for?i?:=?0;?i?<?10000;?i++?{ ??key?:=?fmt.Sprintf("%d",?rand.Intn(10)) ??value?:=?fmt.Sprintf("%d",?rand.Intn(100)) ??if?i%100?==?0?{ ???go?func()?{ ????defer?wait.Done() ????m.Add(key,?value) ???}() ??}?else?{ ???go?func()?{ ????defer?wait.Done() ????fmt.Println(m.Get(key)) ????time.Sleep(time.Millisecond?*?10) ???}() ??} ?} ?wait.Wait() }
以上就是Go語言讀寫鎖RWMutex的源碼分析的詳細(xì)內(nèi)容,更多關(guān)于Go語言讀寫鎖RWMutex的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
詳解Go語言實(shí)現(xiàn)線性查找算法和二分查找算法
線性查找又稱順序查找,它是查找算法中最簡單的一種。二分查找,也稱折半查找,相比于線性查找,它是一種效率較高的算法。本文將用Go語言實(shí)現(xiàn)這兩個(gè)查找算法,需要的可以了解一下2022-12-12

golang調(diào)用shell命令(實(shí)時(shí)輸出,終止)

go數(shù)據(jù)結(jié)構(gòu)和算法BitMap原理及實(shí)現(xiàn)示例

Golang使用Zookeeper實(shí)現(xiàn)分布式鎖

jenkins配置golang?代碼工程自動(dòng)發(fā)布的實(shí)現(xiàn)方法