欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Go channel發(fā)送方和接收方如何相互阻塞等待源碼解讀

 更新時間:2023年12月18日 10:42:08   作者:菜皮日記  
這篇文章主要為大家介紹了Go channel發(fā)送方和接收方如何相互阻塞等待源碼解讀,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

并發(fā)編程的可見性

在 Go 官網(wǎng)上的內(nèi)存模型一文中,介紹了在 Go 并發(fā)編程下數(shù)據(jù)可見性問題,可見性是并發(fā)編程中一個重要概念,指的是在哪些條件下,可以保證一個線程中讀取某個變量時,可以觀察到另一個線程對該變量的寫入后的值,Go 語言中的 goroutine 也適用。

一般來說可見性屬于偏硬件和底層,因為涉及到多核 CPU 的 cache 讀寫和同步問題,開發(fā)者不需要關(guān)心細節(jié),高級編程語言要么屏蔽掉了這些細節(jié),要么會給出一些保證,承諾在確定的條件下就會得到確定的結(jié)果。

Go channel 有一個特性是在一個無緩沖的 channel 上發(fā)送和接收必須等待對方準備好,才可以執(zhí)行,否則會被阻塞。實際上這就是一個同步保證,那么這個同步保證是如何實現(xiàn)的?下面看看官方文章中是如何解釋的。

先 send 后 receive

文中對 channel 的描述有幾個原則,第一個是

A send on a channel is synchronized before the completion of the corresponding receive from that channel.

意思是:在一個 channel 上的發(fā)送操作應(yīng)該發(fā)生在對應(yīng)的接收操作完成之前。說人話就是:要先發(fā)送數(shù)據(jù),然后才能接收數(shù)據(jù),否則就會阻塞。這也比較符合一般的認知。

并用下面一段代碼舉例說明,這段代碼確保一定會輸出 "hello, world”。

var c = make(chan int, 10)
var a string

func f() {
    a = "hello, world"
    c <- 0
}

func main() {
    go f()
    <-c
    print(a)
}

f 函數(shù)負責給變量 a 賦值,main 函數(shù)負責打印變量 a。main 函數(shù)阻塞等待在 <- c 處,直到 f 函數(shù)對 a 賦值之后并寫入數(shù)據(jù)到 c 中,main 函數(shù)才被喚醒繼續(xù)執(zhí)行,所以此時打印 a 必然會得到結(jié)果。

先 receive 后 send?

而下面這段描述有點反直覺

A receive from an unbuffered channel is synchronized before the completion of the corresponding send on that channel.

意思是在無緩沖 channel 上的接收操作發(fā)生在對應(yīng)的發(fā)送操作完成之前,說人話就是:要先接收數(shù)據(jù),之后才可以發(fā)送數(shù)據(jù),否則就會阻塞。這句話看上去與第一條相悖,因為第一條強調(diào)發(fā)送操作要在接收完成之前發(fā)生,而這一條強調(diào)接收操作要在發(fā)送完成之前發(fā)生,這樣相互等待對方的情況,不會陷入死鎖狀態(tài)嗎?

下面的示例代碼與前一個類似,區(qū)別是將 c 換成了無緩沖 channel,并把 c 的寫入和讀取調(diào)換了位置,這段代碼同樣可以保證輸出 "hello, world”。

var c = make(chan int)
var a string

func f() {
    a = "hello, world"
    <-c
}

func main() {
    go f()
    c <- 0
    print(a)
}

這兩段話到底是什么意思?為什么要相互等待但又不會死鎖?

接下來看看 runtime/chan.go 中是怎么實現(xiàn) channel 的發(fā)送和接收的。

channel 的結(jié)構(gòu)

首先看看 channel 的數(shù)據(jù)結(jié)構(gòu)

type hchan struct {
    qcount   uint           // 緩沖區(qū)元素數(shù)量
    dataqsiz uint           // 緩沖區(qū)大小
    buf      unsafe.Pointer // 緩沖區(qū)起始指針
    elemsize uint16
    closed   uint32
    elemtype *_type
    sendx    uint   // 下一次發(fā)送的元素在隊列中的索引
    recvx    uint   // 下一個接收的元素在隊列中的索引
    recvq    waitq  // 當隊列無數(shù)據(jù)時,receiver 阻塞等待的隊列
    sendq    waitq  // 當隊列無空間時,sender 阻塞等待的隊列

    lock mutex
}

channel 內(nèi)部實現(xiàn)了一個環(huán)形隊列,通過 qcount dataqsiz buf sendx recvx 幾個部分組成。

另外 channel 還維護了兩個等待隊列,如果在執(zhí)行 <-c receive 操作時,此時 channel 不滿足接收條件,receiver 會進入 recvq 等待隊列;同樣的如果執(zhí)行 c<- send 操作時,此時 channel 不滿足發(fā)送條件,sender 會進入 sendq 等待隊列。

