golang將多路復(fù)異步io轉(zhuǎn)成阻塞io的方法詳解
前言
本文主要給大家介紹了關(guān)于golang 如何將多路復(fù)異步io轉(zhuǎn)變成阻塞io的相關(guān)內(nèi)容,分享出來供大家參考學(xué)習(xí),下面話不多說了,來一起看看詳細(xì)的介紹:
package main import ( "net" ) func handleConnection(c net.Conn) { //讀寫數(shù)據(jù) buffer := make([]byte, 1024) c.Read(buffer) c.Write([]byte("Hello from server")) } func main() { l, err := net.Listen("tcp", "host:port") if err != nil { return } defer l.Close() for { c, err := l.Accept() if err!= nil { return } go handleConnection(c) } }
對于我們都會寫上面的代碼,很簡單,的確golang的網(wǎng)絡(luò)部分對于我們隱藏了太多東西,我們不用像c++一樣去調(diào)用底層的socket函數(shù),也不用去使用epoll等復(fù)雜的io多路復(fù)用相關(guān)的邏輯,但是上面的代碼真的就像我們看起來的那樣在調(diào)用accept和read時阻塞嗎?
// Multiple goroutines may invoke methods on a Conn simultaneously. //官方注釋:多個goroutines可能同時調(diào)用方法在一個連接上,我的理解就是所謂的驚群效應(yīng)吧 //換句話說就是你多個goroutines監(jiān)聽同一個連接同一個事件,所有的goroutines都會觸發(fā), //這只是我的猜測,有待驗(yàn)證。 type Conn interface { Read(b []byte) (n int, err error) Write(b []byte) (n int, err error) Close() error LocalAddr() Addr RemoteAddr() Addr SetDeadline(t time.Time) error SetReadDeadline(t time.Time) error SetWriteDeadline(t time.Time) error } type conn struct { fd *netFD }
這里面又一個Conn接口,下面conn實(shí)現(xiàn)了這個接口,里面只有一個成員netFD.
// Network file descriptor. type netFD struct { // locking/lifetime of sysfd + serialize access to Read and Write methods fdmu fdMutex // immutable until Close sysfd int family int sotype int isConnected bool net string laddr Addr raddr Addr // wait server pd pollDesc } func (fd *netFD) accept() (netfd *netFD, err error) { //................ for { s, rsa, err = accept(fd.sysfd) if err != nil { nerr, ok := err.(*os.SyscallError) if !ok { return nil, err } switch nerr.Err { /* 如果錯誤是EAGAIN說明Socket的緩沖區(qū)為空,未讀取到任何數(shù)據(jù) 則調(diào)用fd.pd.WaitRead,*/ case syscall.EAGAIN: if err = fd.pd.waitRead(); err == nil { continue } case syscall.ECONNABORTED: continue } return nil, err } break } //......... //代碼過長不再列出,感興趣看go的源碼,runtime 下的fd_unix.go return netfd, nil }
上面代碼段是accept部分,這里我們注意當(dāng)accept有錯誤發(fā)生的時候,會檢查這個錯誤是否是syscall.EAGAIN
,如果是,則調(diào)用WaitRead將當(dāng)前讀這個fd的goroutine在此等待,直到這個fd上的讀事件再次發(fā)生為止。當(dāng)這個socket上有新數(shù)據(jù)到來的時候,WaitRead調(diào)用返回,繼續(xù)for循環(huán)的執(zhí)行,這樣以來就讓調(diào)用netFD的Read的地方變成了同步“阻塞”。有興趣的可以看netFD的讀和寫方法,都有同樣的實(shí)現(xiàn)。
到這里所有的疑問都集中到了pollDesc上,它到底是什么呢?
const ( pdReady uintptr = 1 pdWait uintptr = 2 ) // Network poller descriptor. type pollDesc struct { link *pollDesc // in pollcache, protected by pollcache.lock lock mutex // protects the following fields fd uintptr closing bool seq uintptr // protects from stale timers and ready notifications rg uintptr // pdReady, pdWait, G waiting for read or nil rt timer // read deadline timer (set if rt.f != nil) rd int64 // read deadline wg uintptr // pdReady, pdWait, G waiting for write or nil wt timer // write deadline timer wd int64 // write deadline user uint32 // user settable cookie } type pollCache struct { lock mutex first *pollDesc }
pollDesc網(wǎng)絡(luò)輪詢器是Golang中針對每個socket文件描述符建立的輪詢機(jī)制。 此處的輪詢并不是一般意義上的輪詢,而是Golang的runtime在調(diào)度goroutine或者GC完成之后或者指定時間之內(nèi),調(diào)用epoll_wait獲取所有產(chǎn)生IO事件的socket文件描述符。當(dāng)然在runtime輪詢之前,需要將socket文件描述符和當(dāng)前goroutine的相關(guān)信息加入epoll維護(hù)的數(shù)據(jù)結(jié)構(gòu)中,并掛起當(dāng)前goroutine,當(dāng)IO就緒后,通過epoll返回的文件描述符和其中附帶的goroutine的信息,重新恢復(fù)當(dāng)前goroutine的執(zhí)行。這里我們可以看到pollDesc中有兩個變量wg和rg,其實(shí)我們可以把它們看作信號量,這兩個變量有幾種不同的狀態(tài):
- pdReady:io就緒
- pdWait:當(dāng)前的goroutine正在準(zhǔn)備掛起在信號量上,但是還沒有掛起。
- G pointer:當(dāng)我們把它改為指向當(dāng)前goroutine的指針時,當(dāng)前goroutine掛起
繼續(xù)接著上面的WaitRead調(diào)用說起,go在這里到底做了什么讓當(dāng)前的goroutine掛起了呢。
func net_runtime_pollWait(pd *pollDesc, mode int) int { err := netpollcheckerr(pd, int32(mode)) if err != 0 { return err } // As for now only Solaris uses level-triggered IO. if GOOS == "solaris" { netpollarm(pd, mode) } for !netpollblock(pd, int32(mode), false) { err = netpollcheckerr(pd, int32(mode)) if err != 0 { return err } // Can happen if timeout has fired and unblocked us, // but before we had a chance to run, timeout has been reset. // Pretend it has not happened and retry. } return 0 } // returns true if IO is ready, or false if timedout or closed // waitio - wait only for completed IO, ignore errors func netpollblock(pd *pollDesc, mode int32, waitio bool) bool { //根據(jù)讀寫模式獲取相應(yīng)的pollDesc中的讀寫信號量 gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } for { old := *gpp //已經(jīng)準(zhǔn)備好直接返回true if old == pdReady { *gpp = 0 return true } if old != 0 { throw("netpollblock: double wait") } //設(shè)置gpp pdWait if atomic.Casuintptr(gpp, 0, pdWait) { break } } if waitio || netpollcheckerr(pd, mode) == 0 { gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5) } old := atomic.Xchguintptr(gpp, 0) if old > pdWait { throw("netpollblock: corrupted state") } return old == pdReady }
當(dāng)調(diào)用WaitRead時經(jīng)過一段匯編最重調(diào)用了上面的net_runtime_pollWait函數(shù),該函數(shù)循環(huán)調(diào)用了netpollblock函數(shù),返回true表示io已準(zhǔn)備好,返回false表示錯誤或者超時,在netpollblock中調(diào)用了gopark函數(shù),gopark函數(shù)調(diào)用了mcall的函數(shù),該函數(shù)用匯編來實(shí)現(xiàn),具體功能就是把當(dāng)前的goroutine掛起,然后去執(zhí)行其他可執(zhí)行的goroutine。到這里整個goroutine掛起的過程已經(jīng)結(jié)束,那當(dāng)goroutine可讀的時候是如何通知該goroutine呢,這就是epoll的功勞了。
func netpoll(block bool) *g { if epfd == -1 { return nil } waitms := int32(-1) if !block { waitms = 0 } var events [128]epollevent retry: //每次最多監(jiān)聽128個事件 n := epollwait(epfd, &events[0], int32(len(events)), waitms) if n < 0 { if n != -_EINTR { println("runtime: epollwait on fd", epfd, "failed with", -n) throw("epollwait failed") } goto retry } var gp guintptr for i := int32(0); i < n; i++ { ev := &events[i] if ev.events == 0 { continue } var mode int32 //讀事件 if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'r' } //寫事件 if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 { mode += 'w' } if mode != 0 { //把epoll中的data轉(zhuǎn)換成pollDesc pd := *(**pollDesc)(unsafe.Pointer(&ev.data)) netpollready(&gp, pd, mode) } } if block && gp == 0 { goto retry } return gp.ptr() }
這里就是熟悉的代碼了,epoll的使用,看起來親民多了。pd:=*(**pollDesc)(unsafe.Pointer(&ev.data))
這是最關(guān)鍵的一句,我們在這里拿到當(dāng)前可讀時間的pollDesc,上面我們已經(jīng)說了,當(dāng)pollDesc的讀寫信號量保存為G pointer時當(dāng)前goroutine就會掛起。而在這里我們調(diào)用了netpollready函數(shù),函數(shù)中把相應(yīng)的讀寫信號量G指針擦出,置為pdReady,G-pointer狀態(tài)被抹去,當(dāng)前goroutine的G指針就放到可運(yùn)行隊列中,這樣goroutine就被喚醒了。
可以看到雖然我們在寫tcp server看似一個阻塞的網(wǎng)絡(luò)模型,在其底層實(shí)際上是基于異步多路復(fù)用的機(jī)制來實(shí)現(xiàn)的,只是把它封裝成了跟阻塞io相似的開發(fā)模式,這樣是使得我們不用去關(guān)注異步io,多路復(fù)用等這些復(fù)雜的概念以及混亂的回調(diào)函數(shù)。
總結(jié)
以上就是這篇文章的全部內(nèi)容了,希望本文的內(nèi)容對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,如果有疑問大家可以留言交流,謝謝大家對腳本之家的支持。
相關(guān)文章
Golang中make與new使用區(qū)別小結(jié)
Go語言中new和make是內(nèi)建的兩個函數(shù),主要用來創(chuàng)建分配類型內(nèi)存,本文主要給大家介紹了Go語言中函數(shù)new與make的使用和區(qū)別,具有一定的參考價值,感興趣的可以了解一下2024-01-01go?gin?正確讀取http?response?body內(nèi)容并多次使用詳解
這篇文章主要為大家介紹了go?gin?正確讀取http?response?body內(nèi)容并多次使用解決思路,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-01-01go?sync包中的互斥鎖Mutex和等待組WaitGroup使用詳解
這篇文章主要為大家介紹了go?sync包中的互斥鎖Mutex和等待組WaitGroup使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08go中利用reflect實(shí)現(xiàn)json序列化的示例代碼
和Java語言一樣,Go也實(shí)現(xiàn)運(yùn)行時反射,這為我們提供一種可以在運(yùn)行時操作任意類型對象的能力,本文給大家介紹了在go中如何利用reflect實(shí)現(xiàn)json序列化,需要的朋友可以參考下2024-03-03