欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Golang?channel底層實(shí)現(xiàn)過程解析(深度好文)

 更新時(shí)間:2024年07月06日 09:04:22   作者:golang架構(gòu)師k哥  
Go語言為了方便使用者,提供了簡單、安全的協(xié)程數(shù)據(jù)同步和通信機(jī)制,這篇文章主要介紹了Golang?channel底層是如何實(shí)現(xiàn)的,需要的朋友可以參考下

Hi 你好,我是k哥。大廠搬磚6年的后端程序員。

我們知道,Go語言為了方便使用者,提供了簡單、安全的協(xié)程數(shù)據(jù)同步和通信機(jī)制,channel。那我們知道channel底層是如何實(shí)現(xiàn)的嗎?今天k哥就來聊聊channel的底層實(shí)現(xiàn)原理。同時(shí),為了驗(yàn)證我們是否掌握了channel的實(shí)現(xiàn)原理,本文也收集了channel的高頻面試題,理解了原理,面試題自然不在話下。

1 原理

默認(rèn)情況下,讀寫未就緒的channel(讀沒有數(shù)據(jù)的channel,或者寫緩沖區(qū)已滿的channel)時(shí),協(xié)程會(huì)被阻塞。

但是當(dāng)讀寫channel操作和select搭配使用時(shí),即使channel未就緒,也可以執(zhí)行其它分支,當(dāng)前協(xié)程不會(huì)被阻塞。

ch := make(chan int)
select{
  case <- ch:
  default:
}

本文主要介紹channel的阻塞模式,和select搭配使用的非阻塞模式,后續(xù)會(huì)另起一篇介紹。

1.1 數(shù)據(jù)結(jié)構(gòu)

channel涉及到的核心數(shù)據(jù)結(jié)構(gòu)包含3個(gè)。

hchan

// channel
type hchan struct {
    // 循環(huán)隊(duì)列
    qcount   uint           // 通道中數(shù)據(jù)個(gè)數(shù)
    dataqsiz uint           // buf長度
    buf      unsafe.Pointer // 數(shù)組指針
    sendx    uint   // send index
    recvx    uint   // receive index
    elemsize uint16 // 元素大小
    elemtype *_type // 元素類型
    closed   uint32 // 通道關(guān)閉標(biāo)志
    recvq    waitq  // 由雙向鏈表實(shí)現(xiàn)的recv waiters隊(duì)列
    sendq    waitq  // 由雙向鏈表實(shí)現(xiàn)的send waiters隊(duì)列
    lock mutex
}

hchan是channel底層的數(shù)據(jù)結(jié)構(gòu),其核心是由數(shù)組實(shí)現(xiàn)的一個(gè)環(huán)形緩沖區(qū):

  • qcount 通道中數(shù)據(jù)個(gè)數(shù)
  • dataqsiz 數(shù)組長度
  • buf 指向數(shù)組的指針,數(shù)組中存儲(chǔ)往channel發(fā)送的數(shù)據(jù)
  • sendx 發(fā)送元素到數(shù)組的index
  • recvx 從數(shù)組中接收元素的index
  • elemsize channel中元素類型的大小
  • elemtype channel中的元素類型
  • closed 通道關(guān)閉標(biāo)志
  • recvq 因讀取channel而陷入阻塞的協(xié)程等待隊(duì)列
  • sendq 因發(fā)送channel而陷入阻塞的協(xié)程等待隊(duì)列
  • lock 鎖

waitq

// 等待隊(duì)列(雙向鏈表)
type waitq struct {
    first *sudog
    last  *sudog
}

waitq是因讀寫channel而陷入阻塞的協(xié)程等待隊(duì)列。

  • first 隊(duì)列頭部
  • last 隊(duì)列尾部

sudog

// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
type sudog struct {
    g *g // 等待send或recv的協(xié)程g
    next *sudog // 等待隊(duì)列下一個(gè)結(jié)點(diǎn)next
    prev *sudog // 等待隊(duì)列前一個(gè)結(jié)點(diǎn)prev
    elem unsafe.Pointer // data element (may point to stack)
    success bool // 標(biāo)記協(xié)程g被喚醒是因?yàn)閿?shù)據(jù)傳遞(true)還是channel被關(guān)閉(false)
    c        *hchan // channel
}

