詳解Go channel管道的運行原理
前言
Go推薦通過通信來共享內(nèi)存,而channel就實現(xiàn)了這一理念。那channel是怎么運行的呢?
功能
舉個例子看下channel的使用效果:
package main
import (
"fmt"
"math/rand"
"time"
)
func write(c chan int) {
for {
num := rand.Intn(100)
c <- num
}
}
func read(c chan int) {
for {
num := <-c
fmt.Println("讀取管道的隨機(jī)數(shù):", num)
time.Sleep(time.Second)
}
}
func main() {
var c = make(chan int, 8)
go read(c)
for i := 0; i < 5; i++ {
go write(c)
}
time.Sleep(time.Minute)
}以上代碼新建了一個緩沖區(qū)為8的管道,然后開啟read和五個write讀寫協(xié)程。寫協(xié)程寫入一個隨機(jī)數(shù),讀協(xié)程每隔一秒讀取并打印,效果如下:

說明協(xié)程間可以通過管道來互相通信。接著了解下channel的結(jié)構(gòu)。
channel結(jié)構(gòu)
channel結(jié)構(gòu)體位于GOROOT/src/runtime/chan.go下的hchan,源碼如下:
type hchan struct {
qcount uint // 隊列中元素總數(shù)
dataqsiz uint // 環(huán)型隊列大小
buf unsafe.Pointer // 指向dataqsize的數(shù)組(即緩沖區(qū))
elemsize uint16
closed uint32
elemtype *_type // 元素類型
sendx uint // 發(fā)送到緩沖區(qū)的位置索引
recvx uint // 接收到緩沖區(qū)的位置索引
recvq waitq // 接收者隊列
sendq waitq // 發(fā)送者隊列
lock mutex // 鎖,用于保護(hù)channel數(shù)據(jù)
}其中發(fā)送者和接收者隊列是一個waitq類型,具體如下:
type waitq struct {
first *sudog
last *sudog
}waitq里有隊頭first,隊尾last的指針,指向sudog結(jié)構(gòu)體。
也就是說,waitq是一個列表隊列,隊列里每個元素都是一個sudog結(jié)構(gòu)體,sudog中包裝著一個協(xié)程。
解析一個hchan各部分結(jié)構(gòu):
- 頭部
type hchan struct {
qcount uint // 隊列中元素總數(shù)
dataqsiz uint // 環(huán)型隊列大小
buf unsafe.Pointer // 指向dataqsize的數(shù)組(即緩沖區(qū))
elemsize uint16
closed uint32
elemtype *_type // 元素類型
...
}這部分表示一個環(huán)型緩沖區(qū)。圖解如下:

- 尾部
type hchan struct {
...
sendx uint // 發(fā)送到緩沖區(qū)的位置索引
recvx uint // 接收到緩沖區(qū)的位置索引
recvq waitq // 接收者隊列
sendq waitq // 發(fā)送者隊列
...
}這部分把協(xié)程分為兩個身份,使用chan <- 語法的協(xié)程為發(fā)送者,使用<- chan 語法的協(xié)程為接收者,并放到各自隊列中。圖解如下:

結(jié)合示例代碼。運行結(jié)構(gòu)如下:

