欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

GoLang channel底層代碼實(shí)現(xiàn)詳解

 更新時(shí)間:2022年10月12日 08:57:48   作者:~龐貝  
Channel和goroutine的結(jié)合是Go并發(fā)編程的大殺器。而Channel的實(shí)際應(yīng)用也經(jīng)常讓人眼前一亮,通過與select,cancel,timer等結(jié)合,它能實(shí)現(xiàn)各種各樣的功能。接下來,我們就要梳理一下GoLang channel底層代碼實(shí)現(xiàn)

1.channel 簡介

Go語言有個(gè)很出名的話是“以通信的手段來共享內(nèi)存”,channel就是其最佳的體現(xiàn),channel提供一種機(jī)制,可以同步兩個(gè)并發(fā)執(zhí)行的函數(shù),還可以讓兩個(gè)函數(shù)通過互相傳遞特定類型的值來通信

channel有兩種初始化方式,分別是帶緩存的和不帶緩存的:

make(chan int)   // 無緩存 chan
make(chan int, 10)  // 有緩存 chan

使用方式也很簡單:

c := make(chan int)
defer close(c)
go func(){
  c <- 5   // send
}()
n := <- c   // recv

十分簡潔的做到了不同協(xié)程的交互。

2.channel 內(nèi)部結(jié)構(gòu)

chan的實(shí)現(xiàn)在runtime/chan.go,是一個(gè)hchan的結(jié)構(gòu)體:

type hchan struct {
  qcount   uint           // 隊(duì)列中的數(shù)據(jù)個(gè)數(shù)
  dataqsiz uint           // 環(huán)形隊(duì)列的大小,channel本身是一個(gè)環(huán)形隊(duì)列
  buf      unsafe.Pointer // 存放實(shí)際數(shù)據(jù)的指針,用unsafe.Pointer存放地址,為了避免gc
  elemsize uint16 
  closed   uint32 // 標(biāo)識channel是否關(guān)閉
  elemtype *_type // 數(shù)據(jù) 元素類型
  sendx    uint   // send的 index
  recvx    uint   // recv 的 index
  recvq    waitq  // 阻塞在 recv 的隊(duì)列
  sendq    waitq  // 阻塞在 send 的隊(duì)列
  lock mutex  // 鎖 
}

可以看出,channel本身是一個(gè)環(huán)形緩沖區(qū),數(shù)據(jù)存放到堆上面,channel的同步是通過鎖實(shí)現(xiàn)的,并不是想象中的lock-free的方式,channel中有兩個(gè)隊(duì)列,一個(gè)是發(fā)送阻塞隊(duì)列,一個(gè)是接收阻塞隊(duì)列。當(dāng)向一個(gè)已滿的channel發(fā)送數(shù)據(jù)會被阻塞,此時(shí)發(fā)送協(xié)程會被添加到sendq中,同理,當(dāng)向一個(gè)空的channel接收數(shù)據(jù)時(shí),接收協(xié)程也會被阻塞,被置入recvq中。

waitq是一個(gè)鏈表,里面對g結(jié)構(gòu)做了一下簡單的封裝。

3.創(chuàng)建channel

當(dāng)我們在代碼里面通過make創(chuàng)建一個(gè)channel時(shí),實(shí)際調(diào)用的是下面這個(gè)函數(shù):

CALL runtime.makechan(SB)

makechan的實(shí)現(xiàn)如下所示:

func makechan(t *chantype, size int) *hchan {
  elem := t.elem
  // 判斷 元素類型的大小
  if elem.size >= 1<<16 {
    throw("makechan: invalid channel element type")
  }
  // 判斷對齊限制
  if hchanSize%maxAlign != 0 || elem.align > maxAlign {
    throw("makechan: bad alignment")
  }
  // 判斷 size非負(fù) 和 是否大于 maxAlloc限制
  mem, overflow := math.MulUintptr(elem.size, uintptr(size))
  if overflow || mem > maxAlloc-hchanSize || size < 0 {
    panic(plainError("makechan: size out of range"))
  }
  var c *hchan
  switch {
  case mem == 0: // 無緩沖區(qū),即 make沒設(shè)置大小
    c = (*hchan)(mallocgc(hchanSize, nil, true)) 
    c.buf = c.raceaddr()
  case elem.ptrdata == 0:  // 數(shù)據(jù)類型不包含指針
    c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
    c.buf = add(unsafe.Pointer(c), hchanSize)
  default:  // 如果包含指針
    // Elements contain pointers.
    c = new(hchan)
    c.buf = mallocgc(mem, elem, true)
  }
  c.elemsize = uint16(elem.size)
  c.elemtype = elem
  c.dataqsiz = uint(size)
  if debugChan {
    print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
  }
  return c
}

