Golang中channel的原理解讀(推薦)
數(shù)據(jù)結(jié)構(gòu)
channel的數(shù)據(jù)結(jié)構(gòu)在$GOROOT/src/runtime/chan.go文件下:
type hchan struct {
qcount uint // 當(dāng)前隊(duì)列中剩余元素個(gè)數(shù)
dataqsiz uint // 環(huán)形隊(duì)列長(zhǎng)度,即可以存放的元素個(gè)數(shù)
buf unsafe.Pointer // 環(huán)形隊(duì)列指針
elemsize uint16 // 每個(gè)元素的大小
closed uint32 // 標(biāo)記是否關(guān)閉
elemtype *_type // 元素類(lèi)型
sendx uint // 隊(duì)列下標(biāo),指向元素寫(xiě)入時(shí)存放到隊(duì)列中的位置
recvx uint // 隊(duì)列下標(biāo),指向元素從隊(duì)列中讀出的位置
recvq waitq // 等待讀消息的groutine隊(duì)列
sendq waitq // 等待寫(xiě)消息的groutine隊(duì)列
lock mutex // 互斥鎖
}
chan內(nèi)部實(shí)現(xiàn)了一個(gè)環(huán)形隊(duì)列作為緩沖區(qū),隊(duì)列的長(zhǎng)度在創(chuàng)建chan時(shí)指定:

等待隊(duì)列(recvq/sendq)使用雙向鏈表 runtime.waitq 表示,鏈表中所有的元素都是 runtime.sudog結(jié)構(gòu):
type waitq struct {
first *sudog
last *sudog
}
type sudog struct {
g *g
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
acquiretime int64
releasetime int64
ticket uint32
isSelect bool
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
創(chuàng)建channel
通常使用make(channel string, 0)的方式創(chuàng)建無(wú)緩存的channel,使用make(channel string, 10)創(chuàng)建有緩存的channel。
源碼:
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
// 如果當(dāng)前 Channel 中不存在緩沖區(qū),那么就只會(huì)為 runtime.hchan 分配一段內(nèi)存空間;
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 如果當(dāng)前 Channel 中存儲(chǔ)的類(lèi)型不是指針類(lèi)型,會(huì)為當(dāng)前的 Channel 和底層的數(shù)組分配一塊連續(xù)的內(nèi)存空間;
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
//單獨(dú)為 runtime.hchan 和緩沖區(qū)分配內(nèi)存;
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
// 在函數(shù)的最后會(huì)統(tǒng)一更新elemsize、elemtype 和 dataqsiz 幾個(gè)字段;
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
channel讀寫(xiě)
寫(xiě)
- 當(dāng)有新數(shù)據(jù)來(lái)時(shí),首先判斷recvq中是否有g(shù)routine存在,如果recvq不為空,則說(shuō)明緩沖區(qū)為空,或者沒(méi)有緩沖區(qū),因?yàn)槿绻彌_區(qū)有數(shù)據(jù)會(huì)被recvq里面的groutine消費(fèi)。此時(shí)從recvq中拿出一個(gè)groutine并綁定數(shù)據(jù),喚醒該groutine執(zhí)行任務(wù),這個(gè)過(guò)程跳過(guò)了將數(shù)據(jù)寫(xiě)入緩沖區(qū)的過(guò)程。
- 如果緩沖區(qū)有數(shù)據(jù)并有空余位置,將數(shù)據(jù)放入緩沖區(qū)。
- 如果緩沖區(qū)有數(shù)據(jù)但沒(méi)有空余位置,當(dāng)前groutine綁定數(shù)據(jù)并放入sendx,進(jìn)入睡眠,等待被喚醒。

