探究gRPC?客戶端調用服務端需要連接池嗎?
發(fā)現(xiàn)的問題
在微服務開發(fā)中,gRPC 的應用絕對少不了,一般情況下,內部微服務交互,通常是使用 RPC 進行通信,如果是外部通信的話,會提供 https 接口文檔
對于 gRPC 的基本使用可以查看文章 gRPC介紹
對于 gRPC ,我們需要基本知道如下的一些知識點:
gRPC 的基本四種模式的應用場景
- 請求響應模式
- 客戶端數(shù)據流模式
- 服務端數(shù)據流模式
- 雙向流模式
Proto 文件的定義和使用
gRPC 攔截器的應用 , 基本的可以查看這篇 gRPC 攔截器
- 實際上有客戶端攔截器 和 服務端攔截器,具體詳細的可以自行學習
- gRPC 的設計原理細節(jié)
- Go-Kit 的使用
當然今天并不是要聊 gRPC 的應用或者原理,而是想聊我們在開發(fā)過程中很容易遇到的問題:
- 未復用 gRPC 客戶端連接,影響性能
最近審查各個服務代碼中,發(fā)現(xiàn)整個部門使用 gRPC 客戶端請求服務端接口的時候,都是會新建一個連接,然后調用服務端接口,使用完畢之后就 close 掉, 例如這樣

這會有什么問題呢?
正常簡單的使用不會有啥問題,但如果是面臨高并發(fā)的情況,性能問題很容易就會出現(xiàn),例如我們在做性能測試的時候,就會發(fā)現(xiàn),打一會性能測試,客戶端請求服務端的時候就會報錯:
rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = "transport: Error while dialing dial tcp xxx:xxx: connect: connection refused
實際去查看問題的時候,很明顯,這是 gRPC 的連接數(shù)被打滿了,很多連接都還未完全釋放
那這個時候,簡單思考一下,我們是沒有必要對于每一次客戶端請求服務端接口的時候,都新建立一次連接,并且調用完畢之后就馬上關閉連接
我們知道,gRPC 的通信本質上也是 TCP 的連接,那么一次連接就需要三次握手,和四次揮手,每一次建立連接和釋放連接的時候,都需要走這么一個過程,如果我們頻繁的建立和釋放連接,這對于資源和性能其實都是一個大大的浪費
我們還知道 gRPC 是一個高性能、開源和擁有統(tǒng)一規(guī)定的 RPC框架,面向對象的 http/2 通信協(xié)議,能夠能節(jié)省空間和 IO 密集度的開銷 ,但是我們并沒有很好的將他運用起來,gRPC 服務端的連接管理不用我們操心,但是我們對于 gRPC 客戶端的連續(xù)非常有必要關心,咱們要想辦法復用客戶端的連接
gRPC 連接池
復用連接,我們可以使用連接池的方式
對于這種復用資源,我們其實也接觸了不少,例如復用線程 worker 的線程池,go 中的協(xié)程池 ..
簡單來說,連接池 ,就是提前創(chuàng)建好一定數(shù)量的 tcp 連接句柄放在池子中,咱們需要和外部通信的時候,就去池子中取一個連接來用,用完了之后,咱們就放回去
連接池解決了什么問題
很明顯,連接池解決了上述咱們頻繁創(chuàng)建連接和釋放連接帶來的資源和性能上的損耗,咱們節(jié)省了這部分開銷后,自然就提高了咱們的性能
可是我們再次思考一下,如果這個連接池子就是只能存放固定的連接,那么我們業(yè)務擴張的時候,豈不是光等待池子里面有空閑連接就會耗費大量的時間呢?
或者是池子過大,咱們需要的連接數(shù)較少,那么開辟那么多連接豈不是一種浪費?
那么我們在設計或者是應用連接池的時候,就需要考慮如下幾個方面了:
- 連接池是否支持擴縮容
- 空閑的連接是否支持超時自行關閉,是否支持保活
- 池子滿的時候,處理的策略是什么樣的
其實關于連接池的設計和庫網上都很多,我們可以找一個案例來看看如何來使用連接池,以及它是如何來進行上述幾個方面的編碼落地的
如何去使用連接池
先來看看客戶端如何使用連接池

