欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Golang中tinyrpc框架的源碼解讀詳解

 更新時間:2023年01月14日 16:58:20   作者:騎牛上青山  
tinyrpc是一個高性能的基于protocol?buffer的rpc框架。項目代碼非常少,很適合初學(xué)者進行g(shù)olang的學(xué)習(xí)。本文將從源碼的角度帶大家了解tinyrpc框架的使用,需要的可以參考一下

tinyrpc是一個高性能的基于protocol buffer的rpc框架。項目代碼非常少,很適合初學(xué)者進行g(shù)olang的學(xué)習(xí)。

tinyrpc功能

tinyrpc基于TCP協(xié)議,支持各種壓縮格式,基于protocol buffer的序列化協(xié)議。其rpc是基于golang原生的net/rpc開發(fā)而成。

tinyrpc項目結(jié)構(gòu)

tinyrpc基于net/rpc開發(fā)而成,在此基礎(chǔ)上集成了額外的能力。項目結(jié)構(gòu)如圖:

功能目錄如下:

  • codec 編碼模塊
  • compressor 壓縮模塊
  • header 請求/響應(yīng)頭模塊
  • protoc-gen-tinyrpc 代碼生成插件
  • serializer 序列化模塊

tinyrpc源碼解讀

客戶端和服務(wù)端構(gòu)建

客戶端是以net/rpcrpc.Client為基礎(chǔ)構(gòu)建,在此基礎(chǔ)上定義了Option以配置壓縮方式和序列化方式:

type Option func(o *options)

type options struct {
	compressType compressor.CompressType
	serializer   serializer.Serializer
}

在創(chuàng)建客戶端的時候?qū)⑴渲煤玫膲嚎s算法和序列化方式作為創(chuàng)建客戶端的參數(shù):

func NewClient(conn io.ReadWriteCloser, opts ...Option) *Client {
	options := options{
		compressType: compressor.Raw,
		serializer:   serializer.Proto,
	}
	for _, option := range opts {
		option(&options)
	}
	return &Client{rpc.NewClientWithCodec(
		codec.NewClientCodec(conn, options.compressType, options.serializer))}
}

服務(wù)端是以net/rpcrpc.Server為基礎(chǔ)構(gòu)建,在此基礎(chǔ)上擴展了Server的定義:

type Server struct {
	*rpc.Server
	serializer.Serializer
}

在創(chuàng)建客戶端和開啟服務(wù)時傳入序列化方式:

func NewServer(opts ...Option) *Server {
	options := options{
		serializer: serializer.Proto,
	}
	for _, option := range opts {
		option(&options)
	}

	return &Server{&rpc.Server{}, options.serializer}
}

func (s *Server) Serve(lis net.Listener) {
	log.Printf("tinyrpc started on: %s", lis.Addr().String())
	for {
		conn, err := lis.Accept()
		if err != nil {
			continue
		}
		go s.Server.ServeCodec(codec.NewServerCodec(conn, s.Serializer))
	}
}

壓縮算法compressor

壓縮算法的實現(xiàn)中首先是定義了壓縮的接口:

type Compressor interface {
	Zip([]byte) ([]byte, error)
	Unzip([]byte) ([]byte, error)
}

壓縮的接口包含壓縮和解壓方法。

壓縮算法使用的是uint類型,使用iota來初始化,并且使用map來進行所有壓縮算法實現(xiàn)的管理:

type CompressType uint16

const (
	Raw CompressType = iota
	Gzip
	Snappy
	Zlib
)

// Compressors which supported by rpc
var Compressors = map[CompressType]Compressor{
	Raw:    RawCompressor{},
	Gzip:   GzipCompressor{},
	Snappy: SnappyCompressor{},
	Zlib:   ZlibCompressor{},
}

序列化 serializer

序列化部分代碼非常簡單,提供了一個接口:

type Serializer interface {
	Marshal(message interface{}) ([]byte, error)
	Unmarshal(data []byte, message interface{}) error
}

目前只有ProtoSerializer一個實現(xiàn),ProtoSerializer內(nèi)部的實現(xiàn)是基于"google.golang.org/protobuf/proto"來實現(xiàn)的,并沒有什么特殊的處理,因此就不花費筆墨詳述了。

