Golang中channel的原理解讀(推薦)
數(shù)據結構
channel的數(shù)據結構在$GOROOT/src/runtime/chan.go
文件下:
type hchan struct { qcount uint // 當前隊列中剩余元素個數(shù) dataqsiz uint // 環(huán)形隊列長度,即可以存放的元素個數(shù) buf unsafe.Pointer // 環(huán)形隊列指針 elemsize uint16 // 每個元素的大小 closed uint32 // 標記是否關閉 elemtype *_type // 元素類型 sendx uint // 隊列下標,指向元素寫入時存放到隊列中的位置 recvx uint // 隊列下標,指向元素從隊列中讀出的位置 recvq waitq // 等待讀消息的groutine隊列 sendq waitq // 等待寫消息的groutine隊列 lock mutex // 互斥鎖 }
chan內部實現(xiàn)了一個環(huán)形隊列作為緩沖區(qū),隊列的長度在創(chuàng)建chan時指定:
等待隊列(recvq/sendq)使用雙向鏈表 runtime.waitq 表示,鏈表中所有的元素都是 runtime.sudog
結構:
type waitq struct { first *sudog last *sudog } type sudog struct { g *g next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack) acquiretime int64 releasetime int64 ticket uint32 isSelect bool parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel }
創(chuàng)建channel
通常使用make(channel string, 0)
的方式創(chuàng)建無緩存的channel,使用make(channel string, 10)
創(chuàng)建有緩存的channel。
源碼:
func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } var c *hchan switch { case mem == 0: // 如果當前 Channel 中不存在緩沖區(qū),那么就只會為 runtime.hchan 分配一段內存空間; c = (*hchan)(mallocgc(hchanSize, nil, true)) c.buf = c.raceaddr() case elem.ptrdata == 0: // 如果當前 Channel 中存儲的類型不是指針類型,會為當前的 Channel 和底層的數(shù)組分配一塊連續(xù)的內存空間; c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: //單獨為 runtime.hchan 和緩沖區(qū)分配內存; c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) lockInit(&c.lock, lockRankHchan) // 在函數(shù)的最后會統(tǒng)一更新elemsize、elemtype 和 dataqsiz 幾個字段; if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") } return c }
channel讀寫
寫
- 當有新數(shù)據來時,首先判斷recvq中是否有groutine存在,如果recvq不為空,則說明緩沖區(qū)為空,或者沒有緩沖區(qū),因為如果緩沖區(qū)有數(shù)據會被recvq里面的groutine消費。此時從recvq中拿出一個groutine并綁定數(shù)據,喚醒該groutine執(zhí)行任務,這個過程跳過了將數(shù)據寫入緩沖區(qū)的過程。
- 如果緩沖區(qū)有數(shù)據并有空余位置,將數(shù)據放入緩沖區(qū)。
- 如果緩沖區(qū)有數(shù)據但沒有空余位置,當前groutine綁定數(shù)據并放入sendx,進入睡眠,等待被喚醒。
源碼:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ..... lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } // 如果Channel 沒有被關閉并且已經有處于讀等待的 Goroutine, // 那么從接收隊列 recvq 中取出最先陷入等待的 Goroutine 并直接向它發(fā)送數(shù)據 if sg := c.recvq.dequeue(); sg != nil { send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // 如果recvq為空且緩沖區(qū)中還有剩余空間 if c.qcount < c.dataqsiz { // 計算出下一個可以存儲數(shù)據的位置, qp := chanbuf(c, c.sendx) // raceenabled: 是否啟用數(shù)據競爭檢測,在編譯時指定,默認為false if raceenabled { // 發(fā)出數(shù)據競爭警告 raceacquire(qp) racerelease(qp) } // 將發(fā)送的數(shù)據拷貝到緩沖區(qū)中,產生內存拷貝 typedmemmove(c.elemtype, qp, ep) // 增加 sendx 索引 c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } // 增加計數(shù)器 c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } // 將channel數(shù)據綁定到當前groutine并使groutine休眠 // 獲取發(fā)送數(shù)據使用的 Goroutine gp := getg() // 獲取 runtime.sudog 結構并設置這一次阻塞發(fā)送的相關信息, // 例如發(fā)送的 Channel、是否在 select 中和待發(fā)送數(shù)據的內存地址等 mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // 將剛剛創(chuàng)建并初始化的 mysg 加入發(fā)送等待隊列,并設置到當前 Goroutine的waiting上, // 表示 Goroutine 正在等待該sudog準備就緒 mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) // 休眠groutine gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) // 保證傳入的數(shù)據不被GC KeepAlive(ep) // someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if gp.param == nil { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil releaseSudog(mysg) return true }
讀
- 如果sendx不為空且緩沖區(qū)不為空,從緩沖區(qū)頭部讀出數(shù)據并在當前G執(zhí)行任務,在sendx中拿出一個G,將其數(shù)據寫入緩沖區(qū)尾部并喚醒該G。
- 如果sendx不為空且緩沖區(qū)為空,直接從sendx中拿出一個G,將G中數(shù)據取出并喚醒該G。
- 如果sendx為空且緩沖區(qū)不為空,則從緩沖區(qū)頭部拿出一個數(shù)據。
- 如果sendx為空且緩沖區(qū)為空,將該G放入recvq,進入休眠,等待被喚醒。
源碼:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // block:這次接收是否阻塞 if debugChan { print("chanrecv: chan=", c, "\n") } if c == nil { if !block { return } // 從一個空 Channel 接收數(shù)據時會直接讓出處理器的使用權 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } // Fast path: check for failed non-blocking operation without acquiring the lock. if !block && empty(c) { // 如果channel為空并且未關閉,直接返回 if atomic.Load(&c.closed) == 0 { return } if empty(c) { // The channel is irreversibly closed and empty. if raceenabled { raceacquire(c.raceaddr()) } if ep != nil { // 手動標記清楚對象 typedmemclr(c.elemtype, ep) } return true, false } } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) //如果channel為空,并且已關閉,說明對象不可達 if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { // 手動標記清除 typedmemclr(c.elemtype, ep) } return true, false } // 如果sendq不為空,直接消費,避免sendq --> queue --> recvx的過程 if sg := c.sendq.dequeue(); sg != nil { recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } // 當 Channel 的緩沖區(qū)中已經包含數(shù)據時,從 Channel 中接收數(shù)據會直接從緩沖區(qū)中 // recvx 的索引位置中取出數(shù)據進行處理 if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) } // 如果接收數(shù)據的內存地址不為空,那么會使用 runtime.typedmemmove將緩沖區(qū)中的數(shù)據拷貝到內存中 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 使用 runtime.typedmemclr清除隊列中的數(shù)據并完成收尾工作 typedmemclr(c.elemtype, qp) c.recvx++ // recvx位置歸零 if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- // 計數(shù)減一 unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } // 當 sendq不為空 并且緩沖區(qū)中也不存在任何數(shù)據時,阻塞并休眠當前groutine gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) // someone woke us up if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil gp.activeStackChans = false if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } closed := gp.param == nil gp.param = nil mysg.c = nil releaseSudog(mysg) return true, !closed }
到此這篇關于Golang中channel的原理解讀的文章就介紹到這了,更多相關Golang channel原理內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
文字解說Golang Goroutine和線程的區(qū)別
goroutine 是 Go語言中的輕量級線程實現(xiàn),由 Go 運行時(runtime)管理,使用每一個 go 關鍵字將會額外開啟一個新的協(xié)程 goroutine,今天通過本文給大家介紹下Golang Goroutine和線程的區(qū)別,感興趣的朋友一起看看吧2022-03-03Windows系統(tǒng)中搭建Go語言開發(fā)環(huán)境圖文詳解
GoLand?是?JetBrains?公司推出的商業(yè)?Go?語言集成開發(fā)環(huán)境(IDE),這篇文章主要介紹了Windows系統(tǒng)中搭建Go語言開發(fā)環(huán)境詳解,需要的朋友可以參考下2022-10-10手把手教你如何在Goland中創(chuàng)建和運行項目
歡迎來到本指南!我們將手把手地教您在Goland中如何創(chuàng)建、配置并運行項目,通過簡單的步驟,您將迅速上手這款強大的集成開發(fā)環(huán)境(IDE),輕松實現(xiàn)您的編程夢想,讓我們一起開啟這段精彩的旅程吧!2024-02-02