Golang網(wǎng)絡(luò)模型netpoll源碼解析(具體流程)
0、引言
在學(xué)習(xí)完了Socket編程的基礎(chǔ)知識(shí)、Linux系統(tǒng)提供的I/O多路復(fù)用的實(shí)現(xiàn)以及Golang的GMP調(diào)度模型之后,我們進(jìn)而學(xué)習(xí)Golang的網(wǎng)絡(luò)模型——netpoll。本文將從為什么需要使用netpoll模型,以及netpoll的具體流程實(shí)現(xiàn)兩個(gè)主要角度來(lái)展開(kāi)學(xué)習(xí)。當(dāng)前使用的Go的版本為1.22.4,Linux系統(tǒng)。
1、為什么要使用netpoll模型?
首先,什么是多路復(fù)用?
多路,指的是存在著多個(gè)需要服務(wù)的對(duì)象;復(fù)用,指的是重復(fù)利用一個(gè)單元來(lái)為上述的多個(gè)目標(biāo)提供服務(wù)。
我們知道,Linux系統(tǒng)為用戶(hù)提供了三個(gè)內(nèi)核實(shí)現(xiàn)的IO多路復(fù)用技術(shù)的系統(tǒng)調(diào)用,用發(fā)展時(shí)間來(lái)排序分別為:select->poll->epoll。其中,epoll在當(dāng)今使用的最為廣泛,對(duì)比與select調(diào)用,它有以下的優(yōu)勢(shì):
fd數(shù)量靈活:可監(jiān)聽(tīng)的fd數(shù)量上限靈活,使用方可以在調(diào)用epoll_create操作時(shí)自行指定。- 更少的內(nèi)核拷貝次數(shù):在內(nèi)核中,使用紅黑樹(shù)的結(jié)構(gòu)來(lái)存儲(chǔ)需要監(jiān)聽(tīng)的
fd,相比與調(diào)用select每次需要將所有的fd拷貝進(jìn)內(nèi)核,監(jiān)聽(tīng)到事件后再全部拷貝回用戶(hù)態(tài),epoll只需要將需要監(jiān)聽(tīng)的fd添加到事件表后,即可多次監(jiān)聽(tīng)。 - 返回結(jié)果明確:
epoll運(yùn)行將就緒事件添加到就緒事件列表中,當(dāng)用戶(hù)調(diào)用epoll_wait操作時(shí),內(nèi)核只返回就緒事件,而select返回的是所有的事件,需要用戶(hù)再進(jìn)行一次遍歷,找到就緒事件再處理。
需要注意的是,在不同的條件環(huán)境下,epoll的優(yōu)勢(shì)可能反而作用不明顯。epoll只適用在監(jiān)聽(tīng)fd基數(shù)較大且活躍度不高的場(chǎng)景,如此epoll事件表的空間復(fù)用和epoll_wait操作的精準(zhǔn)才能體現(xiàn)出其優(yōu)勢(shì);而當(dāng)處在fd基數(shù)較小且活躍度高的場(chǎng)景下,select反而更加簡(jiǎn)單有效,構(gòu)造epoll的紅黑樹(shù)結(jié)構(gòu)的消耗會(huì)成為其累贅。
考慮到場(chǎng)景的多樣性,我們會(huì)選擇使用epoll去完成內(nèi)核事件監(jiān)聽(tīng)的操作,那么如何將golang和epoll結(jié)合起來(lái)呢?
在 Go 語(yǔ)言的并發(fā)模型中,GMP 框架實(shí)現(xiàn)了一種高效的協(xié)程調(diào)度機(jī)制,它屏蔽了操作系統(tǒng)線(xiàn)程的細(xì)節(jié),用戶(hù)可以通過(guò)輕量級(jí)的 Goroutine 來(lái)實(shí)現(xiàn)細(xì)粒度的并發(fā)操作。然而,底層的 IO 多路復(fù)用機(jī)制(如 Linux 的 epoll)調(diào)度的單位仍然是線(xiàn)程(M)。為了將 IO 調(diào)度從線(xiàn)程層面提升到協(xié)程層面,充分發(fā)揮 Goroutine 的高并發(fā)優(yōu)勢(shì),netpoll 應(yīng)運(yùn)而生。
接下來(lái)我們就來(lái)學(xué)習(xí)netpoll框架的實(shí)現(xiàn)。
2、netpoll實(shí)現(xiàn)原理
2.1、核心結(jié)構(gòu)
1、pollDesc
為了將IO調(diào)度從線(xiàn)程提升到協(xié)程層面,netpoll框架有個(gè)重要的核心結(jié)構(gòu)pollDesc,它有兩個(gè),一個(gè)為表層,含有指針指向了里層的pollDesc。本文中講到的pollDesc都為里層pollDesc。
表層pollDesc定位在internel/poll/fd_poll_runtime.go文件中:
type pollDesc struct {
runtimeCtx uintptr
}使用一個(gè)runtimeCtx指針指向其底層實(shí)現(xiàn)實(shí)例。
里層的位于runtime/netpoll.go中。
//網(wǎng)絡(luò)poller描述符
type pollDesc struct {
//next指針,指向在pollCache鏈表結(jié)構(gòu)中,以下個(gè)pollDesc實(shí)例。
link *pollDesc
//指向fd
fd uintptr
//讀事件狀態(tài)標(biāo)識(shí)器,狀態(tài)有四種:
//1、pdReady:表示讀操作已就緒,等待處理
//2、pdWait:表示g將要被阻塞等待讀操作就緒,此時(shí)還未阻塞
//3、g:讀操作的g已經(jīng)被阻塞,rg指向阻塞的g實(shí)例
//4、pdNil:空
rg atomic.Uintptr
wg atomic.Uintptr
//...
}pollDesc的核心字段是讀/寫(xiě)標(biāo)識(shí)器rg/wg,它用于標(biāo)識(shí)fd的io事件狀態(tài),并且持有被阻塞的g實(shí)例。當(dāng)后續(xù)需要喚醒這個(gè)g處理讀寫(xiě)事件的時(shí)候,可以通過(guò)pollDesc追溯得到g的實(shí)例進(jìn)行操作。有了pollDesc這個(gè)數(shù)據(jù)結(jié)構(gòu),Golang就能將對(duì)處理socket的調(diào)度單位從線(xiàn)程Thread轉(zhuǎn)換成協(xié)程G。
2、pollCache
pollCache緩沖池采用了單向鏈表的方式存儲(chǔ)多個(gè)pollDesc實(shí)例。
type pollCache struct {
lock mutex
first *pollDesc
}其包含了兩個(gè)核心方法,分別是alloc()和free()
//從pollCache中分配得到一個(gè)pollDesc實(shí)例
func (c *pollCache) alloc() *pollDesc {
lock(&c.lock)
//如果鏈表為空,則進(jìn)行初始化
if c.first == nil {
//pdSize = 248
const pdSize = unsafe.Sizeof(pollDesc{})
//4096 / 248 = 16
n := pollBlockSize / pdSize
if n == 0 {
n = 1
}
//分配指定大小的內(nèi)存空間
mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
//完成指定數(shù)量的pollDesc創(chuàng)建
for i := uintptr(0); i < n; i++ {
pd := (*pollDesc)(add(mem, i*pdSize))
pd.link = c.first
c.first = pd
}
}
pd := c.first
c.first = pd.link
lockInit(&pd.lock, lockRankPollDesc)
unlock(&c.lock)
return pd
}//free用于將一個(gè)pollDesc放回pollCache
func (c *pollCache) free(pd *pollDesc) {
//...
lock(&c.lock)
pd.link = c.first
c.first = pd
unlock(&c.lock)
}2.2、netpoll框架宏觀(guān)流程