sudog是協(xié)程等待隊(duì)列的節(jié)點(diǎn):

  • g 因讀寫而陷入阻塞的協(xié)程
  • next 等待隊(duì)列下一個(gè)節(jié)點(diǎn)
  • prev 等待隊(duì)列前一個(gè)節(jié)點(diǎn)
  • elem 對(duì)于寫channel,表示需要發(fā)送到channel的數(shù)據(jù)指針;對(duì)于讀channel,表示需要被賦值的數(shù)據(jù)指針。
  • success 標(biāo)記協(xié)程被喚醒是因?yàn)閿?shù)據(jù)傳遞(true)還是channel被關(guān)閉(false)
  • c 指向channel的指針

1.2 通道創(chuàng)建

func makechan(t *chantype, size int) *hchan {
    elem := t.elem
    // buf數(shù)組所需分配內(nèi)存大小
    mem := elem.size*uintptr(size)
    var c *hchan
    switch {
    case mem == 0:// Unbuffered channels,buf無需內(nèi)存分配
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0: // Buffered channels,通道元素類型非指針
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Buffered channels,通道元素類型是指針
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    c.dataqsiz = uint(size)
    return c
}

通道創(chuàng)建主要是分配內(nèi)存并構(gòu)建hchan對(duì)象。

1.3 通道寫入

3種異常情況處理

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 1.channel為nil
    if c == nil {
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    lock(&c.lock) //加鎖
    // 2.如果channel已關(guān)閉,直接panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    // Block on the channel. 
    mysg := acquireSudog()
    c.sendq.enqueue(mysg) // 入sendq等待隊(duì)列
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    closed := !mysg.success // 協(xié)程被喚醒的原因是因?yàn)閿?shù)據(jù)傳遞還是通道被關(guān)閉
    // 3.因channel被關(guān)閉導(dǎo)致阻塞寫協(xié)程被喚醒并panic
    if closed {
        panic(plainError("send on closed channel"))
    }
}
  • 對(duì) nil channel寫入,會(huì)死鎖
  • 對(duì)被關(guān)閉的channel寫入,會(huì)panic
  • 對(duì)因?qū)懭攵萑胱枞膮f(xié)程,如果channel被關(guān)閉,阻塞協(xié)程會(huì)被喚醒并panic

寫時(shí)有阻塞讀協(xié)程

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    lock(&c.lock) //加鎖
    // 1、當(dāng)存在等待接收的Goroutine
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3) // 直接把正在發(fā)送的值發(fā)送給等待接收的Goroutine,并將此接收協(xié)程放入可調(diào)度隊(duì)列等待調(diào)度
        return true
    }
}
// send processes a send operation on an empty channel c.
// The value ep sent by the sender is copied to the receiver sg.
// The receiver is then woken up to go on its merry way.
// Channel c must be empty and locked.  send unlocks c with unlockf.
// sg must already be dequeued from c.
// ep must be non-nil and point to the heap or the caller's stack.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 將ep寫入sg中的elem
    if sg.elem != nil {
         t:=c.elemtype
         dst := sg.elem
         // memmove copies n bytes from "from" to "to".
         memmove(dst, ep, t.size)
         sg.elem = nil // 數(shù)據(jù)已經(jīng)被寫入到<- c變量,因此sg.elem指針可以置空了
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    // 喚醒receiver協(xié)程gp
    goready(gp, skip+1)
}
// 喚醒receiver協(xié)程gp,將其放入可運(yùn)行隊(duì)列中等待調(diào)度執(zhí)行
func goready(gp *g, traceskip int) {
    systemstack(func() {
        ready(gp, traceskip, true)
    })
}
// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
    status := readgstatus(gp)
    // Mark runnable.
    _g_ := getg()
    mp := acquirem() // disable preemption because it can be holding p in a local var
    // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
    casgstatus(gp, _Gwaiting, _Grunnable)
    runqput(_g_.m.p.ptr(), gp, next)
    wakep()
    releasem(mp)
}
  • 加鎖
  • 從阻塞讀協(xié)程隊(duì)列取出sudog節(jié)點(diǎn)
  • 在send方法中,調(diào)用memmove方法將數(shù)據(jù)拷貝給sudog.elem指向的變量。
  • goready方法喚醒接收到數(shù)據(jù)的阻塞讀協(xié)程g,將其放入?yún)f(xié)程可運(yùn)行隊(duì)列中等待調(diào)度
  • 解鎖

