欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Golang并發(fā)編程之main goroutine的創(chuàng)建與調(diào)度詳解

 更新時間:2023年03月22日 16:04:54   作者:IguoChan  
這篇文章主要為大家詳細(xì)介紹了Golang并發(fā)編程中main goroutine的創(chuàng)建與調(diào)度,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下

0. 簡介

上一篇博客我們分析了調(diào)度器的初始化,這篇博客我們正式進入main函數(shù)及為其創(chuàng)建的goroutine的過程分析。

1. 創(chuàng)建main goroutine

接上文,在runtime/asm_amd64.s文件的runtime·rt0_go中,在執(zhí)行完runtime.schedinit函數(shù)進行調(diào)度器的初始化后,就開始創(chuàng)建main goroutine了。

// create a new goroutine to start program
MOVQ   $runtime·mainPC(SB), AX       // entry // mainPC是runtime.main
PUSHQ  AX                                     // 將runtime.main函數(shù)地址入棧,作為參數(shù)
CALL   runtime·newproc(SB)                    // 創(chuàng)建main goroutine,入?yún)⒕褪莚untime.main
POPQ   AX 

以上代碼創(chuàng)建了一個新的協(xié)程(在Go中,go func()之類的相當(dāng)于調(diào)用runtime.newproc),這個協(xié)程就是main goroutine,那我們就看看runtime·newproc函數(shù)做了什么。

// Create a new g running fn.
// Put it on the queue of g's waiting to run.
// The compiler turns a go statement into a call to this.
func newproc(fn *funcval) {
   gp := getg()         // 獲取正在運行的g,初始化時是m0.g0
   pc := getcallerpc()  // 返回的是調(diào)用newproc函數(shù)時由call指令壓棧的函數(shù)的返回地址,即上面匯編語言的第5行`POPQ AX`這條指令的地址
   systemstack(func() { // systemstack函數(shù)的作用是切換到系統(tǒng)棧來執(zhí)行其參數(shù)函數(shù),也就是`g0`棧,這里當(dāng)然就是m0.g0,所以基本不需要做什么
      newg := newproc1(fn, gp, pc)

      _p_ := getg().m.p.ptr()
      runqput(_p_, newg, true)

      if mainStarted {
         wakep()
      }
   })
}

所以以上代碼的重點就是調(diào)用newproc1函數(shù)進行協(xié)程的創(chuàng)建。

// Create a new g in state _Grunnable, starting at fn. callerpc is the
// address of the go statement that created this. The caller is responsible
// for adding the new g to the scheduler.
func newproc1(fn *funcval, callergp *g, callerpc uintptr) *g {
   _g_ := getg() // _g_ = g0,即m0.g0

   if fn == nil {
      _g_.m.throwing = -1 // do not dump full stacks
      throw("go of nil func value")
   }
   acquirem() // disable preemption because it can be holding p in a local var

   _p_ := _g_.m.p.ptr()
   newg := gfget(_p_)  // 從本地的已經(jīng)廢棄的g列表中獲取一個g先,此時才剛初始化,所以肯定返回nil
   if newg == nil {
      newg = malg(_StackMin) // new一個g的結(jié)構(gòu)體對象,然后在堆上分配2k的棧大小,并設(shè)置stack和stackguard0/1
      casgstatus(newg, _Gidle, _Gdead)
      allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
   }
   if newg.stack.hi == 0 {
      throw("newproc1: newg missing stack")
   }

   if readgstatus(newg) != _Gdead {
      throw("newproc1: new g is not Gdead")
   }

   // 調(diào)整棧頂指針
   totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize) // extra space in case of reads slightly beyond frame
   totalSize = alignUp(totalSize, sys.StackAlign)
   sp := newg.stack.hi - totalSize
   spArg := sp
   if usesLR {
      // caller's LR
      *(*uintptr)(unsafe.Pointer(sp)) = 0
      prepGoExitFrame(sp)
      spArg += sys.MinFrameSize
   }
   ...
}

上述代碼從堆上分配了一個g的結(jié)構(gòu)體,并且在堆上為其分配了一個2k大小的棧,并設(shè)置了好了newgstack等相關(guān)參數(shù)。此時,newg的狀態(tài)如圖所示:

接著我們繼續(xù)分析newproc1函數(shù):

memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp // 設(shè)置newg的棧頂
newg.stktopsp = sp
// newg.sched.pc表示當(dāng)newg運行起來時的運行起始位置,下面一段是類似于代碼注入,就好像每個go func() 
// 函數(shù)都是由goexit函數(shù)引起的一樣,以便后面當(dāng)newg結(jié)束后,
// 完成newg的回收(當(dāng)然這里main goroutine結(jié)束后進程就結(jié)束了,不會被回收)。
newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn) // 調(diào)整sched成員和newg的棧
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
if isSystemGoroutine(newg, false) {
   atomic.Xadd(&sched.ngsys, +1)
} else {
   // Only user goroutines inherit pprof labels.
   if _g_.m.curg != nil {
      newg.labels = _g_.m.curg.labels
   }
}

以上代碼對newgsched成員進行初始化,其中newg.sched.sp表示其被調(diào)度起來后應(yīng)該使用的棧頂,newg.sched.pc表示其被調(diào)度起來從這個地址開始運行,但是這個值被設(shè)置成了goexit函數(shù)的下一條指令,所以我們看看,在gostartcallfn函數(shù)中,到底做了什么才能實現(xiàn)此功能:

// adjust Gobuf as if it executed a call to fn
// and then stopped before the first instruction in fn.
func gostartcallfn(gobuf *gobuf, fv *funcval) {
   var fn unsafe.Pointer
   if fv != nil {
      fn = unsafe.Pointer(fv.fn)
   } else {
      fn = unsafe.Pointer(abi.FuncPCABIInternal(nilfunc))
   }
   gostartcall(gobuf, fn, unsafe.Pointer(fv))
}

// sys_x86.go
// adjust Gobuf as if it executed a call to fn with context ctxt
// and then stopped before the first instruction in fn.
func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) {
   sp := buf.sp
   sp -= goarch.PtrSize
   *(*uintptr)(unsafe.Pointer(sp)) = buf.pc // 插入goexit的第二條指令,返回時可以調(diào)用
   buf.sp = sp                              
   buf.pc = uintptr(fn)                     // 此時才是真正地設(shè)置pc 
   buf.ctxt = ctxt
}

以上操作的目的就是:

  • 調(diào)整newg的??臻g,把goexit函數(shù)的第二條指令的地址入棧,偽造成goexit函數(shù)調(diào)用了fn,從而使fn執(zhí)行完成后執(zhí)行ret指令時返回到goexit繼續(xù)執(zhí)行完成最后的清理工作;
  • 重新設(shè)置newg.buf.pc 為需要執(zhí)行的函數(shù)的地址,即fn,此場景為runtime.main函數(shù)的地址。

接下來會設(shè)置newg的狀態(tài)為runnable;最后別忘了newproc函數(shù)中還有幾行:

newg := newproc1(fn, gp, pc)

_p_ := getg().m.p.ptr()
runqput(_p_, newg, true)

if mainStarted {
   wakep()
}

在創(chuàng)建完newg后,將其放到此線程的g0(這里是m0.g0)所在的runq隊列,并且優(yōu)先插入到隊列的前端(runqput第三個參數(shù)為true),做完這些后,我們可以得出以下的關(guān)系:

2. 調(diào)度main goroutine

上一節(jié)我們分析了main goroutine的創(chuàng)建過程,這一節(jié)我們討論一下,調(diào)度器如何把main goroutine調(diào)度到CPU上去運行。讓我們繼續(xù)回到runtime/asm_amd64.s中,在完成runtime.newproc創(chuàng)建完main goroutine之后,正式執(zhí)行runtime·mstart來執(zhí)行,而runtime·mstart最終會調(diào)用go寫的runtime·mstart0函數(shù)。

// start this M
CALL   runtime·mstart(SB)

CALL   runtime·abort(SB)  // mstart should never return
RET

TEXT runtime·mstart(SB),NOSPLIT|TOPFRAME,$0
   CALL   runtime·mstart0(SB)
   RET // not reached

runtime·mstart0函數(shù)如下:

func mstart0() {
   _g_ := getg() // _g_ = &g0

   osStack := _g_.stack.lo == 0
   if osStack { // g0的stack.lo已經(jīng)初始化,所以不會走以下邏輯
      // Initialize stack bounds from system stack.
      // Cgo may have left stack size in stack.hi.
      // minit may update the stack bounds.
      //
      // Note: these bounds may not be very accurate.
      // We set hi to &size, but there are things above
      // it. The 1024 is supposed to compensate this,
      // but is somewhat arbitrary.
      size := _g_.stack.hi
      if size == 0 {
         size = 8192 * sys.StackGuardMultiplier
      }
      _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
      _g_.stack.lo = _g_.stack.hi - size + 1024
   }
   // Initialize stack guard so that we can start calling regular
   // Go code.
   _g_.stackguard0 = _g_.stack.lo + _StackGuard
   // This is the g0, so we can also call go:systemstack
   // functions, which check stackguard1.
   _g_.stackguard1 = _g_.stackguard0
   mstart1()

   // Exit this thread.
   if mStackIsSystemAllocated() {
      // Windows, Solaris, illumos, Darwin, AIX and Plan 9 always system-allocate
      // the stack, but put it in _g_.stack before mstart,
      // so the logic above hasn't set osStack yet.
      osStack = true
   }
   mexit(osStack)
}

以上代碼設(shè)置了一些棧信息之后,調(diào)用runtime.mstart1函數(shù):

func mstart1() {
   _g_ := getg() // _g_ = &g0

   if _g_ != _g_.m.g0 { // _g_ = &g0
      throw("bad runtime·mstart")
   }

   // Set up m.g0.sched as a label returning to just
   // after the mstart1 call in mstart0 above, for use by goexit0 and mcall.
   // We're never coming back to mstart1 after we call schedule,
   // so other calls can reuse the current frame.
   // And goexit0 does a gogo that needs to return from mstart1
   // and let mstart0 exit the thread.
   _g_.sched.g = guintptr(unsafe.Pointer(_g_))
   _g_.sched.pc = getcallerpc() // getcallerpc()獲取mstart1執(zhí)行完的返回地址
   _g_.sched.sp = getcallersp() // getcallersp()獲取調(diào)用mstart1時的棧頂?shù)刂?

   asminit()
   minit() // 信號相關(guān)初始化

   // Install signal handlers; after minit so that minit can
   // prepare the thread to be able to handle the signals.
   if _g_.m == &m0 {
      mstartm0()
   }

   if fn := _g_.m.mstartfn; fn != nil {
      fn()
   }

   if _g_.m != &m0 {
      acquirep(_g_.m.nextp.ptr())
      _g_.m.nextp = 0
   }
   schedule()
}

可以看到mstart1函數(shù)保存額調(diào)度相關(guān)的信息,特別是保存了正在運行的g0的下一條指令和棧頂?shù)刂罚?這些調(diào)度信息對于goroutine而言是很重要的。

接下來就是golang調(diào)度系統(tǒng)的核心函數(shù)runtime.schedule了:

func schedule() {
   _g_ := getg() // _g_ 是每個工作線程的m的m0,在初始化的場景就是m0.g0

   ...

   var gp *g
   var inheritTime bool

   ...
   
   if gp == nil {
      // 為了保證調(diào)度的公平性,每進行61次調(diào)度就需要優(yōu)先從全局隊列中獲取goroutine
      // Check the global runnable queue once in a while to ensure fairness.
      // Otherwise two goroutines can completely occupy the local runqueue
      // by constantly respawning each other.
      if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
         lock(&sched.lock)
         gp = globrunqget(_g_.m.p.ptr(), 1)
         unlock(&sched.lock)
      }
   }
   if gp == nil { // 從p本地的隊列中獲取goroutine
      gp, inheritTime = runqget(_g_.m.p.ptr())
      // We can see gp != nil here even if the M is spinning,
      // if checkTimers added a local goroutine via goready.
   }
   if gp == nil { // 如果以上兩者都沒有,那么就需要從其他p哪里竊取goroutine
      gp, inheritTime = findrunnable() // blocks until work is available
   }

   ...

   execute(gp, inheritTime)
}

以上我們節(jié)選了一些和調(diào)度相關(guān)的代碼,意圖簡化我們的理解,調(diào)度中獲取goroutine的規(guī)則是:

  • 每調(diào)度61次就需要從全局隊列中獲取goroutine;
  • 其次優(yōu)先從本P所在隊列中獲取goroutine;
  • 如果還沒有獲取到,則從其他P的運行隊列中竊取goroutine;

最后調(diào)用runtime.excute函數(shù)運行代碼:

func execute(gp *g, inheritTime bool) {
   _g_ := getg()

   // Assign gp.m before entering _Grunning so running Gs have an
   // M.
   _g_.m.curg = gp
   gp.m = _g_.m
   casgstatus(gp, _Grunnable, _Grunning) // 設(shè)置gp的狀態(tài)
   gp.waitsince = 0
   gp.preempt = false
   gp.stackguard0 = gp.stack.lo + _StackGuard
   
   ...

   gogo(&gp.sched)
}

在完成gp運行前的準(zhǔn)備工作后,excute函數(shù)調(diào)用gogo函數(shù)完成從g0gp的轉(zhuǎn)換:

  • 讓出CPU的執(zhí)行權(quán);
  • 棧的切換;

gogo函數(shù)是用匯編語言編寫的精悍的一段代碼,這里就不詳細(xì)分析了,其主要做了兩件事:

  • gp.sched的成員恢復(fù)到CPU的寄存器完成狀態(tài)以及棧的切換;
  • 跳轉(zhuǎn)到gp.sched.pc所指的指令地址(runtime.main)處執(zhí)行。
func main() {
   g := getg() // _g_ = main_goroutine

   // Racectx of m0->g0 is used only as the parent of the main goroutine.
   // It must not be used for anything else.
   g.m.g0.racectx = 0

   // golang棧的最大值
   // Max stack size is 1 GB on 64-bit, 250 MB on 32-bit.
   // Using decimal instead of binary GB and MB because
   // they look nicer in the stack overflow failure message.
   if goarch.PtrSize == 8 {
      maxstacksize = 1000000000
   } else {
      maxstacksize = 250000000
   }

   // An upper limit for max stack size. Used to avoid random crashes
   // after calling SetMaxStack and trying to allocate a stack that is too big,
   // since stackalloc works with 32-bit sizes.
   maxstackceiling = 2 * maxstacksize

   // Allow newproc to start new Ms.
   mainStarted = true

   // 需要切換到g0棧去執(zhí)行newm
   // 創(chuàng)建監(jiān)控線程,該線程獨立于調(diào)度器,無需與P關(guān)聯(lián)
   if GOARCH != "wasm" { // no threads on wasm yet, so no sysmon
      systemstack(func() {
         newm(sysmon, nil, -1)
      })
   }

   // Lock the main goroutine onto this, the main OS thread,
   // during initialization. Most programs won't care, but a few
   // do require certain calls to be made by the main thread.
   // Those can arrange for main.main to run in the main thread
   // by calling runtime.LockOSThread during initialization
   // to preserve the lock.
   lockOSThread()

   if g.m != &m0 {
      throw("runtime.main not on m0")
   }

   // Record when the world started.
   // Must be before doInit for tracing init.
   runtimeInitTime = nanotime()
   if runtimeInitTime == 0 {
      throw("nanotime returning zero")
   }

   if debug.inittrace != 0 {
      inittrace.id = getg().goid
      inittrace.active = true
   }

   // runtime包的init
   doInit(&runtime_inittask) // Must be before defer.

   // Defer unlock so that runtime.Goexit during init does the unlock too.
   needUnlock := true
   defer func() {
      if needUnlock {
         unlockOSThread()
      }
   }()

   gcenable()

   main_init_done = make(chan bool)
   if iscgo {
      if _cgo_thread_start == nil {
         throw("_cgo_thread_start missing")
      }
      if GOOS != "windows" {
         if _cgo_setenv == nil {
            throw("_cgo_setenv missing")
         }
         if _cgo_unsetenv == nil {
            throw("_cgo_unsetenv missing")
         }
      }
      if _cgo_notify_runtime_init_done == nil {
         throw("_cgo_notify_runtime_init_done missing")
      }
      // Start the template thread in case we enter Go from
      // a C-created thread and need to create a new thread.
      startTemplateThread()
      cgocall(_cgo_notify_runtime_init_done, nil)
   }

   doInit(&main_inittask) // main包的init,會遞歸調(diào)用import的包的初始化函數(shù)

   // Disable init tracing after main init done to avoid overhead
   // of collecting statistics in malloc and newproc
   inittrace.active = false

   close(main_init_done)

   needUnlock = false
   unlockOSThread()

   if isarchive || islibrary {
      // A program compiled with -buildmode=c-archive or c-shared
      // has a main, but it is not executed.
      return
   }
   fn := main_main // make an indirect call, as the linker doesn't know the address of the main package when laying down the runtime
   fn() // 執(zhí)行main函數(shù)
   if raceenabled {
      racefini()
   }

   // Make racy client program work: if panicking on
   // another goroutine at the same time as main returns,
   // let the other goroutine finish printing the panic trace.
   // Once it does, it will exit. See issues 3934 and 20018.
   if atomic.Load(&runningPanicDefers) != 0 {
      // Running deferred functions should not take long.
      for c := 0; c < 1000; c++ {
         if atomic.Load(&runningPanicDefers) == 0 {
            break
         }
         Gosched()
      }
   }
   if atomic.Load(&panicking) != 0 {
      gopark(nil, nil, waitReasonPanicWait, traceEvGoStop, 1)
   }

   exit(0)
   for {
      var x *int32
      *x = 0
   }
}