客戶端使用 pool
client/main.go
package main
import (
"context"
"flag"
"fmt"
"log"
"time"
"mypoolclient/pool"
"mypoolclient/pb"
)
var addr = flag.String("addr", "127.0.0.1:8888", "the address to connect to")
func main() {
flag.Parse()
p, err := pool.New(*addr, pool.DefaultOptions)
if err != nil {
log.Fatalf("failed to new pool: %v", err)
}
defer p.Close()
conn, err := p.Get()
if err != nil {
log.Fatalf("failed to get conn: %v", err)
}
defer conn.Close()
client := pb.NewTestsvrClient(conn.Value())
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
res, err := client.Say(ctx, &pb.TestReq{Message: []byte("hi")})
if err != nil {
log.Fatalf("unexpected error from Say: %v", err)
}
fmt.Println("rpc response:", res)
}此處的客戶端,我們很明顯可以看出來,以前咱們使用客戶端去調用服務端接口的時候,總會不自覺的 Dial 一下建立連接
咱們使用連接池的話,就可以直接從池子里面拿一個連接出來直接使用即可
服務端
server/client.go
package main
import (
"context"
"flag"
"fmt"
"log"
"net"
"google.golang.org/grpc"
"mypoolserver/pb"
)
var port = flag.Int("port", 8888, "port number")
// server implements EchoServer.
type server struct{}
func (s *server) Say(context.Context, *pb.TestReq) (*pb.TestRsp, error) {
fmt.Println("call Say ... ")
return &pb.TestRsp{Message: []byte("hello world")}, nil
}
func main() {
flag.Parse()
listen, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%v", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterTestsvrServer(s, &server{})
fmt.Println("start server ...")
if err := s.Serve(listen); err != nil {
log.Fatalf("failed to serve: %v", err)
}
fmt.Println("over server ...")
}連接池的具體實現(xiàn)方式
連接池的具體實現(xiàn)方式,參考了 github
https://github.com/shimingyah/pool
具體的實現(xiàn),都放在上述目錄的 pool 下面了 , 也可以訪問地址
https://github.com/qingconglaixueit/mypoolapp
pool 包中包含了 3 個文件,作用如下
├── conn.go
-- 關于 grpc 連接的結構定義和方法實現(xiàn)
├── options.go
-- 攔截器的常量定義,以及 Dial 建立連接的簡單封裝, 這個文件可要可不要,看自己的需求
└── pool.go
-- 具體 pool 的接口定義和實現(xiàn)
直接來看 pool.go 中的接口定義
type Pool interface {
Get() (Conn, error)
Close() error
Status() string
}- Get()
獲取一個新的連接 , 當關閉連接的時候,會將該連接放入到池子中
- Close()
關閉連接池,自然連接池子中的連接也不再可用
關于 pool 結構的定義 ,conn 結構的定義建議,將上述 github 地址上的源碼下載下來進行閱讀,下面主要是分享關于
- 連接池子的創(chuàng)建,擴縮容,釋放
- 具體 TCP 連接的創(chuàng)建和釋放
創(chuàng)建連接池
func New(address string, option Options) (Pool, error) {
if address == "" {
return nil, errors.New("invalid address settings")
}
if option.Dial == nil {
return nil, errors.New("invalid dial settings")
}
if option.MaxIdle <= 0 || option.MaxActive <= 0 || option.MaxIdle > option.MaxActive {
return nil, errors.New("invalid maximum settings")
}
if option.MaxConcurrentStreams <= 0 {
return nil, errors.New("invalid maximun settings")
}
p := &pool{
index: 0,
current: int32(option.MaxIdle),
ref: 0,
opt: option,
conns: make([]*conn, option.MaxActive),
address: address,
closed: 0,
}
for i := 0; i < p.opt.MaxIdle; i++ {
c, err := p.opt.Dial(address)
if err != nil {
p.Close()
return nil, fmt.Errorf("dial is not able to fill the pool: %s", err)
}
p.conns[i] = p.wrapConn(c, false)
}
log.Printf("new pool success: %v\n", p.Status())
return p, nil
}關于 pool 的接口,可以看成是這樣的