寫時(shí)無阻塞讀協(xié)程但環(huán)形緩沖區(qū)仍有空間

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    lock(&c.lock) //加鎖
    // 當(dāng)緩沖區(qū)未滿時(shí)
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx) // 獲取指向緩沖區(qū)數(shù)組中位于sendx位置的元素的指針
        typedmemmove(c.elemtype, qp, ep) // 將當(dāng)前發(fā)送的值拷貝到緩沖區(qū)
        c.sendx++ 
        if c.sendx == c.dataqsiz {
            c.sendx = 0 // 因?yàn)槭茄h(huán)隊(duì)列,sendx等于隊(duì)列長度時(shí)置為0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }
}
  • 加鎖
  • 將數(shù)據(jù)放入環(huán)形緩沖區(qū)
  • 解鎖

寫時(shí)無阻塞讀協(xié)程且環(huán)形緩沖區(qū)無空間

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    lock(&c.lock) //加鎖
    // Block on the channel. 
    // 將當(dāng)前的Goroutine打包成一個(gè)sudog節(jié)點(diǎn),并加入到阻塞寫隊(duì)列sendq里
    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    mysg.g = gp
    mysg.c = c
    gp.waiting = mysg
    c.sendq.enqueue(mysg) // 入sendq等待隊(duì)列
    // 調(diào)用gopark將當(dāng)前Goroutine設(shè)置為等待狀態(tài)并解鎖,進(jìn)入休眠等待被喚醒,觸發(fā)協(xié)程調(diào)度
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    // 被喚醒之后執(zhí)行清理工作并釋放sudog結(jié)構(gòu)體
    gp.waiting = nil
    gp.activeStackChans = false
    closed := !mysg.success // gp被喚醒的原因是因?yàn)閿?shù)據(jù)傳遞還是通道被關(guān)閉
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    // 因關(guān)閉被喚醒則panic
    if closed {
        panic(plainError("send on closed channel"))
    }
    // 數(shù)據(jù)成功傳遞
    return true
}
  • 加鎖。
  • 將當(dāng)前協(xié)程gp封裝成sudog節(jié)點(diǎn),并加入channel的阻塞寫隊(duì)列sendq。
  • 調(diào)用gopark將當(dāng)前協(xié)程設(shè)置為等待狀態(tài)并解鎖,觸發(fā)調(diào)度其它協(xié)程運(yùn)行。
  • 因數(shù)據(jù)被讀或者channel被關(guān)閉,協(xié)程從park中被喚醒,清理sudog結(jié)構(gòu)。
  • 因channel被關(guān)閉導(dǎo)致協(xié)程喚醒,panic
  • 返回