請求/響應(yīng)頭 header

tinyrpc定義了自己的請求頭和響應(yīng)頭:

// RequestHeader request header structure looks like:
// +--------------+----------------+----------+------------+----------+
// | CompressType |      Method    |    ID    | RequestLen | Checksum |
// +--------------+----------------+----------+------------+----------+
// |    uint16    | uvarint+string |  uvarint |   uvarint  |  uint32  |
// +--------------+----------------+----------+------------+----------+
type RequestHeader struct {
	sync.RWMutex
	CompressType compressor.CompressType
	Method       string
	ID           uint64
	RequestLen   uint32
	Checksum     uint32
}

請求頭由壓縮類型,方法,id,請求長度和校驗碼組成。

// ResponseHeader request header structure looks like:
// +--------------+---------+----------------+-------------+----------+
// | CompressType |    ID   |      Error     | ResponseLen | Checksum |
// +--------------+---------+----------------+-------------+----------+
// |    uint16    | uvarint | uvarint+string |    uvarint  |  uint32  |
// +--------------+---------+----------------+-------------+----------+
type ResponseHeader struct {
	sync.RWMutex
	CompressType compressor.CompressType
	ID           uint64
	Error        string
	ResponseLen  uint32
	Checksum     uint32
}

響應(yīng)頭由壓縮類型,id,錯誤信息,返回長度和校驗碼組成。

為了實現(xiàn)頭的重用,tinyrpc為頭構(gòu)建了緩存池:

var (
	RequestPool  sync.Pool
	ResponsePool sync.Pool
)

func init() {
	RequestPool = sync.Pool{New: func() interface{} {
		return &RequestHeader{}
	}}
	ResponsePool = sync.Pool{New: func() interface{} {
		return &ResponseHeader{}
	}}
}

在使用時get出來,生命周期結(jié)束后放回池子,并且在put之前需要進行重置:

    h := header.RequestPool.Get().(*header.RequestHeader)
	defer func() {
		h.ResetHeader()
		header.RequestPool.Put(h)
	}()
// ResetHeader reset request header
func (r *RequestHeader) ResetHeader() {
	r.Lock()
	defer r.Unlock()
	r.ID = 0
	r.Checksum = 0
	r.Method = ""
	r.CompressType = 0
	r.RequestLen = 0
}

// ResetHeader reset response header
func (r *ResponseHeader) ResetHeader() {
	r.Lock()
	defer r.Unlock()
	r.Error = ""
	r.ID = 0
	r.CompressType = 0
	r.Checksum = 0
	r.ResponseLen = 0
}

搞清楚了頭的結(jié)構(gòu)以及對象池的復(fù)用邏輯,那么具體的頭的編碼與解碼就是很簡單的拆裝工作,就不在此一行一行解析了,大家有興趣可以自行去閱讀。

編碼 codec

由于tinyrpc是基于net/rpc開發(fā),那么其codec模塊自然也是依賴于net/rpcClientCodecServerCodec接口來實現(xiàn)的。

客戶端實現(xiàn)

客戶端是基于ClientCodec實現(xiàn)的能力:

type ClientCodec interface {
	WriteRequest(*Request, any) error
	ReadResponseHeader(*Response) error
	ReadResponseBody(any) error

	Close() error
}

client定義了一個clientCodec類型,并且實現(xiàn)了ClientCodec的接口方法:

type clientCodec struct {
	r io.Reader
	w io.Writer
	c io.Closer

	compressor compressor.CompressType // rpc compress type(raw,gzip,snappy,zlib)
	serializer serializer.Serializer
	response   header.ResponseHeader // rpc response header
	mutex      sync.Mutex            // protect pending map
	pending    map[uint64]string
}

WriteRequest實現(xiàn):

