欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Go調(diào)度器學(xué)習(xí)之goroutine調(diào)度詳解

 更新時(shí)間:2023年03月30日 08:55:36   作者:IguoChan  
這篇文章主要為大家詳細(xì)介紹了Go調(diào)度器中g(shù)oroutine調(diào)度的相關(guān)知識(shí),文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下

0. 簡(jiǎn)介

上篇博客介紹了goroutine的創(chuàng)建、執(zhí)行和退出的過程,并且提及了在協(xié)程切換時(shí)涉及的調(diào)度循環(huán),本篇博客我們就來探究一下其他情形引起的協(xié)程調(diào)度。

1. 協(xié)程調(diào)度發(fā)生的時(shí)機(jī)

在以下情形中,goroutine可能會(huì)發(fā)生調(diào)度:

情形說明
go func(){}使用go關(guān)鍵字創(chuàng)建一個(gè)新的goroutine,調(diào)度器會(huì)考慮調(diào)度
GC由于GC也需要在系統(tǒng)線程M上執(zhí)行,且其中需要所有的goroutine都停止運(yùn)行,所以也會(huì)發(fā)生調(diào)度
系統(tǒng)調(diào)用發(fā)生系統(tǒng)的調(diào)用時(shí),會(huì)阻塞M,所以它會(huì)被調(diào)度走,同時(shí)新的goroutine也會(huì)被調(diào)度上來
同步內(nèi)存訪問mutex、channel等操作會(huì)使得goroutine阻塞,因此會(huì)被調(diào)度走,等條件滿足后,還會(huì)被調(diào)度上來繼續(xù)運(yùn)行

2. 創(chuàng)建協(xié)程時(shí)的調(diào)度

其中,使用go關(guān)鍵字創(chuàng)建協(xié)程時(shí)的調(diào)度分析,上篇博客做了初步的分析,特別是有關(guān)調(diào)度循環(huán)的分析,但是我們沒有具體分析,當(dāng)創(chuàng)建協(xié)程時(shí),系統(tǒng)是怎么發(fā)生調(diào)度的。

func newproc(fn *funcval) {
   gp := getg()
   pc := getcallerpc()
   systemstack(func() {
      newg := newproc1(fn, gp, pc)

      _p_ := getg().m.p.ptr()
      runqput(_p_, newg, true)

      if mainStarted {
         wakep()
      }
   })
}

我們還記得,go關(guān)鍵字在創(chuàng)建協(xié)程時(shí),Go的編譯器會(huì)將其轉(zhuǎn)換為runtime.newproc函數(shù),上篇我們?cè)敿?xì)分析了main goroutine的創(chuàng)建過程,在runtime.main函數(shù)中,全局變量mainStarted會(huì)被置為true,之后普通協(xié)程的創(chuàng)建,則會(huì)調(diào)用runtime.wakep函數(shù)嘗試喚醒空閑的P。

func wakep() {
   if atomic.Load(&sched.npidle) == 0 {
      return
   }
   // be conservative about spinning threads
   if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) {
      return
   }
   startm(nil, true)
}

wakep函數(shù)首先確認(rèn)是否有其他線程正在處于spinning狀態(tài),即M是否在找工作,如果沒有的話,則調(diào)用startm函數(shù)創(chuàng)建一個(gè)新的、或者喚醒一個(gè)處于睡眠狀態(tài)的工作線程出來工作。

