Go調(diào)度器學(xué)習(xí)之goroutine調(diào)度詳解
0. 簡(jiǎn)介
上篇博客介紹了goroutine
的創(chuàng)建、執(zhí)行和退出的過程,并且提及了在協(xié)程切換時(shí)涉及的調(diào)度循環(huán),本篇博客我們就來探究一下其他情形引起的協(xié)程調(diào)度。
1. 協(xié)程調(diào)度發(fā)生的時(shí)機(jī)
在以下情形中,goroutine
可能會(huì)發(fā)生調(diào)度:
情形 | 說明 |
---|---|
go func(){} | 使用go關(guān)鍵字創(chuàng)建一個(gè)新的goroutine,調(diào)度器會(huì)考慮調(diào)度 |
GC | 由于GC也需要在系統(tǒng)線程M上執(zhí)行,且其中需要所有的goroutine都停止運(yùn)行,所以也會(huì)發(fā)生調(diào)度 |
系統(tǒng)調(diào)用 | 發(fā)生系統(tǒng)的調(diào)用時(shí),會(huì)阻塞M,所以它會(huì)被調(diào)度走,同時(shí)新的goroutine也會(huì)被調(diào)度上來 |
同步內(nèi)存訪問 | mutex、channel等操作會(huì)使得goroutine阻塞,因此會(huì)被調(diào)度走,等條件滿足后,還會(huì)被調(diào)度上來繼續(xù)運(yùn)行 |
2. 創(chuàng)建協(xié)程時(shí)的調(diào)度
其中,使用go
關(guān)鍵字創(chuàng)建協(xié)程時(shí)的調(diào)度分析,上篇博客做了初步的分析,特別是有關(guān)調(diào)度循環(huán)的分析,但是我們沒有具體分析,當(dāng)創(chuàng)建協(xié)程時(shí),系統(tǒng)是怎么發(fā)生調(diào)度的。
func newproc(fn *funcval) { gp := getg() pc := getcallerpc() systemstack(func() { newg := newproc1(fn, gp, pc) _p_ := getg().m.p.ptr() runqput(_p_, newg, true) if mainStarted { wakep() } }) }
我們還記得,go
關(guān)鍵字在創(chuàng)建協(xié)程時(shí),Go
的編譯器會(huì)將其轉(zhuǎn)換為runtime.newproc
函數(shù),上篇我們?cè)敿?xì)分析了main goroutine
的創(chuàng)建過程,在runtime.main
函數(shù)中,全局變量mainStarted
會(huì)被置為true
,之后普通協(xié)程的創(chuàng)建,則會(huì)調(diào)用runtime.wakep
函數(shù)嘗試喚醒空閑的P。
func wakep() { if atomic.Load(&sched.npidle) == 0 { return } // be conservative about spinning threads if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) { return } startm(nil, true) }
wakep
函數(shù)首先確認(rèn)是否有其他線程正在處于spinning
狀態(tài),即M是否在找工作,如果沒有的話,則調(diào)用startm
函數(shù)創(chuàng)建一個(gè)新的、或者喚醒一個(gè)處于睡眠狀態(tài)的工作線程出來工作。
func startm(_p_ *p, spinning bool) { // Disable preemption. // // Every owned P must have an owner that will eventually stop it in the // event of a GC stop request. startm takes transient ownership of a P // (either from argument or pidleget below) and transfers ownership to // a started M, which will be responsible for performing the stop. // // Preemption must be disabled during this transient ownership, // otherwise the P this is running on may enter GC stop while still // holding the transient P, leaving that P in limbo and deadlocking the // STW. // // Callers passing a non-nil P must already be in non-preemptible // context, otherwise such preemption could occur on function entry to // startm. Callers passing a nil P may be preemptible, so we must // disable preemption before acquiring a P from pidleget below. mp := acquirem() // 保證在此期間不會(huì)發(fā)生棧擴(kuò)展 lock(&sched.lock) if _p_ == nil { // 沒有指定p,那么需要從空閑隊(duì)列中取一個(gè)p _p_ = pidleget() if _p_ == nil {// 如果沒有空閑的p,直接返回 unlock(&sched.lock) if spinning { // The caller incremented nmspinning, but there are no idle Ps, // so it's okay to just undo the increment and give up. if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("startm: negative nmspinning") } } releasem(mp) return } } nmp := mget() // 如果有空閑的p,那么取出一個(gè)空閑的m if nmp == nil {// 如果沒有空閑的m,那么調(diào)用newm創(chuàng)建一個(gè),然后返回 // No M is available, we must drop sched.lock and call newm. // However, we already own a P to assign to the M. // // Once sched.lock is released, another G (e.g., in a syscall), // could find no idle P while checkdead finds a runnable G but // no running M's because this new M hasn't started yet, thus // throwing in an apparent deadlock. // // Avoid this situation by pre-allocating the ID for the new M, // thus marking it as 'running' before we drop sched.lock. This // new M will eventually run the scheduler to execute any // queued G's. id := mReserveID() unlock(&sched.lock) var fn func() if spinning { // The caller incremented nmspinning, so set m.spinning in the new M. fn = mspinning } newm(fn, _p_, id) // Ownership transfer of _p_ committed by start in newm. // Preemption is now safe. releasem(mp) return } unlock(&sched.lock) if nmp.spinning { throw("startm: m is spinning") } if nmp.nextp != 0 { throw("startm: m has p") } if spinning && !runqempty(_p_) { throw("startm: p has runnable gs") } // The caller incremented nmspinning, so set m.spinning in the new M. nmp.spinning = spinning nmp.nextp.set(_p_) notewakeup(&nmp.park) // 如果有空閑的m,則喚醒這個(gè)m // Ownership transfer of _p_ committed by wakeup. Preemption is now // safe. releasem(mp) }
startm
函數(shù)首先判斷是否有空閑的P,如果沒有則直接返回;如果有,則判斷是否有空閑的M,如果沒有,則新建一個(gè);如果有空閑的M,則喚醒這個(gè)M。說白了,wakep
函數(shù)就是為了更大程度的利用P,利用CPU資源。
說到這里,我們就需要重溫一下上篇博客講到的,調(diào)度中獲取goroutine
的規(guī)則是:
- 每調(diào)度61次就需要從全局隊(duì)列中獲取
goroutine
; - 其次優(yōu)先從本P所在隊(duì)列中獲取
goroutine
; - 如果還沒有獲取到,則從其他P的運(yùn)行隊(duì)列中竊取
goroutine
;
其中,從其他P隊(duì)列中竊取goroutine
,調(diào)用的是findrunnable
函數(shù),這個(gè)函數(shù)很長(zhǎng),為了簡(jiǎn)化說明,我們刪除一些不是很重要的代碼:
func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() top: _p_ := _g_.m.p.ptr() ... // local runq // 再從本地隊(duì)列找找 if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } // global runq // 再看看全局隊(duì)列 if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false } } ... // Spinning Ms: steal work from other Ps. // // Limit the number of spinning Ms to half the number of busy Ps. // This is necessary to prevent excessive CPU consumption when // GOMAXPROCS>>1 but the program parallelism is low. procs := uint32(gomaxprocs) if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) { if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } gp, inheritTime, tnow, w, newWork := stealWork(now) // 調(diào)用stealWork盜取goroutine now = tnow if gp != nil { // Successfully stole. return gp, inheritTime } if newWork { // There may be new timer or GC work; restart to // discover. goto top } if w != 0 && (pollUntil == 0 || w < pollUntil) { // Earlier timer to wait for. pollUntil = w } } ... // return P and block // 上面的竊取沒有成功,那么解除m和p的綁定,摒棄娥江p放到空閑隊(duì)列,然后去休眠 lock(&sched.lock) if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 { unlock(&sched.lock) goto top } if sched.runqsize != 0 { gp := globrunqget(_p_, 0) unlock(&sched.lock) return gp, false } if releasep() != _p_ { throw("findrunnable: wrong p") } pidleput(_p_) unlock(&sched.lock) ... _g_.m.spinning = false // m即將睡眠,狀態(tài)不再是spinning if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("findrunnable: negative nmspinning") } ... stopm() // 休眠 goto top }
從上面的代碼可以看出,工作線程會(huì)反復(fù)嘗試尋找運(yùn)行的goroutine
,實(shí)在找不到的情況下才會(huì)進(jìn)入到睡眠。需要注意的是,工作線程M從其他P的本地隊(duì)列中盜取goroutine時(shí)的狀態(tài)稱之為自旋(spinning)狀態(tài),而前面講到wakep
調(diào)用startm
函數(shù),也是優(yōu)先從自旋狀態(tài)的M中選取,實(shí)在沒有才去喚醒休眠的M,再?zèng)]有就創(chuàng)建新的M。
竊取算法stealWork
我們就不分析了,有興趣的同學(xué)可以看看。下面具體分析下stopm
是怎么實(shí)現(xiàn)線程睡眠的。
func stopm() { _g_ := getg() if _g_.m.locks != 0 { throw("stopm holding locks") } if _g_.m.p != 0 { throw("stopm holding p") } if _g_.m.spinning { throw("stopm spinning") } lock(&sched.lock) mput(_g_.m) // 把m放到sched.midle空閑隊(duì)列 unlock(&sched.lock) mPark() acquirep(_g_.m.nextp.ptr()) // 綁定這個(gè)m和其下一個(gè)p,這里沒有看懂為啥這么操作 _g_.m.nextp = 0 } func mPark() { gp := getg() notesleep(&gp.m.park) // 進(jìn)入睡眠狀態(tài) noteclear(&gp.m.park) }
可以看出,stopm
主要是將m對(duì)象放到調(diào)度器的空閑線程隊(duì)列,然后通過notesleep
進(jìn)入睡眠狀態(tài)。note
是go runtime
實(shí)現(xiàn)的一次性睡眠和喚醒機(jī)制,通過notesleep
進(jìn)入睡眠狀態(tài),然后另一個(gè)線程可以通過notewakeup
喚醒這個(gè)線程。
小結(jié)
上面巴拉巴拉講了那么多,看的人有點(diǎn)頭暈,我們接下來講一個(gè)很小的例子梳理一下以上的邏輯(主線程的創(chuàng)建和執(zhí)行在上一篇博客中詳細(xì)敘述過,這里不再贅述),主線程創(chuàng)建了一個(gè)goroutine
,這時(shí)候會(huì)觸發(fā)wakep
,接下來可能會(huì)喚醒空閑的工作線程(如果是第一個(gè)非main goroutine
,就沒有空閑的工作線程),或者創(chuàng)建一個(gè)新的工作線程,或者什么都不做。
如果是創(chuàng)建一個(gè)新的工作線程,那么其開啟執(zhí)行的點(diǎn)也是mstart
函數(shù)(注意區(qū)分mstart
和startm
),然后在schedule
函數(shù)中會(huì)嘗試去獲取goroutine
,如果全局和本地的goroutine
隊(duì)列都沒有,則會(huì)去其他的P上竊取goroutine
,如果竊取不成功,則會(huì)休眠。
如果是去喚醒工作協(xié)程,喚醒后會(huì)在休眠的地方開始,重新進(jìn)行竊取。
竊取到工作協(xié)程后,就會(huì)去執(zhí)行,然后就會(huì)因?yàn)楦鞣N原因重新開始調(diào)度循環(huán)。
3. 主動(dòng)掛起
在Go
中,有很多種情形會(huì)導(dǎo)致goroutine
阻塞,即其主動(dòng)掛起,然后被調(diào)度走,等滿足其運(yùn)行條件時(shí),還會(huì)被調(diào)度上來繼續(xù)運(yùn)行。比如channel
的讀寫,我們以通道的阻塞讀為例,來介紹goroutine
的主動(dòng)掛起的調(diào)度方式。
3.1 協(xié)程掛起
和前面介紹的Map一樣,channel
的讀也有以下兩種讀取方式:
v := <- ch v, ok := <- ch
分別對(duì)應(yīng)以下chanrecv1
和chanrecv2
函數(shù):
//go:nosplit func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) } //go:nosplit func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return }
無論是哪個(gè)函數(shù),最終調(diào)用的都是chanrecv
函數(shù):
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... c.recvq.enqueue(mysg) // 將這個(gè)goroutine放到channel的recv的queue中 atomic.Store8(&gp.parkingOnChan, 1) // 掛起這個(gè)goroutine gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) ... }
chanrecv
會(huì)先判斷channel是否有數(shù)據(jù)可讀,如果有則直接讀取并返回,如果沒有則將這個(gè)goroutine
放到channel
的recv
的queue
中,然后調(diào)用gopark
函數(shù)將當(dāng)前goroutine
掛起并阻塞。
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) { if reason != waitReasonSleep { checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy } mp := acquirem() gp := mp.curg status := readgstatus(gp) if status != _Grunning && status != _Gscanrunning { throw("gopark: bad g status") } mp.waitlock = lock mp.waitunlockf = unlockf gp.waitreason = reason mp.waittraceev = traceEv mp.waittraceskip = traceskip releasem(mp) // can't do anything that might move the G between Ms here. mcall(park_m) }
gopark
函數(shù)則使用mcall
函數(shù)(前面分析過,主要作用是保存當(dāng)前goroutine
現(xiàn)場(chǎng),然后切換到g0
棧去調(diào)用作為參數(shù)傳入的函數(shù))取執(zhí)行park_m
函數(shù):
// park continuation on g0. func park_m(gp *g) { _g_ := getg() if trace.enabled { traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip) } casgstatus(gp, _Grunning, _Gwaiting) dropg() if fn := _g_.m.waitunlockf; fn != nil { ok := fn(gp, _g_.m.waitlock) _g_.m.waitunlockf = nil _g_.m.waitlock = nil if !ok { if trace.enabled { traceGoUnpark(gp, 2) } casgstatus(gp, _Gwaiting, _Grunnable) execute(gp, true) // Schedule it back, never returns. } } schedule() }
park_m
首先把當(dāng)前goroutine
的狀態(tài)設(shè)置為_Gwaiting
(因?yàn)樗诘却渌?code>goroutine往channel
里面寫數(shù)據(jù)),然后調(diào)用dropg
函數(shù)解除g
和m
之間的關(guān)系,最后通過調(diào)用schedule
函數(shù)進(jìn)入調(diào)度循環(huán)。
至此,一個(gè)goroutine
就被主動(dòng)掛起了。
3.2 協(xié)程喚醒
我們繼續(xù)以上例子,當(dāng)另一個(gè)goroutine
對(duì)這個(gè)channel
發(fā)送數(shù)據(jù)的時(shí)候
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } ... } func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { ... goready(gp, skip+1) }
channel
的發(fā)送流程和讀取類似,當(dāng)檢查到接收隊(duì)列中有等待著時(shí),會(huì)調(diào)用send
函數(shù)然后調(diào)用goready
喚醒協(xié)程:
func goready(gp *g, traceskip int) { systemstack(func() { ready(gp, traceskip, true) }) } func ready(gp *g, traceskip int, next bool) { if trace.enabled { traceGoUnpark(gp, traceskip) } status := readgstatus(gp) // Mark runnable. _g_ := getg() mp := acquirem() // disable preemption because it can be holding p in a local var if status&^_Gscan != _Gwaiting { dumpgstatus(gp) throw("bad g->status in ready") } // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq casgstatus(gp, _Gwaiting, _Grunnable) runqput(_g_.m.p.ptr(), gp, next) wakep() releasem(mp) }
這里發(fā)現(xiàn),ready
函數(shù)和創(chuàng)建協(xié)程時(shí)一樣,會(huì)觸發(fā)wakep
來檢查是否需要喚醒空閑P來執(zhí)行。而在此之前,這個(gè)被喚醒的goroutine
會(huì)放到P的本地隊(duì)列的下一個(gè)執(zhí)行goroutine
,以提升時(shí)效性。
到這里,一個(gè)被掛起的協(xié)程也就被喚醒了。
4. 小結(jié)
本文我們分析了創(chuàng)建協(xié)程時(shí)發(fā)生的調(diào)度,也介紹了以channel
讀寫為例子的主動(dòng)掛起似的調(diào)度。而系統(tǒng)調(diào)用和GC觸發(fā)的調(diào)度比較復(fù)雜,我們放在后面介紹。
以上就是Go調(diào)度器學(xué)習(xí)之goroutine調(diào)度詳解的詳細(xì)內(nèi)容,更多關(guān)于Go goroutine調(diào)度的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go?語言進(jìn)階freecache源碼學(xué)習(xí)教程
這篇文章主要為大家介紹了Go?語言進(jìn)階freecache源碼學(xué)習(xí)教程,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-04-04goland安裝1.7版本報(bào)錯(cuò)Unpacked?SDK?is?corrupted解決
這篇文章主要為大家介紹了goland安裝1.7版本報(bào)錯(cuò)Unpacked?SDK?is?corrupted解決,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-11-11go開源項(xiàng)目用戶名密碼驗(yàn)證的邏輯鬼才寫法
這篇文章主要為大家介紹了go開源項(xiàng)目中發(fā)現(xiàn)的一個(gè)邏輯鬼才寫法,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-07-07詳解Golang如何監(jiān)聽某個(gè)函數(shù)的開始執(zhí)行和執(zhí)行結(jié)束
這篇文章主要為大家詳細(xì)介紹了Golang如何監(jiān)聽某個(gè)函數(shù)的開始執(zhí)行和執(zhí)行結(jié)束,文中的示例代碼講解詳細(xì),有需要的小伙伴可以參考一下2024-02-02Go語言如何利用Mutex保障數(shù)據(jù)讀寫正確
這篇文章主要介紹了互斥鎖的實(shí)現(xiàn)機(jī)制,以及?Go?標(biāo)準(zhǔn)庫的互斥鎖?Mutex?的基本使用方法,文中的示例代碼講解詳細(xì),需要的小伙伴可以參考一下2023-05-05