由于寫協(xié)程一直寫,讀協(xié)程每隔一秒才讀一次,因此很快將緩沖區(qū)寫滿了,這時:
- 寫協(xié)程被裝入sudog進(jìn)行休眠等待
- 讀協(xié)程每隔一秒從緩沖區(qū)讀取數(shù)據(jù)
運行原理
使用chan <- 為發(fā)送者,對發(fā)送者來說:
- 先查看是否有接收者,有則優(yōu)先喚醒并拷貝數(shù)據(jù)給接收者,然后結(jié)束
- 無接收者再查看緩沖區(qū),數(shù)據(jù)未滿則將數(shù)據(jù)放入緩沖區(qū),然后結(jié)束
- 緩沖區(qū)也滿了,則封裝成sudog,休眠等待
使用<- chan 為接收者,對接收者來說:
- 優(yōu)先接收緩沖區(qū)的值
- 再接收發(fā)送者的值
- 否則休眠等待
思考下:
有休眠的接收者,且緩沖區(qū)數(shù)據(jù)已滿的情況是否存在?為什么?
有休眠的發(fā)送者,且緩沖區(qū)為空的情況是否存在?為什么?
以上答案:
有休眠的接收者,緩沖區(qū)不會出現(xiàn)數(shù)據(jù)已滿情況。因為接收者要休眠,得緩沖區(qū)沒數(shù)據(jù)才行。
有休眠的發(fā)送者,緩沖區(qū)不會出現(xiàn)為空情況。因為發(fā)送者要休眠,得緩沖區(qū)數(shù)據(jù)已滿才行。
源碼分析
使用chan <-后,會調(diào)用GOROOT\src\runtime\chan.go下的chansend1方法
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}然后調(diào)用chansend方法
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 嘗試接收者隊列出隊,若有接收者,則直接拷貝數(shù)據(jù)給接收者
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 判斷緩沖區(qū)是否還有空余
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx) // 有的話獲得緩沖區(qū)要存放數(shù)據(jù)的地址
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep) // 將數(shù)據(jù)拷貝到緩沖區(qū)擴(kuò)容地址qp上
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
// 否則封裝成sodug休眠自己,加入發(fā)送者等待隊列
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
// 主動掛起
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
...
// 被喚醒后釋放sudog
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg) // 釋放sudog
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}使用<- chan后,會調(diào)用GOROOT\src\runtime\chan.go下的chanrecv1方法
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}然后調(diào)用chanrecv方法
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
lock(&c.lock)
if c.closed != 0 {
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// The channel has been closed, but the channel's buffer have data.
} else {
// 如果有發(fā)送者在休眠,則調(diào)用recv
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
// 無發(fā)送者,但緩沖區(qū)有數(shù)據(jù)
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// 休眠自己
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg) // 封裝成sudog入隊
gp.parkingOnChan.Store(true)
// 主動掛起
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// 被喚醒后釋放sudog
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}當(dāng)有發(fā)送者,會調(diào)用recv
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// 獲取緩沖區(qū)數(shù)據(jù)的位置
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
// copy data from queue to receiver
if ep != nil {
// 將緩沖區(qū)數(shù)據(jù)拷貝到
typedmemmove(c.elemtype, ep, qp)
}
// 將發(fā)送者的數(shù)據(jù)拷貝到緩沖區(qū)
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1) // 喚醒發(fā)送者協(xié)程
}因此,接收者還是先接收緩沖區(qū)數(shù)據(jù),再接收發(fā)送者的數(shù)據(jù)。其實就是按隊列的先進(jìn)先出順序。
總結(jié)
留下兩個問題:
發(fā)送者分別遇到無有休眠接收協(xié)程,有休眠接收協(xié)程,無接收協(xié)程且緩沖區(qū)沒滿,緩沖區(qū)滿了四種情況該如何處理?
接收者分別遇到無休眠發(fā)送協(xié)程且緩沖區(qū)為空,無發(fā)送協(xié)程且緩沖區(qū)有數(shù)據(jù),有休眠發(fā)送協(xié)程且緩沖區(qū)已滿,緩沖區(qū)滿了四種情況該如何處理?
以上就是詳解Go channel管道的運行原理的詳細(xì)內(nèi)容,更多關(guān)于Go channel的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go 自定義package包設(shè)置與導(dǎo)入操作
這篇文章主要介紹了Go 自定義package包設(shè)置與導(dǎo)入操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-05-05
Golang哈希算法實現(xiàn)配置文件的監(jiān)控功能詳解
這篇文章主要介紹了Golang哈希算法實現(xiàn)配置文件的監(jiān)控功能,哈希和加密類似,唯一區(qū)別是哈希是單項的,即哈希后的數(shù)據(jù)無法解密,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧2023-03-03