func startm(_p_ *p, spinning bool) {
   // Disable preemption.
   //
   // Every owned P must have an owner that will eventually stop it in the
   // event of a GC stop request. startm takes transient ownership of a P
   // (either from argument or pidleget below) and transfers ownership to
   // a started M, which will be responsible for performing the stop.
   //
   // Preemption must be disabled during this transient ownership,
   // otherwise the P this is running on may enter GC stop while still
   // holding the transient P, leaving that P in limbo and deadlocking the
   // STW.
   //
   // Callers passing a non-nil P must already be in non-preemptible
   // context, otherwise such preemption could occur on function entry to
   // startm. Callers passing a nil P may be preemptible, so we must
   // disable preemption before acquiring a P from pidleget below.
   mp := acquirem()  // 保證在此期間不會(huì)發(fā)生棧擴(kuò)展
   lock(&sched.lock)
   if _p_ == nil {   // 沒有指定p,那么需要從空閑隊(duì)列中取一個(gè)p
      _p_ = pidleget()
      if _p_ == nil {// 如果沒有空閑的p,直接返回
         unlock(&sched.lock)
         if spinning {
            // The caller incremented nmspinning, but there are no idle Ps,
            // so it's okay to just undo the increment and give up.
            if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
               throw("startm: negative nmspinning")
            }
         }
         releasem(mp)
         return
      }
   }
   nmp := mget()  // 如果有空閑的p,那么取出一個(gè)空閑的m
   if nmp == nil {// 如果沒有空閑的m,那么調(diào)用newm創(chuàng)建一個(gè),然后返回
      // No M is available, we must drop sched.lock and call newm.
      // However, we already own a P to assign to the M.
      //
      // Once sched.lock is released, another G (e.g., in a syscall),
      // could find no idle P while checkdead finds a runnable G but
      // no running M's because this new M hasn't started yet, thus
      // throwing in an apparent deadlock.
      //
      // Avoid this situation by pre-allocating the ID for the new M,
      // thus marking it as 'running' before we drop sched.lock. This
      // new M will eventually run the scheduler to execute any
      // queued G's.
      id := mReserveID()
      unlock(&sched.lock)

      var fn func()
      if spinning {
         // The caller incremented nmspinning, so set m.spinning in the new M.
         fn = mspinning
      }
      newm(fn, _p_, id)
      // Ownership transfer of _p_ committed by start in newm.
      // Preemption is now safe.
      releasem(mp)
      return
   }
   unlock(&sched.lock)
   if nmp.spinning {
      throw("startm: m is spinning")
   }
   if nmp.nextp != 0 {
      throw("startm: m has p")
   }
   if spinning && !runqempty(_p_) {
      throw("startm: p has runnable gs")
   }
   // The caller incremented nmspinning, so set m.spinning in the new M.
   nmp.spinning = spinning
   nmp.nextp.set(_p_)
   notewakeup(&nmp.park) // 如果有空閑的m,則喚醒這個(gè)m
   // Ownership transfer of _p_ committed by wakeup. Preemption is now
   // safe.
   releasem(mp)
}

startm函數(shù)首先判斷是否有空閑的P,如果沒有則直接返回;如果有,則判斷是否有空閑的M,如果沒有,則新建一個(gè);如果有空閑的M,則喚醒這個(gè)M。說白了,wakep函數(shù)就是為了更大程度的利用P,利用CPU資源。

說到這里,我們就需要重溫一下上篇博客講到的,調(diào)度中獲取goroutine的規(guī)則是:

  • 每調(diào)度61次就需要從全局隊(duì)列中獲取goroutine;
  • 其次優(yōu)先從本P所在隊(duì)列中獲取goroutine;
  • 如果還沒有獲取到,則從其他P的運(yùn)行隊(duì)列中竊取goroutine

其中,從其他P隊(duì)列中竊取goroutine,調(diào)用的是findrunnable函數(shù),這個(gè)函數(shù)很長(zhǎng),為了簡(jiǎn)化說明,我們刪除一些不是很重要的代碼:

func findrunnable() (gp *g, inheritTime bool) {
   _g_ := getg()

top:
   _p_ := _g_.m.p.ptr()
   ...

   // local runq
   // 再從本地隊(duì)列找找
   if gp, inheritTime := runqget(_p_); gp != nil {
      return gp, inheritTime
   }

   // global runq
   // 再看看全局隊(duì)列
   if sched.runqsize != 0 {
      lock(&sched.lock)
      gp := globrunqget(_p_, 0)
      unlock(&sched.lock)
      if gp != nil {
         return gp, false
      }
   }

   ...

   // Spinning Ms: steal work from other Ps.
   //
   // Limit the number of spinning Ms to half the number of busy Ps.
   // This is necessary to prevent excessive CPU consumption when
   // GOMAXPROCS>>1 but the program parallelism is low.
   procs := uint32(gomaxprocs)
   if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
      if !_g_.m.spinning {
         _g_.m.spinning = true
         atomic.Xadd(&sched.nmspinning, 1)
      }

      gp, inheritTime, tnow, w, newWork := stealWork(now) // 調(diào)用stealWork盜取goroutine
      now = tnow
      if gp != nil {
         // Successfully stole.
         return gp, inheritTime
      }
      if newWork {
         // There may be new timer or GC work; restart to
         // discover.
         goto top
      }
      if w != 0 && (pollUntil == 0 || w < pollUntil) {
         // Earlier timer to wait for.
         pollUntil = w
      }
   }

   ...

   // return P and block
   // 上面的竊取沒有成功,那么解除m和p的綁定,摒棄娥江p放到空閑隊(duì)列,然后去休眠
   lock(&sched.lock)
   if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
      unlock(&sched.lock)
      goto top
   }
   if sched.runqsize != 0 {
      gp := globrunqget(_p_, 0)
      unlock(&sched.lock)
      return gp, false
   }
   if releasep() != _p_ {
      throw("findrunnable: wrong p")
   }
   pidleput(_p_)
   unlock(&sched.lock)

   ...
      _g_.m.spinning = false // m即將睡眠,狀態(tài)不再是spinning
      if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
         throw("findrunnable: negative nmspinning")
      }

   ...
   stopm() // 休眠
   goto top
}

