詳解Go channel管道的運(yùn)行原理
前言
Go推薦通過(guò)通信來(lái)共享內(nèi)存,而channel就實(shí)現(xiàn)了這一理念。那channel是怎么運(yùn)行的呢?
功能
舉個(gè)例子看下channel的使用效果:
package main import ( "fmt" "math/rand" "time" ) func write(c chan int) { for { num := rand.Intn(100) c <- num } } func read(c chan int) { for { num := <-c fmt.Println("讀取管道的隨機(jī)數(shù):", num) time.Sleep(time.Second) } } func main() { var c = make(chan int, 8) go read(c) for i := 0; i < 5; i++ { go write(c) } time.Sleep(time.Minute) }
以上代碼新建了一個(gè)緩沖區(qū)為8的管道,然后開啟read和五個(gè)write讀寫協(xié)程。寫協(xié)程寫入一個(gè)隨機(jī)數(shù),讀協(xié)程每隔一秒讀取并打印,效果如下:
說(shuō)明協(xié)程間可以通過(guò)管道來(lái)互相通信。接著了解下channel的結(jié)構(gòu)。
channel結(jié)構(gòu)
channel結(jié)構(gòu)體位于GOROOT/src/runtime/chan.go
下的hchan,源碼如下:
type hchan struct { qcount uint // 隊(duì)列中元素總數(shù) dataqsiz uint // 環(huán)型隊(duì)列大小 buf unsafe.Pointer // 指向dataqsize的數(shù)組(即緩沖區(qū)) elemsize uint16 closed uint32 elemtype *_type // 元素類型 sendx uint // 發(fā)送到緩沖區(qū)的位置索引 recvx uint // 接收到緩沖區(qū)的位置索引 recvq waitq // 接收者隊(duì)列 sendq waitq // 發(fā)送者隊(duì)列 lock mutex // 鎖,用于保護(hù)channel數(shù)據(jù) }
其中發(fā)送者和接收者隊(duì)列是一個(gè)waitq類型,具體如下:
type waitq struct { first *sudog last *sudog }
waitq里有隊(duì)頭first
,隊(duì)尾last
的指針,指向sudog
結(jié)構(gòu)體。
也就是說(shuō),waitq是一個(gè)列表隊(duì)列,隊(duì)列里每個(gè)元素都是一個(gè)sudog結(jié)構(gòu)體,sudog中包裝著一個(gè)協(xié)程。
解析一個(gè)hchan
各部分結(jié)構(gòu):
- 頭部
type hchan struct { qcount uint // 隊(duì)列中元素總數(shù) dataqsiz uint // 環(huán)型隊(duì)列大小 buf unsafe.Pointer // 指向dataqsize的數(shù)組(即緩沖區(qū)) elemsize uint16 closed uint32 elemtype *_type // 元素類型 ... }
這部分表示一個(gè)環(huán)型緩沖區(qū)。圖解如下:
- 尾部
type hchan struct { ... sendx uint // 發(fā)送到緩沖區(qū)的位置索引 recvx uint // 接收到緩沖區(qū)的位置索引 recvq waitq // 接收者隊(duì)列 sendq waitq // 發(fā)送者隊(duì)列 ... }
這部分把協(xié)程分為兩個(gè)身份,使用chan <-
語(yǔ)法的協(xié)程為發(fā)送者,使用<- chan
語(yǔ)法的協(xié)程為接收者,并放到各自隊(duì)列中。圖解如下:
結(jié)合示例代碼。運(yùn)行結(jié)構(gòu)如下:
由于寫協(xié)程一直寫,讀協(xié)程每隔一秒才讀一次,因此很快將緩沖區(qū)寫滿了,這時(shí):
- 寫協(xié)程被裝入sudog進(jìn)行休眠等待
- 讀協(xié)程每隔一秒從緩沖區(qū)讀取數(shù)據(jù)
運(yùn)行原理
使用chan <-
為發(fā)送者,對(duì)發(fā)送者來(lái)說(shuō):
- 先查看是否有接收者,有則優(yōu)先喚醒并拷貝數(shù)據(jù)給接收者,然后結(jié)束
- 無(wú)接收者再查看緩沖區(qū),數(shù)據(jù)未滿則將數(shù)據(jù)放入緩沖區(qū),然后結(jié)束
- 緩沖區(qū)也滿了,則封裝成sudog,休眠等待
使用<- chan
為接收者,對(duì)接收者來(lái)說(shuō):
- 優(yōu)先接收緩沖區(qū)的值
- 再接收發(fā)送者的值
- 否則休眠等待
思考下:
有休眠的接收者,且緩沖區(qū)數(shù)據(jù)已滿的情況是否存在?為什么?
有休眠的發(fā)送者,且緩沖區(qū)為空的情況是否存在?為什么?
以上答案:
有休眠的接收者,緩沖區(qū)不會(huì)出現(xiàn)數(shù)據(jù)已滿情況。因?yàn)榻邮照咭菝?,得緩沖區(qū)沒數(shù)據(jù)才行。
有休眠的發(fā)送者,緩沖區(qū)不會(huì)出現(xiàn)為空情況。因?yàn)榘l(fā)送者要休眠,得緩沖區(qū)數(shù)據(jù)已滿才行。
源碼分析
使用chan <-
后,會(huì)調(diào)用GOROOT\src\runtime\chan.go
下的chansend1
方法
func chansend1(c *hchan, elem unsafe.Pointer) { chansend(c, elem, true, getcallerpc()) }
然后調(diào)用chansend
方法
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } // 嘗試接收者隊(duì)列出隊(duì),若有接收者,則直接拷貝數(shù)據(jù)給接收者 if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // 判斷緩沖區(qū)是否還有空余 if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) // 有的話獲得緩沖區(qū)要存放數(shù)據(jù)的地址 if raceenabled { racenotify(c, c.sendx, nil) } typedmemmove(c.elemtype, qp, ep) // 將數(shù)據(jù)拷貝到緩沖區(qū)擴(kuò)容地址qp上 c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } ... // 否則封裝成sodug休眠自己,加入發(fā)送者等待隊(duì)列 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) // Signal to anyone trying to shrink our stack that we're about // to park on a channel. The window between when this G's status // changes and when we set gp.activeStackChans is not safe for // stack shrinking. gp.parkingOnChan.Store(true) // 主動(dòng)掛起 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) ... // 被喚醒后釋放sudog gp.waiting = nil gp.activeStackChans = false closed := !mysg.success gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) // 釋放sudog if closed { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } return true }
使用<- chan
后,會(huì)調(diào)用GOROOT\src\runtime\chan.go
下的chanrecv1
方法
func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) }
然后調(diào)用chanrecv
方法
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... lock(&c.lock) if c.closed != 0 { if c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } // The channel has been closed, but the channel's buffer have data. } else { // 如果有發(fā)送者在休眠,則調(diào)用recv if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } } // 無(wú)發(fā)送者,但緩沖區(qū)有數(shù)據(jù) if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } // 休眠自己 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) // 封裝成sudog入隊(duì) gp.parkingOnChan.Store(true) // 主動(dòng)掛起 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // 被喚醒后釋放sudog if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } success := mysg.success gp.param = nil mysg.c = nil releaseSudog(mysg) return true, success }
當(dāng)有發(fā)送者,會(huì)調(diào)用recv
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { if raceenabled { racesync(c, sg) } if ep != nil { // copy data from sender recvDirect(c.elemtype, sg, ep) } } else { // 獲取緩沖區(qū)數(shù)據(jù)的位置 qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) racenotify(c, c.recvx, sg) } // copy data from queue to receiver if ep != nil { // 將緩沖區(qū)數(shù)據(jù)拷貝到 typedmemmove(c.elemtype, ep, qp) } // 將發(fā)送者的數(shù)據(jù)拷貝到緩沖區(qū) typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) sg.success = true if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) // 喚醒發(fā)送者協(xié)程 }
因此,接收者還是先接收緩沖區(qū)數(shù)據(jù),再接收發(fā)送者的數(shù)據(jù)。其實(shí)就是按隊(duì)列的先進(jìn)先出順序。
總結(jié)
留下兩個(gè)問(wèn)題:
發(fā)送者分別遇到無(wú)有休眠接收協(xié)程,有休眠接收協(xié)程,無(wú)接收協(xié)程且緩沖區(qū)沒滿,緩沖區(qū)滿了四種情況該如何處理?
接收者分別遇到無(wú)休眠發(fā)送協(xié)程且緩沖區(qū)為空,無(wú)發(fā)送協(xié)程且緩沖區(qū)有數(shù)據(jù),有休眠發(fā)送協(xié)程且緩沖區(qū)已滿,緩沖區(qū)滿了四種情況該如何處理?
以上就是詳解Go channel管道的運(yùn)行原理的詳細(xì)內(nèi)容,更多關(guān)于Go channel的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go 自定義package包設(shè)置與導(dǎo)入操作
這篇文章主要介紹了Go 自定義package包設(shè)置與導(dǎo)入操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-05-05go語(yǔ)言題解LeetCode1275找出井字棋的獲勝者示例
這篇文章主要為大家介紹了go語(yǔ)言題解LeetCode1275找出井字棋的獲勝者示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-01-01Golang哈希算法實(shí)現(xiàn)配置文件的監(jiān)控功能詳解
這篇文章主要介紹了Golang哈希算法實(shí)現(xiàn)配置文件的監(jiān)控功能,哈希和加密類似,唯一區(qū)別是哈希是單項(xiàng)的,即哈希后的數(shù)據(jù)無(wú)法解密,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧2023-03-03