源碼分析Go語言使用cgo導致線程增長的原因
TDengine Go 連接器 https://github.com/taosdata/driver-go 使用 cgo 調用 taos.so 中的 API,使用過程中發(fā)現線程數不斷增長,本文從一個 cgo 調用開始解析 Go 源碼,分析造成線程增長的原因。
轉換 cgo 代碼
對 driver-go/wrapper/taosc.go 進行轉換
go tool cgo taosc.go
執(zhí)行后生成 _obj
文件夾
go 代碼分析
以 taosc.cgo1.go
中 TaosResetCurrentDB
為例來分析。
// TaosResetCurrentDB void taos_reset_current_db(TAOS *taos); func TaosResetCurrentDB(taosConnect unsafe.Pointer) { func() { _cgo0 := /*line :161:26*/taosConnect; _cgoCheckPointer(_cgo0, nil); _Cfunc_taos_reset_current_db(_cgo0); }() } //go:linkname _cgoCheckPointer runtime.cgoCheckPointer func _cgoCheckPointer(interface{}, interface{}) //go:cgo_unsafe_args func _Cfunc_taos_reset_current_db(p0 unsafe.Pointer) (r1 _Ctype_void) { _cgo_runtime_cgocall(_cgo_453a0cad50ef_Cfunc_taos_reset_current_db, uintptr(unsafe.Pointer(&p0))) if _Cgo_always_false { _Cgo_use(p0) } return } //go:linkname _cgo_runtime_cgocall runtime.cgocall func _cgo_runtime_cgocall(unsafe.Pointer, uintptr) int32 //go:cgo_import_static _cgo_453a0cad50ef_Cfunc_taos_reset_current_db //go:linkname __cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db _cgo_453a0cad50ef_Cfunc_taos_reset_current_db var __cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db byte var _cgo_453a0cad50ef_Cfunc_taos_reset_current_db = unsafe.Pointer(&__cgofn__cgo_453a0cad50ef_Cfunc_taos_reset_current_db)
TaosResetCurrentDB
首先調用 _cgoCheckPointer
檢查傳入參數是否為 nil
。
//go:linkname _cgoCheckPointer runtime.cgoCheckPointer
表示 cgoCheckPointer
方法實現是 runtime.cgoCheckPointer
,如果傳入參數是 nil
程序將會 panic
。
接著調用 _Cfunc_taos_reset_current_db
。
Cfunc_taos_reset_current_db
方法中 _Cgo_always_false
在運行時會是 false,所以只分析第一句 _cgo_runtime_cgocall(_cgo_453a0cad50ef_Cfunc_taos_reset_current_db, uintptr(unsafe.Pointer(&p0)))
。
_cgo_runtime_cgocall
實現是runtime.cgocall
這個會重點分析。_cgo_453a0cad50ef_Cfunc_taos_reset_current_db
由上方最后代碼塊可以看出是taos_reset_current_db
方法指針。uintptr(unsafe.Pointer(&p0))
表示 p0 的指針地址。- 由上面可以看出這句意思是調用
runtime.cgocall
,參數為方法指針和參數的指針地址。
分析 runtime.cgocall
基于 golang 1.20.4
分析該方法
func cgocall(fn, arg unsafe.Pointer) int32 { if !iscgo && GOOS != "solaris" && GOOS != "illumos" && GOOS != "windows" { throw("cgocall unavailable") } if fn == nil { throw("cgocall nil") } if raceenabled { racereleasemerge(unsafe.Pointer(&racecgosync)) } mp := getg().m // 獲取當前 goroutine 的 M mp.ncgocall++ // 總 cgo 計數 +1 mp.ncgo++ // 當前 cgo 計數 +1 mp.cgoCallers[0] = 0 // 重置追蹤 entersyscall() // 進入系統(tǒng)調用,保存上下文, 標記當前 goroutine 獨占 m, 跳過垃圾回收 osPreemptExtEnter(mp) // 標記異步搶占, 使異步搶占邏輯失效 mp.incgo = true // 修改狀態(tài) errno := asmcgocall(fn, arg) // 真正進行方法調用的地方 mp.incgo = false // 修改狀態(tài) mp.ncgo-- // 當前 cgo 調用-1 osPreemptExtExit(mp) // 恢復異步搶占 exitsyscall() // 退出系統(tǒng)調用,恢復調度器控制 if raceenabled { raceacquire(unsafe.Pointer(&racecgosync)) } // 避免 GC 過早回收 KeepAlive(fn) KeepAlive(arg) KeepAlive(mp) return errno }
其中兩個主要的方法 entersyscall
和 asmcgocall
,接下來對這兩個方法進行著重分析。
分析 entersyscall
func entersyscall() { reentersyscall(getcallerpc(), getcallersp()) }
entersyscall
直接調用的 reentersyscall
,關注下 reentersyscall
注釋中的一段:
// If the syscall does not block, that is it, we do not emit any other events. // If the syscall blocks (that is, P is retaken), retaker emits traceGoSysBlock;
如果 syscall
調用沒有阻塞則不會觸發(fā)任何事件,如果被阻塞 retaker
會觸發(fā) traceGoSysBlock
,那需要了解一下多長時間被認為是阻塞,先跟到 retaker
方法。
func retake(now int64) uint32 { n := 0 lock(&allpLock) for i := 0; i < len(allp); i++ { pp := allp[i] if pp == nil { continue } pd := &pp.sysmontick s := pp.status sysretake := false if s == _Prunning || s == _Psyscall { t := int64(pp.schedtick) if int64(pd.schedtick) != t { pd.schedtick = uint32(t) pd.schedwhen = now } else if pd.schedwhen+forcePreemptNS <= now { preemptone(pp) sysretake = true } } // 從系統(tǒng)調用中搶占P if s == _Psyscall { // 如果已經超過了一個系統(tǒng)監(jiān)控的 tick(20us),則從系統(tǒng)調用中搶占 P t := int64(pp.syscalltick) if !sysretake && int64(pd.syscalltick) != t { pd.syscalltick = uint32(t) pd.syscallwhen = now continue } if runqempty(pp) && sched.nmspinning.Load()+sched.npidle.Load() > 0 && pd.syscallwhen+10*1000*1000 > now { continue } unlock(&allpLock) incidlelocked(-1) if atomic.Cas(&pp.status, s, _Pidle) { if trace.enabled { traceGoSysBlock(pp) traceProcStop(pp) } n++ pp.syscalltick++ handoffp(pp) } incidlelocked(1) lock(&allpLock) } } unlock(&allpLock) return uint32(n) }
從上面可以看到系統(tǒng)調用阻塞 20 多微秒會被搶占 P,cgo 被迫 handoffp
,接下來分析 handoffp
方法
func handoffp(pp *p) { // ... // 沒有任務且沒有自旋和空閑的 M 則需要啟動一個新的 M if sched.nmspinning.Load()+sched.npidle.Load() == 0 && sched.nmspinning.CompareAndSwap(0, 1) { sched.needspinning.Store(0) startm(pp, true) return } // ... }
handoffp
方法會調用 startm
來啟動一個新的 M,跟到 startm
方法。
func startm(pp *p, spinning bool) { // ... nmp := mget() if nmp == nil { // 沒有M可用,調用newm id := mReserveID() unlock(&sched.lock) var fn func() if spinning { fn = mspinning } newm(fn, pp, id) releasem(mp) return } // ... }
此時如果沒有 M startm
會調用 newm
創(chuàng)建一個新的 M,接下來分析 newm
方法。
func newm(fn func(), pp *p, id int64) { acquirem() mp := allocm(pp, fn, id) mp.nextp.set(pp) mp.sigmask = initSigmask if gp := getg(); gp != nil && gp.m != nil && (gp.m.lockedExt != 0 || gp.m.incgo) && GOOS != "plan9" { lock(&newmHandoff.lock) if newmHandoff.haveTemplateThread == 0 { throw("on a locked thread with no template thread") } mp.schedlink = newmHandoff.newm newmHandoff.newm.set(mp) if newmHandoff.waiting { newmHandoff.waiting = false notewakeup(&newmHandoff.wake) } unlock(&newmHandoff.lock) releasem(getg().m) return } newm1(mp) releasem(getg().m) } func newm1(mp *m) { if iscgo { var ts cgothreadstart if _cgo_thread_start == nil { throw("_cgo_thread_start missing") } ts.g.set(mp.g0) ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0])) ts.fn = unsafe.Pointer(abi.FuncPCABI0(mstart)) if msanenabled { msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts)) } if asanenabled { asanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts)) } execLock.rlock() // 創(chuàng)建新線程 asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts)) execLock.runlock() return } execLock.rlock() newosproc(mp) execLock.runlock() }
從 newm
看出如果線程都在阻塞中則調用 newm1
,newm1
調用 _cgo_thread_start
創(chuàng)建新線程。
由以上分析得出當高并發(fā)調用 cgo 且執(zhí)行時間超過 20 微秒時會創(chuàng)建新線程。
分析 asmcgocall
只分析 amd64
asm_amd64.s
TEXT ·asmcgocall(SB),NOSPLIT,$0-20 MOVQ fn+0(FP), AX MOVQ arg+8(FP), BX MOVQ SP, DX // 考慮是否需要切換到 m.g0 棧 // 也用來調用創(chuàng)建新的 OS 線程,這些線程已經在 m.g0 棧中了 get_tls(CX) MOVQ g(CX), DI CMPQ DI, $0 JEQ nosave MOVQ g_m(DI), R8 MOVQ m_gsignal(R8), SI CMPQ DI, SI JEQ nosave MOVQ m_g0(R8), SI CMPQ DI, SI JEQ nosave // 切換到系統(tǒng)棧 CALL gosave_systemstack_switch<>(SB) MOVQ SI, g(CX) MOVQ (g_sched+gobuf_sp)(SI), SP // 于調度棧中(pthread 新創(chuàng)建的棧) // 確保有足夠的空間給四個 stack-based fast-call 寄存器 // 為使得 windows amd64 調用服務 SUBQ $64, SP ANDQ $~15, SP // 為 gcc ABI 對齊 MOVQ DI, 48(SP) // 保存 g MOVQ (g_stack+stack_hi)(DI), DI SUBQ DX, DI MOVQ DI, 40(SP) // 保存棧深 (不能僅保存 SP,因為??赡茉诨卣{時被復制) MOVQ BX, DI // DI = AMD64 ABI 第一個參數 MOVQ BX, CX // CX = Win64 第一個參數 CALL AX // 調用 fn // 恢復寄存器、 g、棧指針 get_tls(CX) MOVQ 48(SP), DI MOVQ (g_stack+stack_hi)(DI), SI SUBQ 40(SP), SI MOVQ DI, g(CX) MOVQ SI, SP MOVL AX, ret+16(FP) RET nosave: // 在系統(tǒng)棧上運行,可能沒有 g // 沒有 g 的情況發(fā)生在線程創(chuàng)建中或線程結束中(比如 Solaris 平臺上的 needm/dropm) // 這段代碼和上面類似,但沒有保存和恢復 g,且沒有考慮棧的移動問題(因為我們在系統(tǒng)棧上,而非 goroutine 棧) // 如果已經在系統(tǒng)棧上,則上面的代碼可被直接使用,在 Solaris 上會進入下面這段代碼。 // 使用這段代碼來為所有 "已經在系統(tǒng)棧" 的調用進行服務,從而保持正確性。 SUBQ $64, SP ANDQ $~15, SP // ABI 對齊 MOVQ $0, 48(SP) // 上面的代碼保存了 g, 確保 debug 時可用 MOVQ DX, 40(SP) // 保存原始的棧指針 MOVQ BX, DI // DI = AMD64 ABI 第一個參數 MOVQ BX, CX // CX = Win64 第一個參數 CALL AX MOVQ 40(SP), SI // 恢復原來的棧指針 MOVQ SI, SP MOVL AX, ret+16(FP) RET
這段就是將當前棧移到系統(tǒng)棧去執(zhí)行,因為 C 需要無窮大的棧,在 Go 的棧上執(zhí)行 C 函數會導致棧溢出。
產生問題
cgo 調用會將當前棧移到系統(tǒng)棧,并且當 cgo 高并發(fā)調用且阻塞超過 20 微秒時會新建線程。而 Go 并不會銷毀線程,由此造成線程增長。
解決方案
限制 Go 程序最大線程數,默認為 cpu 核數。
runtime.GOMAXPROCS(runtime.NumCPU())
使用 channel 限制 cgo 最大并發(fā)數為 cpu 核數
package thread import "runtime" var c chan struct{} func Lock() { c <- struct{}{} } func Unlock() { <-c } func init() { c = make(chan struct{}, runtime.NumCPU()) }
針對超過 20 微秒的 cgo 調用進行限制:
thread.Lock() wrapper.TaosFreeResult(result) thread.Unlock()
以上就是源碼分析Go語言使用cgo導致線程增長的原因的詳細內容,更多關于Go語言cgo線程增長的資料請關注腳本之家其它相關文章!
相關文章
Go語言使用Request,Response處理web頁面請求
這篇文章主要介紹了Go語言使用Request,Response處理web頁面請求,需要的朋友可以參考下2022-04-04