探究gRPC?客戶端調(diào)用服務(wù)端需要連接池嗎?
發(fā)現(xiàn)的問題
在微服務(wù)開發(fā)中,gRPC 的應(yīng)用絕對(duì)少不了,一般情況下,內(nèi)部微服務(wù)交互,通常是使用 RPC 進(jìn)行通信,如果是外部通信的話,會(huì)提供 https 接口文檔
對(duì)于 gRPC 的基本使用可以查看文章 gRPC介紹
對(duì)于 gRPC ,我們需要基本知道如下的一些知識(shí)點(diǎn):
gRPC 的基本四種模式的應(yīng)用場(chǎng)景
- 請(qǐng)求響應(yīng)模式
- 客戶端數(shù)據(jù)流模式
- 服務(wù)端數(shù)據(jù)流模式
- 雙向流模式
Proto 文件的定義和使用
gRPC 攔截器的應(yīng)用 , 基本的可以查看這篇 gRPC 攔截器
- 實(shí)際上有客戶端攔截器 和 服務(wù)端攔截器,具體詳細(xì)的可以自行學(xué)習(xí)
- gRPC 的設(shè)計(jì)原理細(xì)節(jié)
- Go-Kit 的使用
當(dāng)然今天并不是要聊 gRPC 的應(yīng)用或者原理,而是想聊我們?cè)陂_發(fā)過程中很容易遇到的問題:
- 未復(fù)用 gRPC 客戶端連接,影響性能
最近審查各個(gè)服務(wù)代碼中,發(fā)現(xiàn)整個(gè)部門使用 gRPC 客戶端請(qǐng)求服務(wù)端接口的時(shí)候,都是會(huì)新建一個(gè)連接,然后調(diào)用服務(wù)端接口,使用完畢之后就 close 掉, 例如這樣
這會(huì)有什么問題呢?
正常簡(jiǎn)單的使用不會(huì)有啥問題,但如果是面臨高并發(fā)的情況,性能問題很容易就會(huì)出現(xiàn),例如我們?cè)谧鲂阅軠y(cè)試的時(shí)候,就會(huì)發(fā)現(xiàn),打一會(huì)性能測(cè)試,客戶端請(qǐng)求服務(wù)端的時(shí)候就會(huì)報(bào)錯(cuò):
rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = "transport: Error while dialing dial tcp xxx:xxx: connect: connection refused
實(shí)際去查看問題的時(shí)候,很明顯,這是 gRPC 的連接數(shù)被打滿了,很多連接都還未完全釋放
那這個(gè)時(shí)候,簡(jiǎn)單思考一下,我們是沒有必要對(duì)于每一次客戶端請(qǐng)求服務(wù)端接口的時(shí)候,都新建立一次連接,并且調(diào)用完畢之后就馬上關(guān)閉連接
我們知道,gRPC 的通信本質(zhì)上也是 TCP 的連接,那么一次連接就需要三次握手,和四次揮手,每一次建立連接和釋放連接的時(shí)候,都需要走這么一個(gè)過程,如果我們頻繁的建立和釋放連接,這對(duì)于資源和性能其實(shí)都是一個(gè)大大的浪費(fèi)
我們還知道 gRPC 是一個(gè)高性能、開源和擁有統(tǒng)一規(guī)定的 RPC框架,面向?qū)ο蟮?nbsp;http/2 通信協(xié)議,能夠能節(jié)省空間和 IO 密集度的開銷 ,但是我們并沒有很好的將他運(yùn)用起來(lái),gRPC 服務(wù)端的連接管理不用我們操心,但是我們對(duì)于 gRPC 客戶端的連續(xù)非常有必要關(guān)心,咱們要想辦法復(fù)用客戶端的連接
gRPC 連接池
復(fù)用連接,我們可以使用連接池的方式
對(duì)于這種復(fù)用資源,我們其實(shí)也接觸了不少,例如復(fù)用線程 worker 的線程池,go 中的協(xié)程池 ..
簡(jiǎn)單來(lái)說(shuō),連接池 ,就是提前創(chuàng)建好一定數(shù)量的 tcp 連接句柄放在池子中,咱們需要和外部通信的時(shí)候,就去池子中取一個(gè)連接來(lái)用,用完了之后,咱們就放回去
連接池解決了什么問題
很明顯,連接池解決了上述咱們頻繁創(chuàng)建連接和釋放連接帶來(lái)的資源和性能上的損耗,咱們節(jié)省了這部分開銷后,自然就提高了咱們的性能
可是我們?cè)俅嗡伎家幌拢绻@個(gè)連接池子就是只能存放固定的連接,那么我們業(yè)務(wù)擴(kuò)張的時(shí)候,豈不是光等待池子里面有空閑連接就會(huì)耗費(fèi)大量的時(shí)間呢?
或者是池子過大,咱們需要的連接數(shù)較少,那么開辟那么多連接豈不是一種浪費(fèi)?
那么我們?cè)谠O(shè)計(jì)或者是應(yīng)用連接池的時(shí)候,就需要考慮如下幾個(gè)方面了:
- 連接池是否支持?jǐn)U縮容
- 空閑的連接是否支持超時(shí)自行關(guān)閉,是否支持?;?/strong>
- 池子滿的時(shí)候,處理的策略是什么樣的
其實(shí)關(guān)于連接池的設(shè)計(jì)和庫(kù)網(wǎng)上都很多,我們可以找一個(gè)案例來(lái)看看如何來(lái)使用連接池,以及它是如何來(lái)進(jìn)行上述幾個(gè)方面的編碼落地的
如何去使用連接池
先來(lái)看看客戶端如何使用連接池
客戶端使用 pool
client/main.go
package main import ( "context" "flag" "fmt" "log" "time" "mypoolclient/pool" "mypoolclient/pb" ) var addr = flag.String("addr", "127.0.0.1:8888", "the address to connect to") func main() { flag.Parse() p, err := pool.New(*addr, pool.DefaultOptions) if err != nil { log.Fatalf("failed to new pool: %v", err) } defer p.Close() conn, err := p.Get() if err != nil { log.Fatalf("failed to get conn: %v", err) } defer conn.Close() client := pb.NewTestsvrClient(conn.Value()) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() res, err := client.Say(ctx, &pb.TestReq{Message: []byte("hi")}) if err != nil { log.Fatalf("unexpected error from Say: %v", err) } fmt.Println("rpc response:", res) }
此處的客戶端,我們很明顯可以看出來(lái),以前咱們使用客戶端去調(diào)用服務(wù)端接口的時(shí)候,總會(huì)不自覺的 Dial 一下建立連接
咱們使用連接池的話,就可以直接從池子里面拿一個(gè)連接出來(lái)直接使用即可
服務(wù)端
server/client.go
package main import ( "context" "flag" "fmt" "log" "net" "google.golang.org/grpc" "mypoolserver/pb" ) var port = flag.Int("port", 8888, "port number") // server implements EchoServer. type server struct{} func (s *server) Say(context.Context, *pb.TestReq) (*pb.TestRsp, error) { fmt.Println("call Say ... ") return &pb.TestRsp{Message: []byte("hello world")}, nil } func main() { flag.Parse() listen, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%v", *port)) if err != nil { log.Fatalf("failed to listen: %v", err) } s := grpc.NewServer() pb.RegisterTestsvrServer(s, &server{}) fmt.Println("start server ...") if err := s.Serve(listen); err != nil { log.Fatalf("failed to serve: %v", err) } fmt.Println("over server ...") }
連接池的具體實(shí)現(xiàn)方式
連接池的具體實(shí)現(xiàn)方式,參考了 github
https://github.com/shimingyah/pool
具體的實(shí)現(xiàn),都放在上述目錄的 pool 下面了 , 也可以訪問地址
https://github.com/qingconglaixueit/mypoolapp
pool 包中包含了 3 個(gè)文件,作用如下
├── conn.go
-- 關(guān)于 grpc 連接的結(jié)構(gòu)定義和方法實(shí)現(xiàn)
├── options.go
-- 攔截器的常量定義,以及 Dial 建立連接的簡(jiǎn)單封裝, 這個(gè)文件可要可不要,看自己的需求
└── pool.go
-- 具體 pool 的接口定義和實(shí)現(xiàn)
直接來(lái)看 pool.go 中的接口定義
type Pool interface { Get() (Conn, error) Close() error Status() string }
- Get()
獲取一個(gè)新的連接 , 當(dāng)關(guān)閉連接的時(shí)候,會(huì)將該連接放入到池子中
- Close()
關(guān)閉連接池,自然連接池子中的連接也不再可用
關(guān)于 pool 結(jié)構(gòu)的定義 ,conn 結(jié)構(gòu)的定義建議,將上述 github 地址上的源碼下載下來(lái)進(jìn)行閱讀,下面主要是分享關(guān)于
- 連接池子的創(chuàng)建,擴(kuò)縮容,釋放
- 具體 TCP 連接的創(chuàng)建和釋放
創(chuàng)建連接池
func New(address string, option Options) (Pool, error) { if address == "" { return nil, errors.New("invalid address settings") } if option.Dial == nil { return nil, errors.New("invalid dial settings") } if option.MaxIdle <= 0 || option.MaxActive <= 0 || option.MaxIdle > option.MaxActive { return nil, errors.New("invalid maximum settings") } if option.MaxConcurrentStreams <= 0 { return nil, errors.New("invalid maximun settings") } p := &pool{ index: 0, current: int32(option.MaxIdle), ref: 0, opt: option, conns: make([]*conn, option.MaxActive), address: address, closed: 0, } for i := 0; i < p.opt.MaxIdle; i++ { c, err := p.opt.Dial(address) if err != nil { p.Close() return nil, fmt.Errorf("dial is not able to fill the pool: %s", err) } p.conns[i] = p.wrapConn(c, false) } log.Printf("new pool success: %v\n", p.Status()) return p, nil }
關(guān)于 pool 的接口,可以看成是這樣的
對(duì)于創(chuàng)建連接池,除了校驗(yàn)基本的參數(shù)以外,我們知道池子其實(shí)是一個(gè) TCP 連接的切片,長(zhǎng)度為 option.MaxActive 即最大的活躍連接數(shù)
p.conns[i] = p.wrapConn(c, false)
表示咱們初始化一個(gè)連接,并放到連接池中,且初始化的 once 參數(shù)置為 false,表示該連接默認(rèn)保存在池子中,不被銷毀
換句話說(shuō),當(dāng)我們需要真實(shí)銷毀連接池中的連接的時(shí)候,就將該鏈接的 once 參數(shù)置為 false 即可,實(shí)際上也無(wú)需我們使用這去做這一步
實(shí)際上 關(guān)于每一個(gè)連接的建立也是在 New 里面完成的,只要有 1 個(gè)連接未建立成功,那么咱們的連接池就算是建立失敗,咱們會(huì)調(diào)用 p.Close() 將之前建立好的連接全部釋放掉
// 關(guān)閉連接池 func (p *pool) Close() error { atomic.StoreInt32(&p.closed, 1) atomic.StoreUint32(&p.index, 0) atomic.StoreInt32(&p.current, 0) atomic.StoreInt32(&p.ref, 0) p.deleteFrom(0) log.Printf("close pool success: %v\n", p.Status()) return nil } // 清除從 指定位置開始到 MaxActive 之間的連接 func (p *pool) deleteFrom(begin int) { for i := begin; i < p.opt.MaxActive; i++ { p.reset(i) } } // 清除具體的連接 func (p *pool) reset(index int) { conn := p.conns[index] if conn == nil { return } conn.reset() p.conns[index] = nil }
這里我們可以看到,當(dāng)需要從池子中清除具體的連接的時(shí)候,最終從連接池子中取出對(duì)應(yīng)位置上的連接 ,conn := p.conns[index], conn.reset()
,實(shí)際上是給當(dāng)前這個(gè)連接進(jìn)行參數(shù)賦值
func (c *conn) reset() error { cc := c.cc c.cc = nil c.once = false if cc != nil { return cc.Close() } return nil } func (c *conn) Close() error { c.pool.decrRef() if c.once { return c.reset() } return nil }
最終調(diào)用 Close() 將指定的連接清除掉,這些動(dòng)作都是連接池自動(dòng)給我們做了,無(wú)需我們使用者去擔(dān)心
我們使用連接池通過 pool.Get() 拿到具體的連接句柄 conn 之后,我們使用 conn.Close() 關(guān)閉連接,實(shí)際上也是會(huì)走到上述的 Close() 實(shí)現(xiàn)的位置,但是我們并未指定當(dāng)然也沒有權(quán)限顯示的指定將 once 置位為 false ,因此對(duì)于調(diào)用者來(lái)說(shuō),是關(guān)閉了連接,對(duì)于連接池來(lái)說(shuō),實(shí)際上是將連接歸還到連接池中
關(guān)于連接池子的縮容和擴(kuò)容是在 pool.Get() 中實(shí)現(xiàn)的
func (p *pool) Get() (Conn, error) { // the first selected from the created connections nextRef := p.incrRef() p.RLock() current := atomic.LoadInt32(&p.current) p.RUnlock() if current == 0 { return nil, ErrClosed } if nextRef <= current*int32(p.opt.MaxConcurrentStreams) { next := atomic.AddUint32(&p.index, 1) % uint32(current) return p.conns[next], nil } // the number connection of pool is reach to max active if current == int32(p.opt.MaxActive) { // the second if reuse is true, select from pool's connections if p.opt.Reuse { next := atomic.AddUint32(&p.index, 1) % uint32(current) return p.conns[next], nil } // the third create one-time connection c, err := p.opt.Dial(p.address) return p.wrapConn(c, true), err } // the fourth create new connections given back to pool p.Lock() current = atomic.LoadInt32(&p.current) if current < int32(p.opt.MaxActive) && nextRef > current*int32(p.opt.MaxConcurrentStreams) { // 2 times the incremental or the remain incremental increment := current if current+increment > int32(p.opt.MaxActive) { increment = int32(p.opt.MaxActive) - current } var i int32 var err error for i = 0; i < increment; i++ { c, er := p.opt.Dial(p.address) if er != nil { err = er break } p.reset(int(current + i)) p.conns[current+i] = p.wrapConn(c, false) } current += i log.Printf("grow pool: %d ---> %d, increment: %d, maxActive: %d\n", p.current, current, increment, p.opt.MaxActive) atomic.StoreInt32(&p.current, current) if err != nil { p.Unlock() return nil, err } } p.Unlock() next := atomic.AddUint32(&p.index, 1) % uint32(current) return p.conns[next], nil }
從 Get 的實(shí)現(xiàn)中,我們可以知道 Get 的邏輯如下
- 先增加連接的引用計(jì)數(shù),如果在設(shè)定 current*int32(p.opt.MaxConcurrentStreams) 范圍內(nèi),那么直接取連接進(jìn)行使用即可
- 若當(dāng)前的連接數(shù)達(dá)到了最大活躍的連接數(shù),那么就看我們新建池子的時(shí)候傳遞的 option 中的 reuse 參數(shù)是否是 true,若是復(fù)用,則隨機(jī)取出連接池中的任意連接提供使用,如果不復(fù)用,則新建一個(gè)連接
- 其余的情況,就需要我們進(jìn)行 2 倍或者 1 倍的數(shù)量對(duì)連接池進(jìn)行擴(kuò)容了
實(shí)際上,上述的庫(kù)中,并沒有提供咱們縮容的算法,如果真的有這方面的需求的話
也可以在 Get 的實(shí)現(xiàn)上進(jìn)行縮容,具體的縮容策略可以根據(jù)實(shí)際情況來(lái)定,例如當(dāng)引用計(jì)數(shù) nextRef 只有當(dāng)前活躍連接數(shù)的 20% 的時(shí)候(這只是一個(gè)例子),就可以考慮縮容了
以上就是探究gRPC 客戶端調(diào)用服務(wù)端需要連接池嗎?的詳細(xì)內(nèi)容,更多關(guān)于gRPC客戶端調(diào)用服務(wù)端連接池的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
golang interface{}類型轉(zhuǎn)換的實(shí)現(xiàn)示例
在Go語(yǔ)言中,類型轉(zhuǎn)換可以通過斷言、顯式、隱式和強(qiáng)制四種方式實(shí)現(xiàn),針對(duì)interface{}類型轉(zhuǎn)換為float32或float64,需要使用type斷言或reflect包處理,感興趣的可以了解一下2024-10-10golang goquery selector選擇器使用示例大全
這篇文章主要為大家介紹了golang goquery selector選擇器使用示例大全,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-09-09go同步原語(yǔ)Phaser和Barrier區(qū)別
這篇文章主要為大家介紹了通過java講解go同步原語(yǔ)Phaser和Barrier區(qū)別,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12Go結(jié)合MQTT實(shí)現(xiàn)通信的示例代碼
本文主要介紹了Go結(jié)合MQTT實(shí)現(xiàn)通信的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-05-05Go語(yǔ)言LeetCode題解1046最后一塊石頭的重量
這篇文章主要為大家介紹了Go語(yǔ)言LeetCode題解1046最后一塊石頭的重量,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-12-12