go并發(fā)編程sync.Cond使用場(chǎng)景及實(shí)現(xiàn)原理
使用場(chǎng)景
sync.Cond是go標(biāo)準(zhǔn)庫提供的一個(gè)條件變量,用于控制一組goroutine在滿足特定條件下被喚醒。
sync.Cond常用于一組goroutine等待,一個(gè)goroutine通知(事件發(fā)生)的場(chǎng)景。如果只有一個(gè)goroutine等待,一個(gè)goroutine通知(事件發(fā)生),使用Mutex或者Channel就可以實(shí)現(xiàn)。
可以用一個(gè)全局變量標(biāo)志特定條件condition,每個(gè)sync.Cond都必須要關(guān)聯(lián)一個(gè)互斥鎖(Mutex或者RWMutex),當(dāng)condition發(fā)生變更或者調(diào)用Wait時(shí),都必須加鎖,保證多個(gè)goroutine安全地訪問condition。
下面是go標(biāo)準(zhǔn)庫http中關(guān)于pipe的部分實(shí)現(xiàn),我們可以看到,pipe使用sync.Cond來控制管道中字節(jié)流的寫入和讀取,在pipe中數(shù)據(jù)可用并且字節(jié)流復(fù)制到pipe的緩沖區(qū)之前,所有的需要讀取該管道數(shù)據(jù)的goroutine都必須等待,直到數(shù)據(jù)準(zhǔn)備完成。
type pipe struct {
mu sync.Mutex
c sync.Cond // c.L lazily initialized to &p.mu
b pipeBuffer // nil when done reading
...
}
// Read waits until data is available and copies bytes
// from the buffer into p.
func (p *pipe) Read(d []byte) (n int, err error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.c.L == nil {
p.c.L = &p.mu
}
for {
...
if p.b != nil && p.b.Len() > 0 {
return p.b.Read(d)
}
...
p.c.Wait() // write未完成前調(diào)用Wait進(jìn)入等待
}
}
// Write copies bytes from p into the buffer and wakes a reader.
// It is an error to write more data than the buffer can hold.
func (p *pipe) Write(d []byte) (n int, err error) {
p.mu.Lock()
defer p.mu.Unlock()
if p.c.L == nil {
p.c.L = &p.mu
}
defer p.c.Signal() // 喚醒所有等待的goroutine
if p.err != nil {
return 0, errClosedPipeWrite
}
if p.breakErr != nil {
p.unread += len(d)
return len(d), nil // discard when there is no reader
}
return p.b.Write(d)
}
實(shí)現(xiàn)原理
type Cond struct {
noCopy noCopy // 用來保證結(jié)構(gòu)體無法在編譯期間拷貝
// L is held while observing or changing the condition
L Locker // 用來保證condition變更安全
notify notifyList // 待通知的goutine列表
checker copyChecker // 用于禁止運(yùn)行期間發(fā)生的拷貝
}
type notifyList struct {
wait uint32 // 正在等待的goroutine的ticket
notify uint32 // 已經(jīng)通知到的goroutine的ticket
lock uintptr // key field of the mutex
head unsafe.Pointer // 鏈表頭部
tail unsafe.Pointer // 鏈表尾部
}
copyChecker
copyChecker是一個(gè)指針類型,在創(chuàng)建時(shí),它的值指向自身地址,用于檢測(cè)該對(duì)象是否發(fā)生了拷貝。如果發(fā)生了拷貝,則直接panic。
// copyChecker holds back pointer to itself to detect object copying.
type copyChecker uintptr
func (c *copyChecker) check() {
if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
!atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
uintptr(*c) != uintptr(unsafe.Pointer(c)) {
panic("sync.Cond is copied")
}
}
Wait
調(diào)用 Wait 會(huì)自動(dòng)釋放鎖 c.L,并掛起調(diào)用者所在的 goroutine,因此當(dāng)前協(xié)程會(huì)阻塞在 Wait 方法調(diào)用的地方。如果其他協(xié)程調(diào)用了 Signal 或 Broadcast 喚醒了該協(xié)程,那么 Wait 方法在結(jié)束阻塞時(shí),會(huì)重新給 c.L 加鎖,并且繼續(xù)執(zhí)行 Wait 后面的代碼。
對(duì)條件的檢查,使用了 for !condition() 而非 if,是因?yàn)楫?dāng)前協(xié)程被喚醒時(shí),條件不一定符合要求,需要再次 Wait 等待下次被喚醒。為了保險(xiǎn)起見,使用 for 能夠確保條件符合要求后,再執(zhí)行后續(xù)的代碼。
func (c *Cond) Wait() {
c.checker.check()
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}
- 檢查Cond是否被復(fù)制,如果被復(fù)制,直接panic;
- 調(diào)用runtime_notifyListAdd調(diào)用者添加到通知列表并解鎖,以便可以接收到通知,然后將返回的ticket傳入到runtime_notifyListWait來等待通知。
- 當(dāng)前goroutine會(huì)阻塞在wait調(diào)用的地方,直到其他goroutine調(diào)用Signal或Broadcast喚醒該協(xié)程。
func notifyListAdd(l *notifyList) uint32 {
return atomic.Xadd(&l.wait, 1) - 1
}
notifyListWait會(huì)將當(dāng)前goroutine追加到鏈表的尾端,同時(shí)調(diào)用goparkunlock讓當(dāng)前goroutine陷入休眠,該方法會(huì)直接讓出當(dāng)前處理器的使用權(quán)并等待調(diào)度器的喚醒。
func notifyListWait(l *notifyList, t uint32) {
s := acquireSudog()
s.g = getg()
s.ticket = t
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
releaseSudog(s)
}
Signal
Signal會(huì)喚醒隊(duì)列最前面的Goroutine。
func (c *Cond) Signal() {
c.checker.check()
runtime_notifyListNotifyOne(&c.notify)
}
func notifyListNotifyOne(l *notifyList) {
t := l.notify
atomic.Store(&l.notify, t+1)
for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
if s.ticket == t {
n := s.next
if p != nil {
p.next = n
} else {
l.head = n
}
if n == nil {
l.tail = p
}
s.next = nil
readyWithTime(s, 4)
return
}
}
}
Broadcast
Broadcast會(huì)喚醒隊(duì)列中全部的goroutine。
func (c *Cond) Broadcast() {
c.checker.check()
runtime_notifyListNotifyAll(&c.notify)
}
func notifyListNotifyAll(l *notifyList) {
s := l.head
l.head = nil
l.tail = nil
atomic.Store(&l.notify, atomic.Load(&l.wait))
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}
}以上就是go并發(fā)編程sync.Cond使用場(chǎng)景及實(shí)現(xiàn)原理的詳細(xì)內(nèi)容,更多關(guān)于go并發(fā)編程sync.Cond的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
使用docker構(gòu)建golang線上部署環(huán)境的步驟詳解
這篇文章主要介紹了使用docker構(gòu)建golang線上部署環(huán)境的步驟,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2017-11-11
golang程序使用alpine編譯出最小arm鏡像實(shí)現(xiàn)
這篇文章主要為大家介紹了golang程序使用alpine編譯出最小arm鏡像,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12
Go語言實(shí)現(xiàn)的可讀性更高的并發(fā)神庫詳解
這篇文章主要為大家介紹了Go語言實(shí)現(xiàn)的可讀性更高的并發(fā)神庫詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-01-01
golang構(gòu)建工具M(jìn)akefile使用詳解
這篇文章主要為大家介紹了golang構(gòu)建工具M(jìn)akefile的使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-07-07
深入淺出Go:掌握基礎(chǔ)知識(shí)的關(guān)鍵要點(diǎn)
Go是一種開源的編程語言,由Google開發(fā),它具有簡潔、高效、并發(fā)性強(qiáng)的特點(diǎn),適用于構(gòu)建可靠的、高性能的軟件系統(tǒng),本文將介紹Go的基礎(chǔ)知識(shí),需要的朋友可以參考下2023-10-10
MacOS中 VSCode 安裝 GO 插件失敗問題的快速解決方法
這篇文章主要介紹了MacOS中 VSCode 安裝 GO 插件失敗問題的快速解決方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-05-05

