欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Golang網(wǎng)絡模型netpoll源碼解析(具體流程)

 更新時間:2024年11月27日 09:28:58   作者:MelonTe  
本文介紹了Golang的網(wǎng)絡模型netpoll的實現(xiàn)原理,本文將從為什么需要使用netpoll模型,以及netpoll的具體流程實現(xiàn)兩個主要角度來展開學習,感興趣的朋友跟隨小編一起看看吧

0、引言

在學習完了Socket編程的基礎知識、Linux系統(tǒng)提供的I/O多路復用的實現(xiàn)以及Golang的GMP調度模型之后,我們進而學習Golang的網(wǎng)絡模型——netpoll。本文將從為什么需要使用netpoll模型,以及netpoll的具體流程實現(xiàn)兩個主要角度來展開學習。當前使用的Go的版本為1.22.4,Linux系統(tǒng)。

1、為什么要使用netpoll模型?

首先,什么是多路復用?

多路,指的是存在著多個需要服務的對象;復用,指的是重復利用一個單元來為上述的多個目標提供服務。

我們知道,Linux系統(tǒng)為用戶提供了三個內核實現(xiàn)的IO多路復用技術的系統(tǒng)調用,用發(fā)展時間來排序分別為:select->poll->epoll。其中,epoll在當今使用的最為廣泛,對比與select調用,它有以下的優(yōu)勢:

  • fd數(shù)量靈活:可監(jiān)聽的fd數(shù)量上限靈活,使用方可以在調用epoll_create操作時自行指定。
  • 更少的內核拷貝次數(shù):在內核中,使用紅黑樹的結構來存儲需要監(jiān)聽的fd,相比與調用select每次需要將所有的fd拷貝進內核,監(jiān)聽到事件后再全部拷貝回用戶態(tài),epoll只需要將需要監(jiān)聽的fd添加到事件表后,即可多次監(jiān)聽。
  • 返回結果明確epoll運行將就緒事件添加到就緒事件列表中,當用戶調用epoll_wait操作時,內核只返回就緒事件,而select返回的是所有的事件,需要用戶再進行一次遍歷,找到就緒事件再處理。

需要注意的是,在不同的條件環(huán)境下,epoll的優(yōu)勢可能反而作用不明顯。epoll只適用在監(jiān)聽fd基數(shù)較大且活躍度不高的場景,如此epoll事件表的空間復用和epoll_wait操作的精準才能體現(xiàn)出其優(yōu)勢;而當處在fd基數(shù)較小且活躍度高的場景下,select反而更加簡單有效,構造epoll的紅黑樹結構的消耗會成為其累贅。

考慮到場景的多樣性,我們會選擇使用epoll去完成內核事件監(jiān)聽的操作,那么如何將golangepoll結合起來呢?

在 Go 語言的并發(fā)模型中,GMP 框架實現(xiàn)了一種高效的協(xié)程調度機制,它屏蔽了操作系統(tǒng)線程的細節(jié),用戶可以通過輕量級的 Goroutine 來實現(xiàn)細粒度的并發(fā)操作。然而,底層的 IO 多路復用機制(如 Linux 的 epoll)調度的單位仍然是線程(M)。為了將 IO 調度從線程層面提升到協(xié)程層面,充分發(fā)揮 Goroutine 的高并發(fā)優(yōu)勢,netpoll 應運而生。

接下來我們就來學習netpoll框架的實現(xiàn)。

2、netpoll實現(xiàn)原理

2.1、核心結構

1、pollDesc

為了將IO調度從線程提升到協(xié)程層面,netpoll框架有個重要的核心結構pollDesc,它有兩個,一個為表層,含有指針指向了里層的pollDesc。本文中講到的pollDesc都為里層pollDesc。

表層pollDesc定位在internel/poll/fd_poll_runtime.go文件中:

type pollDesc struct {
	runtimeCtx uintptr
}

使用一個runtimeCtx指針指向其底層實現(xiàn)實例。

里層的位于runtime/netpoll.go中。

//網(wǎng)絡poller描述符
type pollDesc struct {
    //next指針,指向在pollCache鏈表結構中,以下個pollDesc實例。
	link  *pollDesc      
    //指向fd
	fd    uintptr
    //讀事件狀態(tài)標識器,狀態(tài)有四種:
    //1、pdReady:表示讀操作已就緒,等待處理
    //2、pdWait:表示g將要被阻塞等待讀操作就緒,此時還未阻塞
    //3、g:讀操作的g已經(jīng)被阻塞,rg指向阻塞的g實例
    //4、pdNil:空
	rg atomic.Uintptr 
	wg atomic.Uintptr 
    //...
}

pollDesc的核心字段是讀/寫標識器rg/wg,它用于標識fd的io事件狀態(tài),并且持有被阻塞的g實例。當后續(xù)需要喚醒這個g處理讀寫事件的時候,可以通過pollDesc追溯得到g的實例進行操作。有了pollDesc這個數(shù)據(jù)結構,Golang就能將對處理socket的調度單位從線程Thread轉換成協(xié)程G。

