GoLang channel底層代碼實現(xiàn)詳解
1.channel 簡介
Go語言有個很出名的話是“以通信的手段來共享內(nèi)存”,channel就是其最佳的體現(xiàn),channel提供一種機(jī)制,可以同步兩個并發(fā)執(zhí)行的函數(shù),還可以讓兩個函數(shù)通過互相傳遞特定類型的值來通信
channel有兩種初始化方式,分別是帶緩存的和不帶緩存的:
make(chan int) // 無緩存 chan make(chan int, 10) // 有緩存 chan
使用方式也很簡單:
c := make(chan int)
defer close(c)
go func(){
c <- 5 // send
}()
n := <- c // recv
十分簡潔的做到了不同協(xié)程的交互。
2.channel 內(nèi)部結(jié)構(gòu)
chan的實現(xiàn)在runtime/chan.go,是一個hchan的結(jié)構(gòu)體:
type hchan struct {
qcount uint // 隊列中的數(shù)據(jù)個數(shù)
dataqsiz uint // 環(huán)形隊列的大小,channel本身是一個環(huán)形隊列
buf unsafe.Pointer // 存放實際數(shù)據(jù)的指針,用unsafe.Pointer存放地址,為了避免gc
elemsize uint16
closed uint32 // 標(biāo)識channel是否關(guān)閉
elemtype *_type // 數(shù)據(jù) 元素類型
sendx uint // send的 index
recvx uint // recv 的 index
recvq waitq // 阻塞在 recv 的隊列
sendq waitq // 阻塞在 send 的隊列
lock mutex // 鎖
}
可以看出,channel本身是一個環(huán)形緩沖區(qū),數(shù)據(jù)存放到堆上面,channel的同步是通過鎖實現(xiàn)的,并不是想象中的lock-free的方式,channel中有兩個隊列,一個是發(fā)送阻塞隊列,一個是接收阻塞隊列。當(dāng)向一個已滿的channel發(fā)送數(shù)據(jù)會被阻塞,此時發(fā)送協(xié)程會被添加到sendq中,同理,當(dāng)向一個空的channel接收數(shù)據(jù)時,接收協(xié)程也會被阻塞,被置入recvq中。
waitq是一個鏈表,里面對g結(jié)構(gòu)做了一下簡單的封裝。
3.創(chuàng)建channel
當(dāng)我們在代碼里面通過make創(chuàng)建一個channel時,實際調(diào)用的是下面這個函數(shù):
CALL runtime.makechan(SB)
makechan的實現(xiàn)如下所示:
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 判斷 元素類型的大小
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 判斷對齊限制
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 判斷 size非負(fù) 和 是否大于 maxAlloc限制
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0: // 無緩沖區(qū),即 make沒設(shè)置大小
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0: // 數(shù)據(jù)類型不包含指針
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default: // 如果包含指針
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}根據(jù)上面的代碼,我們可以看到,創(chuàng)建channel分為三種情況:
1.第一種緩沖區(qū)大小為0,此時只需要分配hchansize大小的內(nèi)存就ok
2.第二種緩沖區(qū)大小不為0,且channel的類型不包含指針,此時buf為hchanSize+元素大小*元素個數(shù)的連續(xù)內(nèi)存
3.第三種緩沖區(qū)大小不為0,且channel的類型包含指針,則不能簡單的根據(jù)元素的大小去申請內(nèi)存,需要通過mallocgc去分配內(nèi)存
4.發(fā)送數(shù)據(jù)
發(fā)送數(shù)據(jù)會調(diào)用chan.go中的如下接口:
CALL runtime.chansend1(SB)
chansend1會調(diào)用chansend接口,chansend方法簽名如下:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
c是具體的channel,ep是發(fā)送的數(shù)據(jù),block為true表示阻塞的發(fā)送,一般向channel發(fā)送數(shù)據(jù)都是阻塞的,如果channel數(shù)據(jù)滿了,會一直阻塞在這里。但是在select中如果有case監(jiān)聽某個channel的發(fā)送,那么此時的block參數(shù)為false,后續(xù)分析select實現(xiàn)會講到。
select {
case <-c: // 這里為非阻塞發(fā)送
// do some thing
default:
// do some thing
}
chansend接口會對一些條件做判斷
如果向一個為nil的channel發(fā)送數(shù)據(jù),如果是阻塞發(fā)送會一直阻塞:
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
首先會加鎖,保證原子性,如果向一個已關(guān)閉的channel發(fā)送數(shù)據(jù)就會panic。
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
如果此時recvq中有等待協(xié)程,就直接調(diào)用send函數(shù)將數(shù)據(jù)復(fù)制給接收方, 實現(xiàn)如下:
// sg 為接收者協(xié)程,ep為發(fā)送元素
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
qp := chanbuf(c, c.recvx)
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
如果此時沒有等待協(xié)程,并且數(shù)據(jù)未滿的情況下,就將數(shù)據(jù)copy到環(huán)形緩沖區(qū)中,將位置后移一位。
if c.qcount < c.dataqsiz { // 如果 未滿
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
如果此時環(huán)形緩沖區(qū)數(shù)據(jù)滿了,如果是阻塞發(fā)送,此時會把發(fā)送方放到sendq隊列中。
5.接收數(shù)據(jù)
接收數(shù)據(jù)會調(diào)用下面的接口:
CALL runtime.chanrecv1(SB)
chanrecv1會調(diào)用chanrecv接口,chanrecv方法簽名如下:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
c 指需要操作的channel,接收的數(shù)據(jù)會寫到ep中,block與send中的情況一樣,表示是阻塞接收還是非阻塞接收,非阻塞接收指在select中case 接收一個channel值:
select {
case a := <-c: // 這里為非阻塞接收,沒有數(shù)據(jù)直接返回
// do some thing
default:
// do some thing
}
首先chanrecv也會做一些參數(shù)校驗
如果channel為nil并且是非阻塞模式,直接返回,如果是阻塞模式,永遠(yuǎn)等待
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
隨后會加鎖,防止競爭讀寫
lock(&c.lock)
如果向一個已關(guān)閉的channel接收數(shù)據(jù),此時channel里面還有數(shù)據(jù),那么依然可以接收數(shù)據(jù),屬于正常接收數(shù)據(jù)情況。
如果向一個已關(guān)閉的channel接收數(shù)據(jù),此時channel里面沒有數(shù)據(jù),那么此時返回的是(true,false),表示有值返回,但不是我們需要的值:
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep) // 將 ep 指向的內(nèi)存塊置 0
}
return true, false
}
接收也分為三種情況:
如果此時 sendq中有發(fā)送方在阻塞,此時會調(diào)用recv函數(shù):
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
此時有發(fā)送方在等待,表示此時channel中數(shù)據(jù)已滿,這個時候會將channel頭部的數(shù)據(jù)copy到接收方,然后將發(fā)送方隊列頭部的發(fā)送者的數(shù)據(jù)copy到那個位置。這涉及到兩次copy操作。
第二種情況是如果沒有發(fā)送方等待,此時會把數(shù)據(jù)copy到channel中:
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
第三種情況如果channel里面沒有數(shù)據(jù),如果是非阻塞接收直接返回false,如果是阻塞接收會將接收方協(xié)程放入channel的recvq中。
6.關(guān)閉channel
關(guān)閉channel時會調(diào)用如下接口:
func closechan(c *hchan)
首先會做一些數(shù)據(jù)校驗:
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
c.closed = 1 //置關(guān)閉標(biāo)記位如果向一個為nil的channel或者向一個已關(guān)閉的channel發(fā)起close操作就會panic。
隨后會喚醒所有在recvq或者sendq里面的協(xié)程:
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)如果存在接收者,將接收數(shù)據(jù)通過typedmemclr置0。
如果存在發(fā)送者,將所有發(fā)送者panic。
7.總結(jié)
綜上分析,在使用channel有這么幾點要注意
1.確保所有數(shù)據(jù)發(fā)送完后再關(guān)閉channel,由發(fā)送方來關(guān)閉
2.不要重復(fù)關(guān)閉channel
3.不要向為nil的channel里面發(fā)送值
4.不要向為nil的channel里面接收值
5.接收數(shù)據(jù)時,可以通過返回值判斷是否ok
n , ok := <- c
if ok{
// do some thing
}
這樣防止channel被關(guān)閉后返回了零值,對業(yè)務(wù)造成影響
到此這篇關(guān)于GoLang channel底層代碼實現(xiàn)詳解的文章就介紹到這了,更多相關(guān)GoLang channel內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go語言RPC Authorization進(jìn)行簡單ip安全驗證的方法
這篇文章主要介紹了Go語言RPC Authorization進(jìn)行簡單ip安全驗證的方法,實例分析了Go語言進(jìn)行ip驗證的技巧,需要的朋友可以參考下2015-03-03
golang如何通過viper讀取config.yaml文件
這篇文章主要介紹了golang通過viper讀取config.yaml文件,圍繞golang讀取config.yaml文件的相關(guān)資料展開詳細(xì)內(nèi)容,需要的小伙伴可以參考一下2022-03-03
golang中struct和interface的基礎(chǔ)使用教程
Go不同于一般的面向?qū)ο笳Z言,需要我們好好的學(xué)習(xí)研究,下面這篇文章主要給大家介紹了關(guān)于golang中struct和interface的基礎(chǔ)使用的相關(guān)資料,需要的朋友可以參考借鑒,下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2018-03-03
一文帶大家了解Go語言中的內(nèi)聯(lián)優(yōu)化
內(nèi)聯(lián)優(yōu)化是一種常見的編譯器優(yōu)化策略,通俗來講,就是把函數(shù)在它被調(diào)用的地方展開,這樣可以減少函數(shù)調(diào)用所帶來的開銷,本文主要為大家介紹了Go中內(nèi)聯(lián)優(yōu)化的具體使用,需要的可以參考下2023-05-05