// WriteRequest Write the rpc request header and body to the io stream
func (c *clientCodec) WriteRequest(r *rpc.Request, param interface{}) error {
	c.mutex.Lock()
	c.pending[r.Seq] = r.ServiceMethod
	c.mutex.Unlock()

	if _, ok := compressor.Compressors[c.compressor]; !ok {
		return NotFoundCompressorError
	}
	reqBody, err := c.serializer.Marshal(param)
	if err != nil {
		return err
	}
	compressedReqBody, err := compressor.Compressors[c.compressor].Zip(reqBody)
	if err != nil {
		return err
	}
	h := header.RequestPool.Get().(*header.RequestHeader)
	defer func() {
		h.ResetHeader()
		header.RequestPool.Put(h)
	}()
	h.ID = r.Seq
	h.Method = r.ServiceMethod
	h.RequestLen = uint32(len(compressedReqBody))
	h.CompressType = compressor.CompressType(c.compressor)
	h.Checksum = crc32.ChecksumIEEE(compressedReqBody)

	if err := sendFrame(c.w, h.Marshal()); err != nil {
		return err
	}
	if err := write(c.w, compressedReqBody); err != nil {
		return err
	}

	c.w.(*bufio.Writer).Flush()
	return nil
}

可以看到代碼的實現(xiàn)還是比較清晰的,主要分為幾個步驟:

  • 將數(shù)據(jù)進行序列化構(gòu)成請求體
  • 選擇相應(yīng)的壓縮算法進行壓縮
  • 從Pool中獲取請求頭實例將數(shù)據(jù)全部填入其中構(gòu)成最后的請求頭
  • 分別通過io操作發(fā)送處理過的請求頭和請求體

ReadResponseHeader實現(xiàn):

// ReadResponseHeader read the rpc response header from the io stream
func (c *clientCodec) ReadResponseHeader(r *rpc.Response) error {
	c.response.ResetHeader()
	data, err := recvFrame(c.r)
	if err != nil {
		return err
	}
	err = c.response.Unmarshal(data)
	if err != nil {
		return err
	}
	c.mutex.Lock()
	r.Seq = c.response.ID
	r.Error = c.response.Error
	r.ServiceMethod = c.pending[r.Seq]
	delete(c.pending, r.Seq)
	c.mutex.Unlock()
	return nil
}

此方法作用是讀取返回的響應(yīng)頭,并解析成具體的結(jié)構(gòu)體

ReadResponseBody實現(xiàn):

func (c *clientCodec) ReadResponseBody(param interface{}) error {
	if param == nil {
		if c.response.ResponseLen != 0 {
			if err := read(c.r, make([]byte, c.response.ResponseLen)); err != nil {
				return err
			}
		}
		return nil
	}

	respBody := make([]byte, c.response.ResponseLen)
	err := read(c.r, respBody)
	if err != nil {
		return err
	}

	if c.response.Checksum != 0 {
		if crc32.ChecksumIEEE(respBody) != c.response.Checksum {
			return UnexpectedChecksumError
		}
	}

	if c.response.GetCompressType() != c.compressor {
		return CompressorTypeMismatchError
	}

	resp, err := compressor.Compressors[c.response.GetCompressType()].Unzip(respBody)
	if err != nil {
		return err
	}

	return c.serializer.Unmarshal(resp, param)
}

此方法是用于讀取返回的響應(yīng)結(jié)構(gòu)體,流程如下:

  • 讀取流獲取響應(yīng)體
  • 根據(jù)響應(yīng)頭中的校驗碼來比對響應(yīng)體是否完整
  • 根據(jù)壓縮算法來解壓具體的結(jié)構(gòu)體
  • 進行反序列化

服務(wù)端實現(xiàn)

服務(wù)端是基于ServerCodec實現(xiàn)的能力:

type ServerCodec interface {
	ReadRequestHeader(*Request) error
	ReadRequestBody(any) error
	WriteResponse(*Response, any) error

	// Close can be called multiple times and must be idempotent.
	Close() error
}

和客戶端類似,server定義了一個serverCodec類型,并且實現(xiàn)了ServerCodec的接口方法:

type serverCodec struct {
	r io.Reader
	w io.Writer
	c io.Closer

	request    header.RequestHeader
	serializer serializer.Serializer
	mutex      sync.Mutex // protects seq, pending
	seq        uint64
	pending    map[uint64]*reqCtx
}