2、pollCache

pollCache緩沖池采用了單向鏈表的方式存儲多個pollDesc實例。

type pollCache struct {
	lock  mutex
	first *pollDesc
}

其包含了兩個核心方法,分別是alloc()free()

//從pollCache中分配得到一個pollDesc實例
func (c *pollCache) alloc() *pollDesc {
	lock(&c.lock)
    //如果鏈表為空,則進行初始化
	if c.first == nil {
        //pdSize = 248
		const pdSize = unsafe.Sizeof(pollDesc{})
        //4096 / 248 = 16
		n := pollBlockSize / pdSize
		if n == 0 {
			n = 1
		}
        //分配指定大小的內存空間
		mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
        //完成指定數(shù)量的pollDesc創(chuàng)建
		for i := uintptr(0); i < n; i++ {
			pd := (*pollDesc)(add(mem, i*pdSize))
			pd.link = c.first
			c.first = pd
		}
	}
	pd := c.first
	c.first = pd.link
	lockInit(&pd.lock, lockRankPollDesc)
	unlock(&c.lock)
	return pd
}
//free用于將一個pollDesc放回pollCache
func (c *pollCache) free(pd *pollDesc) {
	//...
	lock(&c.lock)
	pd.link = c.first
	c.first = pd
	unlock(&c.lock)
}

2.2、netpoll框架宏觀流程

在宏觀的角度下,netpoll框架主要涉及了以下的幾個流程:

  • poll_init:底層調用epoll_create指令,在內核態(tài)中開辟epoll事件表。
  • poll_open:先構造一個pollDesc實例,然后通過epoll_ctl(ADD)指令,向內核中添加要監(jiān)聽的socket,并將這一個fd綁定在pollDesc中。pollDesc含有狀態(tài)標識器rg/wg,用于標識事件狀態(tài)以及存儲阻塞的g。
  • poll_wait:當g依賴的事件未就緒時,調用gopark方法,將g置為阻塞態(tài)存放在pollDesc中。
  • net_poll:GMP調度器會輪詢netpoll流程,通常會用非阻塞的方式發(fā)起epoll_wait指令,取出就緒的pollDesc,提前出其內部陷入阻塞態(tài)的g然后將其重新添加到GMP的調度隊列中。(以及在sysmon流程和gc流程都會觸發(fā)netpoll)

3、流程源碼實現(xiàn)

3.1、流程入口

我們參考以下的簡易TCP服務器實現(xiàn)框架,走進netpoll框架的具體源碼實現(xiàn)。

// 啟動 tcp server 代碼示例
func main() {
    //創(chuàng)建TCP端口監(jiān)聽器,涉及以下事件:
    //1:創(chuàng)建socket fd,調用bind和accept系統(tǒng)接口函數(shù)
    //2:調用epoll_create,創(chuàng)建eventpool
    //3:調用epoll_ctl(ADD),將socket fd注冊到epoll事件表
	l, _ := net.Listen("tcp", ":8080")
	// eventloop reactor 模型
	for {
        //等待TCP連接到達,涉及以下事件:
        //1:循環(huán)+非阻塞調用accept
        //2:若未就緒,則調用gopark進行阻塞
        //3:等待netpoller輪詢喚醒
        //4:獲取到conn fd后注冊到eventpool
        //5:返回conn
		conn, _ := l.Accept()
		// goroutine per conn
		go serve(conn)
	}
}
// 處理一筆到來的 tcp 連接
func serve(conn net.Conn) {
    //關閉conn,從eventpool中移除fd
	defer conn.Close()
	var buf []byte
    //讀取conn中的數(shù)據(jù),涉及以下事件:
    //1:循環(huán)+非阻塞調用recv(read)
    //2:若未就緒,通過gopark阻塞,等待netpoll輪詢喚醒
	_, _ = conn.Read(buf)
    //向conn中寫入數(shù)據(jù),涉及以下事件:
    //1:循環(huán)+非阻塞調用writev (write)
    //2:若未就緒,通過gopark阻塞,等待netpoll輪詢喚醒
	_, _ = conn.Write(buf)
}

3.2、Socket創(chuàng)建

net.Listen方法為入口,進行創(chuàng)建socket fd,調用的方法棧如下:

方法文件
net.Listen()net/dial.go
net.ListenConfig.Listen()net/dial.go
net.sysListener.listenTCP()net/tcpsock_posix.go
net.internetSocket()net/ipsock_posix.go
net.socket()net/sock_posix.go

核心的調用在net.socket()方法內,源碼核心流程如下:

func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) (fd *netFD, err error) {
    //進行socket系統(tǒng)調用,創(chuàng)建一個socket
	s, err := sysSocket(family, sotype, proto)
    //綁定socket fd
    fd, err = newFD(s, family, sotype, net);
    //...
    //進行了以下事件:
    //1、通過syscall bind指令綁定socket的監(jiān)聽地址
    //2、通過syscall listen指令發(fā)起對socket的監(jiān)聽
    //3、完成epollEvent表的創(chuàng)建(全局執(zhí)行一次)
    //4、將socket fd注冊到epoll事件表中,監(jiān)聽讀寫就緒事件
    err := fd.listenStream(ctx, laddr, listenerBacklog(), ctrlCtxFn);
}

首先先執(zhí)行了sysSocket系統(tǒng)調用,創(chuàng)建一個socket,它是一個整數(shù)值,用于標識操作系統(tǒng)中打開的文件或網(wǎng)絡套接字;接著調用newFD方法包裝成netFD對象,以便實現(xiàn)更高效的異步 IO 和 Goroutine 調度。

3.3、poll_init

緊接3.2中的net.socket方法,在內部還調用了net.netFD.listenStream(),poll_init的調用棧如下:

方法文件
net.netFD.listenStream()net/sock_posix.go
net.netFD.init()net/fd_unix.go
poll.FD.init()internal/poll/fd_unix.go
poll.pollDesc.init()internal/poll/fd_poll_runtime.go
runtime.poll_runtime_pollServerInit()runtime/netpoll.go
runtime.netpollinit()runtime/netpoll_epoll.go

net.netFD.listenStream()核心步驟如下:

func (fd *netFD) listenStream(ctx context.Context, laddr sockaddr, backlog int, ctrlCtxFn func(context.Context, string, string, syscall.RawConn) error) error {
	//....
    //通過Bind系統(tǒng)調用綁定監(jiān)聽地址
	if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
		return os.NewSyscallError("bind", err)
	}
    //通過Listen系統(tǒng)調用對socket進行監(jiān)聽
	if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
		return os.NewSyscallError("listen", err)
	}
    //fd.init()進行了以下操作:
    //1、完成eventPool的創(chuàng)建
    //2、將socket fd注冊到epoll事件表中
	if err = fd.init(); err != nil {
		return err
	}
	//...
	return nil
}
  • 使用Bind系統(tǒng)調用綁定需要監(jiān)聽的地址
  • 使用Listen系統(tǒng)調用監(jiān)聽socket
  • 調用fd.init完成eventpool的創(chuàng)建以及fd的注冊

net.netFD.init()方法在內部轉而調用poll.FD.init()

func (fd *netFD) init() error {
	return fd.pfd.Init(fd.net, true)
}
func (fd *FD) Init(net string, pollable bool) error {
	fd.SysFile.init()
	// We don't actually care about the various network types.
	if net == "file" {
		fd.isFile = true
	}
	if !pollable {
		fd.isBlocking = 1
		return nil
	}
	err := fd.pd.init(fd)
	if err != nil {
		// If we could not initialize the runtime poller,
		// assume we are using blocking mode.
		fd.isBlocking = 1
	}
	return err
}

然后又轉入到poll.pollDesc.init()的調用中。

func (pd *pollDesc) init(fd *FD) error {
    //通過sysOnce結構,完成epoll事件表的唯一一次創(chuàng)建
	serverInit.Do(runtime_pollServerInit)
    //完成init后,進行poll_open
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
	//...
    //綁定里層的pollDesc實例
    pd.runtimeCtx = ctx
	return nil
}

這里的poll.pollDesc表層pollDesc,表層pd的init是poll_initpoll_open流程的入口:

  • 執(zhí)行serverInit.Do(runtime_pollServerInit),其中serverInit是名為sysOnce的特殊結構,它會保證執(zhí)行的方法在全局只會被執(zhí)行一次,然后執(zhí)行runtime_pollServerInit,完成poll_init操作
  • 完成poll_init后,調用runtime_pollOpen(uintptr(fd.Sysfd))將fd加入到eventpool中,完成poll_open操作
  • 綁定里層的pollDesc實例

我們先來關注serverInit.Do(runtime_pollServerInit)中,執(zhí)行的runtime_pollServerInit方法,它定位在runtime/netpoll.go下:

