Golang中tinyrpc框架的源碼解讀詳解
tinyrpc是一個高性能的基于protocol buffer
的rpc框架。項目代碼非常少,很適合初學(xué)者進行g(shù)olang的學(xué)習(xí)。
tinyrpc功能
tinyrpc
基于TCP協(xié)議,支持各種壓縮格式,基于protocol buffer
的序列化協(xié)議。其rpc是基于golang原生的net/rpc
開發(fā)而成。
tinyrpc項目結(jié)構(gòu)
tinyrpc
基于net/rpc
開發(fā)而成,在此基礎(chǔ)上集成了額外的能力。項目結(jié)構(gòu)如圖:
功能目錄如下:
- codec 編碼模塊
- compressor 壓縮模塊
- header 請求/響應(yīng)頭模塊
- protoc-gen-tinyrpc 代碼生成插件
- serializer 序列化模塊
tinyrpc源碼解讀
客戶端和服務(wù)端構(gòu)建
客戶端是以net/rpc
的rpc.Client
為基礎(chǔ)構(gòu)建,在此基礎(chǔ)上定義了Option
以配置壓縮方式和序列化方式:
type Option func(o *options) type options struct { compressType compressor.CompressType serializer serializer.Serializer }
在創(chuàng)建客戶端的時候?qū)⑴渲煤玫膲嚎s算法和序列化方式作為創(chuàng)建客戶端的參數(shù):
func NewClient(conn io.ReadWriteCloser, opts ...Option) *Client { options := options{ compressType: compressor.Raw, serializer: serializer.Proto, } for _, option := range opts { option(&options) } return &Client{rpc.NewClientWithCodec( codec.NewClientCodec(conn, options.compressType, options.serializer))} }
服務(wù)端是以net/rpc
的rpc.Server
為基礎(chǔ)構(gòu)建,在此基礎(chǔ)上擴展了Server
的定義:
type Server struct { *rpc.Server serializer.Serializer }
在創(chuàng)建客戶端和開啟服務(wù)時傳入序列化方式:
func NewServer(opts ...Option) *Server { options := options{ serializer: serializer.Proto, } for _, option := range opts { option(&options) } return &Server{&rpc.Server{}, options.serializer} } func (s *Server) Serve(lis net.Listener) { log.Printf("tinyrpc started on: %s", lis.Addr().String()) for { conn, err := lis.Accept() if err != nil { continue } go s.Server.ServeCodec(codec.NewServerCodec(conn, s.Serializer)) } }
壓縮算法compressor
壓縮算法的實現(xiàn)中首先是定義了壓縮的接口:
type Compressor interface { Zip([]byte) ([]byte, error) Unzip([]byte) ([]byte, error) }
壓縮的接口包含壓縮和解壓方法。
壓縮算法使用的是uint
類型,使用iota
來初始化,并且使用map來進行所有壓縮算法實現(xiàn)的管理:
type CompressType uint16 const ( Raw CompressType = iota Gzip Snappy Zlib ) // Compressors which supported by rpc var Compressors = map[CompressType]Compressor{ Raw: RawCompressor{}, Gzip: GzipCompressor{}, Snappy: SnappyCompressor{}, Zlib: ZlibCompressor{}, }
序列化 serializer
序列化部分代碼非常簡單,提供了一個接口:
type Serializer interface { Marshal(message interface{}) ([]byte, error) Unmarshal(data []byte, message interface{}) error }
目前只有ProtoSerializer
一個實現(xiàn),ProtoSerializer
內(nèi)部的實現(xiàn)是基于"google.golang.org/protobuf/proto"
來實現(xiàn)的,并沒有什么特殊的處理,因此就不花費筆墨詳述了。
請求/響應(yīng)頭 header
tinyrpc
定義了自己的請求頭和響應(yīng)頭:
// RequestHeader request header structure looks like: // +--------------+----------------+----------+------------+----------+ // | CompressType | Method | ID | RequestLen | Checksum | // +--------------+----------------+----------+------------+----------+ // | uint16 | uvarint+string | uvarint | uvarint | uint32 | // +--------------+----------------+----------+------------+----------+ type RequestHeader struct { sync.RWMutex CompressType compressor.CompressType Method string ID uint64 RequestLen uint32 Checksum uint32 }
請求頭由壓縮類型,方法,id,請求長度和校驗碼組成。
// ResponseHeader request header structure looks like: // +--------------+---------+----------------+-------------+----------+ // | CompressType | ID | Error | ResponseLen | Checksum | // +--------------+---------+----------------+-------------+----------+ // | uint16 | uvarint | uvarint+string | uvarint | uint32 | // +--------------+---------+----------------+-------------+----------+ type ResponseHeader struct { sync.RWMutex CompressType compressor.CompressType ID uint64 Error string ResponseLen uint32 Checksum uint32 }
響應(yīng)頭由壓縮類型,id,錯誤信息,返回長度和校驗碼組成。
為了實現(xiàn)頭的重用,tinyrpc
為頭構(gòu)建了緩存池:
var ( RequestPool sync.Pool ResponsePool sync.Pool ) func init() { RequestPool = sync.Pool{New: func() interface{} { return &RequestHeader{} }} ResponsePool = sync.Pool{New: func() interface{} { return &ResponseHeader{} }} }
在使用時get出來,生命周期結(jié)束后放回池子,并且在put之前需要進行重置:
h := header.RequestPool.Get().(*header.RequestHeader) defer func() { h.ResetHeader() header.RequestPool.Put(h) }()
// ResetHeader reset request header func (r *RequestHeader) ResetHeader() { r.Lock() defer r.Unlock() r.ID = 0 r.Checksum = 0 r.Method = "" r.CompressType = 0 r.RequestLen = 0 } // ResetHeader reset response header func (r *ResponseHeader) ResetHeader() { r.Lock() defer r.Unlock() r.Error = "" r.ID = 0 r.CompressType = 0 r.Checksum = 0 r.ResponseLen = 0 }
搞清楚了頭的結(jié)構(gòu)以及對象池的復(fù)用邏輯,那么具體的頭的編碼與解碼就是很簡單的拆裝工作,就不在此一行一行解析了,大家有興趣可以自行去閱讀。
編碼 codec
由于tinyrpc
是基于net/rpc
開發(fā),那么其codec
模塊自然也是依賴于net/rpc
的ClientCodec
和ServerCodec
接口來實現(xiàn)的。
客戶端實現(xiàn)
客戶端是基于ClientCodec
實現(xiàn)的能力:
type ClientCodec interface { WriteRequest(*Request, any) error ReadResponseHeader(*Response) error ReadResponseBody(any) error Close() error }
client
定義了一個clientCodec
類型,并且實現(xiàn)了ClientCodec
的接口方法:
type clientCodec struct { r io.Reader w io.Writer c io.Closer compressor compressor.CompressType // rpc compress type(raw,gzip,snappy,zlib) serializer serializer.Serializer response header.ResponseHeader // rpc response header mutex sync.Mutex // protect pending map pending map[uint64]string }
WriteRequest
實現(xiàn):
// WriteRequest Write the rpc request header and body to the io stream func (c *clientCodec) WriteRequest(r *rpc.Request, param interface{}) error { c.mutex.Lock() c.pending[r.Seq] = r.ServiceMethod c.mutex.Unlock() if _, ok := compressor.Compressors[c.compressor]; !ok { return NotFoundCompressorError } reqBody, err := c.serializer.Marshal(param) if err != nil { return err } compressedReqBody, err := compressor.Compressors[c.compressor].Zip(reqBody) if err != nil { return err } h := header.RequestPool.Get().(*header.RequestHeader) defer func() { h.ResetHeader() header.RequestPool.Put(h) }() h.ID = r.Seq h.Method = r.ServiceMethod h.RequestLen = uint32(len(compressedReqBody)) h.CompressType = compressor.CompressType(c.compressor) h.Checksum = crc32.ChecksumIEEE(compressedReqBody) if err := sendFrame(c.w, h.Marshal()); err != nil { return err } if err := write(c.w, compressedReqBody); err != nil { return err } c.w.(*bufio.Writer).Flush() return nil }
可以看到代碼的實現(xiàn)還是比較清晰的,主要分為幾個步驟:
- 將數(shù)據(jù)進行序列化構(gòu)成請求體
- 選擇相應(yīng)的壓縮算法進行壓縮
- 從Pool中獲取請求頭實例將數(shù)據(jù)全部填入其中構(gòu)成最后的請求頭
- 分別通過io操作發(fā)送處理過的請求頭和請求體
ReadResponseHeader
實現(xiàn):
// ReadResponseHeader read the rpc response header from the io stream func (c *clientCodec) ReadResponseHeader(r *rpc.Response) error { c.response.ResetHeader() data, err := recvFrame(c.r) if err != nil { return err } err = c.response.Unmarshal(data) if err != nil { return err } c.mutex.Lock() r.Seq = c.response.ID r.Error = c.response.Error r.ServiceMethod = c.pending[r.Seq] delete(c.pending, r.Seq) c.mutex.Unlock() return nil }
此方法作用是讀取返回的響應(yīng)頭,并解析成具體的結(jié)構(gòu)體
ReadResponseBody
實現(xiàn):
func (c *clientCodec) ReadResponseBody(param interface{}) error { if param == nil { if c.response.ResponseLen != 0 { if err := read(c.r, make([]byte, c.response.ResponseLen)); err != nil { return err } } return nil } respBody := make([]byte, c.response.ResponseLen) err := read(c.r, respBody) if err != nil { return err } if c.response.Checksum != 0 { if crc32.ChecksumIEEE(respBody) != c.response.Checksum { return UnexpectedChecksumError } } if c.response.GetCompressType() != c.compressor { return CompressorTypeMismatchError } resp, err := compressor.Compressors[c.response.GetCompressType()].Unzip(respBody) if err != nil { return err } return c.serializer.Unmarshal(resp, param) }
此方法是用于讀取返回的響應(yīng)結(jié)構(gòu)體,流程如下:
- 讀取流獲取響應(yīng)體
- 根據(jù)響應(yīng)頭中的校驗碼來比對響應(yīng)體是否完整
- 根據(jù)壓縮算法來解壓具體的結(jié)構(gòu)體
- 進行反序列化
服務(wù)端實現(xiàn)
服務(wù)端是基于ServerCodec
實現(xiàn)的能力:
type ServerCodec interface { ReadRequestHeader(*Request) error ReadRequestBody(any) error WriteResponse(*Response, any) error // Close can be called multiple times and must be idempotent. Close() error }
和客戶端類似,server
定義了一個serverCodec
類型,并且實現(xiàn)了ServerCodec
的接口方法:
type serverCodec struct { r io.Reader w io.Writer c io.Closer request header.RequestHeader serializer serializer.Serializer mutex sync.Mutex // protects seq, pending seq uint64 pending map[uint64]*reqCtx }
ReadRequestHeader
實現(xiàn):
// ReadRequestHeader read the rpc request header from the io stream func (s *serverCodec) ReadRequestHeader(r *rpc.Request) error { s.request.ResetHeader() data, err := recvFrame(s.r) if err != nil { return err } err = s.request.Unmarshal(data) if err != nil { return err } s.mutex.Lock() s.seq++ s.pending[s.seq] = &reqCtx{s.request.ID, s.request.GetCompressType()} r.ServiceMethod = s.request.Method r.Seq = s.seq s.mutex.Unlock() return nil }
此方法用于讀取請求頭并解析成結(jié)構(gòu)體
ReadRequestBody
實現(xiàn):
// ReadRequestBody read the rpc request body from the io stream func (s *serverCodec) ReadRequestBody(param interface{}) error { if param == nil { if s.request.RequestLen != 0 { if err := read(s.r, make([]byte, s.request.RequestLen)); err != nil { return err } } return nil } reqBody := make([]byte, s.request.RequestLen) err := read(s.r, reqBody) if err != nil { return err } if s.request.Checksum != 0 { if crc32.ChecksumIEEE(reqBody) != s.request.Checksum { return UnexpectedChecksumError } } if _, ok := compressor. Compressors[s.request.GetCompressType()]; !ok { return NotFoundCompressorError } req, err := compressor. Compressors[s.request.GetCompressType()].Unzip(reqBody) if err != nil { return err } return s.serializer.Unmarshal(req, param) }
此方法用于讀取請求體,流程和讀取響應(yīng)體差不多,大致如下:
- 讀取流并解析成請求體
- 根據(jù)請求頭中的校驗碼進行校驗
- 根據(jù)壓縮算法進行解壓
- 反序列化
WriteResponse
實現(xiàn):
// WriteResponse Write the rpc response header and body to the io stream func (s *serverCodec) WriteResponse(r *rpc.Response, param interface{}) error { s.mutex.Lock() reqCtx, ok := s.pending[r.Seq] if !ok { s.mutex.Unlock() return InvalidSequenceError } delete(s.pending, r.Seq) s.mutex.Unlock() if r.Error != "" { param = nil } if _, ok := compressor. Compressors[reqCtx.compareType]; !ok { return NotFoundCompressorError } var respBody []byte var err error if param != nil { respBody, err = s.serializer.Marshal(param) if err != nil { return err } } compressedRespBody, err := compressor. Compressors[reqCtx.compareType].Zip(respBody) if err != nil { return err } h := header.ResponsePool.Get().(*header.ResponseHeader) defer func() { h.ResetHeader() header.ResponsePool.Put(h) }() h.ID = reqCtx.requestID h.Error = r.Error h.ResponseLen = uint32(len(compressedRespBody)) h.Checksum = crc32.ChecksumIEEE(compressedRespBody) h.CompressType = reqCtx.compareType if err = sendFrame(s.w, h.Marshal()); err != nil { return err } if err = write(s.w, compressedRespBody); err != nil { return err } s.w.(*bufio.Writer).Flush() return nil }
此方法用于寫入響應(yīng)體,大致與寫入請求體差不多,流程如下:
- 將響應(yīng)體序列化
- 使用壓縮算法將響應(yīng)體進行壓縮
- 使用Pool管理響應(yīng)頭
- 分別發(fā)送返回頭和返回體
總結(jié)
tinyrpc
是基于golang
原生的net/rpc
包實現(xiàn),在此基礎(chǔ)上實現(xiàn)了壓縮和序列化等能力擴展。整體來看tinyrpc
的代碼非常簡單,比較適合剛接觸golang
的程序員來進行閱讀學(xué)習(xí),學(xué)習(xí)一些golang
的基礎(chǔ)的開發(fā)技巧和一些語言特性。
以上就是Golang中tinyrpc框架的源碼解讀詳解的詳細內(nèi)容,更多關(guān)于Golang tinyrpc框架的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Golang拾遺之實現(xiàn)一個不可復(fù)制類型詳解
在這篇文章中我們將實現(xiàn)一個無法被復(fù)制的類型,順便加深對引用類型、值傳遞以及指針的理解。文中的示例代碼講解詳細,感興趣的可以了解一下2023-02-02Golang算法問題之?dāng)?shù)組按指定規(guī)則排序的方法分析
這篇文章主要介紹了Golang算法問題之?dāng)?shù)組按指定規(guī)則排序的方法,結(jié)合實例形式分析了Go語言數(shù)組排序相關(guān)算法原理與操作技巧,需要的朋友可以參考下2017-02-02golang實現(xiàn)簡單的tcp數(shù)據(jù)傳輸
這篇文章主要為大家介紹了golang實現(xiàn)簡單的tcp數(shù)據(jù)傳輸,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-12-12詳解Golang函數(shù)式選項(Functional?Options)模式
什么是函數(shù)式選項模式,為什么要這么寫,這個編程模式解決了什么問題呢?其實就是為了解決動態(tài)靈活的配置不同的參數(shù)的問題。下面通過本文給大家介紹Golang函數(shù)式選項(Functional?Options)模式的問題,感興趣的朋友一起看看吧2021-12-12golang?recover函數(shù)使用中的一些坑解析
這篇文章主要為大家介紹了golang?recover函數(shù)使用中的一些坑解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-02-02Go語言基礎(chǔ)之網(wǎng)絡(luò)編程全面教程示例
這篇文章主要為大家介紹了Go語言基礎(chǔ)之網(wǎng)絡(luò)編程全面教程示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-12-12使用systemd部署和守護golang應(yīng)用程序的操作方法
systemd是一個流行的守護進程管理器,可以輕松管理服務(wù)的啟動、停止、重啟等操作,讓我們的應(yīng)用程序始終保持在線,本文介紹了如何使用systemd部署和守護golang應(yīng)用程序,感興趣的朋友一起看看吧2023-10-10