Go調(diào)度器學(xué)習(xí)之goroutine調(diào)度詳解
0. 簡介
上篇博客介紹了goroutine的創(chuàng)建、執(zhí)行和退出的過程,并且提及了在協(xié)程切換時涉及的調(diào)度循環(huán),本篇博客我們就來探究一下其他情形引起的協(xié)程調(diào)度。
1. 協(xié)程調(diào)度發(fā)生的時機
在以下情形中,goroutine可能會發(fā)生調(diào)度:
| 情形 | 說明 |
|---|---|
| go func(){} | 使用go關(guān)鍵字創(chuàng)建一個新的goroutine,調(diào)度器會考慮調(diào)度 |
| GC | 由于GC也需要在系統(tǒng)線程M上執(zhí)行,且其中需要所有的goroutine都停止運行,所以也會發(fā)生調(diào)度 |
| 系統(tǒng)調(diào)用 | 發(fā)生系統(tǒng)的調(diào)用時,會阻塞M,所以它會被調(diào)度走,同時新的goroutine也會被調(diào)度上來 |
| 同步內(nèi)存訪問 | mutex、channel等操作會使得goroutine阻塞,因此會被調(diào)度走,等條件滿足后,還會被調(diào)度上來繼續(xù)運行 |
2. 創(chuàng)建協(xié)程時的調(diào)度
其中,使用go關(guān)鍵字創(chuàng)建協(xié)程時的調(diào)度分析,上篇博客做了初步的分析,特別是有關(guān)調(diào)度循環(huán)的分析,但是我們沒有具體分析,當(dāng)創(chuàng)建協(xié)程時,系統(tǒng)是怎么發(fā)生調(diào)度的。
func newproc(fn *funcval) {
gp := getg()
pc := getcallerpc()
systemstack(func() {
newg := newproc1(fn, gp, pc)
_p_ := getg().m.p.ptr()
runqput(_p_, newg, true)
if mainStarted {
wakep()
}
})
}我們還記得,go關(guān)鍵字在創(chuàng)建協(xié)程時,Go的編譯器會將其轉(zhuǎn)換為runtime.newproc函數(shù),上篇我們詳細(xì)分析了main goroutine的創(chuàng)建過程,在runtime.main函數(shù)中,全局變量mainStarted會被置為true,之后普通協(xié)程的創(chuàng)建,則會調(diào)用runtime.wakep函數(shù)嘗試喚醒空閑的P。
func wakep() {
if atomic.Load(&sched.npidle) == 0 {
return
}
// be conservative about spinning threads
if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
startm(nil, true)
}wakep函數(shù)首先確認(rèn)是否有其他線程正在處于spinning狀態(tài),即M是否在找工作,如果沒有的話,則調(diào)用startm函數(shù)創(chuàng)建一個新的、或者喚醒一個處于睡眠狀態(tài)的工作線程出來工作。
func startm(_p_ *p, spinning bool) {
// Disable preemption.
//
// Every owned P must have an owner that will eventually stop it in the
// event of a GC stop request. startm takes transient ownership of a P
// (either from argument or pidleget below) and transfers ownership to
// a started M, which will be responsible for performing the stop.
//
// Preemption must be disabled during this transient ownership,
// otherwise the P this is running on may enter GC stop while still
// holding the transient P, leaving that P in limbo and deadlocking the
// STW.
//
// Callers passing a non-nil P must already be in non-preemptible
// context, otherwise such preemption could occur on function entry to
// startm. Callers passing a nil P may be preemptible, so we must
// disable preemption before acquiring a P from pidleget below.
mp := acquirem() // 保證在此期間不會發(fā)生棧擴展
lock(&sched.lock)
if _p_ == nil { // 沒有指定p,那么需要從空閑隊列中取一個p
_p_ = pidleget()
if _p_ == nil {// 如果沒有空閑的p,直接返回
unlock(&sched.lock)
if spinning {
// The caller incremented nmspinning, but there are no idle Ps,
// so it's okay to just undo the increment and give up.
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("startm: negative nmspinning")
}
}
releasem(mp)
return
}
}
nmp := mget() // 如果有空閑的p,那么取出一個空閑的m
if nmp == nil {// 如果沒有空閑的m,那么調(diào)用newm創(chuàng)建一個,然后返回
// No M is available, we must drop sched.lock and call newm.
// However, we already own a P to assign to the M.
//
// Once sched.lock is released, another G (e.g., in a syscall),
// could find no idle P while checkdead finds a runnable G but
// no running M's because this new M hasn't started yet, thus
// throwing in an apparent deadlock.
//
// Avoid this situation by pre-allocating the ID for the new M,
// thus marking it as 'running' before we drop sched.lock. This
// new M will eventually run the scheduler to execute any
// queued G's.
id := mReserveID()
unlock(&sched.lock)
var fn func()
if spinning {
// The caller incremented nmspinning, so set m.spinning in the new M.
fn = mspinning
}
newm(fn, _p_, id)
// Ownership transfer of _p_ committed by start in newm.
// Preemption is now safe.
releasem(mp)
return
}
unlock(&sched.lock)
if nmp.spinning {
throw("startm: m is spinning")
}
if nmp.nextp != 0 {
throw("startm: m has p")
}
if spinning && !runqempty(_p_) {
throw("startm: p has runnable gs")
}
// The caller incremented nmspinning, so set m.spinning in the new M.
nmp.spinning = spinning
nmp.nextp.set(_p_)
notewakeup(&nmp.park) // 如果有空閑的m,則喚醒這個m
// Ownership transfer of _p_ committed by wakeup. Preemption is now
// safe.
releasem(mp)
}startm函數(shù)首先判斷是否有空閑的P,如果沒有則直接返回;如果有,則判斷是否有空閑的M,如果沒有,則新建一個;如果有空閑的M,則喚醒這個M。說白了,wakep函數(shù)就是為了更大程度的利用P,利用CPU資源。
說到這里,我們就需要重溫一下上篇博客講到的,調(diào)度中獲取goroutine的規(guī)則是:
- 每調(diào)度61次就需要從全局隊列中獲取
goroutine; - 其次優(yōu)先從本P所在隊列中獲取
goroutine; - 如果還沒有獲取到,則從其他P的運行隊列中竊取
goroutine;
其中,從其他P隊列中竊取goroutine,調(diào)用的是findrunnable函數(shù),這個函數(shù)很長,為了簡化說明,我們刪除一些不是很重要的代碼:
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
top:
_p_ := _g_.m.p.ptr()
...
// local runq
// 再從本地隊列找找
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// global runq
// 再看看全局隊列
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
...
// Spinning Ms: steal work from other Ps.
//
// Limit the number of spinning Ms to half the number of busy Ps.
// This is necessary to prevent excessive CPU consumption when
// GOMAXPROCS>>1 but the program parallelism is low.
procs := uint32(gomaxprocs)
if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
gp, inheritTime, tnow, w, newWork := stealWork(now) // 調(diào)用stealWork盜取goroutine
now = tnow
if gp != nil {
// Successfully stole.
return gp, inheritTime
}
if newWork {
// There may be new timer or GC work; restart to
// discover.
goto top
}
if w != 0 && (pollUntil == 0 || w < pollUntil) {
// Earlier timer to wait for.
pollUntil = w
}
}
...
// return P and block
// 上面的竊取沒有成功,那么解除m和p的綁定,摒棄娥江p放到空閑隊列,然后去休眠
lock(&sched.lock)
if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
unlock(&sched.lock)
goto top
}
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
if releasep() != _p_ {
throw("findrunnable: wrong p")
}
pidleput(_p_)
unlock(&sched.lock)
...
_g_.m.spinning = false // m即將睡眠,狀態(tài)不再是spinning
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
...
stopm() // 休眠
goto top
}從上面的代碼可以看出,工作線程會反復(fù)嘗試尋找運行的goroutine,實在找不到的情況下才會進入到睡眠。需要注意的是,工作線程M從其他P的本地隊列中盜取goroutine時的狀態(tài)稱之為自旋(spinning)狀態(tài),而前面講到wakep調(diào)用startm函數(shù),也是優(yōu)先從自旋狀態(tài)的M中選取,實在沒有才去喚醒休眠的M,再沒有就創(chuàng)建新的M。
竊取算法stealWork我們就不分析了,有興趣的同學(xué)可以看看。下面具體分析下stopm是怎么實現(xiàn)線程睡眠的。
func stopm() {
_g_ := getg()
if _g_.m.locks != 0 {
throw("stopm holding locks")
}
if _g_.m.p != 0 {
throw("stopm holding p")
}
if _g_.m.spinning {
throw("stopm spinning")
}
lock(&sched.lock)
mput(_g_.m) // 把m放到sched.midle空閑隊列
unlock(&sched.lock)
mPark()
acquirep(_g_.m.nextp.ptr()) // 綁定這個m和其下一個p,這里沒有看懂為啥這么操作
_g_.m.nextp = 0
}
func mPark() {
gp := getg()
notesleep(&gp.m.park) // 進入睡眠狀態(tài)
noteclear(&gp.m.park)
}可以看出,stopm主要是將m對象放到調(diào)度器的空閑線程隊列,然后通過notesleep進入睡眠狀態(tài)。note是go runtime實現(xiàn)的一次性睡眠和喚醒機制,通過notesleep進入睡眠狀態(tài),然后另一個線程可以通過notewakeup喚醒這個線程。
小結(jié)
上面巴拉巴拉講了那么多,看的人有點頭暈,我們接下來講一個很小的例子梳理一下以上的邏輯(主線程的創(chuàng)建和執(zhí)行在上一篇博客中詳細(xì)敘述過,這里不再贅述),主線程創(chuàng)建了一個goroutine,這時候會觸發(fā)wakep,接下來可能會喚醒空閑的工作線程(如果是第一個非main goroutine,就沒有空閑的工作線程),或者創(chuàng)建一個新的工作線程,或者什么都不做。
如果是創(chuàng)建一個新的工作線程,那么其開啟執(zhí)行的點也是mstart函數(shù)(注意區(qū)分mstart和startm),然后在schedule函數(shù)中會嘗試去獲取goroutine,如果全局和本地的goroutine隊列都沒有,則會去其他的P上竊取goroutine,如果竊取不成功,則會休眠。
如果是去喚醒工作協(xié)程,喚醒后會在休眠的地方開始,重新進行竊取。
竊取到工作協(xié)程后,就會去執(zhí)行,然后就會因為各種原因重新開始調(diào)度循環(huán)。