ReadRequestHeader實現(xiàn):

// ReadRequestHeader read the rpc request header from the io stream
func (s *serverCodec) ReadRequestHeader(r *rpc.Request) error {
	s.request.ResetHeader()
	data, err := recvFrame(s.r)
	if err != nil {
		return err
	}
	err = s.request.Unmarshal(data)
	if err != nil {
		return err
	}
	s.mutex.Lock()
	s.seq++
	s.pending[s.seq] = &reqCtx{s.request.ID, s.request.GetCompressType()}
	r.ServiceMethod = s.request.Method
	r.Seq = s.seq
	s.mutex.Unlock()
	return nil
}

此方法用于讀取請求頭并解析成結(jié)構(gòu)體

ReadRequestBody實現(xiàn):

// ReadRequestBody read the rpc request body from the io stream
func (s *serverCodec) ReadRequestBody(param interface{}) error {
	if param == nil {
		if s.request.RequestLen != 0 {
			if err := read(s.r, make([]byte, s.request.RequestLen)); err != nil {
				return err
			}
		}
		return nil
	}

	reqBody := make([]byte, s.request.RequestLen)

	err := read(s.r, reqBody)
	if err != nil {
		return err
	}

	if s.request.Checksum != 0 {
		if crc32.ChecksumIEEE(reqBody) != s.request.Checksum {
			return UnexpectedChecksumError
		}
	}

	if _, ok := compressor.
		Compressors[s.request.GetCompressType()]; !ok {
		return NotFoundCompressorError
	}

	req, err := compressor.
		Compressors[s.request.GetCompressType()].Unzip(reqBody)
	if err != nil {
		return err
	}

	return s.serializer.Unmarshal(req, param)
}

此方法用于讀取請求體,流程和讀取響應(yīng)體差不多,大致如下:

  • 讀取流并解析成請求體
  • 根據(jù)請求頭中的校驗碼進行校驗
  • 根據(jù)壓縮算法進行解壓
  • 反序列化

WriteResponse實現(xiàn):

// WriteResponse Write the rpc response header and body to the io stream
func (s *serverCodec) WriteResponse(r *rpc.Response, param interface{}) error {
	s.mutex.Lock()
	reqCtx, ok := s.pending[r.Seq]
	if !ok {
		s.mutex.Unlock()
		return InvalidSequenceError
	}
	delete(s.pending, r.Seq)
	s.mutex.Unlock()

	if r.Error != "" {
		param = nil
	}
	if _, ok := compressor.
		Compressors[reqCtx.compareType]; !ok {
		return NotFoundCompressorError
	}

	var respBody []byte
	var err error
	if param != nil {
		respBody, err = s.serializer.Marshal(param)
		if err != nil {
			return err
		}
	}

	compressedRespBody, err := compressor.
		Compressors[reqCtx.compareType].Zip(respBody)
	if err != nil {
		return err
	}
	h := header.ResponsePool.Get().(*header.ResponseHeader)
	defer func() {
		h.ResetHeader()
		header.ResponsePool.Put(h)
	}()
	h.ID = reqCtx.requestID
	h.Error = r.Error
	h.ResponseLen = uint32(len(compressedRespBody))
	h.Checksum = crc32.ChecksumIEEE(compressedRespBody)
	h.CompressType = reqCtx.compareType

	if err = sendFrame(s.w, h.Marshal()); err != nil {
		return err
	}

	if err = write(s.w, compressedRespBody); err != nil {
		return err
	}
	s.w.(*bufio.Writer).Flush()
	return nil
}

此方法用于寫入響應(yīng)體,大致與寫入請求體差不多,流程如下:

  • 將響應(yīng)體序列化
  • 使用壓縮算法將響應(yīng)體進行壓縮
  • 使用Pool管理響應(yīng)頭
  • 分別發(fā)送返回頭和返回體

總結(jié)

tinyrpc是基于golang原生的net/rpc包實現(xiàn),在此基礎(chǔ)上實現(xiàn)了壓縮和序列化等能力擴展。整體來看tinyrpc的代碼非常簡單,比較適合剛接觸golang的程序員來進行閱讀學(xué)習(xí),學(xué)習(xí)一些golang的基礎(chǔ)的開發(fā)技巧和一些語言特性。

