如何解析golang中Context在HTTP服務(wù)中的角色
問題背景
在go語言的http服務(wù)中,我們常常會(huì)使用到Context來取消一個(gè)請求,或者取消數(shù)據(jù)的讀取。偶然的一次嘗試,讓我對Context有了一定的興趣。
接下來本文圍繞下面的例子,分析http如何利用Context來控制請求的取消和影響數(shù)據(jù)讀取。
例子
我們開啟一個(gè)http服務(wù),發(fā)送大量數(shù)據(jù)給每個(gè)請求,代碼如下:
srv.go:http服務(wù)
package main import ( "fmt" "net/http" ) func hello(w http.ResponseWriter, r *http.Request) { for i := 0; i < 100*10000; i++ { w.Write([]byte("hello world")) } } func main() { fmt.Println("listening 8888:") http.HandleFunc("/hello", hello) _ = http.ListenAndServe(":8888", nil) }
client.go: 發(fā)送請求的客戶端
package main import ( "context" "fmt" "io" "log" "net/http" "time" ) func main() { client := http.Client{} request, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:8888/hello", nil) ctx, cancelFunc := context.WithCancel(request.Context()) request = request.WithContext(ctx) if err != nil { return } response, err := client.Do(request) if err != nil { log.Fatal(err) } cache := make([]byte, 128) timer := time.NewTimer(time.Millisecond) go func() { select { case <-timer.C: cancelFunc() } }() for { read, err := response.Body.Read(cache) if err == nil { fmt.Println(string(cache[:read])) continue } if err == io.EOF { fmt.Println(string(cache[:read])) break } log.Fatal(err) } }
代碼很簡單,就不做注釋啦。分別啟動(dòng)服務(wù)和client,我們將得到如下結(jié)果:
我們看到這句話Process finished with the exit code 1,程序非正常退出,那么首先是追蹤這個(gè)錯(cuò)誤,下面我們追蹤這個(gè)錯(cuò)誤。
錯(cuò)誤追蹤
首先清楚這個(gè)“context canceled” 是客戶端打印出來的:
log.Fatal(err) // 這個(gè)錯(cuò)誤來源于讀取Response中的數(shù)據(jù)時(shí)得到錯(cuò)誤,而且這個(gè)錯(cuò)誤非io.EOF錯(cuò)誤
斷點(diǎn)入口:
read, err := response.Body.Read(cache)
我們會(huì)進(jìn)入transport.go文件中:
func (es *bodyEOFSignal) Read(p []byte) (n int, err error) { // 這里表明我們讀取的body是bodyEOFSignal類型 es.mu.Lock() closed, rerr := es.closed, es.rerr es.mu.Unlock() if closed { return 0, errReadOnClosedResBody } if rerr != nil { return 0, rerr } n, err = es.body.Read(p)// 我們在這里讀到了錯(cuò)誤,這里是什么錯(cuò)誤,在后面將會(huì)介紹 if err != nil { es.mu.Lock() defer es.mu.Unlock() if es.rerr == nil { es.rerr = err } err = es.condfn(err) // 通過這個(gè)方法對錯(cuò)誤進(jìn)行判別,得到上層傳下來的錯(cuò)誤信息 } return }
然后我們繼續(xù)進(jìn)入到bodyEOFSignal的condfn(error)函數(shù)中:
func (es *bodyEOFSignal) condfn(err error) error { if es.fn == nil { return err //1 } err = es.fn(err) // 如果fn不為空,這里會(huì)繼續(xù)到bodyEOFSignal去得到上層的錯(cuò)誤信息;fn為空,顯然錯(cuò)誤和上層就沒有關(guān)系,就在上面1處就返回了。除此,因?yàn)閏lient從這個(gè)body讀的數(shù)據(jù),這里的錯(cuò)誤是通過fn從上層獲取。 es.fn = nil return err }
那我們繼續(xù)到es.fn(err)中一探究竟:
body := &bodyEOFSignal{ body: resp.Body, earlyCloseFn: func() error { waitForBodyRead <- false <-eofc // will be closed by deferred call at the end of the function return nil }, fn: func(err error) error {// 就到了這里,這一段代碼源自transport.go中的封裝內(nèi)部類persistConn的方法readLoop,顧名思義:循環(huán)讀取 // 這里會(huì)簡單的皮判斷錯(cuò)誤是不是io.EOF,然后作進(jìn)一步處理 isEOF := err == io.EOF waitForBodyRead <- isEOF if isEOF { <-eofc // see comment above eofc declaration } else if err != nil { if cerr := pc.canceled(); cerr != nil {// 繼續(xù)調(diào)試我們就到了這里,顯然不是io.EOF錯(cuò)誤 return cerr // 返回的是pc.canceled() } } return err }, }
繼續(xù)到pc.canceled()中:
func (pc *persistConn) canceled() error { pc.mu.Lock() defer pc.mu.Unlock() return pc.canceledErr // 返回的這個(gè)錯(cuò)誤,那么下一步便需要知道這個(gè)canceledErr是什么?如何被賦值? }
1. 是什么?
canceledErr error // set non-nil if conn is canceled //是一種錯(cuò)誤,且如果非空,則連接被取消,那么這個(gè)錯(cuò)誤是一個(gè)連接狀態(tài)的標(biāo)志或者連接斷開的原因
2. 如何被賦值?
根據(jù)canceledErr,我們找被賦值的函數(shù)如下:
func (pc *persistConn) cancelRequest(err error) { pc.mu.Lock() defer pc.mu.Unlock() pc.canceledErr = err // 在這里被賦值 pc.closeLocked(errRequestCanceled) }
錯(cuò)誤追蹤先到這里。接下來我們換一個(gè)角度,我們從Context的角度來看。
Context
這里就不講context了,有興趣的伙伴去官網(wǎng)獲取吧?。。』氐娇蛻舳舜a,給request傳入了一個(gè)WithCancel context,看看這個(gè)函數(shù)做了什么:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) { if parent == nil { panic("cannot create context from nil parent") } c := newCancelCtx(parent) // 包裝父類Context propagateCancel(parent, &c) return &c, func() { c.cancel(true, Canceled) // 返回一個(gè)取消函數(shù) } }
進(jìn)入到c.cancel(),會(huì)發(fā)現(xiàn)Canceled作為一個(gè)錯(cuò)誤類型,定義如下:
// Canceled is the error returned by Context.Err when the context is canceled. var Canceled = errors.New("context canceled")// 這個(gè)不是客戶端打印的嗎?是不是很激動(dòng),找到了錯(cuò)誤信息的祖宗 ... //而cancel函數(shù)定義如下: // cancel closes c.done, cancels each of c's children, and, if // removeFromParent is true, removes c from its parent's children. func (c *cancelCtx) cancel(removeFromParent bool, err error) { ... c.err = err //這里做了一個(gè)賦值,即把這個(gè)錯(cuò)誤傳給cancelCtx了,它是Context的一個(gè)內(nèi)部類 ... // 做一些子context的通知以及錯(cuò)誤的傳遞,說取消了,不用干了 }
context先到這里,在context里找到了錯(cuò)誤信息的來源,接下來看看錯(cuò)誤是如何傳給前面我們談到的canceledErr。
似乎還有一個(gè)入口沒有看,就是http.client.Do的方法:
我們打斷點(diǎn)進(jìn)入到RoundTrip方法的調(diào)用入口,看看下面是如何感知context被取消:
resp, err = rt.RoundTrip(req) //這個(gè)在send()方法內(nèi)部調(diào)用 ... // send issues an HTTP request. // Caller should close resp.Body when done reading from it. func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) { ... resp, err = rt.RoundTrip(req) ... }
然后跟著RoundTrip(…), 進(jìn)入到:
func (t *Transport) roundTrip(req *Request) (*Response, error) { ... var resp *Response if pconn.alt != nil { // HTTP/2 path. t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest resp, err = pconn.alt.RoundTrip(req) } else { resp, err = pconn.roundTrip(treq) // 繼續(xù)可到這里,我們看看這個(gè)pconn,剛好就是前面提到的persistConn,它里面包含了canceledErr,似乎我們離真相更近了 } }
進(jìn)入到persistConn的實(shí)現(xiàn)方法roundTrip(),我們看看這個(gè)for循環(huán):
var respHeaderTimer <-chan time.Time cancelChan := req.Request.Cancel ctxDoneChan := req.Context().Done() //這個(gè)request是setRequestCancel(req *Request, rt RoundTripper, deadline time.Time)中重新定義的request,里實(shí)現(xiàn)了超時(shí)取消的機(jī)制,這里的監(jiān)聽便是超時(shí)的監(jiān)聽,并不是我們?nèi)∠谋O(jiān)聽 pcClosed := pc.closech canceled := false for { testHookWaitResLoop() select { // select開啟對channel的輪詢 case err := <-writeErrCh: if debugRoundTrip { req.logf("writeErrCh resv: %T/%#v", err, err) } if err != nil { pc.close(fmt.Errorf("write error: %v", err)) return nil, pc.mapRoundTripError(req, startBytesWritten, err) } if d := pc.t.ResponseHeaderTimeout; d > 0 { if debugRoundTrip { req.logf("starting timer for %v", d) } timer := time.NewTimer(d) defer timer.Stop() // prevent leaks respHeaderTimer = timer.C } case <-pcClosed: pcClosed = nil if canceled || pc.t.replaceReqCanceler(req.cancelKey, nil) { if debugRoundTrip { req.logf("closech recv: %T %#v", pc.closed, pc.closed) } return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed) } case <-respHeaderTimer: if debugRoundTrip { req.logf("timeout waiting for response headers.") } pc.close(errTimeout) return nil, errTimeout case re := <-resc: if (re.res == nil) == (re.err == nil) { panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil)) } if debugRoundTrip { req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err) } if re.err != nil { return nil, pc.mapRoundTripError(req, startBytesWritten, re.err) } return re.res, nil case <-cancelChan: canceled = pc.t.cancelRequest(req.cancelKey, errRequestCanceled) cancelChan = nil case <-ctxDoneChan: canceled = pc.t.cancelRequest(req.cancelKey, req.Context().Err()) cancelChan = nil ctxDoneChan = nil } }
因而這里的監(jiān)聽不是在客戶端取消的context的監(jiān)聽,根據(jù)客戶端的輸出顯示,表明請求已經(jīng)發(fā)送到服務(wù)端,請求并未超時(shí),response也返回了,那么這里的函數(shù)監(jiān)聽是與我們讀取數(shù)據(jù)沒有聯(lián)系。
小編最開始也以為是在這里監(jiān)聽返回,然而這里打斷點(diǎn),怎么進(jìn)不來。
在前面提到,連接是類型為persistConn,其次是讀取數(shù)據(jù)過程中,context的取消會(huì)產(chǎn)生影響,那么表明錯(cuò)誤發(fā)生在tcp連接中的讀取數(shù)據(jù)。
接下來,根據(jù)連接建立過程,看看http做了什么?其次是真正的數(shù)據(jù)讀取來自哪里?
pconn, err := t.getConn(treq, cm) ... func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) { req := treq.Request trace := treq.trace ctx := req.Context() //這里去了request的context w := &wantConn{ cm: cm, key: cm.key(), ctx: ctx, //傳給w ready: make(chan struct{}, 1), beforeDial: testHookPrePendingDial, afterDial: testHookPostPendingDial, } ... select{ case <-w.ready: if w.err != nil { // If the request has been canceled, that's probably // what caused w.err; if so, prefer to return the // cancellation error (see golang.org/issue/16049). //如果建立連接前,請求被取消,這里會(huì)監(jiān)聽到取消的err select { case <-req.Cancel: return nil, errRequestCanceledConn case <-req.Context().Done(): return nil, req.Context().Err() case err := <-cancelc: if err == errRequestCanceled { err = errRequestCanceledConn } return nil, err default: // return below } } return w.pc, w.err//這里返回的是persistConn ...
通過這個(gè)w建立連接,進(jìn)入到dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error)。 在這里面開啟了一個(gè)協(xié)程pconn.readLoop(),讀取連接里面的數(shù)據(jù)。
(t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) { ... go pconn.readLoop() }
因?yàn)殄e(cuò)誤與數(shù)據(jù)讀取有直接聯(lián)系,至少錯(cuò)誤發(fā)生readloop中的某一個(gè)地方:
for alive { ... var resp *Response if err == nil { resp, err = pc.readResponse(rc, trace) // 得到response } else { err = transportReadFromServerError{err} closeErr = err } ... waitForBodyRead := make(chan bool, 2) body := &bodyEOFSignal{ //對上面讀取的resp.Body進(jìn)行封裝,這里封裝主要是傳遞請求取消的錯(cuò)誤 body: resp.Body, earlyCloseFn: func() error { waitForBodyRead <- false <-eofc // will be closed by deferred call at the end of the function return nil }, fn: func(err error) error {// isEOF := err == io.EOF waitForBodyRead <- isEOF if isEOF { <-eofc // see comment above eofc declaration } else if err != nil { if cerr := pc.canceled(); cerr != nil { return cerr } } return err }, } resp.Body = body ... // Before looping back to the top of this function and peeking on // the bufio.Reader, wait for the caller goroutine to finish // reading the response body. (or for cancellation or death) // 這里有開啟監(jiān)聽,顯然是監(jiān)聽讀的過程中發(fā)生的取消和超時(shí)等 select { case bodyEOF := <-waitForBodyRead: replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil) // before pc might return to idle pool alive = alive && bodyEOF && !pc.sawEOF && pc.wroteRequest() && replaced && tryPutIdleConn(trace) if bodyEOF { eofc <- struct{}{} } case <-rc.req.Cancel: alive = false pc.t.CancelRequest(rc.req) case <-rc.req.Context().Done(): //這里便監(jiān)聽了客戶頓context的取消 alive = false //結(jié)束循環(huán) pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())//傳遞err case <-pc.closech: alive = false } testHookReadLoopBeforeNextRead() }
熟悉context的便知道,當(dāng)我們調(diào)用context的cancel方法時(shí),在前面的context的cancel()方法中有如下代碼:
d, _ := c.done.Load().(chan struct{}) // 拿到Done方法的返回值channel if d == nil { c.done.Store(closedchan) } else { close(d)// 關(guān)閉channel,而關(guān)閉時(shí)會(huì)向channel寫入值 }
再回到:
ccase <-rc.req.Context().Done():// 當(dāng)contex取消,便進(jìn)入這個(gè)代碼塊 alive = false pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())
進(jìn)入到cancelRequest(…)的rc.req.Context().Err()
func (c *cancelCtx) Err() error { c.mu.Lock() err := c.err//這里似曾相識(shí),前面我們說到context調(diào)用取消函數(shù)時(shí),會(huì)給c.err賦值為cancelErr c.mu.Unlock() return err }
因而傳入cancelRequest的err便是cancelErr,我們進(jìn)入cancelRequest:
func (t *Transport) cancelRequest(key cancelKey, err error) bool { // This function must not return until the cancel func has completed. // See: https://golang.org/issue/34658 t.reqMu.Lock() defer t.reqMu.Unlock() cancel := t.reqCanceler[key]// 這里的key正是我們傳入的請求的cancelkey,拿到reqCanceler中的func(error) delete(t.reqCanceler, key) if cancel != nil { cancel(err) // 進(jìn)入cancel } return cancel != nil }
進(jìn)入cancel(err):
func (pc *persistConn) cancelRequest(err error) {//這個(gè)函數(shù)不正是我們前面追蹤錯(cuò)誤所看見的,這也表明我們追蹤是正確的 pc.mu.Lock() defer pc.mu.Unlock() pc.canceledErr = err pc.closeLocked(errRequestCanceled) }
到這里我們的err就傳給了body bodyEOFSignal,整個(gè)錯(cuò)誤傳遞流程便走通了。
還剩最后一個(gè)問題,bodyEOFSignal的read函數(shù)中n, err = es.body.Read§ 所遇到的錯(cuò)誤是什么?
n, err = es.body.Read(p)// 調(diào)試發(fā)現(xiàn)是網(wǎng)絡(luò)連接關(guān)閉錯(cuò)誤,這里表明我們執(zhí)行完err的傳遞根本原因在于連接被關(guān)閉 if err != nil { es.mu.Lock() defer es.mu.Unlock() if es.rerr == nil { es.rerr = err } err = es.condfn(err) } return
那么關(guān)閉連接又是在哪里呢?
我們回到cancelRequest函數(shù):
pc.closeLocked(errRequestCanceled) //這里便關(guān)閉了連接
這樣err整個(gè)傳遞邏輯和原因便都走同通了!
總結(jié)
經(jīng)過上面的分析,將整個(gè)Context取消過程總結(jié)如下:
1.當(dāng)創(chuàng)建一個(gè)帶有取消的Context,會(huì)把Context的內(nèi)部類中的err變量賦值為CancelErr;
2.客戶端的調(diào)用cancelFunc,會(huì)向context的Done所綁定的channel寫入值;
3.當(dāng)channel寫入值后,transport.go中的readLoop方法會(huì)監(jiān)聽這個(gè)channel的寫入,從而把context取消的err傳給persistConn,并關(guān)閉連接;
4.關(guān)閉連接后,數(shù)據(jù)讀取便會(huì)遇到連接關(guān)閉的網(wǎng)絡(luò)錯(cuò)誤錯(cuò)誤,當(dāng)遇到這個(gè)錯(cuò)誤,在bodySignal中進(jìn)行錯(cuò)誤處理,這里并不感知連接的關(guān)閉,只利用fn分別錯(cuò)誤類型,當(dāng)錯(cuò)誤為io.EOF,直接將這個(gè)錯(cuò)誤置為nil,若不是,便通過bodySignal獲取到連接中的錯(cuò)誤,再返回這個(gè)錯(cuò)誤;
5.最后通過body.read()方法將錯(cuò)誤打印出來。
6.這里復(fù)雜在于,每個(gè)角色只做自己的工作,遇到錯(cuò)誤不是直接返回,而是等待其他角色來讀取錯(cuò)誤;具體表現(xiàn)為:context負(fù)責(zé)生成錯(cuò)誤消息、傳遞取消指令給persistConn;persistConn基于bodySignal建立讀取數(shù)據(jù)和連接的關(guān)聯(lián),響應(yīng)Context的取消并關(guān)閉連接,拿到context的錯(cuò)誤信息;client讀取數(shù)據(jù)和錯(cuò)誤;bodySignal:分析錯(cuò)誤,并傳遞數(shù)據(jù)和persistConn的錯(cuò)誤消息給client。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
基于Golang實(shí)現(xiàn)內(nèi)存數(shù)據(jù)庫的示例詳解
這篇文章主要為大家詳細(xì)介紹了如何基于Golang實(shí)現(xiàn)內(nèi)存數(shù)據(jù)庫,文中的示例代碼講解詳細(xì),具有一定的借鑒價(jià)值,需要的小伙伴可以參考一下2023-03-03Golang服務(wù)的請求調(diào)度的實(shí)現(xiàn)
Golang服務(wù)請求調(diào)度是一種使用Go語言實(shí)現(xiàn)的服務(wù)請求管理方法,本文主要介紹了Golang服務(wù)的請求調(diào)度的實(shí)現(xiàn),具有一定的參考價(jià)值,感興趣的可以了解一下2023-08-08一文詳解go中如何實(shí)現(xiàn)定時(shí)任務(wù)
定時(shí)任務(wù)是指按照預(yù)定的時(shí)間間隔或特定時(shí)間點(diǎn)自動(dòng)執(zhí)行的計(jì)劃任務(wù)或操作,這篇文章主要為大家詳細(xì)介紹了go中是如何實(shí)現(xiàn)定時(shí)任務(wù)的,感興趣的可以了解下2023-11-11