Golang網(wǎng)絡模型netpoll源碼解析(具體流程)
0、引言
在學習完了Socket編程的基礎知識、Linux系統(tǒng)提供的I/O多路復用的實現(xiàn)以及Golang的GMP調度模型之后,我們進而學習Golang的網(wǎng)絡模型——netpoll。本文將從為什么需要使用netpoll模型,以及netpoll的具體流程實現(xiàn)兩個主要角度來展開學習。當前使用的Go的版本為1.22.4,Linux系統(tǒng)。
1、為什么要使用netpoll模型?
首先,什么是多路復用?
多路,指的是存在著多個需要服務的對象;復用,指的是重復利用一個單元來為上述的多個目標提供服務。
我們知道,Linux系統(tǒng)為用戶提供了三個內核實現(xiàn)的IO多路復用技術的系統(tǒng)調用,用發(fā)展時間來排序分別為:select->poll->epoll
。其中,epoll
在當今使用的最為廣泛,對比與select
調用,它有以下的優(yōu)勢:
fd
數(shù)量靈活:可監(jiān)聽的fd
數(shù)量上限靈活,使用方可以在調用epoll_create
操作時自行指定。- 更少的內核拷貝次數(shù):在內核中,使用紅黑樹的結構來存儲需要監(jiān)聽的
fd
,相比與調用select
每次需要將所有的fd
拷貝進內核,監(jiān)聽到事件后再全部拷貝回用戶態(tài),epoll
只需要將需要監(jiān)聽的fd
添加到事件表后,即可多次監(jiān)聽。 - 返回結果明確:
epoll
運行將就緒事件添加到就緒事件列表中,當用戶調用epoll_wait
操作時,內核只返回就緒事件,而select
返回的是所有的事件,需要用戶再進行一次遍歷,找到就緒事件再處理。
需要注意的是,在不同的條件環(huán)境下,epoll的優(yōu)勢可能反而作用不明顯。epoll只適用在監(jiān)聽fd基數(shù)較大且活躍度不高的場景,如此epoll事件表的空間復用和epoll_wait操作的精準才能體現(xiàn)出其優(yōu)勢;而當處在fd基數(shù)較小且活躍度高的場景下,select反而更加簡單有效,構造epoll的紅黑樹結構的消耗會成為其累贅。
考慮到場景的多樣性,我們會選擇使用epoll
去完成內核事件監(jiān)聽的操作,那么如何將golang
和epoll
結合起來呢?
在 Go 語言的并發(fā)模型中,GMP 框架實現(xiàn)了一種高效的協(xié)程調度機制,它屏蔽了操作系統(tǒng)線程的細節(jié),用戶可以通過輕量級的 Goroutine 來實現(xiàn)細粒度的并發(fā)操作。然而,底層的 IO 多路復用機制(如 Linux 的 epoll)調度的單位仍然是線程(M)。為了將 IO 調度從線程層面提升到協(xié)程層面,充分發(fā)揮 Goroutine 的高并發(fā)優(yōu)勢,netpoll 應運而生。
接下來我們就來學習netpoll
框架的實現(xiàn)。
2、netpoll實現(xiàn)原理
2.1、核心結構
1、pollDesc
為了將IO調度從線程提升到協(xié)程層面,netpoll
框架有個重要的核心結構pollDesc
,它有兩個,一個為表層,含有指針指向了里層的pollDesc
。本文中講到的pollDesc
都為里層pollDesc
。
表層pollDesc
定位在internel/poll/fd_poll_runtime.go
文件中:
type pollDesc struct { runtimeCtx uintptr }
使用一個runtimeCtx
指針指向其底層實現(xiàn)實例。
里層的位于runtime/netpoll.go
中。
//網(wǎng)絡poller描述符 type pollDesc struct { //next指針,指向在pollCache鏈表結構中,以下個pollDesc實例。 link *pollDesc //指向fd fd uintptr //讀事件狀態(tài)標識器,狀態(tài)有四種: //1、pdReady:表示讀操作已就緒,等待處理 //2、pdWait:表示g將要被阻塞等待讀操作就緒,此時還未阻塞 //3、g:讀操作的g已經(jīng)被阻塞,rg指向阻塞的g實例 //4、pdNil:空 rg atomic.Uintptr wg atomic.Uintptr //... }
pollDesc
的核心字段是讀/寫標識器rg/wg
,它用于標識fd的io事件狀態(tài),并且持有被阻塞的g實例。當后續(xù)需要喚醒這個g處理讀寫事件的時候,可以通過pollDesc
追溯得到g的實例進行操作。有了pollDesc
這個數(shù)據(jù)結構,Golang就能將對處理socket的調度單位從線程Thread
轉換成協(xié)程G
。
2、pollCache
pollCache
緩沖池采用了單向鏈表的方式存儲多個pollDesc
實例。
type pollCache struct { lock mutex first *pollDesc }
其包含了兩個核心方法,分別是alloc()
和free()
//從pollCache中分配得到一個pollDesc實例 func (c *pollCache) alloc() *pollDesc { lock(&c.lock) //如果鏈表為空,則進行初始化 if c.first == nil { //pdSize = 248 const pdSize = unsafe.Sizeof(pollDesc{}) //4096 / 248 = 16 n := pollBlockSize / pdSize if n == 0 { n = 1 } //分配指定大小的內存空間 mem := persistentalloc(n*pdSize, 0, &memstats.other_sys) //完成指定數(shù)量的pollDesc創(chuàng)建 for i := uintptr(0); i < n; i++ { pd := (*pollDesc)(add(mem, i*pdSize)) pd.link = c.first c.first = pd } } pd := c.first c.first = pd.link lockInit(&pd.lock, lockRankPollDesc) unlock(&c.lock) return pd }
//free用于將一個pollDesc放回pollCache func (c *pollCache) free(pd *pollDesc) { //... lock(&c.lock) pd.link = c.first c.first = pd unlock(&c.lock) }
2.2、netpoll框架宏觀流程
在宏觀的角度下,netpoll框架主要涉及了以下的幾個流程:
poll_init
:底層調用epoll_create
指令,在內核態(tài)中開辟epoll事件表。poll_open
:先構造一個pollDesc實例,然后通過epoll_ctl(ADD)
指令,向內核中添加要監(jiān)聽的socket,并將這一個fd綁定在pollDesc中。pollDesc含有狀態(tài)標識器rg/wg
,用于標識事件狀態(tài)以及存儲阻塞的g。poll_wait
:當g依賴的事件未就緒時,調用gopark
方法,將g置為阻塞態(tài)存放在pollDesc中。net_poll
:GMP調度器會輪詢netpoll流程,通常會用非阻塞的方式發(fā)起epoll_wait
指令,取出就緒的pollDesc,提前出其內部陷入阻塞態(tài)的g然后將其重新添加到GMP的調度隊列中。(以及在sysmon流程和gc流程都會觸發(fā)netpoll)
3、流程源碼實現(xiàn)
3.1、流程入口
我們參考以下的簡易TCP服務器實現(xiàn)框架,走進netpoll框架的具體源碼實現(xiàn)。
// 啟動 tcp server 代碼示例 func main() { //創(chuàng)建TCP端口監(jiān)聽器,涉及以下事件: //1:創(chuàng)建socket fd,調用bind和accept系統(tǒng)接口函數(shù) //2:調用epoll_create,創(chuàng)建eventpool //3:調用epoll_ctl(ADD),將socket fd注冊到epoll事件表 l, _ := net.Listen("tcp", ":8080") // eventloop reactor 模型 for { //等待TCP連接到達,涉及以下事件: //1:循環(huán)+非阻塞調用accept //2:若未就緒,則調用gopark進行阻塞 //3:等待netpoller輪詢喚醒 //4:獲取到conn fd后注冊到eventpool //5:返回conn conn, _ := l.Accept() // goroutine per conn go serve(conn) } } // 處理一筆到來的 tcp 連接 func serve(conn net.Conn) { //關閉conn,從eventpool中移除fd defer conn.Close() var buf []byte //讀取conn中的數(shù)據(jù),涉及以下事件: //1:循環(huán)+非阻塞調用recv(read) //2:若未就緒,通過gopark阻塞,等待netpoll輪詢喚醒 _, _ = conn.Read(buf) //向conn中寫入數(shù)據(jù),涉及以下事件: //1:循環(huán)+非阻塞調用writev (write) //2:若未就緒,通過gopark阻塞,等待netpoll輪詢喚醒 _, _ = conn.Write(buf) }
3.2、Socket創(chuàng)建
以net.Listen
方法為入口,進行創(chuàng)建socket fd
,調用的方法棧如下:
方法 | 文件 |
---|---|
net.Listen() | net/dial.go |
net.ListenConfig.Listen() | net/dial.go |
net.sysListener.listenTCP() | net/tcpsock_posix.go |
net.internetSocket() | net/ipsock_posix.go |
net.socket() | net/sock_posix.go |
核心的調用在net.socket()
方法內,源碼核心流程如下:
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) (fd *netFD, err error) { //進行socket系統(tǒng)調用,創(chuàng)建一個socket s, err := sysSocket(family, sotype, proto) //綁定socket fd fd, err = newFD(s, family, sotype, net); //... //進行了以下事件: //1、通過syscall bind指令綁定socket的監(jiān)聽地址 //2、通過syscall listen指令發(fā)起對socket的監(jiān)聽 //3、完成epollEvent表的創(chuàng)建(全局執(zhí)行一次) //4、將socket fd注冊到epoll事件表中,監(jiān)聽讀寫就緒事件 err := fd.listenStream(ctx, laddr, listenerBacklog(), ctrlCtxFn); }
首先先執(zhí)行了sysSocket
系統(tǒng)調用,創(chuàng)建一個socket
,它是一個整數(shù)值,用于標識操作系統(tǒng)中打開的文件或網(wǎng)絡套接字;接著調用newFD
方法包裝成netFD
對象,以便實現(xiàn)更高效的異步 IO 和 Goroutine 調度。
3.3、poll_init
緊接3.2中的net.socket
方法,在內部還調用了net.netFD.listenStream()
,poll_init
的調用棧如下:
方法 | 文件 |
---|---|
net.netFD.listenStream() | net/sock_posix.go |
net.netFD.init() | net/fd_unix.go |
poll.FD.init() | internal/poll/fd_unix.go |
poll.pollDesc.init() | internal/poll/fd_poll_runtime.go |
runtime.poll_runtime_pollServerInit() | runtime/netpoll.go |
runtime.netpollinit() | runtime/netpoll_epoll.go |
net.netFD.listenStream()
核心步驟如下:
func (fd *netFD) listenStream(ctx context.Context, laddr sockaddr, backlog int, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) error { //.... //通過Bind系統(tǒng)調用綁定監(jiān)聽地址 if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil { return os.NewSyscallError("bind", err) } //通過Listen系統(tǒng)調用對socket進行監(jiān)聽 if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil { return os.NewSyscallError("listen", err) } //fd.init()進行了以下操作: //1、完成eventPool的創(chuàng)建 //2、將socket fd注冊到epoll事件表中 if err = fd.init(); err != nil { return err } //... return nil }
- 使用
Bind
系統(tǒng)調用綁定需要監(jiān)聽的地址 - 使用
Listen
系統(tǒng)調用監(jiān)聽socket - 調用
fd.init
完成eventpool
的創(chuàng)建以及fd的注冊
net.netFD.init()
方法在內部轉而調用poll.FD.init()
func (fd *netFD) init() error { return fd.pfd.Init(fd.net, true) } func (fd *FD) Init(net string, pollable bool) error { fd.SysFile.init() // We don't actually care about the various network types. if net == "file" { fd.isFile = true } if !pollable { fd.isBlocking = 1 return nil } err := fd.pd.init(fd) if err != nil { // If we could not initialize the runtime poller, // assume we are using blocking mode. fd.isBlocking = 1 } return err }
然后又轉入到poll.pollDesc.init()
的調用中。
func (pd *pollDesc) init(fd *FD) error { //通過sysOnce結構,完成epoll事件表的唯一一次創(chuàng)建 serverInit.Do(runtime_pollServerInit) //完成init后,進行poll_open ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd)) //... //綁定里層的pollDesc實例 pd.runtimeCtx = ctx return nil }
這里的poll.pollDesc
是表層pollDesc
,表層pd的init是poll_init
和poll_open
流程的入口:
- 執(zhí)行
serverInit.Do(runtime_pollServerInit)
,其中serverInit
是名為sysOnce
的特殊結構,它會保證執(zhí)行的方法在全局只會被執(zhí)行一次,然后執(zhí)行runtime_pollServerInit
,完成poll_init
操作 - 完成
poll_init
后,調用runtime_pollOpen(uintptr(fd.Sysfd))
將fd加入到eventpool
中,完成poll_open
操作 - 綁定里層的
pollDesc
實例
我們先來關注serverInit.Do(runtime_pollServerInit)
中,執(zhí)行的runtime_pollServerInit
方法,它定位在runtime/netpoll.go
下:
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit func poll_runtime_pollServerInit() { netpollGenericInit() } func netpollGenericInit() { if netpollInited.Load() == 0 { lockInit(&netpollInitLock, lockRankNetpollInit) lock(&netpollInitLock) if netpollInited.Load() == 0 { //進入netpollinit調用 netpollinit() netpollInited.Store(1) } unlock(&netpollInitLock) } }
func netpollinit() { var errno uintptr //進行epollcreate系統(tǒng)調用,創(chuàng)建epoll事件表 epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC) //... //創(chuàng)建pipe管道,接收信號,如程序終止: //r:信號接收端,會注冊對應的read事件到epoll事件表中 //w:信號發(fā)送端,有信號到達的時候,會往w發(fā)送信號,并對r產(chǎn)生讀就緒事件 r, w, errpipe := nonblockingPipe() //... //在epollEvent中注冊監(jiān)聽r的讀就緒事件 ev := syscall.EpollEvent{ Events: syscall.EPOLLIN, } *(**uintptr)(unsafe.Pointer(&ev.Data)) = &netpollBreakRd errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, r, &ev) //... //使用全局變量緩存pipe的讀寫端 netpollBreakRd = uintptr(r) netpollBreakWr = uintptr(w) }
在netpollinit()
方法內部,進行了以下操作:
執(zhí)行
epoll_create
指令創(chuàng)建了epoll事件表,并返回epoll文件描述符epfd
。創(chuàng)建了兩個pipe管道,當向w端寫入信號的時候,r端會發(fā)生讀就緒事件。
注冊監(jiān)聽r的讀就緒事件。
緩存管道。
在這里,我們創(chuàng)建了兩個管道r
以及w
,并且在eventpool
中注冊了r的讀就緒事件的監(jiān)聽,當我們向w管道寫入數(shù)據(jù)的時候,r管道就會產(chǎn)生讀就緒事件,從而打破阻塞的epoll_wait操作,進而執(zhí)行其他的操作。
3.3、poll_open
方法 | 文件 |
---|---|
net.netFD.listenStream() | net/sock_posix.go |
net.netFD.init() | net/fd_unix.go |
poll.FD.init() | internal/poll/fd_unix.go |
poll.pollDesc.init() | internal/poll/fd_poll_runtime.go |
runtime.poll_runtime_pollOpen() | runtime/netpoll.go |
runtime.netpollopen | runtime/netpoll_epoll.go |
在poll.pollDesc.init()
方法中,完成了poll_init
流程后,就會進入到poll_open
流程,執(zhí)行runtime.poll_runtime_pollOpen()
。
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) { //獲取一個pollDesc實例 pd := pollcache.alloc() lock(&pd.lock) wg := pd.wg.Load() if wg != pdNil && wg != pdReady { throw("runtime: blocked write on free polldesc") } rg := pd.rg.Load() if rg != pdNil && rg != pdReady { throw("runtime: blocked read on free polldesc") } //綁定socket fd到pollDesc中 pd.fd = fd //... //初始化讀寫狀態(tài)標識器為無狀態(tài) pd.rg.Store(pdNil) pd.wg.Store(pdNil) //... unlock(&pd.lock) //將fd添加進epoll事件表中 errno := netpollopen(fd, pd) //... //返回pollDesc實例 return pd, 0 }
func netpollopen(fd uintptr, pd *pollDesc) uintptr { var ev syscall.EpollEvent //通過epollctl操作,在EpollEvent中注冊針對fd的監(jiān)聽事件 //操作類型宏指令:EPOLL_CTL_ADD——添加fd并注冊監(jiān)聽事件 //事件類型:epollevent.events: //1、EPOLLIN:監(jiān)聽讀就緒事件 //2、EPOLLOUT:監(jiān)聽寫就緒事件 //3、EPOLLRDHUP:監(jiān)聽中斷事件 //4、EPOLLET:使用邊緣觸發(fā)模式 ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load()) *(*taggedPointer)(unsafe.Pointer(&ev.Data)) = tp return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev) }
不僅在net.Listen()
流程中會觸發(fā)poll open
,在net.Listener.Accept
流程中也會,當我們獲取到了連接之后,也需要為這個連接封裝成一個pollDesc
實例,然后執(zhí)行poll_open
流程將其注冊到epoll事件表中。
func (fd *netFD) accept()(netfd *netFD, err error){ // 通過 syscall accept 接收到來的 conn fd d, rsa, errcall, err := fd.pfd.Accept() // ... // 封裝到來的 conn fd netfd, err = newFD(d, fd.family, fd.sotype, fd.net) // 將 conn fd 注冊到 epoll 事件表中 err = netfd.init() // ... return netfd,nil }
3.4、poll_close
當連接conn需要關閉的時候,最終會進入到poll_close
流程,執(zhí)行epoll_ctl(DELETE)
刪除對應的fd。
方法 | 文件 |
---|---|
net.conn.Close | net/net.go |
net.netFD.Close | net/fd_posix.go |
poll.FD.Close | internal/poll/fd_unix.go |
poll.FD.decref | internal/poll/fd_mutex.go |
poll.FD.destroy | internal/poll/fd_unix.go |
poll.pollDesc.close | internal/poll/fd_poll_runtime.go |
poll.runtime_pollClose | internal/poll/fd_poll_runtime.go |
runtime.poll_runtime_pollClose | runtime/netpoll.go |
runtime.netpollclose | runtime/netpoll_epoll.go |
syscall.EpollCtl | runtime/netpoll_epoll.go |
//go:linkname poll_runtime_pollClose internal/poll.runtime_pollClose func poll_runtime_pollClose(pd *pollDesc) { if !pd.closing { throw("runtime: close polldesc w/o unblock") } wg := pd.wg.Load() if wg != pdNil && wg != pdReady { throw("runtime: blocked write on closing polldesc") } rg := pd.rg.Load() if rg != pdNil && rg != pdReady { throw("runtime: blocked read on closing polldesc") } netpollclose(pd.fd) pollcache.free(pd) }
func netpollclose(fd uintptr) uintptr { var ev syscall.EpollEvent return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_DEL, int32(fd), &ev) }
3.5、poll_wait
poll_wait
流程最終會執(zhí)行gopark
將g陷入到用戶態(tài)阻塞。
方法 | 文件 |
---|---|
poll.pollDesc.wait | internal/poll/fd_poll_runtime.go |
poll.runtime_pollWait | internal/poll/fd_poll_runtime.go |
runtime.poll_runtime_pollWait | runtime/netpoll.go |
runtime.netpollblock | runtime/netpoll.go |
runtime.gopark | runtime/proc.go |
runtime.netpollblockcommit | runtime/netpoll.go |
在表層pollDesc
中,會通過其內部的里層pollDesc
指針,調用到runtime
下的netpollblock
方法。
/* 針對某個 pollDesc 實例,監(jiān)聽指定的mode 就緒事件 - 返回true——已就緒 返回false——因超時或者關閉導致中斷 - 其他情況下,會通過 gopark 操作將當前g 阻塞在該方法中 */ func netpollblock(pd *pollDesc, mode int32, waitio bool) bool { //針對mode事件,獲取相應的狀態(tài) gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } for { //關心的io事件就緒,直接返回 if gpp.CompareAndSwap(pdReady, pdNil) { return true } //關心的io事件未就緒,則置為等待狀態(tài),G將要被阻塞 if gpp.CompareAndSwap(pdNil, pdWait) { break } //... } //... //將G置為阻塞態(tài) gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceBlockNet, 5) //當前g從阻塞態(tài)被喚醒,重置標識器 old := gpp.Swap(pdNil) if old > pdWait { throw("runtime: corrupted polldesc") } //判斷是否是因為所關心的事件觸發(fā)而喚醒 return old == pdReady }
在gopark方法中,會閉包調用netpollblockcommit
方法,其中會根據(jù)g關心的事件類型,將其實例存儲到pollDesc的rg或wg容器
中。
// 將 gpp 狀態(tài)標識器的值由 pdWait 修改為當前 g func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool { r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp))) if r { //增加等待輪詢器的例程計數(shù)。 //調度器使用它來決定是否阻塞 //如果沒有其他事情可做,則等待輪詢器。 netpollAdjustWaiters(1) } return r }
接著我們來關注何時會觸發(fā)poll_wait
流程。
首先是在listener.Accept
流程中,如果當前尚未有連接到達,則執(zhí)行poll wait
將當前g阻塞掛載在該socket fd對應pollDesc的rg
中。
// Accept wraps the accept network call. func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) { //... for { //以非阻塞模式發(fā)起一次accept,嘗試接收conn s, rsa, errcall, err := accept(fd.Sysfd) if err == nil { return s, rsa, "", err } switch err { //忽略中斷類錯誤 case syscall.EINTR: continue //尚未有到達的conn case syscall.EAGAIN: //進入poll_wait流程,監(jiān)聽fd的讀就緒事件,當有conn到達表現(xiàn)為fd可讀。 if fd.pd.pollable() { //假如讀操作未就緒,當前g會被阻塞在方法內部,直到因為超時或者就緒被netpoll ready喚醒。 if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } //... } }
// 指定 mode 為 r 標識等待的是讀就緒事件,然后走入更底層的 poll_wait 流程 func (pd *pollDesc) waitRead(isFile bool) error { return pd.wait('r', isFile) }
其次分別是在conn.Read
/conn.Write
流程中,假若conn fd下讀操作未就緒(無數(shù)據(jù)到達)/寫操作未就緒(緩沖區(qū)空間不足),則會執(zhí)行poll wait將g阻塞并掛載在對應的pollDesc中的rg/wg
中。
func (fd *FD) Read(p []byte) (int, error) { //... for { //非阻塞模式進行一次read調用 n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p) if err != nil { n = 0 //進入poll_wait流程,并標識關心讀就緒事件 if err == syscall.EAGAIN && fd.pd.pollable() { if err = fd.pd.waitRead(fd.isFile); err == nil { continue } } } err = fd.eofError(n, err) return n, err } }
func (fd *FD)Write(p []byte)(int,error){ // ... for{ // ... // 以非阻塞模式執(zhí)行一次syscall write操作 n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max]) if n >0{ nn += n } // 緩沖區(qū)內容都已寫完,直接退出 if nn ==len(p){ return nn, err } // 走入 poll_wait 流程,并標識關心的是該 fd 的寫就緒事件 if err == syscall.EAGAIN && fd.pd.pollable(){ // 倘若寫操作未就緒,當前g 會 park 阻塞在該方法內部,直到因超時或者事件就緒而被 netpoll ready 喚醒 if err = fd.pd.waitWrite(fd.isFile); err ==nil{ continue } } // ... }
3.6、net_poll
netpoll
流程至關重要,它會在底層調用系統(tǒng)的epoll_wait
操作,找到觸發(fā)事件的fd,然后再逆向找到綁定fd的pollDesc
實例,返回內部阻塞的g叫給上游處理喚醒。其調用棧如下:
方法 | 文件 |
---|---|
runtime.netpoll | runtime/netpoll_epoll.go |
runtime.netpollready | runtime/netpoll.go |
runtime.netpollunblock | runtime/netpoll.go |
netpoll
具體的源碼如下:
//netpoll用于輪詢檢查是否有就緒的io事件 //若發(fā)現(xiàn)了就緒的io事件,檢查是否有pollDesc中的g關心其事件 //若找到了關心其io事件就緒的g,添加到list返回給上游處理 func netpoll(delay int64) (gList, int32) { if epfd == -1 { return gList{}, 0 } var waitms int32 //根據(jù)傳入的delay參數(shù),決定調用epoll_wait的模式: //delay < 0:設為阻塞模式(在 gmp 調度流程中,如果某個 p 遲遲獲取不到可執(zhí)行的 g 時,會通過該模式,使得 thread 陷入阻塞態(tài),但該情況全局最多僅有一例) //delay = 0:設為非阻塞模式(通常情況下為此模式,包括 gmp 常規(guī)調度流程、gc 以及全局監(jiān)控線程 sysmon 都是以此模式觸發(fā)的 netpoll 流程) //delay > 0:設為超時模式(在 gmp 調度流程中,如果某個 p 遲遲獲取不到可執(zhí)行的 g 時,并且通過 timer 啟動了定時任務時,會令 thread 以超時模式執(zhí)行 epoll_wait 操作) if delay < 0 { waitms = -1 } else if delay == 0 { waitms = 0 } else if delay < 1e6 { waitms = 1 } else if delay < 1e15 { waitms = int32(delay / 1e6) } else { waitms = 1e9 } //最多接收128個io就緒事件 var events [128]syscall.EpollEvent retry: //以指定模式調用epoll_wait n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms) //... //存儲關心io事件就緒的G實例 var toRun gList delta := int32(0) //遍歷返回的就緒事件 for i := int32(0); i < n; i++ { ev := events[i] if ev.Events == 0 { continue } //pipe接收端的信號處理,檢查是否需要退出netpoll if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollBreakRd { if ev.Events != syscall.EPOLLIN { println("runtime: netpoll: break fd ready for", ev.Events) throw("runtime: netpoll: break fd ready for something unexpected") } //... continue } var mode int32 //記錄io就緒事件的類型 if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 { mode += 'r' } if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 { mode += 'w' } // 根據(jù) epollevent.data 獲取到監(jiān)聽了該事件的 pollDesc 實例 if mode != 0 { tp := *(*taggedPointer)(unsafe.Pointer(&ev.Data)) pd := (*pollDesc)(tp.pointer()) //... //檢查是否為G所關心的事件 delta += netpollready(&toRun, pd, mode) } } return toRun, delta }
func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 { delta := int32(0) var rg, wg *g if mode == 'r' || mode == 'r'+'w' { //就緒事件包含讀就緒,嘗試喚醒pd內部的rg rg = netpollunblock(pd, 'r', true, &delta) } if mode == 'w' || mode == 'r'+'w' { //就緒事件包含讀就緒,嘗試喚醒pd內部的wg wg = netpollunblock(pd, 'w', true, &delta) } //存在G實例,則加入list中 if rg != nil { toRun.push(rg) } if wg != nil { toRun.push(wg) } return delta }
func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g { //獲取存儲的g實例 gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } for { old := gpp.Load() //... new := pdNil if ioready { new = pdReady } //將gpp的值從g置換成pdReady if gpp.CompareAndSwap(old, new) { if old == pdWait { old = pdNil } else if old != pdNil { *delta -= 1 } //返回需要喚醒的g實例 return (*g)(unsafe.Pointer(old)) } } }
那么,我們也同樣需要關注在哪個環(huán)節(jié)進入了net_poll
流程。
首先,是在GMP調度器中的findRunnable
方法中被調用,用于找到可執(zhí)行的G實例。具體的實現(xiàn)在之前的GMP調度文章中有講解,這里只關心涉及到net_poll
方面的源碼。
findRunnable
方法定位在runtime/proc.go
中
func findRunnable()(gp *g, inheritTime, tryWakeP bool){ // .. /* 同時滿足下述三個條件,發(fā)起一次【非阻塞模式】的 netpoll 流程: - epoll事件表初始化過 - 有 g 在等待io 就緒事件 - 沒有空閑 p 在以【阻塞或超時】模式發(fā)起 netpoll 流程 */ if netpollinited()&& atomic.Load(&netpollWaiters)>0&& atomic.Load64(&sched.lastpoll)!=0{ // 以非阻塞模式發(fā)起一輪 netpoll,如果有 g 需要喚醒,一一喚醒之,并返回首個 g 給上層進行調度 if list := netpoll(0);!list.empty(){// non-blocking // 獲取就緒 g 隊列中的首個 g gp := list.pop() // 將就緒 g 隊列中其余 g 一一置為就緒態(tài),并添加到全局隊列 injectglist(&list) // 把首個g 也置為就緒態(tài) casgstatus(gp,_Gwaiting,_Grunnable) // ... //返回 g 給當前 p進行調度 return gp,false,false } } // ... /* 同時滿足下述三個條件,發(fā)起一次【阻塞或超時模式】的 netpoll 流程: - epoll事件表初始化過 - 有 g 在等待io 就緒事件 - 沒有空閑 p 在以【阻塞或超時】模式發(fā)起 netpoll 流程 */ if netpollinited()&&(atomic.Load(&netpollWaiters)>0|| pollUntil !=0)&& atomic.Xchg64(&sched.lastpoll,0)!=0{ // 默認為阻塞模式 delay :=int64(-1) // 存在定時時間,則設為超時模式 if pollUntil !=0{ delay = pollUntil - now // ... } // 以【阻塞或超時模式】發(fā)起一輪 netpoll list := netpoll(delay)// block until new work is available } // ... }
其次,是位于同文件下的sysmon
方法中,它會被一個全局監(jiān)控者G執(zhí)行,每隔10ms發(fā)一次非阻塞的net_poll流程。
// The main goroutine. func main(){ // ... // 新建一個 m,直接運行 sysmon 函數(shù) systemstack(func(){ newm(sysmon,nil,-1) }) // ... } // 全局唯一監(jiān)控線程的執(zhí)行函數(shù) func sysmon(){ // ... for{ // ... /* 同時滿足下述三個條件,發(fā)起一次【非阻塞模式】的 netpoll 流程: - epoll事件表初始化過 - 沒有空閑 p 在以【阻塞或超時】模式發(fā)起 netpoll 流程 - 距離上一次發(fā)起 netpoll 流程的時間間隔已超過 10 ms */ lastpoll :=int64(atomic.Load64(&sched.lastpoll)) if netpollinited()&& lastpoll !=0&& lastpoll+10*1000*1000< now { // 以非阻塞模式發(fā)起 netpoll list := netpoll(0)// non-blocking - returns list of goroutines // 獲取到的 g 置為就緒態(tài)并添加到全局隊列中 if!list.empty(){ // ... injectglist(&list) // ... } } // ... } }
最后,還會發(fā)生在GC流程中。
func pollWork() bool{ // ... // 若全局隊列或 p 的本地隊列非空,則提前返回 /* 同時滿足下述三個條件,發(fā)起一次【非阻塞模式】的 netpoll 流程: - epoll事件表初始化過 - 有 g 在等待io 就緒事件 - 沒有空閑 p 在以【阻塞或超時】模式發(fā)起 netpoll 流程 */ if netpollinited()&& atomic.Load(&netpollWaiters)>0&& sched.lastpoll !=0{ // 所有取得 g 更新為就緒態(tài)并添加到全局隊列 if list := netpoll(0);!list.empty(){ injectglist(&list) return true } } // ... }
4、參考博文
感謝觀看,本篇博文參考了小徐先生的文章,非常推薦大家去觀看并且進入到源碼中學習,鏈接如下:
到此這篇關于Golang網(wǎng)絡模型netpoll源碼解析的文章就介紹到這了,更多相關Golang網(wǎng)絡模型netpoll內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
手把手教你如何在Goland中創(chuàng)建和運行項目
歡迎來到本指南!我們將手把手地教您在Goland中如何創(chuàng)建、配置并運行項目,通過簡單的步驟,您將迅速上手這款強大的集成開發(fā)環(huán)境(IDE),輕松實現(xiàn)您的編程夢想,讓我們一起開啟這段精彩的旅程吧!2024-02-02golang通過反射手動實現(xiàn)json序列化的方法
在 Go 語言中,JSON 序列化和反序列化通常通過標準庫 encoding/json 來實現(xiàn),本文給大家介紹golang 通過反射手動實現(xiàn)json序列化的方法,感興趣的朋友一起看看吧2024-12-12Golang語言JSON解碼函數(shù)Unmarshal的使用
本文主要介紹了Golang語言JSON解碼函數(shù)Unmarshal的使用,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-01-01