根據(jù)上面的代碼,我們可以看到,創(chuàng)建channel分為三種情況:

1.第一種緩沖區(qū)大小為0,此時(shí)只需要分配hchansize大小的內(nèi)存就ok

2.第二種緩沖區(qū)大小不為0,且channel的類型不包含指針,此時(shí)buf為hchanSize+元素大小*元素個(gè)數(shù)的連續(xù)內(nèi)存

3.第三種緩沖區(qū)大小不為0,且channel的類型包含指針,則不能簡單的根據(jù)元素的大小去申請內(nèi)存,需要通過mallocgc去分配內(nèi)存

4.發(fā)送數(shù)據(jù)

發(fā)送數(shù)據(jù)會調(diào)用chan.go中的如下接口:

CALL runtime.chansend1(SB)

chansend1會調(diào)用chansend接口,chansend方法簽名如下:

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool

c是具體的channel,ep是發(fā)送的數(shù)據(jù),block為true表示阻塞的發(fā)送,一般向channel發(fā)送數(shù)據(jù)都是阻塞的,如果channel數(shù)據(jù)滿了,會一直阻塞在這里。但是在select中如果有case監(jiān)聽某個(gè)channel的發(fā)送,那么此時(shí)的block參數(shù)為false,后續(xù)分析select實(shí)現(xiàn)會講到。

select {
    case <-c:  // 這里為非阻塞發(fā)送
        // do some thing
    default:
        // do some thing
}

chansend接口會對一些條件做判斷

如果向一個(gè)為nil的channel發(fā)送數(shù)據(jù),如果是阻塞發(fā)送會一直阻塞:

 if c == nil {
    if !block {
      return false
    }
    gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
    throw("unreachable")
  }

首先會加鎖,保證原子性,如果向一個(gè)已關(guān)閉的channel發(fā)送數(shù)據(jù)就會panic。

lock(&c.lock)
if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("send on closed channel"))
  }

如果此時(shí)recvq中有等待協(xié)程,就直接調(diào)用send函數(shù)將數(shù)據(jù)復(fù)制給接收方, 實(shí)現(xiàn)如下:

// sg 為接收者協(xié)程,ep為發(fā)送元素
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  if raceenabled {
    if c.dataqsiz == 0 {
      racesync(c, sg)
    } else {
      qp := chanbuf(c, c.recvx)
      raceacquire(qp)
      racerelease(qp)
      raceacquireg(sg.g, qp)
      racereleaseg(sg.g, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
        c.recvx = 0
      }
      c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
  }
  if sg.elem != nil {
    sendDirect(c.elemtype, sg, ep)
    sg.elem = nil
  }
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  if sg.releasetime != 0 {
    sg.releasetime = cputicks()
  }
  goready(gp, skip+1)
}

如果此時(shí)沒有等待協(xié)程,并且數(shù)據(jù)未滿的情況下,就將數(shù)據(jù)copy到環(huán)形緩沖區(qū)中,將位置后移一位。

if c.qcount < c.dataqsiz {  // 如果 未滿
    // Space is available in the channel buffer. Enqueue the element to send.
    qp := chanbuf(c, c.sendx)
    if raceenabled {
      raceacquire(qp)
      racerelease(qp)
    }
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
      c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
  }

如果此時(shí)環(huán)形緩沖區(qū)數(shù)據(jù)滿了,如果是阻塞發(fā)送,此時(shí)會把發(fā)送方放到sendq隊(duì)列中。

5.接收數(shù)據(jù)

接收數(shù)據(jù)會調(diào)用下面的接口:

CALL runtime.chanrecv1(SB)

chanrecv1會調(diào)用chanrecv接口,chanrecv方法簽名如下:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)

c 指需要操作的channel,接收的數(shù)據(jù)會寫到ep中,block與send中的情況一樣,表示是阻塞接收還是非阻塞接收,非阻塞接收指在select中case 接收一個(gè)channel值:

select {
    case a := <-c:  // 這里為非阻塞接收,沒有數(shù)據(jù)直接返回
        // do some thing
    default:
        // do some thing
}

