深入理解Golang channel的應(yīng)用
前言
channel是用于 goroutine 之間的同步、通信的數(shù)據(jù)結(jié)構(gòu)
channel 的底層是通過(guò) mutex 來(lái)控制并發(fā)的,但它為程序員提供了更高一層次的抽象,封裝了更多的功能,這樣并發(fā)編程變得更加容易和安全,得以讓程序員把注意力留到業(yè)務(wù)上去,提升開(kāi)發(fā)效率
channel的用途包括但不限于以下幾點(diǎn):
- 協(xié)程間通信,同步
- 定時(shí)任務(wù):和timer結(jié)合
- 解耦生產(chǎn)方和消費(fèi)方,實(shí)現(xiàn)阻塞隊(duì)列
- 控制并發(fā)數(shù)
本文將介紹channel的底層原理,包括數(shù)據(jù)結(jié)構(gòu),channel的創(chuàng)建,發(fā)送,接收,關(guān)閉的實(shí)現(xiàn)邏輯
整體結(jié)構(gòu)
Go channel的數(shù)據(jù)結(jié)構(gòu)如下所示:
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}
qcount:已經(jīng)存儲(chǔ)了多少個(gè)元素
dataqsie:最多存儲(chǔ)多少個(gè)元素,即緩沖區(qū)容量
buf:指向緩沖區(qū)的位置,實(shí)際上是一個(gè)數(shù)組
elemsize:每個(gè)元素占多大空間
closed:channel能夠關(guān)閉,這里記錄其關(guān)閉狀態(tài)
elemtype:保存數(shù)據(jù)的類(lèi)型信息,用于go運(yùn)行時(shí)使用
sendx,recvx:
- 記錄下一個(gè)要發(fā)送到的位置,下一次從哪里還是接收
- 這里用數(shù)組模擬隊(duì)列,這兩個(gè)變量即表示隊(duì)列的隊(duì)頭,隊(duì)尾
- 因此channel的緩沖也被稱(chēng)為環(huán)形緩沖區(qū)
recvq,sendq:
當(dāng)發(fā)送個(gè)接收不能立即完成時(shí),需要讓協(xié)程在channel上等待,所以有兩個(gè)等待隊(duì)列,分別針對(duì)接收和發(fā)送
lock:channel支持協(xié)程間并發(fā)訪問(wèn),因此需要一把鎖來(lái)保護(hù)
創(chuàng)建
創(chuàng)建channel會(huì)被編譯器編譯為調(diào)用makechan函數(shù)
// 無(wú)緩沖通道 ch1 := make(chan int) // 有緩沖通道 ch2 := make(chan int, 10)
會(huì)根據(jù)創(chuàng)建的是帶緩存,還是無(wú)緩沖,決定第二個(gè)參數(shù)size的值
可以看出,創(chuàng)建出來(lái)的是hchan指針,這樣就能在函數(shù)間直接傳遞 channel,而不用傳遞 channel 的指針
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// mem:緩沖區(qū)大小
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError( "makechan: size out of range" ))
}
var c *hchan
switch {
// 緩沖區(qū)大小為空,只申請(qǐng)hchanSize大小的內(nèi)存
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
// 元素類(lèi)型不包含指針,一次性分配hchanSize+mem大小的內(nèi)存
case elem.ptrdata == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
// 否則就是帶緩存,且有指針,分配兩次內(nèi)存
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 保存元素類(lèi)型,元素大小,容量
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
return c
}
發(fā)送
執(zhí)行以下代碼時(shí):
ch <- 3
編譯器會(huì)轉(zhuǎn)化為對(duì)chansend的調(diào)用
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 如果channel是空
if c == nil {
// 非阻塞,直接返回
if !block {
return false
}
// 否則阻塞當(dāng)前協(xié)程
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw( "unreachable" )
}
// 非阻塞,沒(méi)有關(guān)閉,且容量滿了,無(wú)法發(fā)送,直接返回
if !block && c.closed == 0 && full(c) {
return false
}
// 加鎖
lock(&c.lock)
// 如果已經(jīng)關(guān)閉,無(wú)法發(fā)送,直接panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError( "send on closed channel" ))
}
// 從接收隊(duì)列彈出一個(gè)協(xié)程的包裝結(jié)構(gòu)sudog
if sg := c.recvq.dequeue(); sg != nil {
// 如果能彈出,即有等到接收的協(xié)程,說(shuō)明:
// 該channel要么是無(wú)緩沖,要么緩沖區(qū)為空,不然不可能有協(xié)程在等待
// 將要發(fā)送的數(shù)據(jù)拷貝到該協(xié)程的接收指針上
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 緩沖區(qū)還有空間
if c.qcount < c.dataqsiz {
// qp:計(jì)算要發(fā)送到的位置的地址
qp := chanbuf(c, c.sendx)
// 將數(shù)據(jù)從ep拷貝到qp
typedmemmove(c.elemtype, qp, ep)
// 待發(fā)送位置移動(dòng)
c.sendx++
// 由于是數(shù)組模擬隊(duì)列,sendx到頂了需要?dú)w零
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 緩沖區(qū)數(shù)量++
c.qcount++
unlock(&c.lock)
return true
}
// 往下就是緩沖區(qū)無(wú)數(shù)據(jù),也沒(méi)有等到接收協(xié)程的情況了
// 如果是非阻塞模式,直接返回
if !block {
unlock(&c.lock)
return false
}
// 將當(dāng)前協(xié)程包裝成sudog,阻塞到channel上
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 當(dāng)前協(xié)程進(jìn)入發(fā)送等待隊(duì)列
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// 被喚醒后從這里開(kāi)始執(zhí)行
KeepAlive(ep)
if mysg != gp.waiting {
throw( "G waiting list is corrupted" )
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
// 被喚醒后發(fā)現(xiàn)channel關(guān)閉了,panic
if closed {
if c.closed == 0 {
throw( "chansend: spurious wakeup" )
}
panic(plainError( "send on closed channel" ))
}
return true
}
整體流程為:
如果當(dāng)前操作為非阻塞,channel沒(méi)有關(guān)閉,且容量滿了,無(wú)法發(fā)送,直接返回
從接收隊(duì)列彈出一個(gè)協(xié)程的包裝結(jié)構(gòu)sudog,如果能彈出,即有等到接收的協(xié)程,說(shuō)明:
- 該channel要么是無(wú)緩沖,要么緩沖區(qū)為空,不然不可能有協(xié)程在等待
- 將要發(fā)送的數(shù)據(jù)拷貝到該協(xié)程的接收指針上,返回
- 這里直接從發(fā)送者拷貝到接收者的內(nèi)存,而不是先把數(shù)據(jù)拷貝到緩沖區(qū),再?gòu)木彌_區(qū)拷貝到接收者,節(jié)約了一次內(nèi)存拷貝
否則看看緩沖區(qū)還有空間,如果有,將數(shù)據(jù)拷貝到緩沖區(qū)上,也返回
接下來(lái)就是既沒(méi)有接收者等待,緩沖區(qū)也為空的情況,就需要將當(dāng)前協(xié)程包裝成sudog,阻塞到channel上
將協(xié)程阻塞到channel的等待隊(duì)列時(shí),將其包裝成了sudog結(jié)構(gòu):
type sudog struct {
// 協(xié)程
g *g
// 前一個(gè),后一個(gè)指針
next *sudog
prev *sudog
// 等到發(fā)送的數(shù)據(jù)在哪,等待從哪個(gè)位置接收數(shù)據(jù)
elem unsafe.Pointer
acquiretime int64
releasetime int64
ticket uint32
isSelect bool
success bool
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
// 在哪個(gè)channel上等待
c *hchan // channel
}
其目的是:
- g本身沒(méi)有存儲(chǔ)前一個(gè),后一個(gè)指針,需要用sudog結(jié)構(gòu)包裝才能加入隊(duì)列
- elem字段存儲(chǔ)等到發(fā)送的數(shù)據(jù)在哪,等待從哪個(gè)位置接收數(shù)據(jù),用于從數(shù)據(jù)能從協(xié)程到協(xié)程的直接拷貝
來(lái)看看一些子函數(shù):
1.判斷channel是否是滿的
func full(c *hchan) bool {
// 無(wú)緩沖
if c.dataqsiz == 0 {
// 并且沒(méi)有其他協(xié)程在等待
return c.recvq.first == nil
}
// 有緩沖,但容量裝滿了
return c.qcount == c.dataqsiz
}
2.send方法:
/**
c:要操作的channel
sg:彈出的接收者協(xié)程
ep:要發(fā)送的數(shù)據(jù)在的位置
*/
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果接收者指針不為空,直接把數(shù)據(jù)從ep拷貝到sg.elem
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 喚醒該接收者協(xié)程
goready(gp, skip+1)
}
接收
從channel中接收數(shù)據(jù)有幾種寫(xiě)法:
- 帶不帶ok
- 接不接收返回值
根據(jù)帶不帶ok,決定用下面哪個(gè)方法
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
根據(jù)接不接收返回值,決定elem是不是nil
最終都會(huì)調(diào)用chanrecv方法:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// 如果channel為nil,根據(jù)參數(shù)中是否阻塞來(lái)決定是否阻塞
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw( "unreachable" )
}
// 非阻塞,并且channel為空
if !block && empty(c) {
// 如果還沒(méi)關(guān)閉,直接返回
if atomic.Load(&c.closed) == 0 {
return
}
// 否則已經(jīng)關(guān)閉,
// 如果為空,返回該類(lèi)型的零值
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
lock(&c.lock)
// 同樣,如果channel已經(jīng)關(guān)閉,且緩沖區(qū)沒(méi)有元素,返回該類(lèi)型零值
if c.closed != 0 && c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 如果有發(fā)送者正在阻塞,說(shuō)明:
// 1.無(wú)緩沖
// 2.有緩沖,但緩沖區(qū)滿了。因?yàn)橹挥芯彌_區(qū)滿了,才可能有發(fā)送者在等待
if sg := c.sendq.dequeue(); sg != nil {
// 將數(shù)據(jù)從緩沖區(qū)拷貝到ep,再將sg的數(shù)據(jù)拷貝到緩沖區(qū),該函數(shù)詳細(xì)流程可看下文
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 如果緩存區(qū)有數(shù)據(jù),
if c.qcount > 0 {
// qp為緩沖區(qū)中下一次接收的位置
qp := chanbuf(c, c.recvx)
// 將數(shù)據(jù)從qp拷貝到ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
// 接下來(lái)就是既沒(méi)有發(fā)送者在等待,也緩沖區(qū)也沒(méi)數(shù)據(jù)
if !block {
unlock(&c.lock)
return false, false
}
// 將當(dāng)前協(xié)程包裝成sudog,阻塞到channel中
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// 記錄接收地址
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// 從這里喚醒
if mysg != gp.waiting {
throw( "G waiting list is corrupted" )
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
接收流程如為:
如果channel為nil,根據(jù)參數(shù)中是否阻塞來(lái)決定是否阻塞
如果channel已經(jīng)關(guān)閉,且緩沖區(qū)沒(méi)有元素,返回該類(lèi)型零值
如果有發(fā)送者正在阻塞,說(shuō)明:
- 要么是無(wú)緩沖
- 有緩沖,但緩沖區(qū)滿了。因?yàn)?strong>只有緩沖區(qū)滿了,才可能有發(fā)送者在等待
- 將數(shù)據(jù)從緩沖區(qū)拷貝到ep,再將發(fā)送者的數(shù)據(jù)拷貝到緩沖區(qū),并喚該發(fā)送者
如果緩存區(qū)有數(shù)據(jù), 則從緩沖區(qū)將數(shù)據(jù)復(fù)制到ep,返回
接下來(lái)就是既沒(méi)有發(fā)送者在等待,也緩沖區(qū)也沒(méi)數(shù)據(jù)的情況:
將當(dāng)前協(xié)程包裝成sudog,阻塞到channel中
來(lái)看其中的子函數(shù)recv():
/**
c:操作的channel
sg:阻塞的發(fā)送協(xié)程
ep:接收者接收數(shù)據(jù)的地址
*/
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果是無(wú)緩沖channel,直接將數(shù)據(jù)從發(fā)送者sg拷貝到ep
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
// 接下來(lái)是有緩沖,且緩沖區(qū)滿的情況
} else {
// qp為channel緩沖區(qū)中,接收者下一次接收的地址
qp := chanbuf(c, c.recvx)
// 將數(shù)據(jù)從qp拷貝到ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 將發(fā)送者的數(shù)據(jù)從sg.elem拷貝到qp
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 由于一接收已發(fā)送,緩沖區(qū)還是滿的,因此 c.sendx = c.recvx
c.sendx = c.recvx
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 喚醒發(fā)送者
goready(gp, skip+1)
}
關(guān)閉
func closechan(c *hchan) {
// 不能關(guān)閉空channel
if c == nil {
panic(plainError( "close of nil channel" ))
}
lock(&c.lock)
// 不能重復(fù)關(guān)閉
if c.closed != 0 {
unlock(&c.lock)
panic(plainError( "close of closed channel" ))
}
// 修改關(guān)閉狀態(tài)
c.closed = 1
var glist gList
// 釋放所有的接收者協(xié)程,并為它們賦予零值
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
glist.push(gp)
}
// 釋放所有的發(fā)送者協(xié)程
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
glist.push(gp)
}
unlock(&c.lock)
// 執(zhí)行喚醒操作
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
關(guān)閉的流程比較簡(jiǎn)單,可以看出:
不能關(guān)閉空channel,不能重復(fù)關(guān)閉channel
先上一把大鎖,接著把所有掛在這個(gè) channel 上的 sender 和 receiver 全都連成一個(gè) sudog 鏈表,再解鎖。最后,再將所有的 sudog 全都喚醒:
接收者:會(huì)收到該類(lèi)型的零值
這里返回零值沒(méi)有問(wèn)題,因?yàn)橹赃@些接收者會(huì)阻塞,就是因?yàn)榫彌_區(qū)沒(méi)有數(shù)據(jù),因此channel關(guān)閉后該接收者收到零值也符合邏輯
發(fā)送者:會(huì)被喚醒,然后panic
因此不能在有多個(gè)sender的時(shí)候貿(mào)然關(guān)閉channel
以上就是深入理解Golang channel的應(yīng)用的詳細(xì)內(nèi)容,更多關(guān)于Golang channel的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Golang?Fasthttp選擇使用slice而非map?存儲(chǔ)請(qǐng)求數(shù)據(jù)原理探索
本文將從簡(jiǎn)單到復(fù)雜,逐步剖析為什么?Fasthttp?選擇使用?slice?而非?map,并通過(guò)代碼示例解釋這一選擇背后高性能的原因,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-02-02
解決Golang json序列化字符串時(shí)多了\的情況
這篇文章主要介紹了解決Golang json序列化字符串時(shí)多了\的情況,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-12-12
在Mac中搭建go語(yǔ)言開(kāi)發(fā)環(huán)境的操作步驟
go語(yǔ)言在開(kāi)發(fā)效率和運(yùn)行效率中的優(yōu)勢(shì)讓很多人青睞,所以有傾向打算轉(zhuǎn)向go語(yǔ)言的開(kāi)發(fā)。下面介紹在Mac中g(shù)olang的開(kāi)發(fā)環(huán)境配置。有需要的可以參考借鑒。2016-08-08
Go語(yǔ)言實(shí)現(xiàn)牛頓法求平方根函數(shù)的案例
這篇文章主要介紹了Go語(yǔ)言實(shí)現(xiàn)牛頓法求平方根函數(shù)的案例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-12-12
Go語(yǔ)言實(shí)現(xiàn)對(duì)稱(chēng)加密和非對(duì)稱(chēng)加密的示例代碼
本文主要介紹了Go語(yǔ)言實(shí)現(xiàn)對(duì)稱(chēng)加密和非對(duì)稱(chēng)加密的示例代碼,通過(guò)實(shí)際代碼示例展示了如何在Go中實(shí)現(xiàn)這兩種加密方式,具有一定的參考價(jià)值,感興趣的可以了解一下2024-01-01
基于go實(shí)例網(wǎng)絡(luò)存儲(chǔ)協(xié)議詳解
這篇文章主要為大家介紹了基于go實(shí)例網(wǎng)絡(luò)存儲(chǔ)協(xié)議詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-03-03