對于創(chuàng)建連接池,除了校驗基本的參數(shù)以外,我們知道池子其實是一個 TCP 連接的切片,長度為 option.MaxActive 即最大的活躍連接數(shù)
p.conns[i] = p.wrapConn(c, false) 表示咱們初始化一個連接,并放到連接池中,且初始化的 once 參數(shù)置為 false,表示該連接默認保存在池子中,不被銷毀
換句話說,當我們需要真實銷毀連接池中的連接的時候,就將該鏈接的 once 參數(shù)置為 false 即可,實際上也無需我們使用這去做這一步
實際上 關于每一個連接的建立也是在 New 里面完成的,只要有 1 個連接未建立成功,那么咱們的連接池就算是建立失敗,咱們會調用 p.Close() 將之前建立好的連接全部釋放掉
// 關閉連接池
func (p *pool) Close() error {
atomic.StoreInt32(&p.closed, 1)
atomic.StoreUint32(&p.index, 0)
atomic.StoreInt32(&p.current, 0)
atomic.StoreInt32(&p.ref, 0)
p.deleteFrom(0)
log.Printf("close pool success: %v\n", p.Status())
return nil
}
// 清除從 指定位置開始到 MaxActive 之間的連接
func (p *pool) deleteFrom(begin int) {
for i := begin; i < p.opt.MaxActive; i++ {
p.reset(i)
}
}
// 清除具體的連接
func (p *pool) reset(index int) {
conn := p.conns[index]
if conn == nil {
return
}
conn.reset()
p.conns[index] = nil
}這里我們可以看到,當需要從池子中清除具體的連接的時候,最終從連接池子中取出對應位置上的連接 ,conn := p.conns[index], conn.reset() ,實際上是給當前這個連接進行參數(shù)賦值
func (c *conn) reset() error {
cc := c.cc
c.cc = nil
c.once = false
if cc != nil {
return cc.Close()
}
return nil
}
func (c *conn) Close() error {
c.pool.decrRef()
if c.once {
return c.reset()
}
return nil
}最終調用 Close() 將指定的連接清除掉,這些動作都是連接池自動給我們做了,無需我們使用者去擔心
我們使用連接池通過 pool.Get() 拿到具體的連接句柄 conn 之后,我們使用 conn.Close() 關閉連接,實際上也是會走到上述的 Close() 實現(xiàn)的位置,但是我們并未指定當然也沒有權限顯示的指定將 once 置位為 false ,因此對于調用者來說,是關閉了連接,對于連接池來說,實際上是將連接歸還到連接池中
關于連接池子的縮容和擴容是在 pool.Get() 中實現(xiàn)的
func (p *pool) Get() (Conn, error) {
// the first selected from the created connections
nextRef := p.incrRef()
p.RLock()
current := atomic.LoadInt32(&p.current)
p.RUnlock()
if current == 0 {
return nil, ErrClosed
}
if nextRef <= current*int32(p.opt.MaxConcurrentStreams) {
next := atomic.AddUint32(&p.index, 1) % uint32(current)
return p.conns[next], nil
}
// the number connection of pool is reach to max active
if current == int32(p.opt.MaxActive) {
// the second if reuse is true, select from pool's connections
if p.opt.Reuse {
next := atomic.AddUint32(&p.index, 1) % uint32(current)
return p.conns[next], nil
}
// the third create one-time connection
c, err := p.opt.Dial(p.address)
return p.wrapConn(c, true), err
}
// the fourth create new connections given back to pool
p.Lock()
current = atomic.LoadInt32(&p.current)
if current < int32(p.opt.MaxActive) && nextRef > current*int32(p.opt.MaxConcurrentStreams) {
// 2 times the incremental or the remain incremental
increment := current
if current+increment > int32(p.opt.MaxActive) {
increment = int32(p.opt.MaxActive) - current
}
var i int32
var err error
for i = 0; i < increment; i++ {
c, er := p.opt.Dial(p.address)
if er != nil {
err = er
break
}
p.reset(int(current + i))
p.conns[current+i] = p.wrapConn(c, false)
}
current += i
log.Printf("grow pool: %d ---> %d, increment: %d, maxActive: %d\n",
p.current, current, increment, p.opt.MaxActive)
atomic.StoreInt32(&p.current, current)
if err != nil {
p.Unlock()
return nil, err
}
}
p.Unlock()
next := atomic.AddUint32(&p.index, 1) % uint32(current)
return p.conns[next], nil
}從 Get 的實現(xiàn)中,我們可以知道 Get 的邏輯如下
- 先增加連接的引用計數(shù),如果在設定 current*int32(p.opt.MaxConcurrentStreams) 范圍內,那么直接取連接進行使用即可
- 若當前的連接數(shù)達到了最大活躍的連接數(shù),那么就看我們新建池子的時候傳遞的 option 中的 reuse 參數(shù)是否是 true,若是復用,則隨機取出連接池中的任意連接提供使用,如果不復用,則新建一個連接
- 其余的情況,就需要我們進行 2 倍或者 1 倍的數(shù)量對連接池進行擴容了
實際上,上述的庫中,并沒有提供咱們縮容的算法,如果真的有這方面的需求的話
也可以在 Get 的實現(xiàn)上進行縮容,具體的縮容策略可以根據實際情況來定,例如當引用計數(shù) nextRef 只有當前活躍連接數(shù)的 20% 的時候(這只是一個例子),就可以考慮縮容了
以上就是探究gRPC 客戶端調用服務端需要連接池嗎?的詳細內容,更多關于gRPC客戶端調用服務端連接池的資料請關注腳本之家其它相關文章!
相關文章
golang interface{}類型轉換的實現(xiàn)示例
在Go語言中,類型轉換可以通過斷言、顯式、隱式和強制四種方式實現(xiàn),針對interface{}類型轉換為float32或float64,需要使用type斷言或reflect包處理,感興趣的可以了解一下2024-10-10
golang goquery selector選擇器使用示例大全
這篇文章主要為大家介紹了golang goquery selector選擇器使用示例大全,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-09-09