首先chanrecv也會做一些參數(shù)校驗(yàn)

如果channel為nil并且是非阻塞模式,直接返回,如果是阻塞模式,永遠(yuǎn)等待

 if c == nil {
    if !block {
      return
    }
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
    throw("unreachable")
  }

隨后會加鎖,防止競爭讀寫

lock(&c.lock)

如果向一個(gè)已關(guān)閉的channel接收數(shù)據(jù),此時(shí)channel里面還有數(shù)據(jù),那么依然可以接收數(shù)據(jù),屬于正常接收數(shù)據(jù)情況。

如果向一個(gè)已關(guān)閉的channel接收數(shù)據(jù),此時(shí)channel里面沒有數(shù)據(jù),那么此時(shí)返回的是(true,false),表示有值返回,但不是我們需要的值:

 if c.closed != 0 && c.qcount == 0 {
    if raceenabled {
      raceacquire(c.raceaddr())
    }
    unlock(&c.lock)
    if ep != nil {
      typedmemclr(c.elemtype, ep)  // 將 ep 指向的內(nèi)存塊置 0
    }
    return true, false
  }

接收也分為三種情況:

如果此時(shí) sendq中有發(fā)送方在阻塞,此時(shí)會調(diào)用recv函數(shù):

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  if c.dataqsiz == 0 {
    if raceenabled {
      racesync(c, sg)
    }
    if ep != nil {
      recvDirect(c.elemtype, sg, ep)
    }
  } else {
    qp := chanbuf(c, c.recvx)
    if raceenabled {
      raceacquire(qp)
      racerelease(qp)
      raceacquireg(sg.g, qp)
      racereleaseg(sg.g, qp)
    }
    // copy data from queue to receiver
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    // copy data from sender to queue
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
  }
  sg.elem = nil
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  if sg.releasetime != 0 {
    sg.releasetime = cputicks()
  }
  goready(gp, skip+1)
}

此時(shí)有發(fā)送方在等待,表示此時(shí)channel中數(shù)據(jù)已滿,這個(gè)時(shí)候會將channel頭部的數(shù)據(jù)copy到接收方,然后將發(fā)送方隊(duì)列頭部的發(fā)送者的數(shù)據(jù)copy到那個(gè)位置。這涉及到兩次copy操作。

第二種情況是如果沒有發(fā)送方等待,此時(shí)會把數(shù)據(jù)copy到channel中:

if c.qcount > 0 {
    // Receive directly from queue
    qp := chanbuf(c, c.recvx)
    if raceenabled {
      raceacquire(qp)
      racerelease(qp)
    }
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
  }

第三種情況如果channel里面沒有數(shù)據(jù),如果是非阻塞接收直接返回false,如果是阻塞接收會將接收方協(xié)程放入channel的recvq中。

6.關(guān)閉channel

關(guān)閉channel時(shí)會調(diào)用如下接口:

func closechan(c *hchan) 

首先會做一些數(shù)據(jù)校驗(yàn):

if c == nil {
    panic(plainError("close of nil channel"))
  }
  lock(&c.lock)
  if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("close of closed channel"))
  }
  if raceenabled {
    callerpc := getcallerpc()
    racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
    racerelease(c.raceaddr())
  }
  c.closed = 1   //置關(guān)閉標(biāo)記位

如果向一個(gè)為nil的channel或者向一個(gè)已關(guān)閉的channel發(fā)起close操作就會panic。

隨后會喚醒所有在recvq或者sendq里面的協(xié)程:

var glist gList
  // release all readers
  for {
    sg := c.recvq.dequeue()
    if sg == nil {
      break
    }
    if sg.elem != nil {
      typedmemclr(c.elemtype, sg.elem)
      sg.elem = nil
    }
    if sg.releasetime != 0 {
      sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    if raceenabled {
      raceacquireg(gp, c.raceaddr())
    }
    glist.push(gp)
  }
  // release all writers (they will panic)
  for {
    sg := c.sendq.dequeue()
    if sg == nil {
      break
    }
    sg.elem = nil
    if sg.releasetime != 0 {
      sg.releasetime = cputicks()
    }
    gp := sg.g
    gp.param = nil
    if raceenabled {
      raceacquireg(gp, c.raceaddr())
    }
    glist.push(gp)
  }
  unlock(&c.lock)

如果存在接收者,將接收數(shù)據(jù)通過typedmemclr置0。

如果存在發(fā)送者,將所有發(fā)送者panic。

7.總結(jié)

綜上分析,在使用channel有這么幾點(diǎn)要注意

1.確保所有數(shù)據(jù)發(fā)送完后再關(guān)閉channel,由發(fā)送方來關(guān)閉

2.不要重復(fù)關(guān)閉channel

3.不要向?yàn)閚il的channel里面發(fā)送值