在宏觀(guān)的角度下,netpoll框架主要涉及了以下的幾個(gè)流程:
poll_init:底層調(diào)用epoll_create指令,在內(nèi)核態(tài)中開(kāi)辟epoll事件表。poll_open:先構(gòu)造一個(gè)pollDesc實(shí)例,然后通過(guò)epoll_ctl(ADD)指令,向內(nèi)核中添加要監(jiān)聽(tīng)的socket,并將這一個(gè)fd綁定在pollDesc中。pollDesc含有狀態(tài)標(biāo)識(shí)器rg/wg,用于標(biāo)識(shí)事件狀態(tài)以及存儲(chǔ)阻塞的g。poll_wait:當(dāng)g依賴(lài)的事件未就緒時(shí),調(diào)用gopark方法,將g置為阻塞態(tài)存放在pollDesc中。net_poll:GMP調(diào)度器會(huì)輪詢(xún)netpoll流程,通常會(huì)用非阻塞的方式發(fā)起epoll_wait指令,取出就緒的pollDesc,提前出其內(nèi)部陷入阻塞態(tài)的g然后將其重新添加到GMP的調(diào)度隊(duì)列中。(以及在sysmon流程和gc流程都會(huì)觸發(fā)netpoll)
3、流程源碼實(shí)現(xiàn)
3.1、流程入口
我們參考以下的簡(jiǎn)易TCP服務(wù)器實(shí)現(xiàn)框架,走進(jìn)netpoll框架的具體源碼實(shí)現(xiàn)。
// 啟動(dòng) tcp server 代碼示例
func main() {
//創(chuàng)建TCP端口監(jiān)聽(tīng)器,涉及以下事件:
//1:創(chuàng)建socket fd,調(diào)用bind和accept系統(tǒng)接口函數(shù)
//2:調(diào)用epoll_create,創(chuàng)建eventpool
//3:調(diào)用epoll_ctl(ADD),將socket fd注冊(cè)到epoll事件表
l, _ := net.Listen("tcp", ":8080")
// eventloop reactor 模型
for {
//等待TCP連接到達(dá),涉及以下事件:
//1:循環(huán)+非阻塞調(diào)用accept
//2:若未就緒,則調(diào)用gopark進(jìn)行阻塞
//3:等待netpoller輪詢(xún)喚醒
//4:獲取到conn fd后注冊(cè)到eventpool
//5:返回conn
conn, _ := l.Accept()
// goroutine per conn
go serve(conn)
}
}
// 處理一筆到來(lái)的 tcp 連接
func serve(conn net.Conn) {
//關(guān)閉conn,從eventpool中移除fd
defer conn.Close()
var buf []byte
//讀取conn中的數(shù)據(jù),涉及以下事件:
//1:循環(huán)+非阻塞調(diào)用recv(read)
//2:若未就緒,通過(guò)gopark阻塞,等待netpoll輪詢(xún)喚醒
_, _ = conn.Read(buf)
//向conn中寫(xiě)入數(shù)據(jù),涉及以下事件:
//1:循環(huán)+非阻塞調(diào)用writev (write)
//2:若未就緒,通過(guò)gopark阻塞,等待netpoll輪詢(xún)喚醒
_, _ = conn.Write(buf)
}3.2、Socket創(chuàng)建
以net.Listen方法為入口,進(jìn)行創(chuàng)建socket fd,調(diào)用的方法棧如下:
| 方法 | 文件 |
|---|---|
| net.Listen() | net/dial.go |
| net.ListenConfig.Listen() | net/dial.go |
| net.sysListener.listenTCP() | net/tcpsock_posix.go |
| net.internetSocket() | net/ipsock_posix.go |
| net.socket() | net/sock_posix.go |
核心的調(diào)用在net.socket()方法內(nèi),源碼核心流程如下:
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) (fd *netFD, err error) {
//進(jìn)行socket系統(tǒng)調(diào)用,創(chuàng)建一個(gè)socket
s, err := sysSocket(family, sotype, proto)
//綁定socket fd
fd, err = newFD(s, family, sotype, net);
//...
//進(jìn)行了以下事件:
//1、通過(guò)syscall bind指令綁定socket的監(jiān)聽(tīng)地址
//2、通過(guò)syscall listen指令發(fā)起對(duì)socket的監(jiān)聽(tīng)
//3、完成epollEvent表的創(chuàng)建(全局執(zhí)行一次)
//4、將socket fd注冊(cè)到epoll事件表中,監(jiān)聽(tīng)讀寫(xiě)就緒事件
err := fd.listenStream(ctx, laddr, listenerBacklog(), ctrlCtxFn);
}首先先執(zhí)行了sysSocket系統(tǒng)調(diào)用,創(chuàng)建一個(gè)socket,它是一個(gè)整數(shù)值,用于標(biāo)識(shí)操作系統(tǒng)中打開(kāi)的文件或網(wǎng)絡(luò)套接字;接著調(diào)用newFD方法包裝成netFD對(duì)象,以便實(shí)現(xiàn)更高效的異步 IO 和 Goroutine 調(diào)度。
3.3、poll_init
緊接3.2中的net.socket方法,在內(nèi)部還調(diào)用了net.netFD.listenStream(),poll_init的調(diào)用棧如下:
| 方法 | 文件 |
|---|---|
| net.netFD.listenStream() | net/sock_posix.go |
| net.netFD.init() | net/fd_unix.go |
| poll.FD.init() | internal/poll/fd_unix.go |
| poll.pollDesc.init() | internal/poll/fd_poll_runtime.go |
| runtime.poll_runtime_pollServerInit() | runtime/netpoll.go |
| runtime.netpollinit() | runtime/netpoll_epoll.go |
net.netFD.listenStream()核心步驟如下:
func (fd *netFD) listenStream(ctx context.Context, laddr sockaddr, backlog int, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) error {
//....
//通過(guò)Bind系統(tǒng)調(diào)用綁定監(jiān)聽(tīng)地址
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
//通過(guò)Listen系統(tǒng)調(diào)用對(duì)socket進(jìn)行監(jiān)聽(tīng)
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
//fd.init()進(jìn)行了以下操作:
//1、完成eventPool的創(chuàng)建
//2、將socket fd注冊(cè)到epoll事件表中
if err = fd.init(); err != nil {
return err
}
//...
return nil
}- 使用
Bind系統(tǒng)調(diào)用綁定需要監(jiān)聽(tīng)的地址 - 使用
Listen系統(tǒng)調(diào)用監(jiān)聽(tīng)socket - 調(diào)用
fd.init完成eventpool的創(chuàng)建以及fd的注冊(cè)
net.netFD.init()方法在內(nèi)部轉(zhuǎn)而調(diào)用poll.FD.init()
func (fd *netFD) init() error {
return fd.pfd.Init(fd.net, true)
}
func (fd *FD) Init(net string, pollable bool) error {
fd.SysFile.init()
// We don't actually care about the various network types.
if net == "file" {
fd.isFile = true
}
if !pollable {
fd.isBlocking = 1
return nil
}
err := fd.pd.init(fd)
if err != nil {
// If we could not initialize the runtime poller,
// assume we are using blocking mode.
fd.isBlocking = 1
}
return err
}然后又轉(zhuǎn)入到poll.pollDesc.init()的調(diào)用中。
func (pd *pollDesc) init(fd *FD) error {
//通過(guò)sysOnce結(jié)構(gòu),完成epoll事件表的唯一一次創(chuàng)建
serverInit.Do(runtime_pollServerInit)
//完成init后,進(jìn)行poll_open
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
//...
//綁定里層的pollDesc實(shí)例
pd.runtimeCtx = ctx
return nil
}這里的poll.pollDesc是表層pollDesc,表層pd的init是poll_init和poll_open流程的入口:
- 執(zhí)行
serverInit.Do(runtime_pollServerInit),其中serverInit是名為sysOnce的特殊結(jié)構(gòu),它會(huì)保證執(zhí)行的方法在全局只會(huì)被執(zhí)行一次,然后執(zhí)行runtime_pollServerInit,完成poll_init操作 - 完成
poll_init后,調(diào)用runtime_pollOpen(uintptr(fd.Sysfd))將fd加入到eventpool中,完成poll_open操作 - 綁定里層的
pollDesc實(shí)例
我們先來(lái)關(guān)注serverInit.Do(runtime_pollServerInit)中,執(zhí)行的runtime_pollServerInit方法,它定位在runtime/netpoll.go下:
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
if netpollInited.Load() == 0 {
lockInit(&netpollInitLock, lockRankNetpollInit)
lock(&netpollInitLock)
if netpollInited.Load() == 0 {
//進(jìn)入netpollinit調(diào)用
netpollinit()
netpollInited.Store(1)
}
unlock(&netpollInitLock)
}
}func netpollinit() {
var errno uintptr
//進(jìn)行epollcreate系統(tǒng)調(diào)用,創(chuàng)建epoll事件表
epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
//...
//創(chuàng)建pipe管道,接收信號(hào),如程序終止:
//r:信號(hào)接收端,會(huì)注冊(cè)對(duì)應(yīng)的read事件到epoll事件表中
//w:信號(hào)發(fā)送端,有信號(hào)到達(dá)的時(shí)候,會(huì)往w發(fā)送信號(hào),并對(duì)r產(chǎn)生讀就緒事件
r, w, errpipe := nonblockingPipe()
//...
//在epollEvent中注冊(cè)監(jiān)聽(tīng)r的讀就緒事件
ev := syscall.EpollEvent{
Events: syscall.EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.Data)) = &netpollBreakRd
errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, r, &ev)
//...
//使用全局變量緩存pipe的讀寫(xiě)端
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}在netpollinit()方法內(nèi)部,進(jìn)行了以下操作:
執(zhí)行
epoll_create指令創(chuàng)建了epoll事件表,并返回epoll文件描述符epfd。創(chuàng)建了兩個(gè)pipe管道,當(dāng)向w端寫(xiě)入信號(hào)的時(shí)候,r端會(huì)發(fā)生讀就緒事件。
注冊(cè)監(jiān)聽(tīng)r的讀就緒事件。
緩存管道。
在這里,我們創(chuàng)建了兩個(gè)管道r以及w,并且在eventpool中注冊(cè)了r的讀就緒事件的監(jiān)聽(tīng),當(dāng)我們向w管道寫(xiě)入數(shù)據(jù)的時(shí)候,r管道就會(huì)產(chǎn)生讀就緒事件,從而打破阻塞的epoll_wait操作,進(jìn)而執(zhí)行其他的操作。
3.3、poll_open
| 方法 | 文件 |
|---|---|
| net.netFD.listenStream() | net/sock_posix.go |
| net.netFD.init() | net/fd_unix.go |
| poll.FD.init() | internal/poll/fd_unix.go |
| poll.pollDesc.init() | internal/poll/fd_poll_runtime.go |
| runtime.poll_runtime_pollOpen() | runtime/netpoll.go |
| runtime.netpollopen | runtime/netpoll_epoll.go |
在poll.pollDesc.init()方法中,完成了poll_init流程后,就會(huì)進(jìn)入到poll_open流程,執(zhí)行runtime.poll_runtime_pollOpen()。
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
//獲取一個(gè)pollDesc實(shí)例
pd := pollcache.alloc()
lock(&pd.lock)
wg := pd.wg.Load()
if wg != pdNil && wg != pdReady {
throw("runtime: blocked write on free polldesc")
}
rg := pd.rg.Load()
if rg != pdNil && rg != pdReady {
throw("runtime: blocked read on free polldesc")
}
//綁定socket fd到pollDesc中
pd.fd = fd
//...
//初始化讀寫(xiě)狀態(tài)標(biāo)識(shí)器為無(wú)狀態(tài)
pd.rg.Store(pdNil)
pd.wg.Store(pdNil)
//...
unlock(&pd.lock)
//將fd添加進(jìn)epoll事件表中
errno := netpollopen(fd, pd)
//...
//返回pollDesc實(shí)例
return pd, 0
}func netpollopen(fd uintptr, pd *pollDesc) uintptr {
var ev syscall.EpollEvent
//通過(guò)epollctl操作,在EpollEvent中注冊(cè)針對(duì)fd的監(jiān)聽(tīng)事件
//操作類(lèi)型宏指令:EPOLL_CTL_ADD——添加fd并注冊(cè)監(jiān)聽(tīng)事件
//事件類(lèi)型:epollevent.events:
//1、EPOLLIN:監(jiān)聽(tīng)讀就緒事件
//2、EPOLLOUT:監(jiān)聽(tīng)寫(xiě)就緒事件
//3、EPOLLRDHUP:監(jiān)聽(tīng)中斷事件
//4、EPOLLET:使用邊緣觸發(fā)模式
ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET
tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
*(*taggedPointer)(unsafe.Pointer(&ev.Data)) = tp
return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
}不僅在net.Listen()流程中會(huì)觸發(fā)poll open,在net.Listener.Accept流程中也會(huì),當(dāng)我們獲取到了連接之后,也需要為這個(gè)連接封裝成一個(gè)pollDesc實(shí)例,然后執(zhí)行poll_open流程將其注冊(cè)到epoll事件表中。
func (fd *netFD) accept()(netfd *netFD, err error){
// 通過(guò) syscall accept 接收到來(lái)的 conn fd
d, rsa, errcall, err := fd.pfd.Accept()
// ...
// 封裝到來(lái)的 conn fd
netfd, err = newFD(d, fd.family, fd.sotype, fd.net)
// 將 conn fd 注冊(cè)到 epoll 事件表中
err = netfd.init()
// ...
return netfd,nil
}3.4、poll_close
當(dāng)連接conn需要關(guān)閉的時(shí)候,最終會(huì)進(jìn)入到poll_close流程,執(zhí)行epoll_ctl(DELETE)刪除對(duì)應(yīng)的fd。
| 方法 | 文件 |
|---|---|
| net.conn.Close | net/net.go |
| net.netFD.Close | net/fd_posix.go |
| poll.FD.Close | internal/poll/fd_unix.go |
| poll.FD.decref | internal/poll/fd_mutex.go |
| poll.FD.destroy | internal/poll/fd_unix.go |
| poll.pollDesc.close | internal/poll/fd_poll_runtime.go |
| poll.runtime_pollClose | internal/poll/fd_poll_runtime.go |
| runtime.poll_runtime_pollClose | runtime/netpoll.go |
| runtime.netpollclose | runtime/netpoll_epoll.go |
| syscall.EpollCtl | runtime/netpoll_epoll.go |
//go:linkname poll_runtime_pollClose internal/poll.runtime_pollClose
func poll_runtime_pollClose(pd *pollDesc) {
if !pd.closing {
throw("runtime: close polldesc w/o unblock")
}
wg := pd.wg.Load()
if wg != pdNil && wg != pdReady {
throw("runtime: blocked write on closing polldesc")
}
rg := pd.rg.Load()
if rg != pdNil && rg != pdReady {
throw("runtime: blocked read on closing polldesc")
}
netpollclose(pd.fd)
pollcache.free(pd)
}func netpollclose(fd uintptr) uintptr {
var ev syscall.EpollEvent
return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_DEL, int32(fd), &ev)
}3.5、poll_wait
poll_wait流程最終會(huì)執(zhí)行gopark將g陷入到用戶(hù)態(tài)阻塞。
| 方法 | 文件 |
|---|---|
| poll.pollDesc.wait | internal/poll/fd_poll_runtime.go |
| poll.runtime_pollWait | internal/poll/fd_poll_runtime.go |
| runtime.poll_runtime_pollWait | runtime/netpoll.go |
| runtime.netpollblock | runtime/netpoll.go |
| runtime.gopark | runtime/proc.go |
| runtime.netpollblockcommit | runtime/netpoll.go |
在表層pollDesc中,會(huì)通過(guò)其內(nèi)部的里層pollDesc指針,調(diào)用到runtime下的netpollblock方法。
/*
針對(duì)某個(gè) pollDesc 實(shí)例,監(jiān)聽(tīng)指定的mode 就緒事件
- 返回true——已就緒 返回false——因超時(shí)或者關(guān)閉導(dǎo)致中斷
- 其他情況下,會(huì)通過(guò) gopark 操作將當(dāng)前g 阻塞在該方法中
*/
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
//針對(duì)mode事件,獲取相應(yīng)的狀態(tài)
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
//關(guān)心的io事件就緒,直接返回
if gpp.CompareAndSwap(pdReady, pdNil) {
return true
}
//關(guān)心的io事件未就緒,則置為等待狀態(tài),G將要被阻塞
if gpp.CompareAndSwap(pdNil, pdWait) {
break
}
//...
}
//...
//將G置為阻塞態(tài)
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceBlockNet, 5)
//當(dāng)前g從阻塞態(tài)被喚醒,重置標(biāo)識(shí)器
old := gpp.Swap(pdNil)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
//判斷是否是因?yàn)樗P(guān)心的事件觸發(fā)而喚醒
return old == pdReady
}在gopark方法中,會(huì)閉包調(diào)用netpollblockcommit方法,其中會(huì)根據(jù)g關(guān)心的事件類(lèi)型,將其實(shí)例存儲(chǔ)到pollDesc的rg或wg容器中。
// 將 gpp 狀態(tài)標(biāo)識(shí)器的值由 pdWait 修改為當(dāng)前 g
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
//增加等待輪詢(xún)器的例程計(jì)數(shù)。
//調(diào)度器使用它來(lái)決定是否阻塞
//如果沒(méi)有其他事情可做,則等待輪詢(xún)器。
netpollAdjustWaiters(1)
}
return r
}接著我們來(lái)關(guān)注何時(shí)會(huì)觸發(fā)poll_wait流程。
首先是在listener.Accept流程中,如果當(dāng)前尚未有連接到達(dá),則執(zhí)行poll wait將當(dāng)前g阻塞掛載在該socket fd對(duì)應(yīng)pollDesc的rg中。
// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
//...
for {
//以非阻塞模式發(fā)起一次accept,嘗試接收conn
s, rsa, errcall, err := accept(fd.Sysfd)
if err == nil {
return s, rsa, "", err
}
switch err {
//忽略中斷類(lèi)錯(cuò)誤
case syscall.EINTR:
continue
//尚未有到達(dá)的conn
case syscall.EAGAIN:
//進(jìn)入poll_wait流程,監(jiān)聽(tīng)fd的讀就緒事件,當(dāng)有conn到達(dá)表現(xiàn)為fd可讀。
if fd.pd.pollable() {
//假如讀操作未就緒,當(dāng)前g會(huì)被阻塞在方法內(nèi)部,直到因?yàn)槌瑫r(shí)或者就緒被netpoll ready喚醒。
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
//...
}
}// 指定 mode 為 r 標(biāo)識(shí)等待的是讀就緒事件,然后走入更底層的 poll_wait 流程
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}其次分別是在conn.Read/conn.Write流程中,假若conn fd下讀操作未就緒(無(wú)數(shù)據(jù)到達(dá))/寫(xiě)操作未就緒(緩沖區(qū)空間不足),則會(huì)執(zhí)行poll wait將g阻塞并掛載在對(duì)應(yīng)的pollDesc中的rg/wg中。
func (fd *FD) Read(p []byte) (int, error) {
//...
for {
//非阻塞模式進(jìn)行一次read調(diào)用
n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
if err != nil {
n = 0
//進(jìn)入poll_wait流程,并標(biāo)識(shí)關(guān)心讀就緒事件
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
}
err = fd.eofError(n, err)
return n, err
}
}func (fd *FD)Write(p []byte)(int,error){
// ...
for{
// ...
// 以非阻塞模式執(zhí)行一次syscall write操作
n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])
if n >0{
nn += n
}
// 緩沖區(qū)內(nèi)容都已寫(xiě)完,直接退出
if nn ==len(p){
return nn, err
}
// 走入 poll_wait 流程,并標(biāo)識(shí)關(guān)心的是該 fd 的寫(xiě)就緒事件
if err == syscall.EAGAIN && fd.pd.pollable(){
// 倘若寫(xiě)操作未就緒,當(dāng)前g 會(huì) park 阻塞在該方法內(nèi)部,直到因超時(shí)或者事件就緒而被 netpoll ready 喚醒
if err = fd.pd.waitWrite(fd.isFile); err ==nil{
continue
}
}
// ...
}3.6、net_poll
netpoll流程至關(guān)重要,它會(huì)在底層調(diào)用系統(tǒng)的epoll_wait操作,找到觸發(fā)事件的fd,然后再逆向找到綁定fd的pollDesc實(shí)例,返回內(nèi)部阻塞的g叫給上游處理喚醒。其調(diào)用棧如下:
| 方法 | 文件 |
|---|---|
| runtime.netpoll | runtime/netpoll_epoll.go |
| runtime.netpollready | runtime/netpoll.go |
| runtime.netpollunblock | runtime/netpoll.go |
netpoll具體的源碼如下:
//netpoll用于輪詢(xún)檢查是否有就緒的io事件
//若發(fā)現(xiàn)了就緒的io事件,檢查是否有pollDesc中的g關(guān)心其事件
//若找到了關(guān)心其io事件就緒的g,添加到list返回給上游處理
func netpoll(delay int64) (gList, int32) {
if epfd == -1 {
return gList{}, 0
}
var waitms int32
//根據(jù)傳入的delay參數(shù),決定調(diào)用epoll_wait的模式:
//delay < 0:設(shè)為阻塞模式(在 gmp 調(diào)度流程中,如果某個(gè) p 遲遲獲取不到可執(zhí)行的 g 時(shí),會(huì)通過(guò)該模式,使得 thread 陷入阻塞態(tài),但該情況全局最多僅有一例)
//delay = 0:設(shè)為非阻塞模式(通常情況下為此模式,包括 gmp 常規(guī)調(diào)度流程、gc 以及全局監(jiān)控線(xiàn)程 sysmon 都是以此模式觸發(fā)的 netpoll 流程)
//delay > 0:設(shè)為超時(shí)模式(在 gmp 調(diào)度流程中,如果某個(gè) p 遲遲獲取不到可執(zhí)行的 g 時(shí),并且通過(guò) timer 啟動(dòng)了定時(shí)任務(wù)時(shí),會(huì)令 thread 以超時(shí)模式執(zhí)行 epoll_wait 操作)
if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
waitms = 1e9
}
//最多接收128個(gè)io就緒事件
var events [128]syscall.EpollEvent
retry:
//以指定模式調(diào)用epoll_wait
n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
//...
//存儲(chǔ)關(guān)心io事件就緒的G實(shí)例
var toRun gList
delta := int32(0)
//遍歷返回的就緒事件
for i := int32(0); i < n; i++ {
ev := events[i]
if ev.Events == 0 {
continue
}
//pipe接收端的信號(hào)處理,檢查是否需要退出netpoll
if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollBreakRd {
if ev.Events != syscall.EPOLLIN {
println("runtime: netpoll: break fd ready for", ev.Events)
throw("runtime: netpoll: break fd ready for something unexpected")
}
//...
continue
}
var mode int32
//記錄io就緒事件的類(lèi)型
if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
mode += 'r'
}
if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
mode += 'w'
}
// 根據(jù) epollevent.data 獲取到監(jiān)聽(tīng)了該事件的 pollDesc 實(shí)例
if mode != 0 {
tp := *(*taggedPointer)(unsafe.Pointer(&ev.Data))
pd := (*pollDesc)(tp.pointer())
//...
//檢查是否為G所關(guān)心的事件
delta += netpollready(&toRun, pd, mode)
}
}
return toRun, delta
}func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {
delta := int32(0)
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
//就緒事件包含讀就緒,嘗試喚醒pd內(nèi)部的rg
rg = netpollunblock(pd, 'r', true, &delta)
}
if mode == 'w' || mode == 'r'+'w' {
//就緒事件包含讀就緒,嘗試喚醒pd內(nèi)部的wg
wg = netpollunblock(pd, 'w', true, &delta)
}
//存在G實(shí)例,則加入list中
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
return delta
}func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g {
//獲取存儲(chǔ)的g實(shí)例
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
old := gpp.Load()
//...
new := pdNil
if ioready {
new = pdReady
}
//將gpp的值從g置換成pdReady
if gpp.CompareAndSwap(old, new) {
if old == pdWait {
old = pdNil
} else if old != pdNil {
*delta -= 1
}
//返回需要喚醒的g實(shí)例
return (*g)(unsafe.Pointer(old))
}
}
}那么,我們也同樣需要關(guān)注在哪個(gè)環(huán)節(jié)進(jìn)入了net_poll流程。
首先,是在GMP調(diào)度器中的findRunnable方法中被調(diào)用,用于找到可執(zhí)行的G實(shí)例。具體的實(shí)現(xiàn)在之前的GMP調(diào)度文章中有講解,這里只關(guān)心涉及到net_poll方面的源碼。
findRunnable方法定位在runtime/proc.go中
func findRunnable()(gp *g, inheritTime, tryWakeP bool){
// ..
/*
同時(shí)滿(mǎn)足下述三個(gè)條件,發(fā)起一次【非阻塞模式】的 netpoll 流程:
- epoll事件表初始化過(guò)
- 有 g 在等待io 就緒事件
- 沒(méi)有空閑 p 在以【阻塞或超時(shí)】模式發(fā)起 netpoll 流程
*/
if netpollinited()&& atomic.Load(&netpollWaiters)>0&& atomic.Load64(&sched.lastpoll)!=0{
// 以非阻塞模式發(fā)起一輪 netpoll,如果有 g 需要喚醒,一一喚醒之,并返回首個(gè) g 給上層進(jìn)行調(diào)度
if list := netpoll(0);!list.empty(){// non-blocking
// 獲取就緒 g 隊(duì)列中的首個(gè) g
gp := list.pop()
// 將就緒 g 隊(duì)列中其余 g 一一置為就緒態(tài),并添加到全局隊(duì)列
injectglist(&list)
// 把首個(gè)g 也置為就緒態(tài)
casgstatus(gp,_Gwaiting,_Grunnable)
// ...
//返回 g 給當(dāng)前 p進(jìn)行調(diào)度
return gp,false,false
}
}
// ...
/*
同時(shí)滿(mǎn)足下述三個(gè)條件,發(fā)起一次【阻塞或超時(shí)模式】的 netpoll 流程:
- epoll事件表初始化過(guò)
- 有 g 在等待io 就緒事件
- 沒(méi)有空閑 p 在以【阻塞或超時(shí)】模式發(fā)起 netpoll 流程
*/
if netpollinited()&&(atomic.Load(&netpollWaiters)>0|| pollUntil !=0)&& atomic.Xchg64(&sched.lastpoll,0)!=0{
// 默認(rèn)為阻塞模式
delay :=int64(-1)
// 存在定時(shí)時(shí)間,則設(shè)為超時(shí)模式
if pollUntil !=0{
delay = pollUntil - now
// ...
}
// 以【阻塞或超時(shí)模式】發(fā)起一輪 netpoll
list := netpoll(delay)// block until new work is available
}
// ...
}其次,是位于同文件下的sysmon方法中,它會(huì)被一個(gè)全局監(jiān)控者G執(zhí)行,每隔10ms發(fā)一次非阻塞的net_poll流程。
// The main goroutine.
func main(){
// ...
// 新建一個(gè) m,直接運(yùn)行 sysmon 函數(shù)
systemstack(func(){
newm(sysmon,nil,-1)
})
// ...
}
// 全局唯一監(jiān)控線(xiàn)程的執(zhí)行函數(shù)
func sysmon(){
// ...
for{
// ...
/*
同時(shí)滿(mǎn)足下述三個(gè)條件,發(fā)起一次【非阻塞模式】的 netpoll 流程:
- epoll事件表初始化過(guò)
- 沒(méi)有空閑 p 在以【阻塞或超時(shí)】模式發(fā)起 netpoll 流程
- 距離上一次發(fā)起 netpoll 流程的時(shí)間間隔已超過(guò) 10 ms
*/
lastpoll :=int64(atomic.Load64(&sched.lastpoll))
if netpollinited()&& lastpoll !=0&& lastpoll+10*1000*1000< now {
// 以非阻塞模式發(fā)起 netpoll
list := netpoll(0)// non-blocking - returns list of goroutines
// 獲取到的 g 置為就緒態(tài)并添加到全局隊(duì)列中
if!list.empty(){
// ...
injectglist(&list)
// ...
}
}
// ...
}
}最后,還會(huì)發(fā)生在GC流程中。
func pollWork() bool{
// ...
// 若全局隊(duì)列或 p 的本地隊(duì)列非空,則提前返回
/*
同時(shí)滿(mǎn)足下述三個(gè)條件,發(fā)起一次【非阻塞模式】的 netpoll 流程:
- epoll事件表初始化過(guò)
- 有 g 在等待io 就緒事件
- 沒(méi)有空閑 p 在以【阻塞或超時(shí)】模式發(fā)起 netpoll 流程
*/
if netpollinited()&& atomic.Load(&netpollWaiters)>0&& sched.lastpoll !=0{
// 所有取得 g 更新為就緒態(tài)并添加到全局隊(duì)列
if list := netpoll(0);!list.empty(){
injectglist(&list)
return true
}
}
// ...
}4、參考博文
感謝觀(guān)看,本篇博文參考了小徐先生的文章,非常推薦大家去觀(guān)看并且進(jìn)入到源碼中學(xué)習(xí),鏈接如下:
到此這篇關(guān)于Golang網(wǎng)絡(luò)模型netpoll源碼解析的文章就介紹到這了,更多相關(guān)Golang網(wǎng)絡(luò)模型netpoll內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
手把手教你如何在Goland中創(chuàng)建和運(yùn)行項(xiàng)目
歡迎來(lái)到本指南!我們將手把手地教您在Goland中如何創(chuàng)建、配置并運(yùn)行項(xiàng)目,通過(guò)簡(jiǎn)單的步驟,您將迅速上手這款強(qiáng)大的集成開(kāi)發(fā)環(huán)境(IDE),輕松實(shí)現(xiàn)您的編程夢(mèng)想,讓我們一起開(kāi)啟這段精彩的旅程吧!2024-02-02
golang通過(guò)反射手動(dòng)實(shí)現(xiàn)json序列化的方法
在 Go 語(yǔ)言中,JSON 序列化和反序列化通常通過(guò)標(biāo)準(zhǔn)庫(kù) encoding/json 來(lái)實(shí)現(xiàn),本文給大家介紹golang 通過(guò)反射手動(dòng)實(shí)現(xiàn)json序列化的方法,感興趣的朋友一起看看吧2024-12-12
Golang語(yǔ)言JSON解碼函數(shù)Unmarshal的使用
本文主要介紹了Golang語(yǔ)言JSON解碼函數(shù)Unmarshal的使用,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-01-01
golang實(shí)踐-第三方包為私有庫(kù)的配置方案
這篇文章主要介紹了golang實(shí)踐-第三方包為私有庫(kù)的配置方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-05-05
Go 實(shí)現(xiàn)基于Token 的登錄流程深度分析
Token 認(rèn)證機(jī)制的核心思想是,服務(wù)端在用戶(hù)登錄時(shí)生成一個(gè) Token,客戶(hù)端在后續(xù)的請(qǐng)求中攜帶這個(gè) Token,服務(wù)端通過(guò)驗(yàn)證 Token 的有效性來(lái)確認(rèn)用戶(hù)的身份,本文將帶你深入探索基于 Token 的登錄流程,這是一種更為靈活且適用于現(xiàn)代應(yīng)用架構(gòu)的認(rèn)證方式2024-03-03
Go語(yǔ)言實(shí)現(xiàn)棧與隊(duì)列基本操作學(xué)家
go語(yǔ)言中,并沒(méi)有棧與隊(duì)列相關(guān)的數(shù)據(jù)結(jié)構(gòu),但是我們可以借助切片來(lái)實(shí)現(xiàn)棧與隊(duì)列的操作;接下來(lái)我們一起實(shí)現(xiàn)棧與隊(duì)列基本操作,感興趣的可以了解一下2022-11-11
Go如何在HTTP請(qǐng)求中操作cookie教程詳解
這篇文章主要為大家介紹了Go如何在HTTP請(qǐng)求中操作cookie教程詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01