從上面的代碼可以看出,工作線程會(huì)反復(fù)嘗試尋找運(yùn)行的goroutine,實(shí)在找不到的情況下才會(huì)進(jìn)入到睡眠。需要注意的是,工作線程M從其他P的本地隊(duì)列中盜取goroutine時(shí)的狀態(tài)稱之為自旋(spinning)狀態(tài),而前面講到wakep調(diào)用startm函數(shù),也是優(yōu)先從自旋狀態(tài)的M中選取,實(shí)在沒有才去喚醒休眠的M,再?zèng)]有就創(chuàng)建新的M。

竊取算法stealWork我們就不分析了,有興趣的同學(xué)可以看看。下面具體分析下stopm是怎么實(shí)現(xiàn)線程睡眠的。

func stopm() {
   _g_ := getg()

   if _g_.m.locks != 0 {
      throw("stopm holding locks")
   }
   if _g_.m.p != 0 {
      throw("stopm holding p")
   }
   if _g_.m.spinning {
      throw("stopm spinning")
   }

   lock(&sched.lock)
   mput(_g_.m)         // 把m放到sched.midle空閑隊(duì)列
   unlock(&sched.lock)
   mPark()
   acquirep(_g_.m.nextp.ptr()) // 綁定這個(gè)m和其下一個(gè)p,這里沒有看懂為啥這么操作
   _g_.m.nextp = 0
}


func mPark() {
   gp := getg()
   notesleep(&gp.m.park) // 進(jìn)入睡眠狀態(tài)
   noteclear(&gp.m.park)
}

可以看出,stopm主要是將m對(duì)象放到調(diào)度器的空閑線程隊(duì)列,然后通過notesleep進(jìn)入睡眠狀態(tài)。notego runtime實(shí)現(xiàn)的一次性睡眠和喚醒機(jī)制,通過notesleep進(jìn)入睡眠狀態(tài),然后另一個(gè)線程可以通過notewakeup喚醒這個(gè)線程。

小結(jié)

上面巴拉巴拉講了那么多,看的人有點(diǎn)頭暈,我們接下來講一個(gè)很小的例子梳理一下以上的邏輯(主線程的創(chuàng)建和執(zhí)行在上一篇博客中詳細(xì)敘述過,這里不再贅述),主線程創(chuàng)建了一個(gè)goroutine,這時(shí)候會(huì)觸發(fā)wakep,接下來可能會(huì)喚醒空閑的工作線程(如果是第一個(gè)非main goroutine,就沒有空閑的工作線程),或者創(chuàng)建一個(gè)新的工作線程,或者什么都不做。

如果是創(chuàng)建一個(gè)新的工作線程,那么其開啟執(zhí)行的點(diǎn)也是mstart函數(shù)(注意區(qū)分mstartstartm),然后在schedule函數(shù)中會(huì)嘗試去獲取goroutine,如果全局和本地的goroutine隊(duì)列都沒有,則會(huì)去其他的P上竊取goroutine,如果竊取不成功,則會(huì)休眠。

如果是去喚醒工作協(xié)程,喚醒后會(huì)在休眠的地方開始,重新進(jìn)行竊取。

竊取到工作協(xié)程后,就會(huì)去執(zhí)行,然后就會(huì)因?yàn)楦鞣N原因重新開始調(diào)度循環(huán)。

3. 主動(dòng)掛起

Go中,有很多種情形會(huì)導(dǎo)致goroutine阻塞,即其主動(dòng)掛起,然后被調(diào)度走,等滿足其運(yùn)行條件時(shí),還會(huì)被調(diào)度上來繼續(xù)運(yùn)行。比如channel的讀寫,我們以通道的阻塞讀為例,來介紹goroutine的主動(dòng)掛起的調(diào)度方式。