//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
	netpollGenericInit()
}
func netpollGenericInit() {
	if netpollInited.Load() == 0 {
		lockInit(&netpollInitLock, lockRankNetpollInit)
		lock(&netpollInitLock)
		if netpollInited.Load() == 0 {
            //進入netpollinit調用
			netpollinit()
			netpollInited.Store(1)
		}
		unlock(&netpollInitLock)
	}
}
func netpollinit() {
	var errno uintptr
    //進行epollcreate系統(tǒng)調用,創(chuàng)建epoll事件表
	epfd, errno = syscall.EpollCreate1(syscall.EPOLL_CLOEXEC)
	//...
    //創(chuàng)建pipe管道,接收信號,如程序終止:
    //r:信號接收端,會注冊對應的read事件到epoll事件表中
    //w:信號發(fā)送端,有信號到達的時候,會往w發(fā)送信號,并對r產(chǎn)生讀就緒事件
	r, w, errpipe := nonblockingPipe()
	//...
    //在epollEvent中注冊監(jiān)聽r的讀就緒事件
	ev := syscall.EpollEvent{
		Events: syscall.EPOLLIN,
	}
	*(**uintptr)(unsafe.Pointer(&ev.Data)) = &netpollBreakRd
	errno = syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, r, &ev)
	//...
    //使用全局變量緩存pipe的讀寫端
	netpollBreakRd = uintptr(r)
	netpollBreakWr = uintptr(w)
}

netpollinit()方法內部,進行了以下操作:

  • 執(zhí)行epoll_create指令創(chuàng)建了epoll事件表,并返回epoll文件描述符epfd

  • 創(chuàng)建了兩個pipe管道,當向w端寫入信號的時候,r端會發(fā)生讀就緒事件。

  • 注冊監(jiān)聽r的讀就緒事件。

  • 緩存管道。

在這里,我們創(chuàng)建了兩個管道r以及w,并且在eventpool中注冊了r的讀就緒事件的監(jiān)聽,當我們向w管道寫入數(shù)據(jù)的時候,r管道就會產(chǎn)生讀就緒事件,從而打破阻塞的epoll_wait操作,進而執(zhí)行其他的操作。

3.3、poll_open

方法文件
net.netFD.listenStream()net/sock_posix.go
net.netFD.init()net/fd_unix.go
poll.FD.init()internal/poll/fd_unix.go
poll.pollDesc.init()internal/poll/fd_poll_runtime.go
runtime.poll_runtime_pollOpen()runtime/netpoll.go
runtime.netpollopenruntime/netpoll_epoll.go

poll.pollDesc.init()方法中,完成了poll_init流程后,就會進入到poll_open流程,執(zhí)行runtime.poll_runtime_pollOpen()。

//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    //獲取一個pollDesc實例
	pd := pollcache.alloc()
	lock(&pd.lock)
	wg := pd.wg.Load()
	if wg != pdNil && wg != pdReady {
		throw("runtime: blocked write on free polldesc")
	}
	rg := pd.rg.Load()
	if rg != pdNil && rg != pdReady {
		throw("runtime: blocked read on free polldesc")
	}
    //綁定socket fd到pollDesc中
	pd.fd = fd
	//...
    //初始化讀寫狀態(tài)標識器為無狀態(tài)
	pd.rg.Store(pdNil)
	pd.wg.Store(pdNil)
	//...
	unlock(&pd.lock)
    //將fd添加進epoll事件表中
	errno := netpollopen(fd, pd)
	//...
    //返回pollDesc實例
	return pd, 0
}
func netpollopen(fd uintptr, pd *pollDesc) uintptr {
	var ev syscall.EpollEvent
    //通過epollctl操作,在EpollEvent中注冊針對fd的監(jiān)聽事件
    //操作類型宏指令:EPOLL_CTL_ADD——添加fd并注冊監(jiān)聽事件
    //事件類型:epollevent.events:
    //1、EPOLLIN:監(jiān)聽讀就緒事件
    //2、EPOLLOUT:監(jiān)聽寫就緒事件
    //3、EPOLLRDHUP:監(jiān)聽中斷事件
    //4、EPOLLET:使用邊緣觸發(fā)模式
	ev.Events = syscall.EPOLLIN | syscall.EPOLLOUT | syscall.EPOLLRDHUP | syscall.EPOLLET
	tp := taggedPointerPack(unsafe.Pointer(pd), pd.fdseq.Load())
	*(*taggedPointer)(unsafe.Pointer(&ev.Data)) = tp
	return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_ADD, int32(fd), &ev)
}

不僅在net.Listen()流程中會觸發(fā)poll open,在net.Listener.Accept流程中也會,當我們獲取到了連接之后,也需要為這個連接封裝成一個pollDesc實例,然后執(zhí)行poll_open流程將其注冊到epoll事件表中。

func (fd *netFD) accept()(netfd *netFD, err error){
    // 通過 syscall accept 接收到來的 conn fd
    d, rsa, errcall, err := fd.pfd.Accept()
    // ...
    // 封裝到來的 conn fd
    netfd, err = newFD(d, fd.family, fd.sotype, fd.net)
    // 將 conn fd 注冊到 epoll 事件表中
    err = netfd.init()
    // ...
    return netfd,nil
}

3.4、poll_close

當連接conn需要關閉的時候,最終會進入到poll_close流程,執(zhí)行epoll_ctl(DELETE)刪除對應的fd。