4.不要向?yàn)閚il的channel里面接收值

5.接收數(shù)據(jù)時(shí),可以通過返回值判斷是否ok

n , ok := <- c
if ok{
  // do some thing
}

這樣防止channel被關(guān)閉后返回了零值,對業(yè)務(wù)造成影響

到此這篇關(guān)于GoLang channel底層代碼實(shí)現(xiàn)詳解的文章就介紹到這了,更多相關(guān)GoLang channel內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Go語言RPC Authorization進(jìn)行簡單ip安全驗(yàn)證的方法

    Go語言RPC Authorization進(jìn)行簡單ip安全驗(yàn)證的方法

    這篇文章主要介紹了Go語言RPC Authorization進(jìn)行簡單ip安全驗(yàn)證的方法,實(shí)例分析了Go語言進(jìn)行ip驗(yàn)證的技巧,需要的朋友可以參考下
    2015-03-03
  • golang簡單讀寫文件示例

    golang簡單讀寫文件示例

    這篇文章主要介紹了golang簡單讀寫文件的方法,實(shí)例分析了Go簡單文件讀取與寫入操作的相關(guān)技巧,需要的朋友可以參考下
    2016-07-07
  • golang如何通過viper讀取config.yaml文件

    golang如何通過viper讀取config.yaml文件

    這篇文章主要介紹了golang通過viper讀取config.yaml文件,圍繞golang讀取config.yaml文件的相關(guān)資料展開詳細(xì)內(nèi)容,需要的小伙伴可以參考一下
    2022-03-03
  • golang中struct和interface的基礎(chǔ)使用教程

    golang中struct和interface的基礎(chǔ)使用教程

    Go不同于一般的面向?qū)ο笳Z言,需要我們好好的學(xué)習(xí)研究,下面這篇文章主要給大家介紹了關(guān)于golang中struct和interface的基礎(chǔ)使用的相關(guān)資料,需要的朋友可以參考借鑒,下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。
    2018-03-03
  • 使用go讀取gzip格式的壓縮包的操作

    使用go讀取gzip格式的壓縮包的操作

    這篇文章主要介紹了使用go讀取gzip格式的壓縮包的操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • 一文帶大家了解Go語言中的內(nèi)聯(lián)優(yōu)化

    一文帶大家了解Go語言中的內(nèi)聯(lián)優(yōu)化

    內(nèi)聯(lián)優(yōu)化是一種常見的編譯器優(yōu)化策略,通俗來講,就是把函數(shù)在它被調(diào)用的地方展開,這樣可以減少函數(shù)調(diào)用所帶來的開銷,本文主要為大家介紹了Go中內(nèi)聯(lián)優(yōu)化的具體使用,需要的可以參考下
    2023-05-05
  • 使用GO實(shí)現(xiàn)Paxos共識算法的方法

    使用GO實(shí)現(xiàn)Paxos共識算法的方法

    這篇文章主要介紹了使用GO實(shí)現(xiàn)Paxos共識算法,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作,具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2020-09-09
  • golang之JWT實(shí)現(xiàn)的示例代碼

    golang之JWT實(shí)現(xiàn)的示例代碼

    這篇文章主要介紹了golang之JWT實(shí)現(xiàn)的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-05-05
  • GO語言操作Elasticsearch示例分享

    GO語言操作Elasticsearch示例分享

    這篇文章主要介紹了GO語言操作Elasticsearch示例分享的相關(guān)資料,需要的朋友可以參考下
    2023-01-01
  • 詳解如何使用Go的Viper來解析配置信息

    詳解如何使用Go的Viper來解析配置信息

    Viper庫為Golang語言開發(fā)者提供了對不同數(shù)據(jù)源和不同格式的配置文件的讀取,是Go項(xiàng)目讀取配置的神器,我們今天就來講講如何使用Viper來解析配置信息,文中通過代碼示例講解非常詳細(xì),需要的朋友可以參考下
    2024-01-01

最新評論