3.1 協(xié)程掛起

和前面介紹的Map一樣,channel的讀也有以下兩種讀取方式:

v := <- ch
v, ok := <- ch

分別對(duì)應(yīng)以下chanrecv1chanrecv2函數(shù):

//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
   chanrecv(c, elem, true)
}

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
   _, received = chanrecv(c, elem, true)
   return
}

無論是哪個(gè)函數(shù),最終調(diào)用的都是chanrecv函數(shù):

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   ...
   
   c.recvq.enqueue(mysg) // 將這個(gè)goroutine放到channel的recv的queue中
   
   atomic.Store8(&gp.parkingOnChan, 1)
   // 掛起這個(gè)goroutine
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

   ...
}

chanrecv會(huì)先判斷channel是否有數(shù)據(jù)可讀,如果有則直接讀取并返回,如果沒有則將這個(gè)goroutine放到channelrecvqueue中,然后調(diào)用gopark函數(shù)將當(dāng)前goroutine掛起并阻塞。

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
   if reason != waitReasonSleep {
      checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
   }
   mp := acquirem()
   gp := mp.curg
   status := readgstatus(gp)
   if status != _Grunning && status != _Gscanrunning {
      throw("gopark: bad g status")
   }
   mp.waitlock = lock
   mp.waitunlockf = unlockf
   gp.waitreason = reason
   mp.waittraceev = traceEv
   mp.waittraceskip = traceskip
   releasem(mp)
   // can't do anything that might move the G between Ms here.
   mcall(park_m)
}

gopark函數(shù)則使用mcall函數(shù)(前面分析過,主要作用是保存當(dāng)前goroutine現(xiàn)場(chǎng),然后切換到g0棧去調(diào)用作為參數(shù)傳入的函數(shù))取執(zhí)行park_m函數(shù):

// park continuation on g0.
func park_m(gp *g) {
   _g_ := getg()

   if trace.enabled {
      traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
   }

   casgstatus(gp, _Grunning, _Gwaiting)
   dropg()

   if fn := _g_.m.waitunlockf; fn != nil {
      ok := fn(gp, _g_.m.waitlock)
      _g_.m.waitunlockf = nil
      _g_.m.waitlock = nil
      if !ok {
         if trace.enabled {
            traceGoUnpark(gp, 2)
         }
         casgstatus(gp, _Gwaiting, _Grunnable)
         execute(gp, true) // Schedule it back, never returns.
      }
   }
   schedule()
}

park_m首先把當(dāng)前goroutine的狀態(tài)設(shè)置為_Gwaiting(因?yàn)樗诘却渌?code>goroutine往channel里面寫數(shù)據(jù)),然后調(diào)用dropg函數(shù)解除gm之間的關(guān)系,最后通過調(diào)用schedule函數(shù)進(jìn)入調(diào)度循環(huán)。

至此,一個(gè)goroutine就被主動(dòng)掛起了。

3.2 協(xié)程喚醒

我們繼續(xù)以上例子,當(dāng)另一個(gè)goroutine對(duì)這個(gè)channel發(fā)送數(shù)據(jù)的時(shí)候

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   ...

   if sg := c.recvq.dequeue(); sg != nil {
      // Found a waiting receiver. We pass the value we want to send
      // directly to the receiver, bypassing the channel buffer (if any).
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true
   }

   ...
}

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   ...
   goready(gp, skip+1)
}

channel的發(fā)送流程和讀取類似,當(dāng)檢查到接收隊(duì)列中有等待著時(shí),會(huì)調(diào)用send函數(shù)然后調(diào)用goready喚醒協(xié)程:

func goready(gp *g, traceskip int) {
   systemstack(func() {
      ready(gp, traceskip, true)
   })
}

func ready(gp *g, traceskip int, next bool) {
   if trace.enabled {
      traceGoUnpark(gp, traceskip)
   }

   status := readgstatus(gp)

   // Mark runnable.
   _g_ := getg()
   mp := acquirem() // disable preemption because it can be holding p in a local var
   if status&^_Gscan != _Gwaiting {
      dumpgstatus(gp)
      throw("bad g->status in ready")
   }

   // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
   casgstatus(gp, _Gwaiting, _Grunnable)
   runqput(_g_.m.p.ptr(), gp, next)
   wakep()
   releasem(mp)
}

