Go channel發(fā)送方和接收方如何相互阻塞等待源碼解讀
并發(fā)編程的可見性
在 Go 官網(wǎng)上的內(nèi)存模型一文中,介紹了在 Go 并發(fā)編程下數(shù)據(jù)可見性問題,可見性是并發(fā)編程中一個重要概念,指的是在哪些條件下,可以保證一個線程中讀取某個變量時,可以觀察到另一個線程對該變量的寫入后的值,Go 語言中的 goroutine 也適用。
一般來說可見性屬于偏硬件和底層,因為涉及到多核 CPU 的 cache 讀寫和同步問題,開發(fā)者不需要關(guān)心細節(jié),高級編程語言要么屏蔽掉了這些細節(jié),要么會給出一些保證,承諾在確定的條件下就會得到確定的結(jié)果。
Go channel 有一個特性是在一個無緩沖的 channel 上發(fā)送和接收必須等待對方準備好,才可以執(zhí)行,否則會被阻塞。實際上這就是一個同步保證,那么這個同步保證是如何實現(xiàn)的?下面看看官方文章中是如何解釋的。
先 send 后 receive
文中對 channel 的描述有幾個原則,第一個是
A send on a channel is synchronized before the completion of the corresponding receive from that channel.
意思是:在一個 channel 上的發(fā)送操作應(yīng)該發(fā)生在對應(yīng)的接收操作完成之前。說人話就是:要先發(fā)送數(shù)據(jù),然后才能接收數(shù)據(jù),否則就會阻塞。這也比較符合一般的認知。
并用下面一段代碼舉例說明,這段代碼確保一定會輸出 "hello, world”。
var c = make(chan int, 10) var a string func f() { a = "hello, world" c <- 0 } func main() { go f() <-c print(a) }
f
函數(shù)負責給變量 a
賦值,main
函數(shù)負責打印變量 a
。main
函數(shù)阻塞等待在 <- c
處,直到 f
函數(shù)對 a
賦值之后并寫入數(shù)據(jù)到 c
中,main
函數(shù)才被喚醒繼續(xù)執(zhí)行,所以此時打印 a
必然會得到結(jié)果。
先 receive 后 send?
而下面這段描述有點反直覺
A receive from an unbuffered channel is synchronized before the completion of the corresponding send on that channel.
意思是在無緩沖 channel 上的接收操作發(fā)生在對應(yīng)的發(fā)送操作完成之前,說人話就是:要先接收數(shù)據(jù),之后才可以發(fā)送數(shù)據(jù),否則就會阻塞。這句話看上去與第一條相悖,因為第一條強調(diào)發(fā)送操作要在接收完成之前發(fā)生,而這一條強調(diào)接收操作要在發(fā)送完成之前發(fā)生,這樣相互等待對方的情況,不會陷入死鎖狀態(tài)嗎?
下面的示例代碼與前一個類似,區(qū)別是將 c 換成了無緩沖 channel,并把 c 的寫入和讀取調(diào)換了位置,這段代碼同樣可以保證輸出 "hello, world”。
var c = make(chan int) var a string func f() { a = "hello, world" <-c } func main() { go f() c <- 0 print(a) }
這兩段話到底是什么意思?為什么要相互等待但又不會死鎖?
接下來看看 runtime/chan.go 中是怎么實現(xiàn) channel 的發(fā)送和接收的。
channel 的結(jié)構(gòu)
首先看看 channel 的數(shù)據(jù)結(jié)構(gòu)
type hchan struct { qcount uint // 緩沖區(qū)元素數(shù)量 dataqsiz uint // 緩沖區(qū)大小 buf unsafe.Pointer // 緩沖區(qū)起始指針 elemsize uint16 closed uint32 elemtype *_type sendx uint // 下一次發(fā)送的元素在隊列中的索引 recvx uint // 下一個接收的元素在隊列中的索引 recvq waitq // 當隊列無數(shù)據(jù)時,receiver 阻塞等待的隊列 sendq waitq // 當隊列無空間時,sender 阻塞等待的隊列 lock mutex }
channel 內(nèi)部實現(xiàn)了一個環(huán)形隊列,通過 qcount
dataqsiz
buf
sendx
recvx
幾個部分組成。
另外 channel 還維護了兩個等待隊列,如果在執(zhí)行 <-c
receive 操作時,此時 channel 不滿足接收條件,receiver 會進入 recvq 等待隊列;同樣的如果執(zhí)行 c<-
send 操作時,此時 channel 不滿足發(fā)送條件,sender 會進入 sendq 等待隊列。
具體看代碼:
var c = make(chan int) var a string func f() { a = "hello, world" x := <-c // 3 fmt.Println("\nx:", x) } func main() { go f() // 1 c <- 123456 // 2 print(a) }
send 具體干了什么
當 main
函數(shù)執(zhí)行到 c<-123456
是,會執(zhí)行 runtime/chan.go 中的 chansend
函數(shù),該函數(shù)首先會判斷當前 channel c 的等待接收隊列是否有阻塞的 receiver
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // ...省略部分代碼... // 是否有等待的 receiver 存在 if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // ...省略部分代碼... }
如果有等待的 receiver 則彈出隊列,調(diào)用 send
函數(shù),其中 sg
就表示 receiver
,sg.elem
表示將數(shù)據(jù)接收到哪里去,這個地址也就對應(yīng)示例代碼中的變量 x 的地址。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // ...省略部分代碼... if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } // ...省略部分代碼... // 將 goroutine 置為可執(zhí)行狀態(tài) }
sendDirect
函數(shù)就是直接從 src
里面將數(shù)據(jù)復制到 dst
中。
// 直接拷貝數(shù)據(jù) func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { dst := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) memmove(dst, src, t.size) }
回到 chansend
函數(shù),如果沒有等待的 receiver
,那么會查看當前 buf 中是否有空間,如果有空間,則數(shù)據(jù)緩存到 buf 中。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // ...省略部分代碼... // 將數(shù)據(jù)緩存到 buf 中 if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { racenotify(c, c.sendx, nil) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } // ...省略部分代碼... }
如果也沒有 buf 空間,那么就將 sender 本身放入到 sendq 等待隊列中。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // ...省略部分代碼... // 進入 sendq 等待隊列 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) // ...省略部分代碼... }
總結(jié)起來 send 操作分三部分:
- 如果當前 channel 上有等待的 receiver,則直接 copy 數(shù)據(jù)過去
- 否則如果當前 buf 有空閑空間,則將數(shù)據(jù)存在 buf 中
- 否則將 sender 本身加入到 sendq 等待隊列中
receive 具體干了什么
相應(yīng)的與發(fā)送類似,執(zhí)行到示例代碼中第 (3) 步接收數(shù)據(jù)時,會調(diào)用 runtime/chan.go 中的 chanrecv
函數(shù)來處理接收,同樣是先看 sender 等待隊列是否有阻塞的 sender
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // ...省略部分代碼... // 從等待的 sender 取一個出來 if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } // ...省略部分代碼... }
如果有的等待的 sender,那么將 sender 取出來,并復制數(shù)據(jù)。
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // ...省略部分代碼... if ep != nil { // copy data from sender recvDirect(c.elemtype, sg, ep) } // ...省略部分代碼... } func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) { src := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) memmove(dst, src, t.size) }
如果沒有等待的 sender,那么看 buf 中有沒有緩存的數(shù)據(jù)
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // ...省略部分代碼... if c.qcount > 0 { qp := chanbuf(c, c.recvx) if raceenabled { racenotify(c, c.recvx, nil) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } // ...省略部分代碼... }
最后如果也沒有 buf 數(shù)據(jù),那么久把自己加入到 receiver 等待隊列中 recvq
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // ...省略部分代碼... gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) // ...省略部分代碼... }
總結(jié)起來 receive 操作分三部分:
- 如果當前 channel 上有等待的 sender,則直接 copy 數(shù)據(jù)過去
- 否則如果當前 buf 有緩存的數(shù)據(jù),則將讀取該數(shù)據(jù)
- 否則將 receiver 本身加入到 recvq 等待隊列中
小結(jié)
這樣一來就能夠理解前面的兩個原則了,在一個無緩沖的 channel 中,無論是 sender 先執(zhí)行,還是 receiver 先執(zhí)行,都會因為找不到對方,并且沒有 buf 空間的情況下,將自己加入到等待隊列;當對方開始執(zhí)行時就會檢查到已經(jīng)有對端正在阻塞,進而拷貝數(shù)據(jù),并喚醒阻塞的對象最終走完整個流程。
有一種說法是:sender 必須在 receiver 準備好才能執(zhí)行,否則就會阻塞;而 receiver 必須在 sender 準備好才能執(zhí)行,否則就會阻塞;這個說法沒錯,但是太籠統(tǒng)了,什么叫準備好?怎么算是準備好?這是比較模糊的。而看過 send 和 receive 的流程之后,就更能理解整個過程了。
為什么要有無緩沖 channel
實際上兩個 goroutine 相互等待對方到達某個狀態(tài)的效果,非常類似操作系統(tǒng)中的一種同步機制:屏障 barrier,同步屏障要求只有當所有進程都到達屏障后,才能一起執(zhí)行下一狀態(tài),否則就阻塞在屏障處。
回到 channel 操作,即 sender 和 receiver 無論誰先執(zhí)行,都必須等待對方也已經(jīng)執(zhí)行,兩者才可以繼續(xù)執(zhí)行。就像一塊電路板串聯(lián)有兩個開關(guān),要想電路聯(lián)通,必須兩個開關(guān)都被打開才可以,而不管哪一個先打開,都必須等待另一個開關(guān)也打開,之后電流才可以接通電路也才聯(lián)通。
可以將無緩沖 channel 看做是一種同步屏障,同步屏障能夠讓多個 goroutine 都達到某種狀態(tài)之后才可以繼續(xù)執(zhí)行,這是帶緩沖 channel 無法做到的。另外在無緩沖 channel 數(shù)據(jù)的交換更加簡單快速,因為不需要維護緩存 buf,實現(xiàn)邏輯也更簡單,運行更可靠。
以上就是Go channel發(fā)送方和接收方如何相互阻塞等待源碼解讀的詳細內(nèi)容,更多關(guān)于Go channel相互阻塞等待的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
go語言中的udp協(xié)議及TCP通訊實現(xiàn)示例
這篇文章主要為大家介紹了go語言中的udp協(xié)議及TCP通訊的實現(xiàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步早日升職加薪2022-04-04golang接口實現(xiàn)調(diào)用修改(值接收者指針接收者)場景詳解
這篇文章主要為大家介紹了golang接口實現(xiàn)調(diào)用修改值接收者指針接收者示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-08-08Go語言resty http包調(diào)用jenkins api實例
這篇文章主要為大家介紹了Go語言resty http包調(diào)用jenkins api實例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-06-06Go語言并發(fā)編程之控制并發(fā)數(shù)量實現(xiàn)實例
這篇文章主要為大家介紹了Go語言并發(fā)編程之控制并發(fā)數(shù)量實例探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2024-01-01