golang協(xié)程設(shè)計及調(diào)度原理
一、協(xié)程設(shè)計-GMP模型
線程是操作系統(tǒng)調(diào)度到CPU中執(zhí)行的基本單位,多線程總是交替式地搶占CPU的時間片,線程在上下文的切換過程中需要經(jīng)過操作系統(tǒng)用戶態(tài)與內(nèi)核態(tài)的切換。golang的協(xié)程(G
)依然運行在工作線程(M
)之上,但是借助語言的調(diào)度器,協(xié)程只需要在用戶態(tài)即可完成切換,工作線程是感受不到協(xié)程存在的。golang在設(shè)計上通過邏輯處理器(P
)建立起了工作線程與協(xié)程之間的聯(lián)系。最簡單的GMP關(guān)系模型為(圖是靜態(tài)的,在程序運行的過程中,GMP三者之間的綁定關(guān)系都是不固定的):
1.工作線程M
工作線程是最終運行協(xié)程的實體。操作系統(tǒng)中的線程與在運行時代表線程的m結(jié)構(gòu)體進行了綁定:
// go/src/runtime/runtime2.go type m struct { g0 *g // goroutine with scheduling stack tls [tlsSlots]uintptr // thread-local storage (for x86 extern register) curg *g // current running goroutine p puintptr // attached p for executing go code (nil if not executing go code) nextp puintptr oldp puintptr // the p that was attached before executing a syscall park note ... }
為了執(zhí)行g(shù)o代碼,每一個工作線程m都與一個邏輯處理器p進行綁定,同時記錄了線程當前正在運行的用戶協(xié)程curg
。
每一個工作線程中都有一個特殊的協(xié)程g0
,稱為調(diào)度協(xié)程,其主要作用是執(zhí)行協(xié)程調(diào)度。而普通的協(xié)程g
無差別地用于執(zhí)行用戶代碼。當用戶協(xié)程g
主動讓渡、退出或者是被搶占時,m內(nèi)部就需要重新執(zhí)行協(xié)程調(diào)度,這時需要從用戶協(xié)程g切換到調(diào)度協(xié)程g0,g0調(diào)度一個普通協(xié)程g來執(zhí)行用戶代碼,便從g0又切換回普通協(xié)程g。每個工作線程內(nèi)部都在完成g->g0->g
這樣的調(diào)度循環(huán)。
操作系統(tǒng)的線程與m結(jié)構(gòu)體是通過線程本地存儲(thread-local storage
)進行綁定的。普通的全局變量對進程中的所有線程可見,而線程本地存儲(tls
)中的變量只對當前線程可見。系統(tǒng)線程通過m.tls
即可在任意時刻獲取到當前線程上的正在運行的協(xié)程g、邏輯處理器p、特殊協(xié)程g0、線程結(jié)構(gòu)體m等信息。
2.邏輯處理器p
系統(tǒng)線程m想要運行用戶協(xié)程g,必須先綁定邏輯處理器p。在代碼中可以通過runtime.GOMAXPROCS()
具體指定程序運行需要使用多少個邏輯處理器p。通常指定多少個邏輯處理器p最多就可以同時使用到多少個CPU核心數(shù)。
邏輯處理器p通過結(jié)構(gòu)體p進行定義:
type p struct { id int32 status uint32 // one of pidle/prunning/... schedtick uint32 // incremented on every scheduler call syscalltick uint32 // incremented on every system call m muintptr // back-link to associated m (nil if idle) // Queue of runnable goroutines. Accessed without lock. runqhead uint32 runqtail uint32 runq [256]guintptr runnext guintptr ... }
在p中,通過字段m維護了與工作線程m的綁定關(guān)系。每一個邏輯處理器p都具有唯一的id
,以及當前的狀態(tài)status
。如果p的狀態(tài)為正在運行中,則必然綁定到了一個工作線程m
上,當邏輯處理完成后,解綁工作線程(m==nil
),p的狀態(tài)便是空閑的。需要注意的是,m與p的數(shù)量沒有絕對關(guān)系,當m阻塞時,p就會切換到一個空閑的m,當不存在空閑的m時,便會創(chuàng)建一個m。所以即使p的數(shù)量是1,也有可能會創(chuàng)建很多個m出來。
程序中往往有成千上萬的協(xié)程存在,不可能同時被執(zhí)行。協(xié)程需要進行調(diào)度執(zhí)行,而那些等待被調(diào)度執(zhí)行的協(xié)程存儲在運行隊列中。go語言調(diào)度器將運行隊列分為全局運行隊列與局部運行隊列。邏輯處理器p中維護了局部運行隊列runq
。局部運行隊列是每個p特有的長度為256的數(shù)組。該數(shù)組模擬了一個循環(huán)隊列,p.runqhead
為隊頭,p.runqtail
為隊尾,協(xié)程g都從隊尾入隊,從隊頭獲取。而全局運行隊列維護在schedt.runq
中(見后文)。
p中還有一個特殊的runnext
字段,用于標識下一個要執(zhí)行的協(xié)程g,如果p.runnext
不為空,則會直接執(zhí)行runnext
指向的協(xié)程,而不會再去p.runq
數(shù)組中尋找。
3.協(xié)程g
協(xié)程通常分為特殊的調(diào)度協(xié)程g0以及執(zhí)行用戶代碼的普通協(xié)程g。
無論g0還是g,都通過結(jié)構(gòu)體g進行定義:
// go/src/runtime/runtime2.go type g struct { stack stack // offset known to runtime/cgo m *m // current m; offset known to arm liblink sched gobuf ... } // Stack describes a Go execution stack. type stack struct { lo uintptr hi uintptr } type gobuf struct { sp uintptr pc uintptr g guintptr ctxt unsafe.Pointer ret uintptr lr uintptr bp uintptr // for framepointer-enabled architectures }
協(xié)程g中包含了協(xié)程的執(zhí)行棧空間(stack
),執(zhí)行當前協(xié)程的工作線程m
以及執(zhí)行現(xiàn)場sched
。協(xié)程g執(zhí)行上下文切換時需要保存當前的執(zhí)行現(xiàn)場,以便在切回協(xié)程g時能夠繼續(xù)正常執(zhí)行。協(xié)程g中的執(zhí)行現(xiàn)場由結(jié)構(gòu)體gobuf
定義,其保存了CPU中幾個重要的寄存器值,以及執(zhí)行現(xiàn)場信息屬于哪個協(xié)程g。
4.全局調(diào)度信息schedt
golang協(xié)程設(shè)計中,除了工作線程m、邏輯處理器p、協(xié)程g以外,還存在一個存儲全局調(diào)度信息的結(jié)構(gòu)體schedt
:
// go/src/runtime/runtime2.go type schedt struct { lock mutex midle muintptr // idle m's waiting for work nmidle int32 // number of idle m's waiting for work nmidlelocked int32 // number of locked m's waiting for work mnext int64 // number of m's that have been created and next M ID maxmcount int32 // maximum number of m's allowed (or die) nmsys int32 // number of system m's not counted for deadlock nmfreed int64 // cumulative number of freed m's ngsys uint32 // number of system goroutines; updated atomically pidle puintptr // idle p's npidle uint32 nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go. // Global runnable queue. runq gQueue runqsize int32 // Global cache of dead G's. gFree struct { lock mutex stack gList // Gs with stacks noStack gList // Gs without stacks n int32 } // freem is the list of m's waiting to be freed when their // m.exited is set. Linked through m.freelink. freem *m ... }
schedt
中維護了空閑的工作線程midle
、空閑工作線程的數(shù)量nmidle
、等待被釋放的線程列表freem
、系統(tǒng)協(xié)程g的數(shù)量ngsys
、空閑邏輯處理器pidle
、空閑邏輯處理器的數(shù)量npidle
、以及全局運行隊列runq
及全局運行隊列的大小runqsize
、處于新建或者被銷毀狀態(tài)的協(xié)程g列表gFree
等信息。
schedt
中的信息是全局共享的,例如全局運行隊列runq
被所有p共享,所以schedt
中也持有一個鎖lock
以保證原子性訪問。
5.GMP詳細示圖
通過上述說明,我們可以進一步細化GMP模型示圖為:
二、協(xié)程調(diào)度
已經(jīng)知道,每個工作線程m中都有一個調(diào)度協(xié)程g0
,專門執(zhí)行協(xié)程的調(diào)度循環(huán)(g->g0->g->g0-g
)。在調(diào)度循環(huán)中,協(xié)程g具體是如何被調(diào)度的呢?go語言調(diào)度器實現(xiàn)了自己的調(diào)度策略。
1.調(diào)度策略
工作線程m需要通過協(xié)程調(diào)度獲得具體可運行的某一協(xié)程g。
獲取協(xié)程g的一般策略主要包含三大步:
- 1. 查找p本地的局部運行隊列
- 2. 查找schedt中的全局運行隊列
- 3. 竊取其他p中的局部運行隊列
在運行時通過findRunnable()
函數(shù)獲取可運行的協(xié)程g:
// go/src/runtime/proc.go // Finds a runnable goroutine to execute. func findRunnable() (gp *g, inheritTime, tryWakeP bool) { ... // Check the global runnable queue once in a while to ensure fairness. // Otherwise two goroutines can completely occupy the local runqueue // by constantly respawning each other. if _p_.schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp = globrunqget(_p_, 1) unlock(&sched.lock) if gp != nil { return gp, false, false } } ... // local runq if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime, false } // global runq if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false, false } } ... // Spinning Ms: steal work from other Ps. // // Limit the number of spinning Ms to half the number of busy Ps. // This is necessary to prevent excessive CPU consumption when // GOMAXPROCS>>1 but the program parallelism is low. procs := uint32(gomaxprocs) if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) { if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } gp, inheritTime, tnow, w, newWork := stealWork(now) now = tnow if gp != nil { // Successfully stole. return gp, inheritTime, false } ... } }
獲取本地運行隊列
在查找可運行的協(xié)程g時,首先通過函數(shù)runqget()
從p本地的運行隊列中獲取:
首先嘗試從runnext
中獲取下一個執(zhí)行的g。當runnext
不為空時則返回對應(yīng)的協(xié)程g,如果為空則繼續(xù)從局部運行隊列runq
中查找。 當循環(huán)隊列的隊頭runqhead
和隊尾runqtail
相同時,說明循環(huán)隊列中沒有任何可運行的協(xié)程,否則從隊列頭部獲取一個協(xié)程返回。 由于可能存在其他邏輯處理器p來竊取協(xié)程,從而造成當前p與其他p同時訪問局部隊列的情況,因此在此處需要加鎖訪問,訪問結(jié)束后釋放鎖。
// go/src/runtime/proc.go func runqget(_p_ *p) (gp *g, inheritTime bool) { // If there's a runnext, it's the next G to run. next := _p_.runnext // If the runnext is non-0 and the CAS fails, it could only have been stolen by another P, // because other Ps can race to set runnext to 0, but only the current P can set it to non-0. // Hence, there's no need to retry this CAS if it falls. if next != 0 && _p_.runnext.cas(next, 0) { return next.ptr(), true } for { h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers t := _p_.runqtail if t == h { return nil, false } gp := _p_.runq[h%uint32(len(_p_.runq))].ptr() if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume return gp, false } } }
協(xié)程調(diào)度時由于總是優(yōu)先查找局部運行隊列中的協(xié)程g,如果只是循環(huán)往復的地執(zhí)行局部隊列中的g,那么全局隊列中的g可能一個都不會被調(diào)度到。
因此,為了保證調(diào)度的公平性,p中每執(zhí)行61次調(diào)度,就會優(yōu)先從全局隊列中獲取一個g到當前p中執(zhí)行:
// go/src/runtime/proc.go func findRunnable() (gp *g, inheritTime, tryWakeP bool) { ... if _p_.schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp = globrunqget(_p_, 1) unlock(&sched.lock) if gp != nil { return gp, false, false } } ... }
獲取全局運行隊列
當p每執(zhí)行61次調(diào)度,或者p本地運行隊列不存在可運行的協(xié)程時,需要從全局運行隊列中獲取一批協(xié)程分配給本地運行隊列。由于每個p共享了全局運行隊列,因此為了保證公平,需要將全局運行隊列中的g按照p的數(shù)量進行平分,平分后數(shù)量也不能超過局部運行隊列容量的一半(即128=256/2)。最后通過循環(huán)調(diào)用runqput
將全局隊列中的g放入到p的局部運行隊列中。
// go/src/runtime/proc.go // Try get a batch of G's from the global runnable queue. // sched.lock must be held. func globrunqget(_p_ *p, max int32) *g { assertLockHeld(&sched.lock) if sched.runqsize == 0 { return nil } n := sched.runqsize/gomaxprocs + 1 if n > sched.runqsize { n = sched.runqsize } if max > 0 && n > max { n = max } if n > int32(len(_p_.runq))/2 { n = int32(len(_p_.runq)) / 2 } sched.runqsize -= n gp := sched.runq.pop() n-- for ; n > 0; n-- { gp1 := sched.runq.pop() runqput(_p_, gp1, false) } return gp }
協(xié)程竊取
當p在局部運行隊列、全局運行隊列中都找不到可運行的協(xié)程時,就需要從其他p的本地運行隊列中竊取一批可用的協(xié)程。所有的p都存儲在全局的allp []*p
變量中, 調(diào)度器隨機在其中選擇一個p來進行協(xié)程竊取工作。竊取工作總共會執(zhí)行不超過4次,當竊取成功時即返回。
// go/src/runtime/proc.go // stealWork attempts to steal a runnable goroutine or timer from any P. func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) { pp := getg().m.p.ptr() ranTimer := false const stealTries = 4 for i := 0; i < stealTries; i++ { stealTimersOrRunNextG := i == stealTries-1 for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { if sched.gcwaiting != 0 { // GC work may be available. return nil, false, now, pollUntil, true } p2 := allp[enum.position()] if pp == p2 { continue } ... // Don't bother to attempt to steal if p2 is idle. if !idlepMask.read(enum.position()) { if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil { return gp, false, now, pollUntil, ranTimer } } } } ... }
協(xié)程竊取的主要執(zhí)行邏輯通過runqsteal
以及runqgrab
函數(shù)實現(xiàn),竊取的核心邏輯是:將要竊取的p本地運行隊列中g(shù)個數(shù)的一半放入到自己的運行隊列中。
// Steal half of elements from local runnable queue of p2 // and put onto local runnable queue of p. // Returns one of the stolen elements (or nil if failed). func runqsteal(_p_, p2 *p, stealRunNextG bool) *g { t := _p_.runqtail n := runqgrab(p2, &_p_.runq, t, stealRunNextG) if n == 0 { return nil } n-- gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr() if n == 0 { return gp } h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers if t-h+n >= uint32(len(_p_.runq)) { throw("runqsteal: runq overflow") } atomic.StoreRel(&_p_.runqtail, t+n) // store-release, makes the item available for consumption return gp } // Grabs a batch of goroutines from _p_'s runnable queue into batch. func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 { for { h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer n := t - h n = n - n/2 ... for i := uint32(0); i < n; i++ { g := _p_.runq[(h+i)%uint32(len(_p_.runq))] batch[(batchHead+i)%uint32(len(batch))] = g } if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consume return n } } }
2.調(diào)度時機
調(diào)度策略讓我們知道了協(xié)程是如何調(diào)度的,下面繼續(xù)說明什么時候會發(fā)生協(xié)程調(diào)度。
主動調(diào)度
協(xié)程可以選擇主動讓渡自己的執(zhí)行權(quán),這主要通過在代碼中主動執(zhí)行runtime.Gosched()
函數(shù)實現(xiàn)。
- 主動調(diào)度會從當前協(xié)程g切換到g0并更新協(xié)程狀態(tài)由運行中
_Grunning
變?yōu)榭蛇\行_Grunnable
; - 然后通過
dropg()
取消g與m的綁定關(guān)系; - 接著通過
globrunqput()
將g放入到全局運行隊列中; - 最后調(diào)用
schedule()
函數(shù)開啟新一輪的調(diào)度循環(huán)。
// go/src/runtime/proc.go // Gosched yields the processor, allowing other goroutines to run. It does not // suspend the current goroutine, so execution resumes automatically. func Gosched() { checkTimeouts() mcall(gosched_m) // } // Gosched continuation on g0. func gosched_m(gp *g) { ... goschedImpl(gp) // } func goschedImpl(gp *g) { ... casgstatus(gp, _Grunning, _Grunnable) dropg() // lock(&sched.lock) globrunqput(gp) unlock(&sched.lock) schedule() } // dropg removes the association between m and the current goroutine m->curg (gp for short). func dropg() { _g_ := getg() setMNoWB(&_g_.m.curg.m, nil) setGNoWB(&_g_.m.curg, nil) }
被動調(diào)度
當協(xié)程休眠、通道堵塞、網(wǎng)絡(luò)堵塞、垃圾回收導致暫停時,協(xié)程會被動讓渡出執(zhí)行的權(quán)利給其他可運行的協(xié)程繼續(xù)執(zhí)行。調(diào)度器通過gopark()
函數(shù)執(zhí)行被動調(diào)度邏輯。gopark()
函數(shù)最終調(diào)用park_m()
函數(shù)來完成調(diào)度邏輯。
- 首先會從當前協(xié)程g切換到g0并更新協(xié)程狀態(tài)由運行中
_Grunning
變?yōu)榈却?code>_Gwaiting; - 然后通過
dropg()
取消g與m的綁定關(guān)系; - 接著執(zhí)行
waitunlockf
函數(shù),如果該函數(shù)返回false
,則協(xié)程g立即恢復執(zhí)行,否則等待喚醒; - 最后調(diào)用
schedule()
函數(shù)開啟新一輪的調(diào)度循環(huán)。
// go/src/runtime/proc.go // Puts the current goroutine into a waiting state and calls unlockf on the // system stack. func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) { ... mcall(park_m) } // park continuation on g0. func park_m(gp *g) { ... casgstatus(gp, _Grunning, _Gwaiting) dropg() if fn := _g_.m.waitunlockf; fn != nil { ok := fn(gp, _g_.m.waitlock) _g_.m.waitunlockf = nil _g_.m.waitlock = nil if !ok { ... casgstatus(gp, _Gwaiting, _Grunnable) execute(gp, true) // Schedule it back, never returns. } } schedule() }
與主動調(diào)度不同的是,被動調(diào)度的協(xié)程g不會放入到全局隊列中進行調(diào)度。而是一直處于等待中_Gwaiting
狀態(tài)等待被喚醒。當?shù)却械膮f(xié)程被喚醒時,協(xié)程的狀態(tài)由_Gwaiting
變?yōu)榭蛇\行_Grunnable
狀態(tài),然后被添加到當前p的局部運行隊列中。喚醒邏輯通過函數(shù)goready()
調(diào)用ready()
實現(xiàn):
// go/src/runtime/proc.go func goready(gp *g, traceskip int) { systemstack(func() { ready(gp, traceskip, true) }) } // Mark gp ready to run. func ready(gp *g, traceskip int, next bool) { ... // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq casgstatus(gp, _Gwaiting, _Grunnable) runqput(_g_.m.p.ptr(), gp, next) wakep() ... }
搶占調(diào)度
go應(yīng)用程序在啟動時會開啟一個特殊的線程來執(zhí)行系統(tǒng)監(jiān)控任務(wù),系統(tǒng)監(jiān)控運行在一個獨立的工作線程m上,該線程不用綁定邏輯處理器p。系統(tǒng)監(jiān)控每隔10ms會檢測是否有準備就緒的網(wǎng)絡(luò)協(xié)程,并放置到全局隊列中。
為了保證每個協(xié)程都有執(zhí)行的機會,系統(tǒng)監(jiān)控服務(wù)會對執(zhí)行時間過長(大于10ms
)的協(xié)程、或者處于系統(tǒng)調(diào)用(大于20微秒
)的協(xié)程進行搶占。搶占的核心邏輯通過retake()
函數(shù)實現(xiàn):
// go/src/runtime/proc.go // forcePreemptNS is the time slice given to a G before it is // preempted. const forcePreemptNS = 10 * 1000 * 1000 // 10ms func retake(now int64) uint32 { n := 0 lock(&allpLock) for i := 0; i < len(allp); i++ { _p_ := allp[i] if _p_ == nil { continue } pd := &_p_.sysmontick s := _p_.status sysretake := false if s == _Prunning || s == _Psyscall { // Preempt G if it's running for too long. t := int64(_p_.schedtick) if int64(pd.schedtick) != t { pd.schedtick = uint32(t) pd.schedwhen = now } else if pd.schedwhen+forcePreemptNS <= now { preemptone(_p_) // In case of syscall, preemptone() doesn't // work, because there is no M wired to P. sysretake = true } } if s == _Psyscall { // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us). t := int64(_p_.syscalltick) if !sysretake && int64(pd.syscalltick) != t { pd.syscalltick = uint32(t) pd.syscallwhen = now continue } if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } ... } unlock(&allpLock) return uint32(n) }
到此這篇關(guān)于golang協(xié)程設(shè)計及調(diào)度原理的文章就介紹到這了,更多相關(guān)go協(xié)程設(shè)計內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!