具體看代碼:

var c = make(chan int)
var a string

func f() {
    a = "hello, world"

    x := <-c    // 3
    fmt.Println("\nx:", x)
}

func main() {
    go f()      // 1
    c <- 123456 // 2

    print(a)
}

send 具體干了什么

當 main 函數(shù)執(zhí)行到 c<-123456 是,會執(zhí)行 runtime/chan.go 中的 chansend 函數(shù),該函數(shù)首先會判斷當前 channel c 的等待接收隊列是否有阻塞的 receiver

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  // ...省略部分代碼...

  // 是否有等待的 receiver 存在
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

  // ...省略部分代碼...
}

如果有等待的 receiver 則彈出隊列,調(diào)用 send 函數(shù),其中 sg 就表示 receiver,sg.elem 表示將數(shù)據(jù)接收到哪里去,這個地址也就對應(yīng)示例代碼中的變量 x 的地址。

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  // ...省略部分代碼...

    if sg.elem != nil {
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }

  // ...省略部分代碼...
  // 將 goroutine 置為可執(zhí)行狀態(tài)
}

sendDirect 函數(shù)就是直接從 src 里面將數(shù)據(jù)復制到 dst 中。

// 直接拷貝數(shù)據(jù)
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

回到 chansend 函數(shù),如果沒有等待的 receiver,那么會查看當前 buf 中是否有空間,如果有空間,則數(shù)據(jù)緩存到 buf 中。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  // ...省略部分代碼...

  // 將數(shù)據(jù)緩存到 buf 中
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            racenotify(c, c.sendx, nil)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }

  // ...省略部分代碼...
}

如果也沒有 buf 空間,那么就將 sender 本身放入到 sendq 等待隊列中。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  // ...省略部分代碼...

  // 進入 sendq 等待隊列
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)

  // ...省略部分代碼...
}

總結(jié)起來 send 操作分三部分:

  • 如果當前 channel 上有等待的 receiver,則直接 copy 數(shù)據(jù)過去
  • 否則如果當前 buf 有空閑空間,則將數(shù)據(jù)存在 buf 中
  • 否則將 sender 本身加入到 sendq 等待隊列中

receive 具體干了什么

相應(yīng)的與發(fā)送類似,執(zhí)行到示例代碼中第 (3) 步接收數(shù)據(jù)時,會調(diào)用 runtime/chan.go 中的 chanrecv 函數(shù)來處理接收,同樣是先看 sender 等待隊列是否有阻塞的 sender

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  // ...省略部分代碼...

  // 從等待的 sender 取一個出來
    if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }
  // ...省略部分代碼...
}

如果有的等待的 sender,那么將 sender 取出來,并復制數(shù)據(jù)。

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // ...省略部分代碼...
    if ep != nil {
        // copy data from sender
        recvDirect(c.elemtype, sg, ep)
    }
  // ...省略部分代碼...
}

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
    src := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

如果沒有等待的 sender,那么看 buf 中有沒有緩存的數(shù)據(jù)

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  // ...省略部分代碼...
    if c.qcount > 0 {
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            racenotify(c, c.recvx, nil)
        }
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }
  // ...省略部分代碼...
}

最后如果也沒有 buf 數(shù)據(jù),那么久把自己加入到 receiver 等待隊列中 recvq

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  // ...省略部分代碼...

    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)

  // ...省略部分代碼...
}

總結(jié)起來 receive 操作分三部分:

  • 如果當前 channel 上有等待的 sender,則直接 copy 數(shù)據(jù)過去
  • 否則如果當前 buf 有緩存的數(shù)據(jù),則將讀取該數(shù)據(jù)
  • 否則將 receiver 本身加入到 recvq 等待隊列中

小結(jié)

這樣一來就能夠理解前面的兩個原則了,在一個無緩沖的 channel 中,無論是 sender 先執(zhí)行,還是 receiver 先執(zhí)行,都會因為找不到對方,并且沒有 buf 空間的情況下,將自己加入到等待隊列;當對方開始執(zhí)行時就會檢查到已經(jīng)有對端正在阻塞,進而拷貝數(shù)據(jù),并喚醒阻塞的對象最終走完整個流程。

有一種說法是:sender 必須在 receiver 準備好才能執(zhí)行,否則就會阻塞;而 receiver 必須在 sender 準備好才能執(zhí)行,否則就會阻塞;這個說法沒錯,但是太籠統(tǒng)了,什么叫準備好?怎么算是準備好?這是比較模糊的。而看過 send 和 receive 的流程之后,就更能理解整個過程了。

為什么要有無緩沖 channel