源碼:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
.....
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 如果Channel 沒(méi)有被關(guān)閉并且已經(jīng)有處于讀等待的 Goroutine,
// 那么從接收隊(duì)列 recvq 中取出最先陷入等待的 Goroutine 并直接向它發(fā)送數(shù)據(jù)
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 如果recvq為空且緩沖區(qū)中還有剩余空間
if c.qcount < c.dataqsiz {
// 計(jì)算出下一個(gè)可以存儲(chǔ)數(shù)據(jù)的位置,
qp := chanbuf(c, c.sendx)
// raceenabled: 是否啟用數(shù)據(jù)競(jìng)爭(zhēng)檢測(cè),在編譯時(shí)指定,默認(rèn)為false
if raceenabled {
// 發(fā)出數(shù)據(jù)競(jìng)爭(zhēng)警告
raceacquire(qp)
racerelease(qp)
}
// 將發(fā)送的數(shù)據(jù)拷貝到緩沖區(qū)中,產(chǎn)生內(nèi)存拷貝
typedmemmove(c.elemtype, qp, ep)
// 增加 sendx 索引
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 增加計(jì)數(shù)器
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// 將channel數(shù)據(jù)綁定到當(dāng)前groutine并使groutine休眠
// 獲取發(fā)送數(shù)據(jù)使用的 Goroutine
gp := getg()
// 獲取 runtime.sudog 結(jié)構(gòu)并設(shè)置這一次阻塞發(fā)送的相關(guān)信息,
// 例如發(fā)送的 Channel、是否在 select 中和待發(fā)送數(shù)據(jù)的內(nèi)存地址等
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 將剛剛創(chuàng)建并初始化的 mysg 加入發(fā)送等待隊(duì)列,并設(shè)置到當(dāng)前 Goroutine的waiting上,
// 表示 Goroutine 正在等待該sudog準(zhǔn)備就緒
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// 休眠groutine
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 保證傳入的數(shù)據(jù)不被GC
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
讀
- 如果sendx不為空且緩沖區(qū)不為空,從緩沖區(qū)頭部讀出數(shù)據(jù)并在當(dāng)前G執(zhí)行任務(wù),在sendx中拿出一個(gè)G,將其數(shù)據(jù)寫(xiě)入緩沖區(qū)尾部并喚醒該G。
- 如果sendx不為空且緩沖區(qū)為空,直接從sendx中拿出一個(gè)G,將G中數(shù)據(jù)取出并喚醒該G。
- 如果sendx為空且緩沖區(qū)不為空,則從緩沖區(qū)頭部拿出一個(gè)數(shù)據(jù)。
- 如果sendx為空且緩沖區(qū)為空,將該G放入recvq,進(jìn)入休眠,等待被喚醒。

源碼:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// block:這次接收是否阻塞
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil {
if !block {
return
}
// 從一個(gè)空 Channel 接收數(shù)據(jù)時(shí)會(huì)直接讓出處理器的使用權(quán)
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
if !block && empty(c) {
// 如果channel為空并且未關(guān)閉,直接返回
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
// 手動(dòng)標(biāo)記清楚對(duì)象
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
//如果channel為空,并且已關(guān)閉,說(shuō)明對(duì)象不可達(dá)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
// 手動(dòng)標(biāo)記清除
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 如果sendq不為空,直接消費(fèi),避免sendq --> queue --> recvx的過(guò)程
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 當(dāng) Channel 的緩沖區(qū)中已經(jīng)包含數(shù)據(jù)時(shí),從 Channel 中接收數(shù)據(jù)會(huì)直接從緩沖區(qū)中
// recvx 的索引位置中取出數(shù)據(jù)進(jìn)行處理
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// 如果接收數(shù)據(jù)的內(nèi)存地址不為空,那么會(huì)使用 runtime.typedmemmove將緩沖區(qū)中的數(shù)據(jù)拷貝到內(nèi)存中
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 使用 runtime.typedmemclr清除隊(duì)列中的數(shù)據(jù)并完成收尾工作
typedmemclr(c.elemtype, qp)
c.recvx++
// recvx位置歸零
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount-- // 計(jì)數(shù)減一
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// 當(dāng) sendq不為空 并且緩沖區(qū)中也不存在任何數(shù)據(jù)時(shí),阻塞并休眠當(dāng)前groutine
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
到此這篇關(guān)于Golang中channel的原理解讀的文章就介紹到這了,更多相關(guān)Golang channel原理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
文字解說(shuō)Golang Goroutine和線程的區(qū)別
goroutine 是 Go語(yǔ)言中的輕量級(jí)線程實(shí)現(xiàn),由 Go 運(yùn)行時(shí)(runtime)管理,使用每一個(gè) go 關(guān)鍵字將會(huì)額外開(kāi)啟一個(gè)新的協(xié)程 goroutine,今天通過(guò)本文給大家介紹下Golang Goroutine和線程的區(qū)別,感興趣的朋友一起看看吧2022-03-03
Golang?中判斷兩個(gè)結(jié)構(gòu)體相等的方法
這篇文章主要介紹了Golang?中如何判斷兩個(gè)結(jié)構(gòu)體相等,本文通過(guò)示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-08-08
Go語(yǔ)言Goroutines?泄漏場(chǎng)景與防治解決分析
這篇文章主要為大家介紹了Go語(yǔ)言Goroutines?泄漏場(chǎng)景與防治解決分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-12-12
Windows系統(tǒng)中搭建Go語(yǔ)言開(kāi)發(fā)環(huán)境圖文詳解
GoLand?是?JetBrains?公司推出的商業(yè)?Go?語(yǔ)言集成開(kāi)發(fā)環(huán)境(IDE),這篇文章主要介紹了Windows系統(tǒng)中搭建Go語(yǔ)言開(kāi)發(fā)環(huán)境詳解,需要的朋友可以參考下2022-10-10
一鍵定位Golang線上服務(wù)內(nèi)存泄露的秘籍
內(nèi)存泄露?別讓它拖垮你的Golang線上服務(wù)!快速掌握Go語(yǔ)言服務(wù)內(nèi)存泄漏排查秘籍,從此問(wèn)題無(wú)處遁形,一文讀懂如何精準(zhǔn)定位與有效解決Golang應(yīng)用中的內(nèi)存問(wèn)題,立即閱讀,讓性能飛升!2024-01-01
手把手教你如何在Goland中創(chuàng)建和運(yùn)行項(xiàng)目
歡迎來(lái)到本指南!我們將手把手地教您在Goland中如何創(chuàng)建、配置并運(yùn)行項(xiàng)目,通過(guò)簡(jiǎn)單的步驟,您將迅速上手這款強(qiáng)大的集成開(kāi)發(fā)環(huán)境(IDE),輕松實(shí)現(xiàn)您的編程夢(mèng)想,讓我們一起開(kāi)啟這段精彩的旅程吧!2024-02-02
在Go語(yǔ)言程序中使用gojson來(lái)解析JSON格式文件
這篇文章主要介紹了在Go語(yǔ)言程序中使用gojson來(lái)解析JSON格式文件的方法,Go是由Google開(kāi)發(fā)的高人氣新興編程語(yǔ)言,需要的朋友可以參考下2015-10-10

