如何解析golang中Context在HTTP服務中的角色
問題背景
在go語言的http服務中,我們常常會使用到Context來取消一個請求,或者取消數(shù)據(jù)的讀取。偶然的一次嘗試,讓我對Context有了一定的興趣。
接下來本文圍繞下面的例子,分析http如何利用Context來控制請求的取消和影響數(shù)據(jù)讀取。
例子
我們開啟一個http服務,發(fā)送大量數(shù)據(jù)給每個請求,代碼如下:
srv.go:http服務
package main
import (
"fmt"
"net/http"
)
func hello(w http.ResponseWriter, r *http.Request) {
for i := 0; i < 100*10000; i++ {
w.Write([]byte("hello world"))
}
}
func main() {
fmt.Println("listening 8888:")
http.HandleFunc("/hello", hello)
_ = http.ListenAndServe(":8888", nil)
}
client.go: 發(fā)送請求的客戶端
package main
import (
"context"
"fmt"
"io"
"log"
"net/http"
"time"
)
func main() {
client := http.Client{}
request, err := http.NewRequest(http.MethodPost, "http://127.0.0.1:8888/hello", nil)
ctx, cancelFunc := context.WithCancel(request.Context())
request = request.WithContext(ctx)
if err != nil {
return
}
response, err := client.Do(request)
if err != nil {
log.Fatal(err)
}
cache := make([]byte, 128)
timer := time.NewTimer(time.Millisecond)
go func() {
select {
case <-timer.C:
cancelFunc()
}
}()
for {
read, err := response.Body.Read(cache)
if err == nil {
fmt.Println(string(cache[:read]))
continue
}
if err == io.EOF {
fmt.Println(string(cache[:read]))
break
}
log.Fatal(err)
}
}
代碼很簡單,就不做注釋啦。分別啟動服務和client,我們將得到如下結果:

我們看到這句話Process finished with the exit code 1,程序非正常退出,那么首先是追蹤這個錯誤,下面我們追蹤這個錯誤。
錯誤追蹤
首先清楚這個“context canceled” 是客戶端打印出來的:
log.Fatal(err) // 這個錯誤來源于讀取Response中的數(shù)據(jù)時得到錯誤,而且這個錯誤非io.EOF錯誤
斷點入口:
read, err := response.Body.Read(cache)
我們會進入transport.go文件中:
func (es *bodyEOFSignal) Read(p []byte) (n int, err error) { // 這里表明我們讀取的body是bodyEOFSignal類型
es.mu.Lock()
closed, rerr := es.closed, es.rerr
es.mu.Unlock()
if closed {
return 0, errReadOnClosedResBody
}
if rerr != nil {
return 0, rerr
}
n, err = es.body.Read(p)// 我們在這里讀到了錯誤,這里是什么錯誤,在后面將會介紹
if err != nil {
es.mu.Lock()
defer es.mu.Unlock()
if es.rerr == nil {
es.rerr = err
}
err = es.condfn(err) // 通過這個方法對錯誤進行判別,得到上層傳下來的錯誤信息
}
return
}
然后我們繼續(xù)進入到bodyEOFSignal的condfn(error)函數(shù)中:
func (es *bodyEOFSignal) condfn(err error) error {
if es.fn == nil {
return err //1
}
err = es.fn(err) // 如果fn不為空,這里會繼續(xù)到bodyEOFSignal去得到上層的錯誤信息;fn為空,顯然錯誤和上層就沒有關系,就在上面1處就返回了。除此,因為client從這個body讀的數(shù)據(jù),這里的錯誤是通過fn從上層獲取。
es.fn = nil
return err
}
那我們繼續(xù)到es.fn(err)中一探究竟:
body := &bodyEOFSignal{
body: resp.Body,
earlyCloseFn: func() error {
waitForBodyRead <- false
<-eofc // will be closed by deferred call at the end of the function
return nil
},
fn: func(err error) error {// 就到了這里,這一段代碼源自transport.go中的封裝內(nèi)部類persistConn的方法readLoop,顧名思義:循環(huán)讀取
// 這里會簡單的皮判斷錯誤是不是io.EOF,然后作進一步處理
isEOF := err == io.EOF
waitForBodyRead <- isEOF
if isEOF {
<-eofc // see comment above eofc declaration
} else if err != nil {
if cerr := pc.canceled(); cerr != nil {// 繼續(xù)調試我們就到了這里,顯然不是io.EOF錯誤
return cerr // 返回的是pc.canceled()
}
}
return err
},
}
繼續(xù)到pc.canceled()中:
func (pc *persistConn) canceled() error {
pc.mu.Lock()
defer pc.mu.Unlock()
return pc.canceledErr // 返回的這個錯誤,那么下一步便需要知道這個canceledErr是什么?如何被賦值?
}
1. 是什么?
canceledErr error // set non-nil if conn is canceled //是一種錯誤,且如果非空,則連接被取消,那么這個錯誤是一個連接狀態(tài)的標志或者連接斷開的原因
2. 如何被賦值?
根據(jù)canceledErr,我們找被賦值的函數(shù)如下:
func (pc *persistConn) cancelRequest(err error) {
pc.mu.Lock()
defer pc.mu.Unlock()
pc.canceledErr = err // 在這里被賦值
pc.closeLocked(errRequestCanceled)
}
錯誤追蹤先到這里。接下來我們換一個角度,我們從Context的角度來看。
Context
這里就不講context了,有興趣的伙伴去官網(wǎng)獲取吧?。。』氐娇蛻舳舜a,給request傳入了一個WithCancel context,看看這個函數(shù)做了什么:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent) // 包裝父類Context
propagateCancel(parent, &c)
return &c, func() {
c.cancel(true, Canceled) // 返回一個取消函數(shù)
}
}
進入到c.cancel(),會發(fā)現(xiàn)Canceled作為一個錯誤類型,定義如下:
// Canceled is the error returned by Context.Err when the context is canceled.
var Canceled = errors.New("context canceled")// 這個不是客戶端打印的嗎?是不是很激動,找到了錯誤信息的祖宗
...
//而cancel函數(shù)定義如下:
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
...
c.err = err //這里做了一個賦值,即把這個錯誤傳給cancelCtx了,它是Context的一個內(nèi)部類
...
// 做一些子context的通知以及錯誤的傳遞,說取消了,不用干了
}
context先到這里,在context里找到了錯誤信息的來源,接下來看看錯誤是如何傳給前面我們談到的canceledErr。
似乎還有一個入口沒有看,就是http.client.Do的方法:
我們打斷點進入到RoundTrip方法的調用入口,看看下面是如何感知context被取消:
resp, err = rt.RoundTrip(req) //這個在send()方法內(nèi)部調用
...
// send issues an HTTP request.
// Caller should close resp.Body when done reading from it.
func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
...
resp, err = rt.RoundTrip(req)
...
}
然后跟著RoundTrip(…), 進入到:
func (t *Transport) roundTrip(req *Request) (*Response, error) {
...
var resp *Response
if pconn.alt != nil {
// HTTP/2 path.
t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest
resp, err = pconn.alt.RoundTrip(req)
} else {
resp, err = pconn.roundTrip(treq) // 繼續(xù)可到這里,我們看看這個pconn,剛好就是前面提到的persistConn,它里面包含了canceledErr,似乎我們離真相更近了
}
}
進入到persistConn的實現(xiàn)方法roundTrip(),我們看看這個for循環(huán):
var respHeaderTimer <-chan time.Time
cancelChan := req.Request.Cancel
ctxDoneChan := req.Context().Done() //這個request是setRequestCancel(req *Request, rt RoundTripper, deadline time.Time)中重新定義的request,里實現(xiàn)了超時取消的機制,這里的監(jiān)聽便是超時的監(jiān)聽,并不是我們?nèi)∠谋O(jiān)聽
pcClosed := pc.closech
canceled := false
for {
testHookWaitResLoop()
select { // select開啟對channel的輪詢
case err := <-writeErrCh:
if debugRoundTrip {
req.logf("writeErrCh resv: %T/%#v", err, err)
}
if err != nil {
pc.close(fmt.Errorf("write error: %v", err))
return nil, pc.mapRoundTripError(req, startBytesWritten, err)
}
if d := pc.t.ResponseHeaderTimeout; d > 0 {
if debugRoundTrip {
req.logf("starting timer for %v", d)
}
timer := time.NewTimer(d)
defer timer.Stop() // prevent leaks
respHeaderTimer = timer.C
}
case <-pcClosed:
pcClosed = nil
if canceled || pc.t.replaceReqCanceler(req.cancelKey, nil) {
if debugRoundTrip {
req.logf("closech recv: %T %#v", pc.closed, pc.closed)
}
return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed)
}
case <-respHeaderTimer:
if debugRoundTrip {
req.logf("timeout waiting for response headers.")
}
pc.close(errTimeout)
return nil, errTimeout
case re := <-resc:
if (re.res == nil) == (re.err == nil) {
panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))
}
if debugRoundTrip {
req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)
}
if re.err != nil {
return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)
}
return re.res, nil
case <-cancelChan:
canceled = pc.t.cancelRequest(req.cancelKey, errRequestCanceled)
cancelChan = nil
case <-ctxDoneChan:
canceled = pc.t.cancelRequest(req.cancelKey, req.Context().Err())
cancelChan = nil
ctxDoneChan = nil
}
}
因而這里的監(jiān)聽不是在客戶端取消的context的監(jiān)聽,根據(jù)客戶端的輸出顯示,表明請求已經(jīng)發(fā)送到服務端,請求并未超時,response也返回了,那么這里的函數(shù)監(jiān)聽是與我們讀取數(shù)據(jù)沒有聯(lián)系。
小編最開始也以為是在這里監(jiān)聽返回,然而這里打斷點,怎么進不來。
在前面提到,連接是類型為persistConn,其次是讀取數(shù)據(jù)過程中,context的取消會產(chǎn)生影響,那么表明錯誤發(fā)生在tcp連接中的讀取數(shù)據(jù)。
接下來,根據(jù)連接建立過程,看看http做了什么?其次是真正的數(shù)據(jù)讀取來自哪里?
pconn, err := t.getConn(treq, cm)
...
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
req := treq.Request
trace := treq.trace
ctx := req.Context() //這里去了request的context
w := &wantConn{
cm: cm,
key: cm.key(),
ctx: ctx, //傳給w
ready: make(chan struct{}, 1),
beforeDial: testHookPrePendingDial,
afterDial: testHookPostPendingDial,
}
...
select{
case <-w.ready:
if w.err != nil {
// If the request has been canceled, that's probably
// what caused w.err; if so, prefer to return the
// cancellation error (see golang.org/issue/16049).
//如果建立連接前,請求被取消,這里會監(jiān)聽到取消的err
select {
case <-req.Cancel:
return nil, errRequestCanceledConn
case <-req.Context().Done():
return nil, req.Context().Err()
case err := <-cancelc:
if err == errRequestCanceled {
err = errRequestCanceledConn
}
return nil, err
default:
// return below
}
}
return w.pc, w.err//這里返回的是persistConn
...
通過這個w建立連接,進入到dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error)。 在這里面開啟了一個協(xié)程pconn.readLoop(),讀取連接里面的數(shù)據(jù)。
(t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
...
go pconn.readLoop()
}
因為錯誤與數(shù)據(jù)讀取有直接聯(lián)系,至少錯誤發(fā)生readloop中的某一個地方:
for alive {
...
var resp *Response
if err == nil {
resp, err = pc.readResponse(rc, trace) // 得到response
} else {
err = transportReadFromServerError{err}
closeErr = err
}
...
waitForBodyRead := make(chan bool, 2)
body := &bodyEOFSignal{ //對上面讀取的resp.Body進行封裝,這里封裝主要是傳遞請求取消的錯誤
body: resp.Body,
earlyCloseFn: func() error {
waitForBodyRead <- false
<-eofc // will be closed by deferred call at the end of the function
return nil
},
fn: func(err error) error {//
isEOF := err == io.EOF
waitForBodyRead <- isEOF
if isEOF {
<-eofc // see comment above eofc declaration
} else if err != nil {
if cerr := pc.canceled(); cerr != nil {
return cerr
}
}
return err
},
}
resp.Body = body
...
// Before looping back to the top of this function and peeking on
// the bufio.Reader, wait for the caller goroutine to finish
// reading the response body. (or for cancellation or death)
// 這里有開啟監(jiān)聽,顯然是監(jiān)聽讀的過程中發(fā)生的取消和超時等
select {
case bodyEOF := <-waitForBodyRead:
replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil) // before pc might return to idle pool
alive = alive &&
bodyEOF &&
!pc.sawEOF &&
pc.wroteRequest() &&
replaced && tryPutIdleConn(trace)
if bodyEOF {
eofc <- struct{}{}
}
case <-rc.req.Cancel:
alive = false
pc.t.CancelRequest(rc.req)
case <-rc.req.Context().Done(): //這里便監(jiān)聽了客戶頓context的取消
alive = false //結束循環(huán)
pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())//傳遞err
case <-pc.closech:
alive = false
}
testHookReadLoopBeforeNextRead()
}
熟悉context的便知道,當我們調用context的cancel方法時,在前面的context的cancel()方法中有如下代碼:
d, _ := c.done.Load().(chan struct{}) // 拿到Done方法的返回值channel
if d == nil {
c.done.Store(closedchan)
} else {
close(d)// 關閉channel,而關閉時會向channel寫入值
}
再回到:
ccase <-rc.req.Context().Done():// 當contex取消,便進入這個代碼塊 alive = false pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err())
進入到cancelRequest(…)的rc.req.Context().Err()
func (c *cancelCtx) Err() error {
c.mu.Lock()
err := c.err//這里似曾相識,前面我們說到context調用取消函數(shù)時,會給c.err賦值為cancelErr
c.mu.Unlock()
return err
}
因而傳入cancelRequest的err便是cancelErr,我們進入cancelRequest:
func (t *Transport) cancelRequest(key cancelKey, err error) bool {
// This function must not return until the cancel func has completed.
// See: https://golang.org/issue/34658
t.reqMu.Lock()
defer t.reqMu.Unlock()
cancel := t.reqCanceler[key]// 這里的key正是我們傳入的請求的cancelkey,拿到reqCanceler中的func(error)
delete(t.reqCanceler, key)
if cancel != nil {
cancel(err) // 進入cancel
}
return cancel != nil
}
進入cancel(err):
func (pc *persistConn) cancelRequest(err error) {//這個函數(shù)不正是我們前面追蹤錯誤所看見的,這也表明我們追蹤是正確的
pc.mu.Lock()
defer pc.mu.Unlock()
pc.canceledErr = err
pc.closeLocked(errRequestCanceled)
}
到這里我們的err就傳給了body bodyEOFSignal,整個錯誤傳遞流程便走通了。
還剩最后一個問題,bodyEOFSignal的read函數(shù)中n, err = es.body.Read§ 所遇到的錯誤是什么?
n, err = es.body.Read(p)// 調試發(fā)現(xiàn)是網(wǎng)絡連接關閉錯誤,這里表明我們執(zhí)行完err的傳遞根本原因在于連接被關閉
if err != nil {
es.mu.Lock()
defer es.mu.Unlock()
if es.rerr == nil {
es.rerr = err
}
err = es.condfn(err)
}
return
那么關閉連接又是在哪里呢?
我們回到cancelRequest函數(shù):
pc.closeLocked(errRequestCanceled) //這里便關閉了連接
這樣err整個傳遞邏輯和原因便都走同通了!
總結
經(jīng)過上面的分析,將整個Context取消過程總結如下:
1.當創(chuàng)建一個帶有取消的Context,會把Context的內(nèi)部類中的err變量賦值為CancelErr;
2.客戶端的調用cancelFunc,會向context的Done所綁定的channel寫入值;
3.當channel寫入值后,transport.go中的readLoop方法會監(jiān)聽這個channel的寫入,從而把context取消的err傳給persistConn,并關閉連接;
4.關閉連接后,數(shù)據(jù)讀取便會遇到連接關閉的網(wǎng)絡錯誤錯誤,當遇到這個錯誤,在bodySignal中進行錯誤處理,這里并不感知連接的關閉,只利用fn分別錯誤類型,當錯誤為io.EOF,直接將這個錯誤置為nil,若不是,便通過bodySignal獲取到連接中的錯誤,再返回這個錯誤;
5.最后通過body.read()方法將錯誤打印出來。
6.這里復雜在于,每個角色只做自己的工作,遇到錯誤不是直接返回,而是等待其他角色來讀取錯誤;具體表現(xiàn)為:context負責生成錯誤消息、傳遞取消指令給persistConn;persistConn基于bodySignal建立讀取數(shù)據(jù)和連接的關聯(lián),響應Context的取消并關閉連接,拿到context的錯誤信息;client讀取數(shù)據(jù)和錯誤;bodySignal:分析錯誤,并傳遞數(shù)據(jù)和persistConn的錯誤消息給client。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
基于Golang實現(xiàn)內(nèi)存數(shù)據(jù)庫的示例詳解
這篇文章主要為大家詳細介紹了如何基于Golang實現(xiàn)內(nèi)存數(shù)據(jù)庫,文中的示例代碼講解詳細,具有一定的借鑒價值,需要的小伙伴可以參考一下2023-03-03