方法文件
net.conn.Closenet/net.go
net.netFD.Closenet/fd_posix.go
poll.FD.Closeinternal/poll/fd_unix.go
poll.FD.decrefinternal/poll/fd_mutex.go
poll.FD.destroyinternal/poll/fd_unix.go
poll.pollDesc.closeinternal/poll/fd_poll_runtime.go
poll.runtime_pollCloseinternal/poll/fd_poll_runtime.go
runtime.poll_runtime_pollCloseruntime/netpoll.go
runtime.netpollcloseruntime/netpoll_epoll.go
syscall.EpollCtlruntime/netpoll_epoll.go
//go:linkname poll_runtime_pollClose internal/poll.runtime_pollClose
func poll_runtime_pollClose(pd *pollDesc) {
	if !pd.closing {
		throw("runtime: close polldesc w/o unblock")
	}
	wg := pd.wg.Load()
	if wg != pdNil && wg != pdReady {
		throw("runtime: blocked write on closing polldesc")
	}
	rg := pd.rg.Load()
	if rg != pdNil && rg != pdReady {
		throw("runtime: blocked read on closing polldesc")
	}
	netpollclose(pd.fd)
	pollcache.free(pd)
}
func netpollclose(fd uintptr) uintptr {
	var ev syscall.EpollEvent
	return syscall.EpollCtl(epfd, syscall.EPOLL_CTL_DEL, int32(fd), &ev)
}

3.5、poll_wait

poll_wait流程最終會執(zhí)行gopark將g陷入到用戶態(tài)阻塞。

方法文件
poll.pollDesc.waitinternal/poll/fd_poll_runtime.go
poll.runtime_pollWaitinternal/poll/fd_poll_runtime.go
runtime.poll_runtime_pollWaitruntime/netpoll.go
runtime.netpollblockruntime/netpoll.go
runtime.goparkruntime/proc.go
runtime.netpollblockcommitruntime/netpoll.go

在表層pollDesc中,會通過其內部的里層pollDesc指針,調用到runtime下的netpollblock方法。

/*
    針對某個 pollDesc 實例,監(jiān)聽指定的mode 就緒事件
        - 返回true——已就緒  返回false——因超時或者關閉導致中斷
        - 其他情況下,會通過 gopark 操作將當前g 阻塞在該方法中
*/
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    //針對mode事件,獲取相應的狀態(tài)
	gpp := &pd.rg
	if mode == 'w' {
		gpp = &pd.wg
	}
	for {
		//關心的io事件就緒,直接返回
		if gpp.CompareAndSwap(pdReady, pdNil) {
			return true
		}
        //關心的io事件未就緒,則置為等待狀態(tài),G將要被阻塞
		if gpp.CompareAndSwap(pdNil, pdWait) {
			break
		}
		//...
	}
	//...
    //將G置為阻塞態(tài)
		gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceBlockNet, 5)
    //當前g從阻塞態(tài)被喚醒,重置標識器
    old := gpp.Swap(pdNil)
	if old > pdWait {
		throw("runtime: corrupted polldesc")
	}
    //判斷是否是因為所關心的事件觸發(fā)而喚醒
	return old == pdReady
}

在gopark方法中,會閉包調用netpollblockcommit方法,其中會根據(jù)g關心的事件類型,將其實例存儲到pollDesc的rg或wg容器中。

// 將 gpp 狀態(tài)標識器的值由 pdWait 修改為當前 g 
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
	r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
	if r {
		//增加等待輪詢器的例程計數(shù)。
		//調度器使用它來決定是否阻塞
		//如果沒有其他事情可做,則等待輪詢器。
		netpollAdjustWaiters(1)
	}
	return r
}

接著我們來關注何時會觸發(fā)poll_wait流程。

首先是在listener.Accept流程中,如果當前尚未有連接到達,則執(zhí)行poll wait將當前g阻塞掛載在該socket fd對應pollDesc的rg中。

// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
	//...
	for {
        //以非阻塞模式發(fā)起一次accept,嘗試接收conn
		s, rsa, errcall, err := accept(fd.Sysfd)
		if err == nil {
			return s, rsa, "", err
		}
		switch err {
            //忽略中斷類錯誤
		case syscall.EINTR:
			continue
            //尚未有到達的conn
		case syscall.EAGAIN:
            //進入poll_wait流程,監(jiān)聽fd的讀就緒事件,當有conn到達表現(xiàn)為fd可讀。
			if fd.pd.pollable() {
                //假如讀操作未就緒,當前g會被阻塞在方法內部,直到因為超時或者就緒被netpoll ready喚醒。
				if err = fd.pd.waitRead(fd.isFile); err == nil {
					continue
				}
			}
		//...
	}
}
// 指定 mode 為 r 標識等待的是讀就緒事件,然后走入更底層的 poll_wait 流程
func (pd *pollDesc) waitRead(isFile bool) error {
    return pd.wait('r', isFile)
}

