GoLang channel底層代碼實(shí)現(xiàn)詳解
1.channel 簡介
Go語言有個(gè)很出名的話是“以通信的手段來共享內(nèi)存”,channel就是其最佳的體現(xiàn),channel提供一種機(jī)制,可以同步兩個(gè)并發(fā)執(zhí)行的函數(shù),還可以讓兩個(gè)函數(shù)通過互相傳遞特定類型的值來通信
channel有兩種初始化方式,分別是帶緩存的和不帶緩存的:
make(chan int) // 無緩存 chan make(chan int, 10) // 有緩存 chan
使用方式也很簡單:
c := make(chan int) defer close(c) go func(){ c <- 5 // send }() n := <- c // recv
十分簡潔的做到了不同協(xié)程的交互。
2.channel 內(nèi)部結(jié)構(gòu)
chan的實(shí)現(xiàn)在runtime/chan.go,是一個(gè)hchan的結(jié)構(gòu)體:
type hchan struct { qcount uint // 隊(duì)列中的數(shù)據(jù)個(gè)數(shù) dataqsiz uint // 環(huán)形隊(duì)列的大小,channel本身是一個(gè)環(huán)形隊(duì)列 buf unsafe.Pointer // 存放實(shí)際數(shù)據(jù)的指針,用unsafe.Pointer存放地址,為了避免gc elemsize uint16 closed uint32 // 標(biāo)識channel是否關(guān)閉 elemtype *_type // 數(shù)據(jù) 元素類型 sendx uint // send的 index recvx uint // recv 的 index recvq waitq // 阻塞在 recv 的隊(duì)列 sendq waitq // 阻塞在 send 的隊(duì)列 lock mutex // 鎖 }
可以看出,channel本身是一個(gè)環(huán)形緩沖區(qū),數(shù)據(jù)存放到堆上面,channel的同步是通過鎖實(shí)現(xiàn)的,并不是想象中的lock-free的方式,channel中有兩個(gè)隊(duì)列,一個(gè)是發(fā)送阻塞隊(duì)列,一個(gè)是接收阻塞隊(duì)列。當(dāng)向一個(gè)已滿的channel發(fā)送數(shù)據(jù)會被阻塞,此時(shí)發(fā)送協(xié)程會被添加到sendq中,同理,當(dāng)向一個(gè)空的channel接收數(shù)據(jù)時(shí),接收協(xié)程也會被阻塞,被置入recvq中。
waitq是一個(gè)鏈表,里面對g結(jié)構(gòu)做了一下簡單的封裝。
3.創(chuàng)建channel
當(dāng)我們在代碼里面通過make創(chuàng)建一個(gè)channel時(shí),實(shí)際調(diào)用的是下面這個(gè)函數(shù):
CALL runtime.makechan(SB)
makechan的實(shí)現(xiàn)如下所示:
func makechan(t *chantype, size int) *hchan { elem := t.elem // 判斷 元素類型的大小 if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } // 判斷對齊限制 if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } // 判斷 size非負(fù) 和 是否大于 maxAlloc限制 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } var c *hchan switch { case mem == 0: // 無緩沖區(qū),即 make沒設(shè)置大小 c = (*hchan)(mallocgc(hchanSize, nil, true)) c.buf = c.raceaddr() case elem.ptrdata == 0: // 數(shù)據(jù)類型不包含指針 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) default: // 如果包含指針 // Elements contain pointers. c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n") } return c }
根據(jù)上面的代碼,我們可以看到,創(chuàng)建channel分為三種情況:
1.第一種緩沖區(qū)大小為0,此時(shí)只需要分配hchansize大小的內(nèi)存就ok
2.第二種緩沖區(qū)大小不為0,且channel的類型不包含指針,此時(shí)buf為hchanSize+元素大小*元素個(gè)數(shù)的連續(xù)內(nèi)存
3.第三種緩沖區(qū)大小不為0,且channel的類型包含指針,則不能簡單的根據(jù)元素的大小去申請內(nèi)存,需要通過mallocgc去分配內(nèi)存
4.發(fā)送數(shù)據(jù)
發(fā)送數(shù)據(jù)會調(diào)用chan.go中的如下接口:
CALL runtime.chansend1(SB)
chansend1會調(diào)用chansend接口,chansend方法簽名如下:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
c是具體的channel,ep是發(fā)送的數(shù)據(jù),block為true表示阻塞的發(fā)送,一般向channel發(fā)送數(shù)據(jù)都是阻塞的,如果channel數(shù)據(jù)滿了,會一直阻塞在這里。但是在select中如果有case監(jiān)聽某個(gè)channel的發(fā)送,那么此時(shí)的block參數(shù)為false,后續(xù)分析select實(shí)現(xiàn)會講到。
select { case <-c: // 這里為非阻塞發(fā)送 // do some thing default: // do some thing }
chansend接口會對一些條件做判斷
如果向一個(gè)為nil的channel發(fā)送數(shù)據(jù),如果是阻塞發(fā)送會一直阻塞:
if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") }
首先會加鎖,保證原子性,如果向一個(gè)已關(guān)閉的channel發(fā)送數(shù)據(jù)就會panic。
lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) }
如果此時(shí)recvq中有等待協(xié)程,就直接調(diào)用send函數(shù)將數(shù)據(jù)復(fù)制給接收方, 實(shí)現(xiàn)如下:
// sg 為接收者協(xié)程,ep為發(fā)送元素 func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if raceenabled { if c.dataqsiz == 0 { racesync(c, sg) } else { qp := chanbuf(c, c.recvx) raceacquire(qp) racerelease(qp) raceacquireg(sg.g, qp) racereleaseg(sg.g, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } } if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
如果此時(shí)沒有等待協(xié)程,并且數(shù)據(jù)未滿的情況下,就將數(shù)據(jù)copy到環(huán)形緩沖區(qū)中,將位置后移一位。
if c.qcount < c.dataqsiz { // 如果 未滿 // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true }
如果此時(shí)環(huán)形緩沖區(qū)數(shù)據(jù)滿了,如果是阻塞發(fā)送,此時(shí)會把發(fā)送方放到sendq隊(duì)列中。
5.接收數(shù)據(jù)
接收數(shù)據(jù)會調(diào)用下面的接口:
CALL runtime.chanrecv1(SB)
chanrecv1會調(diào)用chanrecv接口,chanrecv方法簽名如下:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
c 指需要操作的channel,接收的數(shù)據(jù)會寫到ep中,block與send中的情況一樣,表示是阻塞接收還是非阻塞接收,非阻塞接收指在select中case 接收一個(gè)channel值:
select { case a := <-c: // 這里為非阻塞接收,沒有數(shù)據(jù)直接返回 // do some thing default: // do some thing }
首先chanrecv也會做一些參數(shù)校驗(yàn)
如果channel為nil并且是非阻塞模式,直接返回,如果是阻塞模式,永遠(yuǎn)等待
if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") }
隨后會加鎖,防止競爭讀寫
lock(&c.lock)
如果向一個(gè)已關(guān)閉的channel接收數(shù)據(jù),此時(shí)channel里面還有數(shù)據(jù),那么依然可以接收數(shù)據(jù),屬于正常接收數(shù)據(jù)情況。
如果向一個(gè)已關(guān)閉的channel接收數(shù)據(jù),此時(shí)channel里面沒有數(shù)據(jù),那么此時(shí)返回的是(true,false),表示有值返回,但不是我們需要的值:
if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) // 將 ep 指向的內(nèi)存塊置 0 } return true, false }
接收也分為三種情況:
如果此時(shí) sendq中有發(fā)送方在阻塞,此時(shí)會調(diào)用recv函數(shù):
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { if raceenabled { racesync(c, sg) } if ep != nil { recvDirect(c.elemtype, sg, ep) } } else { qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) raceacquireg(sg.g, qp) racereleaseg(sg.g, qp) } // copy data from queue to receiver if ep != nil { typedmemmove(c.elemtype, ep, qp) } // copy data from sender to queue typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
此時(shí)有發(fā)送方在等待,表示此時(shí)channel中數(shù)據(jù)已滿,這個(gè)時(shí)候會將channel頭部的數(shù)據(jù)copy到接收方,然后將發(fā)送方隊(duì)列頭部的發(fā)送者的數(shù)據(jù)copy到那個(gè)位置。這涉及到兩次copy操作。
第二種情況是如果沒有發(fā)送方等待,此時(shí)會把數(shù)據(jù)copy到channel中:
if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true }
第三種情況如果channel里面沒有數(shù)據(jù),如果是非阻塞接收直接返回false,如果是阻塞接收會將接收方協(xié)程放入channel的recvq中。
6.關(guān)閉channel
關(guān)閉channel時(shí)會調(diào)用如下接口:
func closechan(c *hchan)
首先會做一些數(shù)據(jù)校驗(yàn):
if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } if raceenabled { callerpc := getcallerpc() racewritepc(c.raceaddr(), callerpc, funcPC(closechan)) racerelease(c.raceaddr()) } c.closed = 1 //置關(guān)閉標(biāo)記位
如果向一個(gè)為nil的channel或者向一個(gè)已關(guān)閉的channel發(fā)起close操作就會panic。
隨后會喚醒所有在recvq或者sendq里面的協(xié)程:
var glist gList // release all readers for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } // release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } glist.push(gp) } unlock(&c.lock)
如果存在接收者,將接收數(shù)據(jù)通過typedmemclr置0。
如果存在發(fā)送者,將所有發(fā)送者panic。
7.總結(jié)
綜上分析,在使用channel有這么幾點(diǎn)要注意
1.確保所有數(shù)據(jù)發(fā)送完后再關(guān)閉channel,由發(fā)送方來關(guān)閉
2.不要重復(fù)關(guān)閉channel
3.不要向?yàn)閚il的channel里面發(fā)送值
4.不要向?yàn)閚il的channel里面接收值
5.接收數(shù)據(jù)時(shí),可以通過返回值判斷是否ok
n , ok := <- c if ok{ // do some thing }
這樣防止channel被關(guān)閉后返回了零值,對業(yè)務(wù)造成影響
到此這篇關(guān)于GoLang channel底層代碼實(shí)現(xiàn)詳解的文章就介紹到這了,更多相關(guān)GoLang channel內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go語言RPC Authorization進(jìn)行簡單ip安全驗(yàn)證的方法
這篇文章主要介紹了Go語言RPC Authorization進(jìn)行簡單ip安全驗(yàn)證的方法,實(shí)例分析了Go語言進(jìn)行ip驗(yàn)證的技巧,需要的朋友可以參考下2015-03-03golang如何通過viper讀取config.yaml文件
這篇文章主要介紹了golang通過viper讀取config.yaml文件,圍繞golang讀取config.yaml文件的相關(guān)資料展開詳細(xì)內(nèi)容,需要的小伙伴可以參考一下2022-03-03golang中struct和interface的基礎(chǔ)使用教程
Go不同于一般的面向?qū)ο笳Z言,需要我們好好的學(xué)習(xí)研究,下面這篇文章主要給大家介紹了關(guān)于golang中struct和interface的基礎(chǔ)使用的相關(guān)資料,需要的朋友可以參考借鑒,下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2018-03-03一文帶大家了解Go語言中的內(nèi)聯(lián)優(yōu)化
內(nèi)聯(lián)優(yōu)化是一種常見的編譯器優(yōu)化策略,通俗來講,就是把函數(shù)在它被調(diào)用的地方展開,這樣可以減少函數(shù)調(diào)用所帶來的開銷,本文主要為大家介紹了Go中內(nèi)聯(lián)優(yōu)化的具體使用,需要的可以參考下2023-05-05