整體寫流程

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 1.channel為nil
    if c == nil {
        // 當(dāng)前Goroutine阻塞掛起
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    // 2.加鎖
    lock(&c.lock) 
    // 3.如果channel已關(guān)閉,直接panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    // 4、存在阻塞讀協(xié)程
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3) // 直接把正在發(fā)送的值發(fā)送給等待接收的Goroutine,并將此接收協(xié)程放入可調(diào)度隊(duì)列等待調(diào)度
        return true
    }
    // 5、緩沖區(qū)未滿時(shí)
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx) // 獲取指向緩沖區(qū)數(shù)組中位于sendx位置的元素的指針
        typedmemmove(c.elemtype, qp, ep) // 將當(dāng)前發(fā)送的值拷貝到緩沖區(qū)
        c.sendx++ 
        if c.sendx == c.dataqsiz {
            c.sendx = 0 // 因?yàn)槭茄h(huán)隊(duì)列,sendx等于隊(duì)列長度時(shí)置為0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }
    // Block on the channel. 
    // 6、將當(dāng)前協(xié)程打包成一個(gè)sudog結(jié)構(gòu)體,并加入到channel的阻塞寫隊(duì)列sendq
    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg) // 入sendq等待隊(duì)列
    atomic.Store8(&gp.parkingOnChan, 1)
    // 7.調(diào)用gopark將當(dāng)前協(xié)程設(shè)置為等待狀態(tài)并解鎖,進(jìn)入休眠,等待被喚醒,并觸發(fā)協(xié)程調(diào)度
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    // 8. 被喚醒之后執(zhí)行清理工作并釋放sudog結(jié)構(gòu)體
    gp.waiting = nil
    gp.activeStackChans = false
    closed := !mysg.success // g被喚醒的原因是因?yàn)閿?shù)據(jù)傳遞還是通道被關(guān)閉
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    // 9.因關(guān)閉被喚醒則panic
    if closed {
        panic(plainError("send on closed channel"))
    }
    // 10.數(shù)據(jù)成功傳遞
    return true
}
  • channel為nil檢查。為空則死鎖。
  • 加鎖
  • 如果channel已關(guān)閉,直接panic。
  • 當(dāng)存在阻塞讀協(xié)程,直接把數(shù)據(jù)發(fā)送給讀協(xié)程,喚醒并將其放入?yún)f(xié)程可運(yùn)行隊(duì)列中等待調(diào)度運(yùn)行。
  • 當(dāng)緩沖區(qū)未滿時(shí),將當(dāng)前發(fā)送的數(shù)據(jù)拷貝到緩沖區(qū)。
  • 當(dāng)既沒有阻塞讀協(xié)程,緩沖區(qū)也沒有剩余空間時(shí),將協(xié)程加入阻塞寫隊(duì)列sendq。
  • 調(diào)用gopark將當(dāng)前協(xié)程設(shè)置為等待狀態(tài),進(jìn)入休眠等待被喚醒,觸發(fā)協(xié)程調(diào)度。
  • 被喚醒之后執(zhí)行清理工作并釋放sudog結(jié)構(gòu)體
  • 喚醒之后檢查,因channel被關(guān)閉導(dǎo)致協(xié)程喚醒則panic。
  • 返回。

1.4 通道讀

2種異常情況處理

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 1.channel為nil
    if c == nil {
        // 否則,當(dāng)前Goroutine阻塞掛起
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    lock(&c.lock)
    // 2.如果channel已關(guān)閉,并且緩沖區(qū)無元素,返回(true,false)
    if c.closed != 0 {
        if c.qcount == 0 {
            unlock(&c.lock)
            if ep != nil {
                //根據(jù)channel元素的類型清理ep對(duì)應(yīng)地址的內(nèi)存,即ep接收了channel元素類型的零值
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    }
}
  • channel未初始化,讀操作會(huì)死鎖
  • channel已關(guān)閉且緩沖區(qū)無數(shù)據(jù),給讀變量賦零值。

讀時(shí)有阻塞寫協(xié)程

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    lock(&c.lock)
    // Just found waiting sender with not closed.
    // 等待發(fā)送的隊(duì)列sendq里存在Goroutine
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        // 如果無緩沖區(qū),那么直接從sender接收數(shù)據(jù);否則,從buf隊(duì)列的頭部接收數(shù)據(jù),并把sender的數(shù)據(jù)加到buf隊(duì)列的尾部
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true // 接收成功
    }
}
// recv processes a receive operation on a full channel c.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // channel無緩沖區(qū),直接從sender讀
    if c.dataqsiz == 0 {
        if ep != nil {
            // copy data from sender
            t := c.elemtype
            src := sg.elem
            typeBitsBulkBarrier(t, uintptr(ep), uintptr(src), t.size)
            memmove(dst, src, t.size)
        }
    } else {
        // 從隊(duì)列讀,sender再寫入隊(duì)列
        qp := chanbuf(c, c.recvx)
        // copy data from queue to receiver
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        // copy data from sender to queue
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
    // 喚醒sender隊(duì)列協(xié)程sg
    sg.elem = nil
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    // 喚醒協(xié)程
    goready(gp, skip+1)
}
  • 加鎖
  • 從阻塞寫隊(duì)列取出sudog節(jié)點(diǎn)
  • 假如channel為無緩沖區(qū)通道,則直接讀取sudog對(duì)應(yīng)寫協(xié)程數(shù)據(jù),喚醒寫協(xié)程。
  • 假如channel為緩沖區(qū)通道,從channel緩沖區(qū)頭部(recvx)讀數(shù)據(jù),將sudog對(duì)應(yīng)寫協(xié)程數(shù)據(jù),寫入緩沖區(qū)尾部(sendx),喚醒寫協(xié)程。
  • 解鎖

