談?wù)刧olang的netpoll原理解析
今天談?wù)刧olang源碼netpoll部分實(shí)現(xiàn)的細(xì)節(jié)和協(xié)程阻塞調(diào)度原理
epoll原理
epoll是linux環(huán)境下i/o多路復(fù)用的模型,結(jié)合下圖簡(jiǎn)單說(shuō)明epoll工作原理
上圖說(shuō)明了epoll生成描epoll表的基本流程,生成socket用來(lái)綁定和監(jiān)聽(tīng)新的連接,將該socket放入epoll內(nèi)核表,然后調(diào)用wait等待就緒事件。
當(dāng)epoll wait返回就緒事件時(shí),判斷是否是新的連接,如果是新的連接則將描述符加入epoll表,監(jiān)聽(tīng)讀寫事件。如果不是新的連接,說(shuō)明已建立的連接上有讀或?qū)懢途w事件,這樣我們根據(jù)EPOLLOUT或者EPOLLIN進(jìn)行寫或者讀操作,上圖是echo server的基本原理,實(shí)際生產(chǎn)中監(jiān)聽(tīng)EPOLLIN還是EPOLLOUT根據(jù)實(shí)際情況而定。以上是單線程下epoll工作原理。
golang 網(wǎng)絡(luò)層如何封裝的epoll
golang 網(wǎng)絡(luò)層封裝epoll核心文件在系統(tǒng)文件src/runtime/netpoll.go, 這個(gè)文件中調(diào)用了不同平臺(tái)封裝的多路復(fù)用api,linux環(huán)境下epoll封裝的文件在src/runtime/netpoll_epoll.go中,windows環(huán)境下多路復(fù)用模型實(shí)現(xiàn)在src/runtime/netpoll_windows.go。golang的思想意在將epoll操作放在runtime包里,而runtime是負(fù)責(zé)協(xié)程調(diào)度的功能模塊,程序啟動(dòng)后runtime運(yùn)行時(shí)是在單獨(dú)的線程里,個(gè)人認(rèn)為是MPG模型中M模型,epoll模型管理放在這個(gè)單獨(dú)M中調(diào)度,M其實(shí)是運(yùn)行在內(nèi)核態(tài)的,在這個(gè)內(nèi)核態(tài)線程不斷輪詢檢測(cè)就緒事件,將讀寫就緒事件拋出,從而觸發(fā)用戶態(tài)協(xié)程讀寫調(diào)度。而我們常用的read,write,accept等操作其實(shí)是在用戶態(tài)操作的,也就是MPG模型中的G,舉個(gè)例子當(dāng)read阻塞時(shí),將該協(xié)程掛起,當(dāng)epoll讀就緒事件觸發(fā)后查找阻塞的協(xié)程列表,將該協(xié)程激活,用戶態(tài)G激活后繼續(xù)讀,這樣在用戶態(tài)操作是阻塞的,在內(nèi)核態(tài)其實(shí)一直是輪詢的,這就是golang將epoll和協(xié)程調(diào)度結(jié)合的原理。
golang 如何實(shí)現(xiàn)協(xié)程和描述符綁定
golang 在internal/poll/fd_windows.go和internal/poll/fd_unix.go中實(shí)現(xiàn)了基本的描述符結(jié)構(gòu)
type netFD struct { pfd poll.FD // immutable until Close family int sotype int isConnected bool // handshake completed or use of association with peer net string laddr Addr raddr Addr }
netFD中pfd結(jié)構(gòu)如下
type FD struct { // Lock sysfd and serialize access to Read and Write methods. fdmu fdMutex // System file descriptor. Immutable until Close. Sysfd syscall.Handle // Read operation. rop operation // Write operation. wop operation // I/O poller. pd pollDesc // Used to implement pread/pwrite. l sync.Mutex // For console I/O. lastbits []byte // first few bytes of the last incomplete rune in last write readuint16 []uint16 // buffer to hold uint16s obtained with ReadConsole readbyte []byte // buffer to hold decoding of readuint16 from utf16 to utf8 readbyteOffset int // readbyte[readOffset:] is yet to be consumed with file.Read // Semaphore signaled when file is closed. csema uint32 skipSyncNotif bool // Whether this is a streaming descriptor, as opposed to a // packet-based descriptor like a UDP socket. IsStream bool // Whether a zero byte read indicates EOF. This is false for a // message based socket connection. ZeroReadIsEOF bool // Whether this is a file rather than a network socket. isFile bool // The kind of this file. kind fileKind }
FD是用戶態(tài)基本的描述符結(jié)構(gòu),內(nèi)部幾個(gè)變量通過(guò)注釋可以讀懂,挑幾個(gè)難理解的
fdmu 控制讀寫互斥訪問(wèn)的鎖,因?yàn)榭赡軒讉€(gè)協(xié)程并發(fā)讀寫
Sysfd 系統(tǒng)返回的描述符,不會(huì)更改除非系統(tǒng)關(guān)閉回收
rop 為讀操作,這個(gè)其實(shí)是根據(jù)不同系統(tǒng)網(wǎng)絡(luò)模型封裝的統(tǒng)一類型,比如epoll,iocp等都封裝為統(tǒng)一的operation,根據(jù)不同的系統(tǒng)調(diào)用不同的模型
wop 為寫操作封裝的類型
pd 這個(gè)是最重要的結(jié)構(gòu),內(nèi)部封裝了協(xié)程等基本信息,這個(gè)變量會(huì)和內(nèi)核epoll線程通信,從而實(shí)現(xiàn)epoll通知和控制用戶態(tài)協(xié)程的效果。
下面我們著重看看pollDesc結(jié)構(gòu)
type pollDesc struct { runtimeCtx uintptr }
pollDesc內(nèi)部存儲(chǔ)了一個(gè)unintptr的變量,uintptr為四字節(jié)大小的變量,可以存儲(chǔ)指針。runtimeCtx顧名思義,為運(yùn)行時(shí)上下文,其初始化代碼如下
func (pd *pollDesc) init(fd *FD) error { serverInit.Do(runtime_pollServerInit) ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd)) if errno != 0 { if ctx != 0 { runtime_pollUnblock(ctx) runtime_pollClose(ctx) } return errnoErr(syscall.Errno(errno)) } pd.runtimeCtx = ctx return nil }
runtime_pollOpen實(shí)際link的是runtime包下的poll_runtime_pollOpen函數(shù),具體實(shí)現(xiàn)在runtime/netpoll.go
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) { pd := pollcache.alloc() lock(&pd.lock) if pd.wg != 0 && pd.wg != pdReady { throw("runtime: blocked write on free polldesc") } if pd.rg != 0 && pd.rg != pdReady { throw("runtime: blocked read on free polldesc") } pd.fd = fd pd.closing = false pd.everr = false pd.rseq++ pd.rg = 0 pd.rd = 0 pd.wseq++ pd.wg = 0 pd.wd = 0 unlock(&pd.lock) var errno int32 errno = netpollopen(fd, pd) return pd, int(errno) }
可以看出通過(guò)pollcache.alloc返回*pollDesc類型的變量pd,并且用pd初始化了netpollopen,這里我們稍作停留,談?wù)刾ollcache
func (c *pollCache) alloc() *pollDesc { lock(&c.lock) if c.first == nil { const pdSize = unsafe.Sizeof(pollDesc{}) n := pollBlockSize / pdSize if n == 0 { n = 1 } // Must be in non-GC memory because can be referenced // only from epoll/kqueue internals. mem := persistentalloc(n*pdSize, 0, &memstats.other_sys) for i := uintptr(0); i < n; i++ { pd := (*pollDesc)(add(mem, i*pdSize)) pd.link = c.first c.first = pd } } pd := c.first c.first = pd.link unlock(&c.lock) return pd }
alloc函數(shù)做了這樣的操作,如果鏈表頭為空則初始化pdSize個(gè)pollDesc節(jié)點(diǎn),并pop出頭部,如果不為空則直接pop出頭部節(jié)點(diǎn),每個(gè)節(jié)點(diǎn)的類型就是*pollDesc類型,具體實(shí)現(xiàn)在runtime/netpoll.go中
type pollDesc struct { link *pollDesc // in pollcache, protected by pollcache.lock // The lock protects pollOpen, pollSetDeadline, pollUnblock and deadlineimpl operations. // This fully covers seq, rt and wt variables. fd is constant throughout the PollDesc lifetime. // pollReset, pollWait, pollWaitCanceled and runtime·netpollready (IO readiness notification) // proceed w/o taking the lock. So closing, everr, rg, rd, wg and wd are manipulated // in a lock-free way by all operations. // NOTE(dvyukov): the following code uses uintptr to store *g (rg/wg), // that will blow up when GC starts moving objects. lock mutex // protects the following fields fd uintptr closing bool everr bool // marks event scanning error happened user uint32 // user settable cookie rseq uintptr // protects from stale read timers rg uintptr // pdReady, pdWait, G waiting for read or nil rt timer // read deadline timer (set if rt.f != nil) rd int64 // read deadline wseq uintptr // protects from stale write timers wg uintptr // pdReady, pdWait, G waiting for write or nil wt timer // write deadline timer wd int64 // write deadline }
其中rt和wt分別是讀寫定時(shí)器,用來(lái)防止讀寫超時(shí)。
fd為描述符指針,lock負(fù)責(zé)保護(hù)pollDesc內(nèi)部成員變量讀寫防止多線程操作導(dǎo)致并發(fā)問(wèn)題。
除此之外最重要的是rg和wg兩個(gè)變量,rg保存了用戶態(tài)操作pollDesc的讀協(xié)程地址,wg保存了用戶態(tài)操作pollDesc寫協(xié)程地址。
舉個(gè)例子,當(dāng)我們?cè)谠谟脩魬B(tài)協(xié)程調(diào)用read阻塞時(shí)rg就被設(shè)置為該讀協(xié)程,當(dāng)內(nèi)核態(tài)epoll_wait檢測(cè)read就緒后就會(huì)通過(guò)rg找到這個(gè)協(xié)程讓后恢復(fù)運(yùn)行。
rg,wg默認(rèn)是0,rg為pdReady表示讀就緒,可以將協(xié)程恢復(fù),為pdWait表示讀阻塞,協(xié)程將要被掛起。wg也是如此。
所以golang其實(shí)是通過(guò)pollDesc實(shí)現(xiàn)用戶態(tài)和內(nèi)核態(tài)信息的共享的。
回到之前poll_runtime_pollOpen函數(shù),我們就理解了其內(nèi)部生成*pollDesc,并且傳入netpollopen函數(shù),netpollopen對(duì)應(yīng)實(shí)現(xiàn)了epoll的init和wait,從而達(dá)到了用戶態(tài)信息和內(nèi)核態(tài)的關(guān)聯(lián)。
netpollopen函數(shù)不同模型的實(shí)現(xiàn)不相同,epoll的實(shí)現(xiàn)在runtime/netpoll_epoll.go中
func netpollopen(fd uintptr, pd *pollDesc) int32 { var ev epollevent ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev) }
從而實(shí)現(xiàn)了epoll將fd添加至內(nèi)核epoll表里,同樣pd作為event的data傳入內(nèi)核表,從而實(shí)現(xiàn)內(nèi)核態(tài)和用戶態(tài)協(xié)程的關(guān)聯(lián)。
runtime/netpoll_epoll.go實(shí)現(xiàn)了epoll模型的基本操作,詳見(jiàn)源碼。
golang如何將一個(gè)描述符加入epoll表中
傳統(tǒng)的流程為:
生成socket–> bind socket–> listen–> accept
在golang中生成socket,bind,以及l(fā)isten統(tǒng)一封裝好了
Listen–> lc.Listen –> sl.listenTCP –> internetSocket
internetSocket –> socket –> newFD && listenStream
在newFD中完成了描述符創(chuàng)建,在listenStream完成了bind和listen。newFD只初始化了基本的結(jié)構(gòu),未完成pollDesc類型變量pd的初始化。
我們跟隨源碼查看listen的綁定流程
unc (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) { addrs, err := DefaultResolver.resolveAddrList(ctx, "listen", network, address, nil) if err != nil { return nil, &OpError{Op: "listen", Net: network, Source: nil, Addr: nil, Err: err} } sl := &sysListener{ ListenConfig: *lc, network: network, address: address, } var l Listener la := addrs.first(isIPv4) switch la := la.(type) { case *TCPAddr: l, err = sl.listenTCP(ctx, la) case *UnixAddr: l, err = sl.listenUnix(ctx, la) default: return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: &AddrError{Err: "unexpected address type", Addr: address}} } if err != nil { return nil, &OpError{Op: "listen", Net: sl.network, Source: nil, Addr: la, Err: err} // l is non-nil interface containing nil pointer } return l, nil }
可以看出Listen函數(shù)返回的類型為L(zhǎng)istener接口類型,其內(nèi)部根據(jù)la類型調(diào)用不同的listen函數(shù),這里查看listenTCP
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) { fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control) if err != nil { return nil, err } return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil }
internetSocket內(nèi)部調(diào)用socket生成描述符返回
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) { s, err := sysSocket(family, sotype, proto) if err != nil { return nil, err } if err = setDefaultSockopts(s, family, sotype, ipv6only); err != nil { poll.CloseFunc(s) return nil, err } if fd, err = newFD(s, family, sotype, net); err != nil { poll.CloseFunc(s) return nil, err } if laddr != nil && raddr == nil { switch sotype { case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET: if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil { fd.Close() return nil, err } return fd, nil case syscall.SOCK_DGRAM: if err := fd.listenDatagram(laddr, ctrlFn); err != nil { fd.Close() return nil, err } return fd, nil } } if err := fd.dial(ctx, laddr, raddr, ctrlFn); err != nil { fd.Close() return nil, err } return fd, nil }
socket函數(shù)做了這樣幾件事
1 調(diào)用sysSocket生成描述符
2 調(diào)用newFD封裝描述符,構(gòu)造netFD類型變量
3 調(diào)用netFD的listenDatagram方法,實(shí)現(xiàn)bind和listen
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error { var err error if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil { return err } var lsa syscall.Sockaddr if lsa, err = laddr.sockaddr(fd.family); err != nil { return err } if ctrlFn != nil { c, err := newRawConn(fd) if err != nil { return err } if err := ctrlFn(fd.ctrlNetwork(), laddr.String(), c); err != nil { return err } } if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil { return os.NewSyscallError("bind", err) } if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil { return os.NewSyscallError("listen", err) } if err = fd.init(); err != nil { return err } lsa, _ = syscall.Getsockname(fd.pfd.Sysfd) fd.setAddr(fd.addrFunc()(lsa), nil) return nil }
listenStream除了bind和listen操作之外,還執(zhí)行了netFD的init操作,這個(gè)init操作就是將netFD和epoll關(guān)聯(lián),將描述符和協(xié)程信息寫入epoll表
func (fd *netFD) init() error { errcall, err := fd.pfd.Init(fd.net, true) if errcall != "" { err = wrapSyscallError(errcall, err) } return err }
前文講過(guò)fd.pfd為FD類型,是和epoll通信的核心結(jié)構(gòu),F(xiàn)D的Init方法內(nèi)完成了pollDesc類型成員變量pd和epoll的關(guān)聯(lián)。
其內(nèi)部調(diào)用了fd.pd.init(fd),pd就是fd的pollDesc類型成員變量,其init函數(shù)上面已經(jīng)解釋過(guò)了調(diào)用了runtime_pollOpen,runtime_pollOpen是link到
runtime/netpoll.go中poll_runtime_pollOpen函數(shù),這個(gè)函數(shù)將用戶態(tài)協(xié)程的pollDesc信息寫入到epoll所在的單獨(dú)線程,從而實(shí)現(xiàn)用戶態(tài)和內(nèi)核態(tài)的關(guān)聯(lián)。
總結(jié)下bind和listen后續(xù)的消息流程就是:
listenStream –> bind&listen&init –> pollDesc.Init –> runtime_pollOpen
–> poll_runtime_pollOpen –> epollctl(EPOLL_CTL_ADD)
到此為止golang網(wǎng)絡(luò)描述符從生成到綁定和監(jiān)聽(tīng),以及寫入epoll表的流程分析完畢,下一篇分析accept流程以及用戶態(tài)協(xié)程如何掛起,epoll就緒后如何喚醒協(xié)程。
到此這篇關(guān)于談?wù)刧olang的netpoll原理解析的文章就介紹到這了,更多相關(guān)golang的netpoll原理內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
beego獲取ajax數(shù)據(jù)的實(shí)例
下面小編就為大家分享一篇beego獲取ajax數(shù)據(jù)的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2017-12-12golang?使用chromedp獲取頁(yè)面請(qǐng)求日志network
這篇文章主要為大家介紹了golang?使用chromedp獲取頁(yè)面請(qǐng)求日志network方法實(shí)例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-11-11Golang中的自定義類型之間的轉(zhuǎn)換的實(shí)現(xiàn)(type conversion)
這篇文章主要介紹了Golang中的自定義類型之間的轉(zhuǎn)換的實(shí)現(xiàn)(type conversion),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-02-02基于Golang實(shí)現(xiàn)YOLO目標(biāo)檢測(cè)算法
目標(biāo)檢測(cè)是計(jì)算機(jī)視覺(jué)領(lǐng)域的重要任務(wù),它不僅可以識(shí)別圖像中的物體,還可以標(biāo)記出物體的位置和邊界框,YOLO是一種先進(jìn)的目標(biāo)檢測(cè)算法,以其高精度和實(shí)時(shí)性而聞名,本文將介紹如何使用Golang實(shí)現(xiàn)YOLO目標(biāo)檢測(cè)算法,文中有相關(guān)的代碼示例供大家參考,需要的朋友可以參考下2023-11-11go sync Once實(shí)現(xiàn)原理示例解析
這篇文章主要為大家介紹了go sync Once實(shí)現(xiàn)原理示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-01-01Go語(yǔ)言實(shí)戰(zhàn)之詳細(xì)掌握正則表達(dá)式的應(yīng)用與技巧
正則表達(dá)式是一種從左到右與主題字符串匹配的模式,正則表達(dá)式用于替換字符串中的文本,驗(yàn)證表單,基于模式匹配從字符串中提取子字符串等等,這篇文章主要給大家介紹了關(guān)于Go語(yǔ)言實(shí)戰(zhàn)之詳細(xì)掌握正則表達(dá)式的應(yīng)用與技巧,需要的朋友可以參考下2023-12-12Go語(yǔ)言學(xué)習(xí)技巧之如何合理使用Pool
這篇文章主要給大家介紹了關(guān)于Go語(yǔ)言學(xué)習(xí)技巧之如何合理使用Pool的相關(guān)資料,Pool用于存儲(chǔ)那些被分配了但是沒(méi)有被使用,而未來(lái)可能會(huì)使用的值,以減小垃圾回收的壓力。文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考下。2017-12-12