這里發(fā)現(xiàn),ready函數(shù)和創(chuàng)建協(xié)程時(shí)一樣,會(huì)觸發(fā)wakep來檢查是否需要喚醒空閑P來執(zhí)行。而在此之前,這個(gè)被喚醒的goroutine會(huì)放到P的本地隊(duì)列的下一個(gè)執(zhí)行goroutine,以提升時(shí)效性。

到這里,一個(gè)被掛起的協(xié)程也就被喚醒了。

4. 小結(jié)

本文我們分析了創(chuàng)建協(xié)程時(shí)發(fā)生的調(diào)度,也介紹了以channel讀寫為例子的主動(dòng)掛起似的調(diào)度。而系統(tǒng)調(diào)用和GC觸發(fā)的調(diào)度比較復(fù)雜,我們放在后面介紹。

以上就是Go調(diào)度器學(xué)習(xí)之goroutine調(diào)度詳解的詳細(xì)內(nèi)容,更多關(guān)于Go goroutine調(diào)度的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Go?語言進(jìn)階freecache源碼學(xué)習(xí)教程

    Go?語言進(jìn)階freecache源碼學(xué)習(xí)教程

    這篇文章主要為大家介紹了Go?語言進(jìn)階freecache源碼學(xué)習(xí)教程,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-04-04
  • goland安裝1.7版本報(bào)錯(cuò)Unpacked?SDK?is?corrupted解決

    goland安裝1.7版本報(bào)錯(cuò)Unpacked?SDK?is?corrupted解決

    這篇文章主要為大家介紹了goland安裝1.7版本報(bào)錯(cuò)Unpacked?SDK?is?corrupted解決,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-11-11
  • GO語言中回調(diào)函數(shù)的使用

    GO語言中回調(diào)函數(shù)的使用

    本文主要介紹了GO語言中回調(diào)函數(shù)的使用,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-03-03
  • Golang中for循環(huán)的用法示例詳解

    Golang中for循環(huán)的用法示例詳解

    for循環(huán)就是讓一段代碼循環(huán)的執(zhí)行,接下來通過本文給大家講解Golang中for循環(huán)的用法,代碼簡(jiǎn)單易懂,對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2022-12-12
  • go開源項(xiàng)目用戶名密碼驗(yàn)證的邏輯鬼才寫法

    go開源項(xiàng)目用戶名密碼驗(yàn)證的邏輯鬼才寫法

    這篇文章主要為大家介紹了go開源項(xiàng)目中發(fā)現(xiàn)的一個(gè)邏輯鬼才寫法,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-07-07
  • go語言中的二維切片賦值

    go語言中的二維切片賦值

    這篇文章主要介紹了go語言中的二維切片賦值操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2021-04-04
  • Go語言題解LeetCode1051高度檢查器示例詳解

    Go語言題解LeetCode1051高度檢查器示例詳解

    這篇文章主要為大家介紹了Go語言題解LeetCode1051高度檢查器示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-12-12
  • 詳解Golang如何監(jiān)聽某個(gè)函數(shù)的開始執(zhí)行和執(zhí)行結(jié)束

    詳解Golang如何監(jiān)聽某個(gè)函數(shù)的開始執(zhí)行和執(zhí)行結(jié)束

    這篇文章主要為大家詳細(xì)介紹了Golang如何監(jiān)聽某個(gè)函數(shù)的開始執(zhí)行和執(zhí)行結(jié)束,文中的示例代碼講解詳細(xì),有需要的小伙伴可以參考一下
    2024-02-02
  • Golang與其他語言不同的九個(gè)特性

    Golang與其他語言不同的九個(gè)特性

    近來關(guān)于對(duì)Golang的討論有很多,七牛的幾個(gè)大牛們也斷定Go語言在未來將會(huì)快速發(fā)展,并且很可能會(huì)取代Java成為互聯(lián)網(wǎng)時(shí)代最受歡迎的編程語言。本文將帶你了解它不同于其他語言的九個(gè)特性
    2021-09-09
  • Go語言如何利用Mutex保障數(shù)據(jù)讀寫正確

    Go語言如何利用Mutex保障數(shù)據(jù)讀寫正確

    這篇文章主要介紹了互斥鎖的實(shí)現(xiàn)機(jī)制,以及?Go?標(biāo)準(zhǔn)庫的互斥鎖?Mutex?的基本使用方法,文中的示例代碼講解詳細(xì),需要的小伙伴可以參考一下
    2023-05-05

最新評(píng)論