Go簡單實現(xiàn)協(xié)程方法
為什么需要協(xié)程
協(xié)程的本質是將一段數(shù)據(jù)的運行狀態(tài)進行打包,可以在線程之間調度,所以協(xié)程就是在單線程的環(huán)境下實現(xiàn)的應用程序級別的并發(fā),就是把本來由操作系統(tǒng)控制的切換+保存狀態(tài)在應用程序里面實現(xiàn)了。
所以我們需要協(xié)程的目的其實就是它更加節(jié)省資源、可以在有限的資源內支持更高的并發(fā),體現(xiàn)在以下三個方面:
- 資源利用:程可以利用任何的線程去運行,不需要等待CPU的調度。
- 快速調度:協(xié)程可以快速地調度(避開了系統(tǒng)調用和切換),快速的切換。
- 超高并發(fā):有限的線程就可以并發(fā)很多的協(xié)程。
協(xié)程的本質
協(xié)程在go語言中使用runtime\runtime2.go下的g結構體來表示,這個結構體中包含了協(xié)程的很多信息,我們只挑選其中的重要字段來進行分析:
type g struct {
// 協(xié)程的棧幀,里面包含了兩個字段:lo和hi,分別是協(xié)程棧的高位指針和低位指針
stack stack
// gobuf結構體中儲存了很多與協(xié)程棧相關的指針,比如pc、sp
sched gobuf
// 用來標記協(xié)程當前的狀態(tài)
atomicstatus uint32
// 每個協(xié)程的唯一標識,不向應用層暴露。但是goid的地址會存在寄存器里面,可以通過ebpf工具無侵入地去獲取
goid int64
}

對線程的描述
我們知道,go語言中的協(xié)程是跑在線程上面的,那么go中肯定會有對線程的抽象描述,這個結構體也在runtime\runtime2.go中,我們只展示重要的部分:
type m struct {
// 每次啟動一個M都會第一個創(chuàng)建的gourtine,用于操作調度器,所以它不指向任何函數(shù),只負責調度
g0 *g // goroutine with scheduling stack
// 當前正在線程上運行的協(xié)程
curg *g // current running goroutine
// 線程id
id int64
// 記錄每種操作系統(tǒng)對于線程額外的描述信息
mOS
}
協(xié)程如何在線程中執(zhí)行
我們從最簡單的單線程調度模型來看,協(xié)程在線程中的執(zhí)行流程可以參考下圖:

線程循環(huán)
在go中每個線程都是循環(huán)執(zhí)行一系列工作,又稱作單線程循環(huán)如下圖所示:左側為棧,右側為線程執(zhí)行的函數(shù)順序,其中的業(yè)務方法就是協(xié)程方法。
普通協(xié)程棧只能記錄業(yè)務方法的業(yè)務信息,且當線程沒有獲得協(xié)程之前是沒有普通協(xié)程棧的。所以在內存中開辟了一個g0棧,專門用于記錄函數(shù)調用跳轉的信息,因此g0棧其實就是調度中心的棧。
線程循環(huán)會按順序循環(huán)去執(zhí)行上圖右側的函數(shù):schedule->execute->gogo->業(yè)務方法->goexit。
schedule
schedule函數(shù)的作用是為當前的P獲取一個可以執(zhí)行的g,并執(zhí)行它。
- 首先會有1/61的概率檢查全局隊列,確保全局隊列中的G也會被調度。
- 然后有60/61的概率從本地隊列中獲取g。
- 如果從本地隊列中沒有獲取到可執(zhí)行的g,就會調用
findrunnable函數(shù)去獲取。
findrunnable函數(shù)的流程:
- 調用runqget函數(shù)來從P自己的runnable G隊列中得到一個可以執(zhí)行的G;
- 如果1失敗,調用globrunqget函數(shù)從全局runnableG隊列中得到一個可以執(zhí)行的G;
- 如果2失敗,調用netpoll(非阻塞)函數(shù)取一個異步回調的G;
- 如果3失敗,嘗試從其他P那里偷取一半數(shù)量的G過來;
- 如果4失敗,再次調用globrunqget函數(shù)從全局runnableG隊列中得到一個可以執(zhí)行的G;
- 如果5失敗,調用netpoll(阻塞)函數(shù)取一個異步回調的G;
- 如果6仍然沒有取到G,那么調用stopm函數(shù)停止這個M。
如果獲取到了可執(zhí)行的g,就調用execute函數(shù)去執(zhí)行。
// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
......
// 新建一個gp變量,gp就是即將要運行的協(xié)程指針
var gp *g
var inheritTime bool
// 垃圾回收相關的工作
......
// 調度過程中有1/61的概率檢查全局隊列,確保全局隊列中的G也會被調度。
// M綁定的P首先有1/61概率從全局隊列獲取G,60/61概率從本地隊列獲取G
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
// 從本地隊列中獲取g
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
// We can see gp != nil here even if the M is spinning,
// if checkTimers added a local goroutine via goready.
}
// 如果從本地隊列獲取失敗,就會調用findrunnable函數(shù)去獲取g
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
......
execute(gp, inheritTime)
}
execute
execute函數(shù)會為schedule獲取到的可執(zhí)行協(xié)程初始化相關結構體,然后以sched結構體為參數(shù)調用gogo函數(shù):
func execute(gp *g, inheritTime bool) {
_g_ := getg()
// 初始化g結構體
// Assign gp.m before entering _Grunning so running Gs have an
// M.
_g_.m.curg = gp
gp.m = _g_.m
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard
if !inheritTime {
_g_.m.p.ptr().schedtick++
}
......
// 匯編實現(xiàn)的函數(shù),通過gobuf結構體中的信息,跳轉到執(zhí)行業(yè)務的方法
gogo(&gp.sched)
gogo
gogo函數(shù)實際上是匯編實現(xiàn)的,每個操作系統(tǒng)實現(xiàn)的gogo方法是不同的,它會通過傳進來的gobuf結構體,先向普通協(xié)程棧中壓入goexit函數(shù),然后跳轉到執(zhí)行業(yè)務的方法,協(xié)程棧也會被切換成業(yè)務協(xié)程自己的棧。
業(yè)務方法
業(yè)務方法就是協(xié)程中需要執(zhí)行的相關函數(shù)。
goexit
goexit也是匯編實現(xiàn)的,當執(zhí)行完協(xié)程棧中的業(yè)務方法之后,就會退到goexit方法中,它會將業(yè)務協(xié)程的棧切換成調度器的棧(也就是g0棧),然后重新調用schedule函數(shù),形成一個閉環(huán)。
GMP調度模型
上述的調度模型是單線程的,但是現(xiàn)代CPU往往是多核的,應用采用的也是多線程,因此單線程調度模型有些浪費資源。所以我們在實際使用中,其實是一種多線程循環(huán)。但是多個線程在獲取可執(zhí)行g的時候就會存在并發(fā)沖突的問題,所以就有了GMP調度模型。
GMP調度模型簡單來說是這樣的:
G是指協(xié)程goroutine,M是指操作系統(tǒng)線程,P是指調度器。
首先,GMP調度模型中有一個全局隊列,用于存放等待運行的G。然后每個P都有自己的本地隊列,存放的也是等待運行的G,但是存的數(shù)量有限,不會超過256個。我們新建goroutine的時候,是優(yōu)先放到P的本地隊列中的,如果隊列滿了,會把本地隊列中一半的G都移到全局隊列中。
線程想運行任務就得獲取P,從P的本地隊列獲取G,G執(zhí)行之后,M會從P獲取下一個G,不斷重復下去。P隊列為空時,M會嘗試從全局隊列拿一批G放到P的本地隊列,如果獲取不到就會從其他P的本地隊列偷一半放到自己P的本地隊列。
當M執(zhí)行某一個G時候如果發(fā)生了系統(tǒng)調用或者其余阻塞操作,M會阻塞,如果當前有一些G在執(zhí)行,runtime會把這個線程M從P中摘除(detach),然后再創(chuàng)建一個新的操作系統(tǒng)的線程(如果有空閑的線程可用就復用空閑線程)來服務于這個P。當M系統(tǒng)調用結束時候,這個G會嘗試獲取一個空閑的P執(zhí)行,并放入到這個P的本地隊列。如果獲取不到P,那么這個線程M變成休眠狀態(tài), 加入到空閑線程中,然后這個G會被放入全局隊列中。
P的底層結構
我們發(fā)現(xiàn)GMP調度模型中有一個P,P就是調度器,我們來看一下P的底層數(shù)據(jù)結構,同樣在runtime\runtime2.go文件中:
type p struct {
id int32
status uint32 // one of pidle/prunning/...
// 指向調度器服務的那個線程
m muintptr // back-link to associated m (nil if idle)
// Queue of runnable goroutines. Accessed without lock.
// 調度器的本地隊列,因為只服務于一個線程,所以可以無鎖的訪問,隊列本身實際上是一個大小為256的指針數(shù)組
runqhead uint32
runqtail uint32
runq [256]guintptr
// 指向下一個可用g的指針
runnext guintptr
}協(xié)程并發(fā)
我們上面介紹的調度模型實際上是非搶占式的,非搶占式模型的特點就是只有當協(xié)程主動讓出后,M才會去運行本地隊列后面的協(xié)程,那么這樣就很容易造成隊列尾部的協(xié)程餓死。
其實Go語言的協(xié)程是基于搶占式來實現(xiàn)的,也就是當協(xié)程執(zhí)行一段時間后將當前任務暫定,執(zhí)行后續(xù)協(xié)程任務,防止時間敏感攜程執(zhí)行失敗。如下圖所示:

搶占式調度
當目前線程中執(zhí)行的協(xié)程是一個超長時間的任務,此時先保存該協(xié)程的運行狀態(tài)也就是保護現(xiàn)場,若是后續(xù)還需繼續(xù)執(zhí)行就將其放入本地隊列中去,如果不需要執(zhí)行就將其處于休眠狀態(tài),然后直接跳轉到schedule函數(shù)中。
實現(xiàn):
- 主動掛?。篻opark方法,當業(yè)務調用這個方法線程就會直接回到schedule函數(shù)并切換協(xié)程棧,當前運行的協(xié)程將會處于等待狀態(tài),等待狀態(tài)的協(xié)程是無法立即進入任務隊列中的。程序員無法主動調用gopark函數(shù),但是我們可以通過Sleep等具有gopark的函數(shù)來進行主動掛取,Sleep五秒之后系統(tǒng)將會把任務的等待狀態(tài)更改為運行狀態(tài)放入隊列中。
- 系統(tǒng)調用完成時:go程序在運行狀態(tài)中進行了系統(tǒng)調用,那么當系統(tǒng)的底層調用完成后就會調用exitsyscall函數(shù),線程就會停止執(zhí)行當前協(xié)程,將當前協(xié)程放入隊列中去。
- 標記搶占morestack():當函數(shù)跳轉時都會調用這個方法,它的本意在于檢查當前協(xié)程??臻g是否有足夠內存,如果不夠就要擴大該??臻g。當系統(tǒng)監(jiān)控到協(xié)程運行超過
10ms,就將g.stackguard0置為0xfffffade(該值是一個搶占標志),讓程序在只執(zhí)行morestack函數(shù)時順便判斷一下是否將g中的stackguard置為搶占,如果的確被標記搶占,就回到schedule方法,并將當前協(xié)程放回隊列中。
全局隊列的饑餓問題
上述操作讓本地隊列成了一個小循環(huán),但是如果目前系統(tǒng)中的線程的本地隊列中都擁有一個超大的協(xié)程任務,那么所有的線程都將在一段時間內處于忙碌狀態(tài),全局隊列中的任務將會長期無法運行,這個問題又稱為全局隊列饑餓問題,解決方式就是在本地隊列循環(huán)時,以一定的概率從全局隊列中取出某個任務,讓它也參與到本地循環(huán)當中去。
其實在執(zhí)行schedule函數(shù)尋找可運行g的時候,首先會去執(zhí)行下面的代碼,即調度過程中有1/61的概率去全局隊列中獲取可執(zhí)行的協(xié)程,防止全局隊列中的協(xié)程被餓死。
// 調度過程中有1/61的概率檢查全局隊列,確保全局隊列中的G也會被調度。
if gp == nil {
// Check the global runnable queue once in a while to ensure fairness.
// Otherwise two goroutines can completely occupy the local runqueue
// by constantly respawning each other.
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
到此這篇關于Go簡單實現(xiàn)協(xié)程方法的文章就介紹到這了,更多相關Go協(xié)程內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Golang?WorkerPool線程池并發(fā)模式示例詳解
這篇文章主要為大家介紹了Golang?WorkerPool線程池并發(fā)模式示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-08-08
Golang實現(xiàn)短網址/短鏈服務的開發(fā)筆記分享
這篇文章主要為大家詳細介紹了如何使用Golang實現(xiàn)短網址/短鏈服務,文中的示例代碼講解詳細,具有一定的學習價值,感興趣的小伙伴可以了解一下2023-05-05
Go+Vue開發(fā)一個線上外賣應用的流程(用戶名密碼和圖形驗證碼)
這篇文章主要介紹了Go+Vue開發(fā)一個線上外賣應用(用戶名密碼和圖形驗證碼),本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-11-11