讀時(shí)無阻塞寫協(xié)程且緩沖區(qū)有數(shù)據(jù)

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    lock(&c.lock)
    // 緩沖區(qū)buf中有元素,直接從buf拷貝元素到當(dāng)前協(xié)程(在已關(guān)閉的情況下,隊(duì)列有數(shù)據(jù)依然會(huì)讀)
    if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)// 將從buf中取出的元素拷貝到當(dāng)前協(xié)程
        }
        typedmemclr(c.elemtype, qp) // 同時(shí)將取出的數(shù)據(jù)所在的內(nèi)存清空
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true // 接收成功
    }
}
  • 加鎖
  • 從環(huán)形緩沖區(qū)讀數(shù)據(jù)。在channel已關(guān)閉的情況下,緩沖區(qū)有數(shù)據(jù)依然可以被讀。
  • 解鎖

讀時(shí)無阻塞寫協(xié)程且緩沖區(qū)無數(shù)據(jù)

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    lock(&c.lock)
    // no sender available: block on this channel.
    // 阻塞模式,獲取當(dāng)前Goroutine,打包一個(gè)sudog,并加入到channel的接收隊(duì)列recvq里
    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    gp.waiting = mysg
    mysg.g = gp
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg) // 入接收隊(duì)列recvq
    // 掛起當(dāng)前Goroutine,設(shè)置為_Gwaiting狀態(tài),進(jìn)入休眠等待被喚醒
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    // 因通道關(guān)閉或者讀到數(shù)據(jù)被喚醒
    gp.waiting = nil
    success := mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, success // 10.返回成功
}
  • 加鎖。
  • 將當(dāng)前協(xié)程gp封裝成sudog節(jié)點(diǎn),加入channel的阻塞讀隊(duì)列recvq。
  • 調(diào)用gopark將當(dāng)前協(xié)程設(shè)置為等待狀態(tài)并解鎖,觸發(fā)調(diào)度其它協(xié)程運(yùn)行。
  • 因讀到數(shù)據(jù)或者channel被關(guān)閉,協(xié)程從park中被喚醒,清理sudog結(jié)構(gòu)。
  • 返回

整體讀流程

// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 1.channel為nil
    if c == nil {
        // 否則,當(dāng)前Goroutine阻塞掛起
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    // 2.加鎖
    lock(&c.lock)
    // 3.如果channel已關(guān)閉,并且緩沖區(qū)無元素,返回(true,false)
    if c.closed != 0 {
        if c.qcount == 0 {
            unlock(&c.lock)
            if ep != nil {
                //根據(jù)channel元素的類型清理ep對(duì)應(yīng)地址的內(nèi)存,即ep接收了channel元素類型的零值
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
        // The channel has been closed, but the channel's buffer have data.
    } else {
        // Just found waiting sender with not closed.
        // 4.存在阻塞寫協(xié)程
        if sg := c.sendq.dequeue(); sg != nil {
            // Found a waiting sender. If buffer is size 0, receive value
            // directly from sender. Otherwise, receive from head of queue
            // and add sender's value to the tail of the queue (both map to
            // the same buffer slot because the queue is full).
            // 如果無緩沖區(qū),那么直接從sender接收數(shù)據(jù);否則,從buf隊(duì)列的頭部接收數(shù)據(jù),并把sender的數(shù)據(jù)加到buf隊(duì)列的尾部
            recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true, true // 接收成功
        }
    }
    // 5.緩沖區(qū)buf中有元素,直接從buf拷貝元素到當(dāng)前協(xié)程(在已關(guān)閉的情況下,隊(duì)列有數(shù)據(jù)依然會(huì)讀)
    if c.qcount > 0 {
        // Receive directly from queue
        qp := chanbuf(c, c.recvx)
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)// 將從buf中取出的元素拷貝到當(dāng)前協(xié)程
        }
        typedmemclr(c.elemtype, qp) // 同時(shí)將取出的數(shù)據(jù)所在的內(nèi)存清空
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true // 接收成功
    }
    // no sender available: block on this channel.
    // 6.獲取當(dāng)前Goroutine,封裝成sudog節(jié)點(diǎn),加入channel阻塞讀隊(duì)列recvq
    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg) // 入接收隊(duì)列recvq
    atomic.Store8(&gp.parkingOnChan, 1)
    // 7.掛起當(dāng)前Goroutine,設(shè)置為_Gwaiting狀態(tài),進(jìn)入休眠等待被喚醒
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    // 8.因通道關(guān)閉或者可讀被喚醒
    gp.waiting = nil
    gp.activeStackChans = false
    success := mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    // 9.返回
    return true, success 
}

通道讀流程如下:

  • channel為nil檢查。空則死鎖。
  • 加鎖。
  • 如果channel已關(guān)閉,并且緩沖區(qū)無數(shù)據(jù),讀變量賦零值,返回。
  • 當(dāng)存在阻塞寫協(xié)程,如果緩沖區(qū)已滿,則直接從sender接收數(shù)據(jù);否則,從環(huán)形緩沖區(qū)頭部接收數(shù)據(jù),并把sender的數(shù)據(jù)加到環(huán)形緩沖區(qū)尾部。喚醒sender,將其放入?yún)f(xié)程可運(yùn)行隊(duì)列中等待調(diào)度運(yùn)行,返回。
  • 如果緩沖區(qū)中有數(shù)據(jù),直接從緩沖區(qū)拷貝數(shù)據(jù)到當(dāng)前協(xié)程,返回。
  • 當(dāng)既沒有阻塞寫協(xié)程,緩沖區(qū)也沒有數(shù)據(jù)時(shí),將協(xié)程加入阻塞讀隊(duì)列recvq。
  • 調(diào)用gopark將當(dāng)前協(xié)程設(shè)置為等待狀態(tài),進(jìn)入休眠等待被喚醒,觸發(fā)協(xié)程調(diào)度。
  • 因通道關(guān)閉或者可讀被喚醒。
  • 返回。

1.5 通道關(guān)閉

func closechan(c *hchan) {
    // // 1.channel為nil則panic
    if c == nil {
        panic(plainError("close of nil channel"))
    }
    lock(&c.lock)
    // 2.已關(guān)閉的channel再次關(guān)閉則panic
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }
    // 設(shè)置關(guān)閉標(biāo)記
    c.closed = 1
    var glist gList
    // 遍歷recvq和sendq中的協(xié)程放入glist
    // release all readers
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = unsafe.Pointer(sg)
        sg.success = false
        glist.push(gp)
    }
    // release all writers (they will panic)
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = unsafe.Pointer(sg)
        sg.success = false
        glist.push(gp)
    }
    unlock(&c.lock)
    // 3.將glist中所有Goroutine的狀態(tài)置為_Grunnable,等待調(diào)度器進(jìn)行調(diào)度
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}
  • channel為nil檢查。為空則panic
  • 已關(guān)閉channel再次被關(guān)閉,panic
  • 將sendq和recvq所有Goroutine的狀態(tài)置為_Grunnable,放入?yún)f(xié)程調(diào)度隊(duì)列等待調(diào)度器調(diào)度

2 高頻面試題