實際上兩個 goroutine 相互等待對方到達某個狀態(tài)的效果,非常類似操作系統(tǒng)中的一種同步機制:屏障 barrier,同步屏障要求只有當所有進程都到達屏障后,才能一起執(zhí)行下一狀態(tài),否則就阻塞在屏障處。

回到 channel 操作,即 sender 和 receiver 無論誰先執(zhí)行,都必須等待對方也已經(jīng)執(zhí)行,兩者才可以繼續(xù)執(zhí)行。就像一塊電路板串聯(lián)有兩個開關(guān),要想電路聯(lián)通,必須兩個開關(guān)都被打開才可以,而不管哪一個先打開,都必須等待另一個開關(guān)也打開,之后電流才可以接通電路也才聯(lián)通。

可以將無緩沖 channel 看做是一種同步屏障,同步屏障能夠讓多個 goroutine 都達到某種狀態(tài)之后才可以繼續(xù)執(zhí)行,這是帶緩沖 channel 無法做到的。另外在無緩沖 channel 數(shù)據(jù)的交換更加簡單快速,因為不需要維護緩存 buf,實現(xiàn)邏輯也更簡單,運行更可靠。

以上就是Go channel發(fā)送方和接收方如何相互阻塞等待源碼解讀的詳細內(nèi)容,更多關(guān)于Go channel相互阻塞等待的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • go語言中的udp協(xié)議及TCP通訊實現(xiàn)示例

    go語言中的udp協(xié)議及TCP通訊實現(xiàn)示例

    這篇文章主要為大家介紹了go語言中的udp協(xié)議及TCP通訊的實現(xiàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步早日升職加薪
    2022-04-04
  • 詳解如何在Go服務(wù)中做鏈路追蹤

    詳解如何在Go服務(wù)中做鏈路追蹤

    使用 Go 語言開發(fā)微服務(wù)的時候,需要追蹤每一個請求的訪問鏈路,本文主要介紹了如何在Go 服務(wù)中做鏈路追蹤,感興趣的可以了解一下
    2021-09-09
  • golang接口實現(xiàn)調(diào)用修改(值接收者指針接收者)場景詳解

    golang接口實現(xiàn)調(diào)用修改(值接收者指針接收者)場景詳解

    這篇文章主要為大家介紹了golang接口實現(xiàn)調(diào)用修改值接收者指針接收者示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-08-08
  • golang配制高性能sql.DB的使用

    golang配制高性能sql.DB的使用

    本文主要講述SetMaxOpenConns(),?SetMaxIdleConns()?和?SetConnMaxLifetime()方法,?您可以使用它們來配置sql.DB的行為并改變其性能,感興趣的可以了解一下
    2021-12-12
  • go程序執(zhí)行交叉編譯的流程步驟

    go程序執(zhí)行交叉編譯的流程步驟

    go程序可用通過交叉編譯的方式在一個平臺輸出多個平臺可運行的二進制包,本文給大家詳細介紹了go程序執(zhí)行交叉編譯的流程步驟,文中有詳細的代碼示例供大家參考,需要的朋友可以參考下
    2024-07-07
  • Go語言resty http包調(diào)用jenkins api實例

    Go語言resty http包調(diào)用jenkins api實例

    這篇文章主要為大家介紹了Go語言resty http包調(diào)用jenkins api實例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-06-06
  • Go語言并發(fā)編程之控制并發(fā)數(shù)量實現(xiàn)實例

    Go語言并發(fā)編程之控制并發(fā)數(shù)量實現(xiàn)實例

    這篇文章主要為大家介紹了Go語言并發(fā)編程之控制并發(fā)數(shù)量實例探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2024-01-01
  • 關(guān)于升級go1.18的goland問題詳解

    關(guān)于升級go1.18的goland問題詳解

    作為一個go語言程序員,覺得自己有義務(wù)為go新手開一條更簡單便捷的上手之路,下面這篇文章主要給大家介紹了關(guān)于升級go1.18的goland問題的相關(guān)資料,需要的朋友可以參考下
    2022-11-11
  • 十個Go map面試??紗栴}合集

    十個Go map面試??紗栴}合集

    go面試中,map相關(guān)知識點問的比較多,這篇文章主要為大家整理歸納了10個??嫉膯栴},文中的示例代碼講解詳細,希望對大家有一定的幫助
    2023-07-07
  • Go秒爬博客園100頁新聞

    Go秒爬博客園100頁新聞

    利用go語言的協(xié)程并發(fā)優(yōu)勢爬取網(wǎng)頁速度相當之快,博客園100頁新聞標題只需一秒即可全部爬取,跟著小編一起去看看如何實現(xiàn)的,希望大家可以從中受益
    2018-09-09

最新評論