3. 主動掛起
在Go中,有很多種情形會導(dǎo)致goroutine阻塞,即其主動掛起,然后被調(diào)度走,等滿足其運行條件時,還會被調(diào)度上來繼續(xù)運行。比如channel的讀寫,我們以通道的阻塞讀為例,來介紹goroutine的主動掛起的調(diào)度方式。
3.1 協(xié)程掛起
和前面介紹的Map一樣,channel的讀也有以下兩種讀取方式:
v := <- ch v, ok := <- ch
分別對應(yīng)以下chanrecv1和chanrecv2函數(shù):
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}無論是哪個函數(shù),最終調(diào)用的都是chanrecv函數(shù):
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
c.recvq.enqueue(mysg) // 將這個goroutine放到channel的recv的queue中
atomic.Store8(&gp.parkingOnChan, 1)
// 掛起這個goroutine
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
...
}chanrecv會先判斷channel是否有數(shù)據(jù)可讀,如果有則直接讀取并返回,如果沒有則將這個goroutine放到channel的recv的queue中,然后調(diào)用gopark函數(shù)將當(dāng)前goroutine掛起并阻塞。
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
mcall(park_m)
}gopark函數(shù)則使用mcall函數(shù)(前面分析過,主要作用是保存當(dāng)前goroutine現(xiàn)場,然后切換到g0棧去調(diào)用作為參數(shù)傳入的函數(shù))取執(zhí)行park_m函數(shù):
// park continuation on g0.
func park_m(gp *g) {
_g_ := getg()
if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}
casgstatus(gp, _Grunning, _Gwaiting)
dropg()
if fn := _g_.m.waitunlockf; fn != nil {
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
}park_m首先把當(dāng)前goroutine的狀態(tài)設(shè)置為_Gwaiting(因為它正在等待其它goroutine往channel里面寫數(shù)據(jù)),然后調(diào)用dropg函數(shù)解除g和m之間的關(guān)系,最后通過調(diào)用schedule函數(shù)進入調(diào)度循環(huán)。
至此,一個goroutine就被主動掛起了。
3.2 協(xié)程喚醒
我們繼續(xù)以上例子,當(dāng)另一個goroutine對這個channel發(fā)送數(shù)據(jù)的時候
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
...
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
goready(gp, skip+1)
}channel的發(fā)送流程和讀取類似,當(dāng)檢查到接收隊列中有等待著時,會調(diào)用send函數(shù)然后調(diào)用goready喚醒協(xié)程:
func goready(gp *g, traceskip int) {
systemstack(func() {
ready(gp, traceskip, true)
})
}
func ready(gp *g, traceskip int, next bool) {
if trace.enabled {
traceGoUnpark(gp, traceskip)
}
status := readgstatus(gp)
// Mark runnable.
_g_ := getg()
mp := acquirem() // disable preemption because it can be holding p in a local var
if status&^_Gscan != _Gwaiting {
dumpgstatus(gp)
throw("bad g->status in ready")
}
// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
casgstatus(gp, _Gwaiting, _Grunnable)
runqput(_g_.m.p.ptr(), gp, next)
wakep()
releasem(mp)
}這里發(fā)現(xiàn),ready函數(shù)和創(chuàng)建協(xié)程時一樣,會觸發(fā)wakep來檢查是否需要喚醒空閑P來執(zhí)行。而在此之前,這個被喚醒的goroutine會放到P的本地隊列的下一個執(zhí)行goroutine,以提升時效性。
到這里,一個被掛起的協(xié)程也就被喚醒了。
4. 小結(jié)
本文我們分析了創(chuàng)建協(xié)程時發(fā)生的調(diào)度,也介紹了以channel讀寫為例子的主動掛起似的調(diào)度。而系統(tǒng)調(diào)用和GC觸發(fā)的調(diào)度比較復(fù)雜,我們放在后面介紹。
以上就是Go調(diào)度器學(xué)習(xí)之goroutine調(diào)度詳解的詳細(xì)內(nèi)容,更多關(guān)于Go goroutine調(diào)度的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go?語言進階freecache源碼學(xué)習(xí)教程
這篇文章主要為大家介紹了Go?語言進階freecache源碼學(xué)習(xí)教程,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-04-04
goland安裝1.7版本報錯Unpacked?SDK?is?corrupted解決
這篇文章主要為大家介紹了goland安裝1.7版本報錯Unpacked?SDK?is?corrupted解決,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-11-11
詳解Golang如何監(jiān)聽某個函數(shù)的開始執(zhí)行和執(zhí)行結(jié)束
這篇文章主要為大家詳細(xì)介紹了Golang如何監(jiān)聽某個函數(shù)的開始執(zhí)行和執(zhí)行結(jié)束,文中的示例代碼講解詳細(xì),有需要的小伙伴可以參考一下2024-02-02
Go語言如何利用Mutex保障數(shù)據(jù)讀寫正確
這篇文章主要介紹了互斥鎖的實現(xiàn)機制,以及?Go?標(biāo)準(zhǔn)庫的互斥鎖?Mutex?的基本使用方法,文中的示例代碼講解詳細(xì),需要的小伙伴可以參考一下2023-05-05