channel 的底層實(shí)現(xiàn)原理 (數(shù)據(jù)結(jié)構(gòu))

nil、關(guān)閉的 channel、有數(shù)據(jù)的 channel,再進(jìn)行讀、寫、關(guān)閉會(huì)怎么樣?(各類變種題型)

有緩沖channel和無緩沖channel的區(qū)別

原文鏈接https://reurl.cc/Wx26jD

到此這篇關(guān)于Golang channel底層是如何實(shí)現(xiàn)的?(深度好文)的文章就介紹到這了,更多相關(guān)Golang channel內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Go語言實(shí)現(xiàn)單端口轉(zhuǎn)發(fā)到多個(gè)端口

    Go語言實(shí)現(xiàn)單端口轉(zhuǎn)發(fā)到多個(gè)端口

    這篇文章主要為大家詳細(xì)介紹了Go語言實(shí)現(xiàn)單端口轉(zhuǎn)發(fā)到多個(gè)端口,文中的示例代碼講解詳細(xì),具有一定的參考價(jià)值,對(duì)大家的學(xué)習(xí)或工作有一定的幫助,需要的小伙伴可以了解下
    2024-02-02
  • Golang 使用Map實(shí)現(xiàn)去重與set的功能操作

    Golang 使用Map實(shí)現(xiàn)去重與set的功能操作

    這篇文章主要介紹了Golang 使用 Map 實(shí)現(xiàn)去重與 set 的功能操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2021-04-04
  • 詳解Go?將在下個(gè)版本支持新型排序算法pdqsort

    詳解Go?將在下個(gè)版本支持新型排序算法pdqsort

    這篇文章主要介紹了Go?將在下個(gè)版本支持新型排序算法:pdqsort,而就Go支持pdqsort算法,在HN上引起了不少的討論,有人表示,我們研究排序算法這么久了,很驚訝我們還能想出能產(chǎn)生實(shí)際改進(jìn)的優(yōu)化方案。對(duì)此,你怎么看,快快上手體驗(yàn)一下吧
    2022-04-04
  • GOLANG使用Context管理關(guān)聯(lián)goroutine的方法

    GOLANG使用Context管理關(guān)聯(lián)goroutine的方法

    這篇文章主要介紹了GOLANG使用Context管理關(guān)聯(lián)goroutine的方法,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-01-01
  • Golang Map value不可尋址使用指針類型代替示例詳解

    Golang Map value不可尋址使用指針類型代替示例詳解

    這篇文章主要為大家介紹了Golang Map value不可尋址使用指針類型代替示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-11-11
  • Go語言封裝HTTP請(qǐng)求的Curl工具包詳解

    Go語言封裝HTTP請(qǐng)求的Curl工具包詳解

    在 Go 語言開發(fā)中,與 HTTP 服務(wù)進(jìn)行交互是非常常見的需求,本文將分享一個(gè)用 Go 語言封裝的 Curl 工具包,它提供了簡潔易用的接口來進(jìn)行 HTTP 請(qǐng)求,需要的可以了解下
    2025-03-03
  • 使用Go語言簡單模擬Python的生成器

    使用Go語言簡單模擬Python的生成器

    這篇文章主要介紹了使用Go語言簡單模擬Python的生成器,Python的generator是非??岬墓δ?用Go實(shí)現(xiàn)的代碼也較為簡潔,需要的朋友可以參考下
    2015-08-08
  • Go語言中strings和strconv包示例代碼詳解

    Go語言中strings和strconv包示例代碼詳解

    這篇文章主要介紹了Go語言中strings和strconv包示例代碼詳解 ,非常不錯(cuò),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2018-11-11
  • golang使用go mod導(dǎo)入本地包和第三方包的方式

    golang使用go mod導(dǎo)入本地包和第三方包的方式

    這篇文章主要介紹了golang使用go mod導(dǎo)入本地包和第三方包的方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2024-01-01
  • go 協(xié)程返回值處理操作

    go 協(xié)程返回值處理操作

    這篇文章主要介紹了go 協(xié)程返回值處理操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12

最新評(píng)論