runtime.main函數(shù)的主要工作是:

  • 啟動一個sysmon系統(tǒng)監(jiān)控線程,該線程負(fù)責(zé)程序的gc、搶占調(diào)度等;
  • 執(zhí)行runtime包和所有包的初始化;
  • 執(zhí)行main.main函數(shù);
  • 最后調(diào)用exit系統(tǒng)調(diào)用退出進程,之前提到的注入goexit程序?qū)?code>main goroutine不起作用,是為了其他線程的回收而做的。

以上就是Golang并發(fā)編程之main goroutine的創(chuàng)建與調(diào)度詳解的詳細(xì)內(nèi)容,更多關(guān)于Golang main goroutine的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • golang 接口嵌套實現(xiàn)復(fù)用的操作

    golang 接口嵌套實現(xiàn)復(fù)用的操作

    這篇文章主要介紹了golang 接口嵌套實現(xiàn)復(fù)用的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-04-04
  • 詳解Go語言中io/ioutil工具的使用

    詳解Go語言中io/ioutil工具的使用

    這篇文章主要為大家詳細(xì)介紹了Go語言中io/ioutil工具的使用,從而簡化文件操作。文中是示例代碼講解詳細(xì),感興趣的小伙伴可以了解一下
    2022-05-05
  • Golang實現(xiàn)拓?fù)渑判?DFS算法版)

    Golang實現(xiàn)拓?fù)渑判?DFS算法版)

    這篇文章主要介紹了Golang實現(xiàn)拓?fù)渑判?DFS算法版),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2019-11-11
  • jenkins配置golang?代碼工程自動發(fā)布的實現(xiàn)方法

    jenkins配置golang?代碼工程自動發(fā)布的實現(xiàn)方法

    這篇文章主要介紹了jenkins配置golang?代碼工程自動發(fā)布,jks是個很好的工具,使用方法也很多,我只用了它簡單的功能,對jenkins配置golang相關(guān)知識感興趣的朋友一起看看吧
    2022-07-07
  • go語言實現(xiàn)接口查詢

    go語言實現(xiàn)接口查詢

    這篇文章主要介紹了go語言實現(xiàn)接口查詢,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • golang定時器Timer的用法和實現(xiàn)原理解析

    golang定時器Timer的用法和實現(xiàn)原理解析

    這篇文章主要介紹了golang定時器Ticker,本文主要來看一下Timer的用法和實現(xiàn)原理,需要的朋友可以參考以下內(nèi)容
    2023-04-04
  • go語言beego框架web開發(fā)語法筆記示例

    go語言beego框架web開發(fā)語法筆記示例

    這篇文章主要為大家介紹了go語言beego框架web開發(fā)語法筆記示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步早日升職加薪
    2022-04-04
  • Go map定義的方式及修改技巧

    Go map定義的方式及修改技巧

    這篇文章主要給大家介紹了關(guān)于Go map定義的方式及修改技巧,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-02-02
  • Go語言遍歷map實現(xiàn)(訪問map中的每一個鍵值對)

    Go語言遍歷map實現(xiàn)(訪問map中的每一個鍵值對)

    這篇文章主要介紹了Go語言遍歷map實現(xiàn)(訪問map中的每一個鍵值對),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-01-01
  • GoLang中的互斥鎖Mutex和讀寫鎖RWMutex使用教程

    GoLang中的互斥鎖Mutex和讀寫鎖RWMutex使用教程

    RWMutex是一個讀/寫互斥鎖,在某一時刻只能由任意數(shù)量的reader持有或者一個writer持有。也就是說,要么放行任意數(shù)量的reader,多個reader可以并行讀;要么放行一個writer,多個writer需要串行寫
    2023-01-01

最新評論