GoLang channel底層代碼實(shí)現(xiàn)詳解
1.channel 簡(jiǎn)介
Go語(yǔ)言有個(gè)很出名的話是“以通信的手段來(lái)共享內(nèi)存”,channel就是其最佳的體現(xiàn),channel提供一種機(jī)制,可以同步兩個(gè)并發(fā)執(zhí)行的函數(shù),還可以讓兩個(gè)函數(shù)通過(guò)互相傳遞特定類(lèi)型的值來(lái)通信
channel有兩種初始化方式,分別是帶緩存的和不帶緩存的:
make(chan int) // 無(wú)緩存 chan make(chan int, 10) // 有緩存 chan
使用方式也很簡(jiǎn)單:
c := make(chan int)
defer close(c)
go func(){
c <- 5 // send
}()
n := <- c // recv
十分簡(jiǎn)潔的做到了不同協(xié)程的交互。
2.channel 內(nèi)部結(jié)構(gòu)
chan的實(shí)現(xiàn)在runtime/chan.go,是一個(gè)hchan的結(jié)構(gòu)體:
type hchan struct {
qcount uint // 隊(duì)列中的數(shù)據(jù)個(gè)數(shù)
dataqsiz uint // 環(huán)形隊(duì)列的大小,channel本身是一個(gè)環(huán)形隊(duì)列
buf unsafe.Pointer // 存放實(shí)際數(shù)據(jù)的指針,用unsafe.Pointer存放地址,為了避免gc
elemsize uint16
closed uint32 // 標(biāo)識(shí)channel是否關(guān)閉
elemtype *_type // 數(shù)據(jù) 元素類(lèi)型
sendx uint // send的 index
recvx uint // recv 的 index
recvq waitq // 阻塞在 recv 的隊(duì)列
sendq waitq // 阻塞在 send 的隊(duì)列
lock mutex // 鎖
}
可以看出,channel本身是一個(gè)環(huán)形緩沖區(qū),數(shù)據(jù)存放到堆上面,channel的同步是通過(guò)鎖實(shí)現(xiàn)的,并不是想象中的lock-free的方式,channel中有兩個(gè)隊(duì)列,一個(gè)是發(fā)送阻塞隊(duì)列,一個(gè)是接收阻塞隊(duì)列。當(dāng)向一個(gè)已滿的channel發(fā)送數(shù)據(jù)會(huì)被阻塞,此時(shí)發(fā)送協(xié)程會(huì)被添加到sendq中,同理,當(dāng)向一個(gè)空的channel接收數(shù)據(jù)時(shí),接收協(xié)程也會(huì)被阻塞,被置入recvq中。
waitq是一個(gè)鏈表,里面對(duì)g結(jié)構(gòu)做了一下簡(jiǎn)單的封裝。
3.創(chuàng)建channel
當(dāng)我們?cè)诖a里面通過(guò)make創(chuàng)建一個(gè)channel時(shí),實(shí)際調(diào)用的是下面這個(gè)函數(shù):
CALL runtime.makechan(SB)
makechan的實(shí)現(xiàn)如下所示:
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 判斷 元素類(lèi)型的大小
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 判斷對(duì)齊限制
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 判斷 size非負(fù) 和 是否大于 maxAlloc限制
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0: // 無(wú)緩沖區(qū),即 make沒(méi)設(shè)置大小
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0: // 數(shù)據(jù)類(lèi)型不包含指針
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default: // 如果包含指針
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}根據(jù)上面的代碼,我們可以看到,創(chuàng)建channel分為三種情況:
1.第一種緩沖區(qū)大小為0,此時(shí)只需要分配hchansize大小的內(nèi)存就ok
2.第二種緩沖區(qū)大小不為0,且channel的類(lèi)型不包含指針,此時(shí)buf為hchanSize+元素大小*元素個(gè)數(shù)的連續(xù)內(nèi)存
3.第三種緩沖區(qū)大小不為0,且channel的類(lèi)型包含指針,則不能簡(jiǎn)單的根據(jù)元素的大小去申請(qǐng)內(nèi)存,需要通過(guò)mallocgc去分配內(nèi)存
4.發(fā)送數(shù)據(jù)
發(fā)送數(shù)據(jù)會(huì)調(diào)用chan.go中的如下接口:
CALL runtime.chansend1(SB)
chansend1會(huì)調(diào)用chansend接口,chansend方法簽名如下:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
c是具體的channel,ep是發(fā)送的數(shù)據(jù),block為true表示阻塞的發(fā)送,一般向channel發(fā)送數(shù)據(jù)都是阻塞的,如果channel數(shù)據(jù)滿了,會(huì)一直阻塞在這里。但是在select中如果有case監(jiān)聽(tīng)某個(gè)channel的發(fā)送,那么此時(shí)的block參數(shù)為false,后續(xù)分析select實(shí)現(xiàn)會(huì)講到。
select {
case <-c: // 這里為非阻塞發(fā)送
// do some thing
default:
// do some thing
}
chansend接口會(huì)對(duì)一些條件做判斷
如果向一個(gè)為nil的channel發(fā)送數(shù)據(jù),如果是阻塞發(fā)送會(huì)一直阻塞:
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
首先會(huì)加鎖,保證原子性,如果向一個(gè)已關(guān)閉的channel發(fā)送數(shù)據(jù)就會(huì)panic。
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
如果此時(shí)recvq中有等待協(xié)程,就直接調(diào)用send函數(shù)將數(shù)據(jù)復(fù)制給接收方, 實(shí)現(xiàn)如下:
// sg 為接收者協(xié)程,ep為發(fā)送元素
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
qp := chanbuf(c, c.recvx)
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
如果此時(shí)沒(méi)有等待協(xié)程,并且數(shù)據(jù)未滿的情況下,就將數(shù)據(jù)copy到環(huán)形緩沖區(qū)中,將位置后移一位。
if c.qcount < c.dataqsiz { // 如果 未滿
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
如果此時(shí)環(huán)形緩沖區(qū)數(shù)據(jù)滿了,如果是阻塞發(fā)送,此時(shí)會(huì)把發(fā)送方放到sendq隊(duì)列中。
5.接收數(shù)據(jù)
接收數(shù)據(jù)會(huì)調(diào)用下面的接口:
CALL runtime.chanrecv1(SB)
chanrecv1會(huì)調(diào)用chanrecv接口,chanrecv方法簽名如下:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
c 指需要操作的channel,接收的數(shù)據(jù)會(huì)寫(xiě)到ep中,block與send中的情況一樣,表示是阻塞接收還是非阻塞接收,非阻塞接收指在select中case 接收一個(gè)channel值:
select {
case a := <-c: // 這里為非阻塞接收,沒(méi)有數(shù)據(jù)直接返回
// do some thing
default:
// do some thing
}
首先chanrecv也會(huì)做一些參數(shù)校驗(yàn)
如果channel為nil并且是非阻塞模式,直接返回,如果是阻塞模式,永遠(yuǎn)等待
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
隨后會(huì)加鎖,防止競(jìng)爭(zhēng)讀寫(xiě)
lock(&c.lock)
如果向一個(gè)已關(guān)閉的channel接收數(shù)據(jù),此時(shí)channel里面還有數(shù)據(jù),那么依然可以接收數(shù)據(jù),屬于正常接收數(shù)據(jù)情況。
如果向一個(gè)已關(guān)閉的channel接收數(shù)據(jù),此時(shí)channel里面沒(méi)有數(shù)據(jù),那么此時(shí)返回的是(true,false),表示有值返回,但不是我們需要的值:
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep) // 將 ep 指向的內(nèi)存塊置 0
}
return true, false
}
接收也分為三種情況:
如果此時(shí) sendq中有發(fā)送方在阻塞,此時(shí)會(huì)調(diào)用recv函數(shù):
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
此時(shí)有發(fā)送方在等待,表示此時(shí)channel中數(shù)據(jù)已滿,這個(gè)時(shí)候會(huì)將channel頭部的數(shù)據(jù)copy到接收方,然后將發(fā)送方隊(duì)列頭部的發(fā)送者的數(shù)據(jù)copy到那個(gè)位置。這涉及到兩次copy操作。
第二種情況是如果沒(méi)有發(fā)送方等待,此時(shí)會(huì)把數(shù)據(jù)copy到channel中:
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
第三種情況如果channel里面沒(méi)有數(shù)據(jù),如果是非阻塞接收直接返回false,如果是阻塞接收會(huì)將接收方協(xié)程放入channel的recvq中。
6.關(guān)閉channel
關(guān)閉channel時(shí)會(huì)調(diào)用如下接口:
func closechan(c *hchan)
首先會(huì)做一些數(shù)據(jù)校驗(yàn):
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
c.closed = 1 //置關(guān)閉標(biāo)記位如果向一個(gè)為nil的channel或者向一個(gè)已關(guān)閉的channel發(fā)起close操作就會(huì)panic。
隨后會(huì)喚醒所有在recvq或者sendq里面的協(xié)程:
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)如果存在接收者,將接收數(shù)據(jù)通過(guò)typedmemclr置0。
如果存在發(fā)送者,將所有發(fā)送者panic。
7.總結(jié)
綜上分析,在使用channel有這么幾點(diǎn)要注意
1.確保所有數(shù)據(jù)發(fā)送完后再關(guān)閉channel,由發(fā)送方來(lái)關(guān)閉
2.不要重復(fù)關(guān)閉channel
3.不要向?yàn)閚il的channel里面發(fā)送值
4.不要向?yàn)閚il的channel里面接收值
5.接收數(shù)據(jù)時(shí),可以通過(guò)返回值判斷是否ok
n , ok := <- c
if ok{
// do some thing
}
這樣防止channel被關(guān)閉后返回了零值,對(duì)業(yè)務(wù)造成影響
到此這篇關(guān)于GoLang channel底層代碼實(shí)現(xiàn)詳解的文章就介紹到這了,更多相關(guān)GoLang channel內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go語(yǔ)言RPC Authorization進(jìn)行簡(jiǎn)單ip安全驗(yàn)證的方法
這篇文章主要介紹了Go語(yǔ)言RPC Authorization進(jìn)行簡(jiǎn)單ip安全驗(yàn)證的方法,實(shí)例分析了Go語(yǔ)言進(jìn)行ip驗(yàn)證的技巧,需要的朋友可以參考下2015-03-03
golang如何通過(guò)viper讀取config.yaml文件
這篇文章主要介紹了golang通過(guò)viper讀取config.yaml文件,圍繞golang讀取config.yaml文件的相關(guān)資料展開(kāi)詳細(xì)內(nèi)容,需要的小伙伴可以參考一下2022-03-03
golang中struct和interface的基礎(chǔ)使用教程
Go不同于一般的面向?qū)ο笳Z(yǔ)言,需要我們好好的學(xué)習(xí)研究,下面這篇文章主要給大家介紹了關(guān)于golang中struct和interface的基礎(chǔ)使用的相關(guān)資料,需要的朋友可以參考借鑒,下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2018-03-03
一文帶大家了解Go語(yǔ)言中的內(nèi)聯(lián)優(yōu)化
內(nèi)聯(lián)優(yōu)化是一種常見(jiàn)的編譯器優(yōu)化策略,通俗來(lái)講,就是把函數(shù)在它被調(diào)用的地方展開(kāi),這樣可以減少函數(shù)調(diào)用所帶來(lái)的開(kāi)銷(xiāo),本文主要為大家介紹了Go中內(nèi)聯(lián)優(yōu)化的具體使用,需要的可以參考下2023-05-05
使用GO實(shí)現(xiàn)Paxos共識(shí)算法的方法
這篇文章主要介紹了使用GO實(shí)現(xiàn)Paxos共識(shí)算法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作,具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-09-09