以上就是Golang中tinyrpc框架的源碼解讀詳解的詳細內(nèi)容,更多關(guān)于Golang tinyrpc框架的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Golang拾遺之實現(xiàn)一個不可復(fù)制類型詳解

    Golang拾遺之實現(xiàn)一個不可復(fù)制類型詳解

    在這篇文章中我們將實現(xiàn)一個無法被復(fù)制的類型,順便加深對引用類型、值傳遞以及指針的理解。文中的示例代碼講解詳細,感興趣的可以了解一下
    2023-02-02
  • Golang算法問題之?dāng)?shù)組按指定規(guī)則排序的方法分析

    Golang算法問題之?dāng)?shù)組按指定規(guī)則排序的方法分析

    這篇文章主要介紹了Golang算法問題之?dāng)?shù)組按指定規(guī)則排序的方法,結(jié)合實例形式分析了Go語言數(shù)組排序相關(guān)算法原理與操作技巧,需要的朋友可以參考下
    2017-02-02
  • 詳解Go如何優(yōu)雅的對時間進行格式化

    詳解Go如何優(yōu)雅的對時間進行格式化

    這篇文章主要為大家詳細介紹了Go語言中是如何優(yōu)雅的對時間進行格式化的,文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起了解一下
    2023-06-06
  • golang實現(xiàn)簡單的tcp數(shù)據(jù)傳輸

    golang實現(xiàn)簡單的tcp數(shù)據(jù)傳輸

    這篇文章主要為大家介紹了golang實現(xiàn)簡單的tcp數(shù)據(jù)傳輸,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-12-12
  • 詳解Golang函數(shù)式選項(Functional?Options)模式

    詳解Golang函數(shù)式選項(Functional?Options)模式

    什么是函數(shù)式選項模式,為什么要這么寫,這個編程模式解決了什么問題呢?其實就是為了解決動態(tài)靈活的配置不同的參數(shù)的問題。下面通過本文給大家介紹Golang函數(shù)式選項(Functional?Options)模式的問題,感興趣的朋友一起看看吧
    2021-12-12
  • Golang單元測試與斷言編寫流程詳解

    Golang單元測試與斷言編寫流程詳解

    這篇文章主要介紹了Golang單元測試與斷言編寫流程,單元測試也是一個很重要的事情。單元測試是指在開發(fā)中,對一個函數(shù)或模塊的測試。其強調(diào)的是對單元進行測試
    2022-12-12
  • golang?recover函數(shù)使用中的一些坑解析

    golang?recover函數(shù)使用中的一些坑解析

    這篇文章主要為大家介紹了golang?recover函數(shù)使用中的一些坑解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-02-02
  • Go語言基礎(chǔ)之網(wǎng)絡(luò)編程全面教程示例

    Go語言基礎(chǔ)之網(wǎng)絡(luò)編程全面教程示例

    這篇文章主要為大家介紹了Go語言基礎(chǔ)之網(wǎng)絡(luò)編程全面教程示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-12-12
  • 使用systemd部署和守護golang應(yīng)用程序的操作方法

    使用systemd部署和守護golang應(yīng)用程序的操作方法

    systemd是一個流行的守護進程管理器,可以輕松管理服務(wù)的啟動、停止、重啟等操作,讓我們的應(yīng)用程序始終保持在線,本文介紹了如何使用systemd部署和守護golang應(yīng)用程序,感興趣的朋友一起看看吧
    2023-10-10
  • Go方法簡單性和高效性的充分體現(xiàn)詳解

    Go方法簡單性和高效性的充分體現(xiàn)詳解

    本文深入探討了Go語言中方法的各個方面,包括基礎(chǔ)概念、定義與聲明、特性、實戰(zhàn)應(yīng)用以及性能考量,文章充滿技術(shù)深度,通過實例和代碼演示,力圖幫助讀者全面理解Go方法的設(shè)計哲學(xué)和最佳實踐
    2023-10-10

最新評論