Go簡(jiǎn)單實(shí)現(xiàn)協(xié)程方法
為什么需要協(xié)程
協(xié)程的本質(zhì)是將一段數(shù)據(jù)的運(yùn)行狀態(tài)進(jìn)行打包,可以在線程之間調(diào)度,所以協(xié)程就是在單線程的環(huán)境下實(shí)現(xiàn)的應(yīng)用程序級(jí)別的并發(fā),就是把本來由操作系統(tǒng)控制的切換+保存狀態(tài)在應(yīng)用程序里面實(shí)現(xiàn)了。
所以我們需要協(xié)程的目的其實(shí)就是它更加節(jié)省資源、可以在有限的資源內(nèi)支持更高的并發(fā),體現(xiàn)在以下三個(gè)方面:
- 資源利用:程可以利用任何的線程去運(yùn)行,不需要等待CPU的調(diào)度。
- 快速調(diào)度:協(xié)程可以快速地調(diào)度(避開了系統(tǒng)調(diào)用和切換),快速的切換。
- 超高并發(fā):有限的線程就可以并發(fā)很多的協(xié)程。
協(xié)程的本質(zhì)
協(xié)程在go語(yǔ)言中使用runtime\runtime2.go
下的g結(jié)構(gòu)體來表示,這個(gè)結(jié)構(gòu)體中包含了協(xié)程的很多信息,我們只挑選其中的重要字段來進(jìn)行分析:
type g struct { // 協(xié)程的棧幀,里面包含了兩個(gè)字段:lo和hi,分別是協(xié)程棧的高位指針和低位指針 stack stack // gobuf結(jié)構(gòu)體中儲(chǔ)存了很多與協(xié)程棧相關(guān)的指針,比如pc、sp sched gobuf // 用來標(biāo)記協(xié)程當(dāng)前的狀態(tài) atomicstatus uint32 // 每個(gè)協(xié)程的唯一標(biāo)識(shí),不向應(yīng)用層暴露。但是goid的地址會(huì)存在寄存器里面,可以通過ebpf工具無(wú)侵入地去獲取 goid int64 }
對(duì)線程的描述
我們知道,go語(yǔ)言中的協(xié)程是跑在線程上面的,那么go中肯定會(huì)有對(duì)線程的抽象描述,這個(gè)結(jié)構(gòu)體也在runtime\runtime2.go
中,我們只展示重要的部分:
type m struct { // 每次啟動(dòng)一個(gè)M都會(huì)第一個(gè)創(chuàng)建的gourtine,用于操作調(diào)度器,所以它不指向任何函數(shù),只負(fù)責(zé)調(diào)度 g0 *g // goroutine with scheduling stack // 當(dāng)前正在線程上運(yùn)行的協(xié)程 curg *g // current running goroutine // 線程id id int64 // 記錄每種操作系統(tǒng)對(duì)于線程額外的描述信息 mOS }
協(xié)程如何在線程中執(zhí)行
我們從最簡(jiǎn)單的單線程調(diào)度模型來看,協(xié)程在線程中的執(zhí)行流程可以參考下圖:
線程循環(huán)
在go中每個(gè)線程都是循環(huán)執(zhí)行一系列工作,又稱作單線程循環(huán)如下圖所示:左側(cè)為棧,右側(cè)為線程執(zhí)行的函數(shù)順序,其中的業(yè)務(wù)方法就是協(xié)程方法。
普通協(xié)程棧只能記錄業(yè)務(wù)方法的業(yè)務(wù)信息,且當(dāng)線程沒有獲得協(xié)程之前是沒有普通協(xié)程棧的。所以在內(nèi)存中開辟了一個(gè)g0棧,專門用于記錄函數(shù)調(diào)用跳轉(zhuǎn)的信息,因此g0棧其實(shí)就是調(diào)度中心的棧。
線程循環(huán)會(huì)按順序循環(huán)去執(zhí)行上圖右側(cè)的函數(shù):schedule->execute->gogo->業(yè)務(wù)方法->goexit。
schedule
schedule
函數(shù)的作用是為當(dāng)前的P獲取一個(gè)可以執(zhí)行的g,并執(zhí)行它。
- 首先會(huì)有1/61的概率檢查全局隊(duì)列,確保全局隊(duì)列中的G也會(huì)被調(diào)度。
- 然后有60/61的概率從本地隊(duì)列中獲取g。
- 如果從本地隊(duì)列中沒有獲取到可執(zhí)行的g,就會(huì)調(diào)用
findrunnable
函數(shù)去獲取。
findrunnable
函數(shù)的流程:
- 調(diào)用runqget函數(shù)來從P自己的runnable G隊(duì)列中得到一個(gè)可以執(zhí)行的G;
- 如果1失敗,調(diào)用globrunqget函數(shù)從全局runnableG隊(duì)列中得到一個(gè)可以執(zhí)行的G;
- 如果2失敗,調(diào)用netpoll(非阻塞)函數(shù)取一個(gè)異步回調(diào)的G;
- 如果3失敗,嘗試從其他P那里偷取一半數(shù)量的G過來;
- 如果4失敗,再次調(diào)用globrunqget函數(shù)從全局runnableG隊(duì)列中得到一個(gè)可以執(zhí)行的G;
- 如果5失敗,調(diào)用netpoll(阻塞)函數(shù)取一個(gè)異步回調(diào)的G;
- 如果6仍然沒有取到G,那么調(diào)用stopm函數(shù)停止這個(gè)M。
如果獲取到了可執(zhí)行的g,就調(diào)用execute
函數(shù)去執(zhí)行。
// One round of scheduler: find a runnable goroutine and execute it. // Never returns. func schedule() { ...... // 新建一個(gè)gp變量,gp就是即將要運(yùn)行的協(xié)程指針 var gp *g var inheritTime bool // 垃圾回收相關(guān)的工作 ...... // 調(diào)度過程中有1/61的概率檢查全局隊(duì)列,確保全局隊(duì)列中的G也會(huì)被調(diào)度。 // M綁定的P首先有1/61概率從全局隊(duì)列獲取G,60/61概率從本地隊(duì)列獲取G if gp == nil { // Check the global runnable queue once in a while to ensure fairness. // Otherwise two goroutines can completely occupy the local runqueue // by constantly respawning each other. if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) } } // 從本地隊(duì)列中獲取g if gp == nil { gp, inheritTime = runqget(_g_.m.p.ptr()) // We can see gp != nil here even if the M is spinning, // if checkTimers added a local goroutine via goready. } // 如果從本地隊(duì)列獲取失敗,就會(huì)調(diào)用findrunnable函數(shù)去獲取g if gp == nil { gp, inheritTime = findrunnable() // blocks until work is available } ...... execute(gp, inheritTime) }
execute
execute函數(shù)會(huì)為schedule獲取到的可執(zhí)行協(xié)程初始化相關(guān)結(jié)構(gòu)體,然后以sched結(jié)構(gòu)體為參數(shù)調(diào)用gogo函數(shù):
func execute(gp *g, inheritTime bool) { _g_ := getg() // 初始化g結(jié)構(gòu)體 // Assign gp.m before entering _Grunning so running Gs have an // M. _g_.m.curg = gp gp.m = _g_.m casgstatus(gp, _Grunnable, _Grunning) gp.waitsince = 0 gp.preempt = false gp.stackguard0 = gp.stack.lo + _StackGuard if !inheritTime { _g_.m.p.ptr().schedtick++ } ...... // 匯編實(shí)現(xiàn)的函數(shù),通過gobuf結(jié)構(gòu)體中的信息,跳轉(zhuǎn)到執(zhí)行業(yè)務(wù)的方法 gogo(&gp.sched)
gogo
gogo函數(shù)實(shí)際上是匯編實(shí)現(xiàn)的,每個(gè)操作系統(tǒng)實(shí)現(xiàn)的gogo方法是不同的,它會(huì)通過傳進(jìn)來的gobuf
結(jié)構(gòu)體,先向普通協(xié)程棧中壓入goexit
函數(shù),然后跳轉(zhuǎn)到執(zhí)行業(yè)務(wù)的方法,協(xié)程棧也會(huì)被切換成業(yè)務(wù)協(xié)程自己的棧。
業(yè)務(wù)方法
業(yè)務(wù)方法就是協(xié)程中需要執(zhí)行的相關(guān)函數(shù)。
goexit
goexit也是匯編實(shí)現(xiàn)的,當(dāng)執(zhí)行完協(xié)程棧中的業(yè)務(wù)方法之后,就會(huì)退到goexit方法中,它會(huì)將業(yè)務(wù)協(xié)程的棧切換成調(diào)度器的棧(也就是g0棧),然后重新調(diào)用schedule函數(shù),形成一個(gè)閉環(huán)。
GMP調(diào)度模型
上述的調(diào)度模型是單線程的,但是現(xiàn)代CPU往往是多核的,應(yīng)用采用的也是多線程,因此單線程調(diào)度模型有些浪費(fèi)資源。所以我們?cè)趯?shí)際使用中,其實(shí)是一種多線程循環(huán)。但是多個(gè)線程在獲取可執(zhí)行g(shù)的時(shí)候就會(huì)存在并發(fā)沖突的問題,所以就有了GMP調(diào)度模型。
GMP調(diào)度模型簡(jiǎn)單來說是這樣的:
G是指協(xié)程goroutine,M是指操作系統(tǒng)線程,P是指調(diào)度器。
首先,GMP調(diào)度模型中有一個(gè)全局隊(duì)列,用于存放等待運(yùn)行的G。然后每個(gè)P都有自己的本地隊(duì)列,存放的也是等待運(yùn)行的G,但是存的數(shù)量有限,不會(huì)超過256個(gè)。我們新建goroutine的時(shí)候,是優(yōu)先放到P的本地隊(duì)列中的,如果隊(duì)列滿了,會(huì)把本地隊(duì)列中一半的G都移到全局隊(duì)列中。
線程想運(yùn)行任務(wù)就得獲取P,從P的本地隊(duì)列獲取G,G執(zhí)行之后,M會(huì)從P獲取下一個(gè)G,不斷重復(fù)下去。P隊(duì)列為空時(shí),M會(huì)嘗試從全局隊(duì)列拿一批G放到P的本地隊(duì)列,如果獲取不到就會(huì)從其他P的本地隊(duì)列偷一半放到自己P的本地隊(duì)列。
當(dāng)M執(zhí)行某一個(gè)G時(shí)候如果發(fā)生了系統(tǒng)調(diào)用或者其余阻塞操作,M會(huì)阻塞,如果當(dāng)前有一些G在執(zhí)行,runtime會(huì)把這個(gè)線程M從P中摘除(detach),然后再創(chuàng)建一個(gè)新的操作系統(tǒng)的線程(如果有空閑的線程可用就復(fù)用空閑線程)來服務(wù)于這個(gè)P。當(dāng)M系統(tǒng)調(diào)用結(jié)束時(shí)候,這個(gè)G會(huì)嘗試獲取一個(gè)空閑的P執(zhí)行,并放入到這個(gè)P的本地隊(duì)列。如果獲取不到P,那么這個(gè)線程M變成休眠狀態(tài), 加入到空閑線程中,然后這個(gè)G會(huì)被放入全局隊(duì)列中。
P的底層結(jié)構(gòu)
我們發(fā)現(xiàn)GMP調(diào)度模型中有一個(gè)P,P就是調(diào)度器,我們來看一下P的底層數(shù)據(jù)結(jié)構(gòu),同樣在runtime\runtime2.go
文件中:
type p struct { id int32 status uint32 // one of pidle/prunning/... // 指向調(diào)度器服務(wù)的那個(gè)線程 m muintptr // back-link to associated m (nil if idle) // Queue of runnable goroutines. Accessed without lock. // 調(diào)度器的本地隊(duì)列,因?yàn)橹环?wù)于一個(gè)線程,所以可以無(wú)鎖的訪問,隊(duì)列本身實(shí)際上是一個(gè)大小為256的指針數(shù)組 runqhead uint32 runqtail uint32 runq [256]guintptr // 指向下一個(gè)可用g的指針 runnext guintptr }
協(xié)程并發(fā)
我們上面介紹的調(diào)度模型實(shí)際上是非搶占式的,非搶占式模型的特點(diǎn)就是只有當(dāng)協(xié)程主動(dòng)讓出后,M才會(huì)去運(yùn)行本地隊(duì)列后面的協(xié)程,那么這樣就很容易造成隊(duì)列尾部的協(xié)程餓死。
其實(shí)Go語(yǔ)言的協(xié)程是基于搶占式來實(shí)現(xiàn)的,也就是當(dāng)協(xié)程執(zhí)行一段時(shí)間后將當(dāng)前任務(wù)暫定,執(zhí)行后續(xù)協(xié)程任務(wù),防止時(shí)間敏感攜程執(zhí)行失敗。如下圖所示:
搶占式調(diào)度
當(dāng)目前線程中執(zhí)行的協(xié)程是一個(gè)超長(zhǎng)時(shí)間的任務(wù),此時(shí)先保存該協(xié)程的運(yùn)行狀態(tài)也就是保護(hù)現(xiàn)場(chǎng),若是后續(xù)還需繼續(xù)執(zhí)行就將其放入本地隊(duì)列中去,如果不需要執(zhí)行就將其處于休眠狀態(tài),然后直接跳轉(zhuǎn)到schedule函數(shù)中。
實(shí)現(xiàn):
- 主動(dòng)掛取:gopark方法,當(dāng)業(yè)務(wù)調(diào)用這個(gè)方法線程就會(huì)直接回到schedule函數(shù)并切換協(xié)程棧,當(dāng)前運(yùn)行的協(xié)程將會(huì)處于等待狀態(tài),等待狀態(tài)的協(xié)程是無(wú)法立即進(jìn)入任務(wù)隊(duì)列中的。程序員無(wú)法主動(dòng)調(diào)用gopark函數(shù),但是我們可以通過Sleep等具有g(shù)opark的函數(shù)來進(jìn)行主動(dòng)掛取,Sleep五秒之后系統(tǒng)將會(huì)把任務(wù)的等待狀態(tài)更改為運(yùn)行狀態(tài)放入隊(duì)列中。
- 系統(tǒng)調(diào)用完成時(shí):go程序在運(yùn)行狀態(tài)中進(jìn)行了系統(tǒng)調(diào)用,那么當(dāng)系統(tǒng)的底層調(diào)用完成后就會(huì)調(diào)用exitsyscall函數(shù),線程就會(huì)停止執(zhí)行當(dāng)前協(xié)程,將當(dāng)前協(xié)程放入隊(duì)列中去。
- 標(biāo)記搶占morestack():當(dāng)函數(shù)跳轉(zhuǎn)時(shí)都會(huì)調(diào)用這個(gè)方法,它的本意在于檢查當(dāng)前協(xié)程??臻g是否有足夠內(nèi)存,如果不夠就要擴(kuò)大該??臻g。當(dāng)系統(tǒng)監(jiān)控到協(xié)程運(yùn)行超過
10ms
,就將g.stackguard0置為0xfffffade(該值是一個(gè)搶占標(biāo)志),讓程序在只執(zhí)行morestack函數(shù)時(shí)順便判斷一下是否將g中的stackguard置為搶占,如果的確被標(biāo)記搶占,就回到schedule方法,并將當(dāng)前協(xié)程放回隊(duì)列中。
全局隊(duì)列的饑餓問題
上述操作讓本地隊(duì)列成了一個(gè)小循環(huán),但是如果目前系統(tǒng)中的線程的本地隊(duì)列中都擁有一個(gè)超大的協(xié)程任務(wù),那么所有的線程都將在一段時(shí)間內(nèi)處于忙碌狀態(tài),全局隊(duì)列中的任務(wù)將會(huì)長(zhǎng)期無(wú)法運(yùn)行,這個(gè)問題又稱為全局隊(duì)列饑餓問題,解決方式就是在本地隊(duì)列循環(huán)時(shí),以一定的概率從全局隊(duì)列中取出某個(gè)任務(wù),讓它也參與到本地循環(huán)當(dāng)中去。
其實(shí)在執(zhí)行schedule
函數(shù)尋找可運(yùn)行g(shù)的時(shí)候,首先會(huì)去執(zhí)行下面的代碼,即調(diào)度過程中有1/61
的概率去全局隊(duì)列中獲取可執(zhí)行的協(xié)程,防止全局隊(duì)列中的協(xié)程被餓死。
// 調(diào)度過程中有1/61的概率檢查全局隊(duì)列,確保全局隊(duì)列中的G也會(huì)被調(diào)度。 if gp == nil { // Check the global runnable queue once in a while to ensure fairness. // Otherwise two goroutines can completely occupy the local runqueue // by constantly respawning each other. if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) } }
到此這篇關(guān)于Go簡(jiǎn)單實(shí)現(xiàn)協(xié)程方法的文章就介紹到這了,更多相關(guān)Go協(xié)程內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解Go語(yǔ)言如何熱重載和優(yōu)雅地關(guān)閉程序
我們有時(shí)會(huì)因不同的目的去關(guān)閉服務(wù),一種關(guān)閉服務(wù)是終止操作系統(tǒng),一種關(guān)閉服務(wù)是用來更新配置,本文就來和大家簡(jiǎn)單講講這兩種方法的實(shí)現(xiàn)吧2023-07-07談?wù)揋o 什么時(shí)候會(huì)觸發(fā) GC問題
Go 語(yǔ)言作為一門新語(yǔ)言,在早期經(jīng)常遭到唾棄的就是在垃圾回收(下稱:GC)機(jī)制中 STW(Stop-The-World)的時(shí)間過長(zhǎng)。下面文章就對(duì)此話題展開,感興趣的小伙伴可以參考下面文章的內(nèi)容2021-09-09Golang?WorkerPool線程池并發(fā)模式示例詳解
這篇文章主要為大家介紹了Golang?WorkerPool線程池并發(fā)模式示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08Golang實(shí)現(xiàn)短網(wǎng)址/短鏈服務(wù)的開發(fā)筆記分享
這篇文章主要為大家詳細(xì)介紹了如何使用Golang實(shí)現(xiàn)短網(wǎng)址/短鏈服務(wù),文中的示例代碼講解詳細(xì),具有一定的學(xué)習(xí)價(jià)值,感興趣的小伙伴可以了解一下2023-05-05Go 協(xié)程超時(shí)控制的實(shí)現(xiàn)
本文主要介紹了Go 協(xié)程超時(shí)控制的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08Go+Vue開發(fā)一個(gè)線上外賣應(yīng)用的流程(用戶名密碼和圖形驗(yàn)證碼)
這篇文章主要介紹了Go+Vue開發(fā)一個(gè)線上外賣應(yīng)用(用戶名密碼和圖形驗(yàn)證碼),本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-11-11Golang實(shí)現(xiàn)驗(yàn)證一個(gè)字符串是否為URL
在實(shí)際開發(fā)過程中,有時(shí)候會(huì)遇到?URL?的校驗(yàn)問題,Go?語(yǔ)言中有哪些方法去驗(yàn)證一個(gè)字符串是否滿足?URL?格式呢?本文就來和大家詳細(xì)講講2023-04-04一文詳解GO如何實(shí)現(xiàn)Redis的AOF持久化
這篇文章主要為大家詳細(xì)介紹了GO如何實(shí)現(xiàn)Redis的AOF持久化的,文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,感興趣的小伙伴可以了解一下2023-03-03Go語(yǔ)言學(xué)習(xí)之?dāng)?shù)組的用法詳解
數(shù)組是相同數(shù)據(jù)類型的一組數(shù)據(jù)的集合,數(shù)組一旦定義長(zhǎng)度不能修改,數(shù)組可以通過下標(biāo)(或者叫索引)來訪問元素。本文將通過示例詳細(xì)講解Go語(yǔ)言中數(shù)組的使用,需要的可以參考一下2022-04-04