其次分別是在conn.Read/conn.Write流程中,假若conn fd下讀操作未就緒(無數(shù)據(jù)到達)/寫操作未就緒(緩沖區(qū)空間不足),則會執(zhí)行poll wait將g阻塞并掛載在對應的pollDesc中的rg/wg中。

func (fd *FD) Read(p []byte) (int, error) {
	//...
	for {
        //非阻塞模式進行一次read調用
		n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)
		if err != nil {
			n = 0
            //進入poll_wait流程,并標識關心讀就緒事件
			if err == syscall.EAGAIN && fd.pd.pollable() {
				if err = fd.pd.waitRead(fd.isFile); err == nil {
					continue
				}
			}
		}
		err = fd.eofError(n, err)
		return n, err
	}
}
func (fd *FD)Write(p []byte)(int,error){
    // ... 
    for{
    // ...
    // 以非阻塞模式執(zhí)行一次syscall write操作
        n, err := ignoringEINTRIO(syscall.Write, fd.Sysfd, p[nn:max])
        if n >0{
            nn += n
        }
        // 緩沖區(qū)內容都已寫完,直接退出
        if nn ==len(p){
            return nn, err
        }
    // 走入 poll_wait 流程,并標識關心的是該 fd 的寫就緒事件
    if err == syscall.EAGAIN && fd.pd.pollable(){
        // 倘若寫操作未就緒,當前g 會 park 阻塞在該方法內部,直到因超時或者事件就緒而被 netpoll ready 喚醒
        if err = fd.pd.waitWrite(fd.isFile); err ==nil{
            continue
        }
    }
    // ...  
}

3.6、net_poll

netpoll流程至關重要,它會在底層調用系統(tǒng)的epoll_wait操作,找到觸發(fā)事件的fd,然后再逆向找到綁定fd的pollDesc實例,返回內部阻塞的g叫給上游處理喚醒。其調用棧如下:

方法文件
runtime.netpollruntime/netpoll_epoll.go
runtime.netpollreadyruntime/netpoll.go
runtime.netpollunblockruntime/netpoll.go

netpoll具體的源碼如下:

//netpoll用于輪詢檢查是否有就緒的io事件
//若發(fā)現(xiàn)了就緒的io事件,檢查是否有pollDesc中的g關心其事件
//若找到了關心其io事件就緒的g,添加到list返回給上游處理
func netpoll(delay int64) (gList, int32) {
	if epfd == -1 {
		return gList{}, 0
	}
	var waitms int32
    //根據(jù)傳入的delay參數(shù),決定調用epoll_wait的模式:
    //delay < 0:設為阻塞模式(在 gmp 調度流程中,如果某個 p 遲遲獲取不到可執(zhí)行的 g 時,會通過該模式,使得 thread 陷入阻塞態(tài),但該情況全局最多僅有一例)
    //delay = 0:設為非阻塞模式(通常情況下為此模式,包括 gmp 常規(guī)調度流程、gc 以及全局監(jiān)控線程 sysmon 都是以此模式觸發(fā)的 netpoll 流程)
    //delay > 0:設為超時模式(在 gmp 調度流程中,如果某個 p 遲遲獲取不到可執(zhí)行的 g 時,并且通過 timer 啟動了定時任務時,會令 thread 以超時模式執(zhí)行 epoll_wait 操作)
	if delay < 0 {
		waitms = -1
	} else if delay == 0 {
		waitms = 0
	} else if delay < 1e6 {
		waitms = 1
	} else if delay < 1e15 {
		waitms = int32(delay / 1e6)
	} else {
		waitms = 1e9
	}
    //最多接收128個io就緒事件
	var events [128]syscall.EpollEvent
retry:
    //以指定模式調用epoll_wait
	n, errno := syscall.EpollWait(epfd, events[:], int32(len(events)), waitms)
	//...
    //存儲關心io事件就緒的G實例
	var toRun gList
	delta := int32(0)
    //遍歷返回的就緒事件
	for i := int32(0); i < n; i++ {
		ev := events[i]
		if ev.Events == 0 {
			continue
		}
		//pipe接收端的信號處理,檢查是否需要退出netpoll
		if *(**uintptr)(unsafe.Pointer(&ev.Data)) == &netpollBreakRd {
			if ev.Events != syscall.EPOLLIN {
				println("runtime: netpoll: break fd ready for", ev.Events)
				throw("runtime: netpoll: break fd ready for something unexpected")
			}
		//...
			continue
		}
		var mode int32
        //記錄io就緒事件的類型
		if ev.Events&(syscall.EPOLLIN|syscall.EPOLLRDHUP|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
			mode += 'r'
		}
		if ev.Events&(syscall.EPOLLOUT|syscall.EPOLLHUP|syscall.EPOLLERR) != 0 {
			mode += 'w'
		}
        // 根據(jù) epollevent.data 獲取到監(jiān)聽了該事件的 pollDesc 實例
		if mode != 0 {
			tp := *(*taggedPointer)(unsafe.Pointer(&ev.Data))
			pd := (*pollDesc)(tp.pointer())
			//...
            //檢查是否為G所關心的事件
				delta += netpollready(&toRun, pd, mode)
		}
	}
	return toRun, delta
}
func netpollready(toRun *gList, pd *pollDesc, mode int32) int32 {
	delta := int32(0)
	var rg, wg *g
	if mode == 'r' || mode == 'r'+'w' {
        //就緒事件包含讀就緒,嘗試喚醒pd內部的rg
		rg = netpollunblock(pd, 'r', true, &delta)
	}
	if mode == 'w' || mode == 'r'+'w' {
        //就緒事件包含讀就緒,嘗試喚醒pd內部的wg
		wg = netpollunblock(pd, 'w', true, &delta)
	}
    //存在G實例,則加入list中
	if rg != nil {
		toRun.push(rg)
	}
	if wg != nil {
		toRun.push(wg)
	}
	return delta
}
func netpollunblock(pd *pollDesc, mode int32, ioready bool, delta *int32) *g {
    //獲取存儲的g實例
	gpp := &pd.rg
	if mode == 'w' {
		gpp = &pd.wg
	}
	for {
		old := gpp.Load()
		//...
		new := pdNil
		if ioready {
			new = pdReady
		}
        //將gpp的值從g置換成pdReady
		if gpp.CompareAndSwap(old, new) {
			if old == pdWait {
				old = pdNil
			} else if old != pdNil {
				*delta -= 1
			}
            //返回需要喚醒的g實例
			return (*g)(unsafe.Pointer(old))
		}
	}
}

那么,我們也同樣需要關注在哪個環(huán)節(jié)進入了net_poll流程。

首先,是在GMP調度器中的findRunnable方法中被調用,用于找到可執(zhí)行的G實例。具體的實現(xiàn)在之前的GMP調度文章中有講解,這里只關心涉及到net_poll方面的源碼。

findRunnable方法定位在runtime/proc.go

func findRunnable()(gp *g, inheritTime, tryWakeP bool){
    // ..
    /*
        同時滿足下述三個條件,發(fā)起一次【非阻塞模式】的 netpoll 流程:
            - epoll事件表初始化過
            - 有 g 在等待io 就緒事件
            - 沒有空閑 p 在以【阻塞或超時】模式發(fā)起 netpoll 流程
    */
    if netpollinited()&& atomic.Load(&netpollWaiters)>0&& atomic.Load64(&sched.lastpoll)!=0{
        // 以非阻塞模式發(fā)起一輪 netpoll,如果有 g 需要喚醒,一一喚醒之,并返回首個 g 給上層進行調度
        if list := netpoll(0);!list.empty(){// non-blocking
            // 獲取就緒 g 隊列中的首個 g
            gp := list.pop()
            // 將就緒 g 隊列中其余 g 一一置為就緒態(tài),并添加到全局隊列
            injectglist(&list)
            // 把首個g 也置為就緒態(tài)
            casgstatus(gp,_Gwaiting,_Grunnable)
            // ...   
            //返回 g 給當前 p進行調度
            return gp,false,false
        }
    }
    // ...
    /*
        同時滿足下述三個條件,發(fā)起一次【阻塞或超時模式】的 netpoll 流程:
            - epoll事件表初始化過
            - 有 g 在等待io 就緒事件
            - 沒有空閑 p 在以【阻塞或超時】模式發(fā)起 netpoll 流程
    */
    if netpollinited()&&(atomic.Load(&netpollWaiters)>0|| pollUntil !=0)&& atomic.Xchg64(&sched.lastpoll,0)!=0{
    // 默認為阻塞模式  
        delay :=int64(-1)
        // 存在定時時間,則設為超時模式
        if pollUntil !=0{
            delay = pollUntil - now
        // ...   
        }
        // 以【阻塞或超時模式】發(fā)起一輪 netpoll
        list := netpoll(delay)// block until new work is available 
    }
    // ...    
}

其次,是位于同文件下的sysmon方法中,它會被一個全局監(jiān)控者G執(zhí)行,每隔10ms發(fā)一次非阻塞的net_poll流程。

// The main goroutine.
func main(){
// ...
// 新建一個 m,直接運行 sysmon 函數(shù)
    systemstack(func(){
        newm(sysmon,nil,-1)
    })
    // ...
}
// 全局唯一監(jiān)控線程的執(zhí)行函數(shù)
func sysmon(){
// ...
for{
// ...
/*
        同時滿足下述三個條件,發(fā)起一次【非阻塞模式】的 netpoll 流程:
            - epoll事件表初始化過
            - 沒有空閑 p 在以【阻塞或超時】模式發(fā)起 netpoll 流程
            - 距離上一次發(fā)起 netpoll 流程的時間間隔已超過 10 ms
    */
        lastpoll :=int64(atomic.Load64(&sched.lastpoll))
        if netpollinited()&& lastpoll !=0&& lastpoll+10*1000*1000< now {
            // 以非阻塞模式發(fā)起 netpoll
            list := netpoll(0)// non-blocking - returns list of goroutines
            // 獲取到的  g 置為就緒態(tài)并添加到全局隊列中
            if!list.empty(){
                // ...
                injectglist(&list)
                // ...
            }
        }
    // ...  
    }
}

最后,還會發(fā)生在GC流程中。

func pollWork() bool{
    // ...
    // 若全局隊列或 p 的本地隊列非空,則提前返回
    /*
        同時滿足下述三個條件,發(fā)起一次【非阻塞模式】的 netpoll 流程:
            - epoll事件表初始化過
            - 有 g 在等待io 就緒事件
            - 沒有空閑 p 在以【阻塞或超時】模式發(fā)起 netpoll 流程
    */
    if netpollinited()&& atomic.Load(&netpollWaiters)>0&& sched.lastpoll !=0{
    // 所有取得 g 更新為就緒態(tài)并添加到全局隊列
        if list := netpoll(0);!list.empty(){
            injectglist(&list)
            return true
        }
    }
    // ...
}

4、參考博文

感謝觀看,本篇博文參考了小徐先生的文章,非常推薦大家去觀看并且進入到源碼中學習,鏈接如下:

萬字解析 golang netpoll 底層原理

到此這篇關于Golang網(wǎng)絡模型netpoll源碼解析的文章就介紹到這了,更多相關Golang網(wǎng)絡模型netpoll內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!

您可能感興趣的文章:

相關文章

  • 手把手教你如何在Goland中創(chuàng)建和運行項目

    手把手教你如何在Goland中創(chuàng)建和運行項目

    歡迎來到本指南!我們將手把手地教您在Goland中如何創(chuàng)建、配置并運行項目,通過簡單的步驟,您將迅速上手這款強大的集成開發(fā)環(huán)境(IDE),輕松實現(xiàn)您的編程夢想,讓我們一起開啟這段精彩的旅程吧!
    2024-02-02
  • golang通過反射手動實現(xiàn)json序列化的方法

    golang通過反射手動實現(xiàn)json序列化的方法

    在 Go 語言中,JSON 序列化和反序列化通常通過標準庫 encoding/json 來實現(xiàn),本文給大家介紹golang  通過反射手動實現(xiàn)json序列化的方法,感興趣的朋友一起看看吧
    2024-12-12
  • Golang語言JSON解碼函數(shù)Unmarshal的使用

    Golang語言JSON解碼函數(shù)Unmarshal的使用

    本文主要介紹了Golang語言JSON解碼函數(shù)Unmarshal的使用,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-01-01
  • GOLang單元測試用法詳解

    GOLang單元測試用法詳解

    Go語言中自帶有一個輕量級的測試框架testing和自帶的go test命令來實現(xiàn)單元測試和性能測試。本文將通過示例詳細聊聊Go語言單元測試的原理與使用,需要的可以參考一下
    2022-12-12
  • golang實踐-第三方包為私有庫的配置方案

    golang實踐-第三方包為私有庫的配置方案

    這篇文章主要介紹了golang實踐-第三方包為私有庫的配置方案,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-05-05
  • Go 實現(xiàn)基于Token 的登錄流程深度分析

    Go 實現(xiàn)基于Token 的登錄流程深度分析

    Token 認證機制的核心思想是,服務端在用戶登錄時生成一個 Token,客戶端在后續(xù)的請求中攜帶這個 Token,服務端通過驗證 Token 的有效性來確認用戶的身份,本文將帶你深入探索基于 Token 的登錄流程,這是一種更為靈活且適用于現(xiàn)代應用架構的認證方式
    2024-03-03
  • go語言中[]*int和*[]int的具體使用

    go語言中[]*int和*[]int的具體使用

    本文主要介紹了go語言中[]*int和*[]int的具體使用,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-04-04
  • Go語言實現(xiàn)棧與隊列基本操作學家

    Go語言實現(xiàn)棧與隊列基本操作學家

    go語言中,并沒有棧與隊列相關的數(shù)據(jù)結構,但是我們可以借助切片來實現(xiàn)棧與隊列的操作;接下來我們一起實現(xiàn)棧與隊列基本操作,感興趣的可以了解一下
    2022-11-11
  • Go語言中字符串的查找方法小結

    Go語言中字符串的查找方法小結

    這篇文章主要介紹了Go語言中字符串的查找方法小結,示例的main函數(shù)都是導入strings包然后使用其中的方法,需要的朋友可以參考下
    2015-10-10
  • Go如何在HTTP請求中操作cookie教程詳解

    Go如何在HTTP請求中操作cookie教程詳解

    這篇文章主要為大家介紹了Go如何在HTTP請求中操作cookie教程詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2024-01-01

最新評論