欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

探究gRPC?客戶端調用服務端需要連接池嗎?

 更新時間:2023年08月22日 10:10:44   作者:阿兵云原  
這篇文章主要為大家介紹了gRPC?客戶端調用服務端需要連接池嗎的問題探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

發(fā)現(xiàn)的問題

在微服務開發(fā)中,gRPC 的應用絕對少不了,一般情況下,內部微服務交互,通常是使用 RPC 進行通信,如果是外部通信的話,會提供 https 接口文檔

對于 gRPC 的基本使用可以查看文章 gRPC介紹

對于 gRPC ,我們需要基本知道如下的一些知識點:

  • gRPC 的基本四種模式的應用場景

    • 請求響應模式
    • 客戶端數(shù)據流模式
    • 服務端數(shù)據流模式
    • 雙向流模式

Proto 文件的定義和使用

  • gRPC 攔截器的應用 , 基本的可以查看這篇 gRPC 攔截器

    • 實際上有客戶端攔截器 和 服務端攔截器,具體詳細的可以自行學習
  • gRPC 的設計原理細節(jié)
  • Go-Kit 的使用

當然今天并不是要聊 gRPC 的應用或者原理,而是想聊我們在開發(fā)過程中很容易遇到的問題:

  • 未復用 gRPC 客戶端連接,影響性能

最近審查各個服務代碼中,發(fā)現(xiàn)整個部門使用 gRPC 客戶端請求服務端接口的時候,都是會新建一個連接,然后調用服務端接口,使用完畢之后就 close 掉, 例如這樣

這會有什么問題呢?

正常簡單的使用不會有啥問題,但如果是面臨高并發(fā)的情況,性能問題很容易就會出現(xiàn),例如我們在做性能測試的時候,就會發(fā)現(xiàn),打一會性能測試,客戶端請求服務端的時候就會報錯:

rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = "transport: Error while dialing dial tcp xxx:xxx: connect: connection refused

實際去查看問題的時候,很明顯,這是 gRPC 的連接數(shù)被打滿了,很多連接都還未完全釋放

那這個時候,簡單思考一下,我們是沒有必要對于每一次客戶端請求服務端接口的時候,都新建立一次連接,并且調用完畢之后就馬上關閉連接

我們知道,gRPC 的通信本質上也是 TCP 的連接,那么一次連接就需要三次握手,和四次揮手,每一次建立連接和釋放連接的時候,都需要走這么一個過程,如果我們頻繁的建立和釋放連接,這對于資源和性能其實都是一個大大的浪費

我們還知道 gRPC 是一個高性能、開源和擁有統(tǒng)一規(guī)定的 RPC框架,面向對象的 http/2 通信協(xié)議,能夠能節(jié)省空間和 IO 密集度的開銷 ,但是我們并沒有很好的將他運用起來,gRPC 服務端的連接管理不用我們操心,但是我們對于 gRPC 客戶端的連續(xù)非常有必要關心,咱們要想辦法復用客戶端的連接

gRPC 連接池

復用連接,我們可以使用連接池的方式

對于這種復用資源,我們其實也接觸了不少,例如復用線程 worker 的線程池,go 中的協(xié)程池 ..

簡單來說,連接池 ,就是提前創(chuàng)建好一定數(shù)量的 tcp 連接句柄放在池子中,咱們需要和外部通信的時候,就去池子中取一個連接來用,用完了之后,咱們就放回去

連接池解決了什么問題

很明顯,連接池解決了上述咱們頻繁創(chuàng)建連接和釋放連接帶來的資源和性能上的損耗,咱們節(jié)省了這部分開銷后,自然就提高了咱們的性能

可是我們再次思考一下,如果這個連接池子就是只能存放固定的連接,那么我們業(yè)務擴張的時候,豈不是光等待池子里面有空閑連接就會耗費大量的時間呢?

或者是池子過大,咱們需要的連接數(shù)較少,那么開辟那么多連接豈不是一種浪費?

那么我們在設計或者是應用連接池的時候,就需要考慮如下幾個方面了:

  • 連接池是否支持擴縮容
  • 空閑的連接是否支持超時自行關閉,是否支持保活
  • 池子滿的時候,處理的策略是什么樣的

其實關于連接池的設計和庫網上都很多,我們可以找一個案例來看看如何來使用連接池,以及它是如何來進行上述幾個方面的編碼落地的

如何去使用連接池

先來看看客戶端如何使用連接池

客戶端使用 pool

client/main.go

package main
import (
        "context"
        "flag"
        "fmt"
        "log"
        "time"
        "mypoolclient/pool"
        "mypoolclient/pb"
)
var addr = flag.String("addr", "127.0.0.1:8888", "the address to connect to")
func main() {
        flag.Parse()
        p, err := pool.New(*addr, pool.DefaultOptions)
        if err != nil {
                log.Fatalf("failed to new pool: %v", err)
        }
        defer p.Close()
        conn, err := p.Get()
        if err != nil {
                log.Fatalf("failed to get conn: %v", err)
        }
        defer conn.Close()
        client := pb.NewTestsvrClient(conn.Value())
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
        res, err := client.Say(ctx, &pb.TestReq{Message: []byte("hi")})
        if err != nil {
                log.Fatalf("unexpected error from Say: %v", err)
        }
        fmt.Println("rpc response:", res)
}

此處的客戶端,我們很明顯可以看出來,以前咱們使用客戶端去調用服務端接口的時候,總會不自覺的 Dial 一下建立連接

咱們使用連接池的話,就可以直接從池子里面拿一個連接出來直接使用即可

服務端

server/client.go

package main
import (
        "context"
        "flag"
        "fmt"
        "log"
        "net"
        "google.golang.org/grpc"
        "mypoolserver/pb"
)
var port = flag.Int("port", 8888, "port number")
// server implements EchoServer.
type server struct{}
func (s *server) Say(context.Context, *pb.TestReq) (*pb.TestRsp, error) {
        fmt.Println("call  Say ... ")
        return &pb.TestRsp{Message: []byte("hello world")}, nil
}
func main() {
        flag.Parse()
        listen, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%v", *port))
        if err != nil {
                log.Fatalf("failed to listen: %v", err)
        }
        s := grpc.NewServer()
        pb.RegisterTestsvrServer(s, &server{})
        fmt.Println("start server ...")
        if err := s.Serve(listen); err != nil {
                log.Fatalf("failed to serve: %v", err)
        }
        fmt.Println("over server ...")
}

連接池的具體實現(xiàn)方式

連接池的具體實現(xiàn)方式,參考了 github

https://github.com/shimingyah/pool

具體的實現(xiàn),都放在上述目錄的 pool 下面了 , 也可以訪問地址

https://github.com/qingconglaixueit/mypoolapp

pool 包中包含了 3 個文件,作用如下

├── conn.go

-- 關于 grpc 連接的結構定義和方法實現(xiàn)

├── options.go

-- 攔截器的常量定義,以及 Dial 建立連接的簡單封裝, 這個文件可要可不要,看自己的需求

└── pool.go

-- 具體 pool 的接口定義和實現(xiàn)

直接來看 pool.go 中的接口定義

type Pool interface {
   Get() (Conn, error)
   Close() error
   Status() string
}
  • Get()

獲取一個新的連接 , 當關閉連接的時候,會將該連接放入到池子中

  • Close()

關閉連接池,自然連接池子中的連接也不再可用

關于 pool 結構的定義 ,conn 結構的定義建議,將上述 github 地址上的源碼下載下來進行閱讀,下面主要是分享關于

  • 連接池子的創(chuàng)建,擴縮容,釋放
  • 具體 TCP 連接的創(chuàng)建和釋放

創(chuàng)建連接池

func New(address string, option Options) (Pool, error) {
   if address == "" {
      return nil, errors.New("invalid address settings")
   }
   if option.Dial == nil {
      return nil, errors.New("invalid dial settings")
   }
   if option.MaxIdle <= 0 || option.MaxActive <= 0 || option.MaxIdle > option.MaxActive {
      return nil, errors.New("invalid maximum settings")
   }
   if option.MaxConcurrentStreams <= 0 {
      return nil, errors.New("invalid maximun settings")
   }
   p := &pool{
      index:   0,
      current: int32(option.MaxIdle),
      ref:     0,
      opt:     option,
      conns:   make([]*conn, option.MaxActive),
      address: address,
      closed:  0,
   }
   for i := 0; i < p.opt.MaxIdle; i++ {
      c, err := p.opt.Dial(address)
      if err != nil {
         p.Close()
         return nil, fmt.Errorf("dial is not able to fill the pool: %s", err)
      }
      p.conns[i] = p.wrapConn(c, false)
   }
   log.Printf("new pool success: %v\n", p.Status())
   return p, nil
}

關于 pool 的接口,可以看成是這樣的

對于創(chuàng)建連接池,除了校驗基本的參數(shù)以外,我們知道池子其實是一個 TCP 連接的切片,長度為 option.MaxActive 即最大的活躍連接數(shù)

p.conns[i] = p.wrapConn(c, false) 表示咱們初始化一個連接,并放到連接池中,且初始化的 once 參數(shù)置為 false,表示該連接默認保存在池子中,不被銷毀

換句話說,當我們需要真實銷毀連接池中的連接的時候,就將該鏈接的 once 參數(shù)置為 false 即可,實際上也無需我們使用這去做這一步

實際上 關于每一個連接的建立也是在 New 里面完成的,只要有 1 個連接未建立成功,那么咱們的連接池就算是建立失敗,咱們會調用 p.Close() 將之前建立好的連接全部釋放掉

// 關閉連接池
func (p *pool) Close() error {
   atomic.StoreInt32(&p.closed, 1)
   atomic.StoreUint32(&p.index, 0)
   atomic.StoreInt32(&p.current, 0)
   atomic.StoreInt32(&p.ref, 0)
   p.deleteFrom(0)
   log.Printf("close pool success: %v\n", p.Status())
   return nil
}
// 清除從 指定位置開始到 MaxActive 之間的連接
func (p *pool) deleteFrom(begin int) {
   for i := begin; i < p.opt.MaxActive; i++ {
      p.reset(i)
   }
}
// 清除具體的連接
func (p *pool) reset(index int) {
   conn := p.conns[index]
   if conn == nil {
      return
   }
   conn.reset()
   p.conns[index] = nil
}

這里我們可以看到,當需要從池子中清除具體的連接的時候,最終從連接池子中取出對應位置上的連接 ,conn := p.conns[index], conn.reset() ,實際上是給當前這個連接進行參數(shù)賦值

func (c *conn) reset() error {
   cc := c.cc
   c.cc = nil
   c.once = false
   if cc != nil {
      return cc.Close()
   }
   return nil
}
func (c *conn) Close() error {
   c.pool.decrRef()
   if c.once {
      return c.reset()
   }
   return nil
}

最終調用 Close() 將指定的連接清除掉,這些動作都是連接池自動給我們做了,無需我們使用者去擔心

我們使用連接池通過 pool.Get() 拿到具體的連接句柄 conn 之后,我們使用 conn.Close() 關閉連接,實際上也是會走到上述的 Close() 實現(xiàn)的位置,但是我們并未指定當然也沒有權限顯示的指定將 once 置位為 false ,因此對于調用者來說,是關閉了連接,對于連接池來說,實際上是將連接歸還到連接池中

關于連接池子的縮容和擴容是在 pool.Get() 中實現(xiàn)的

func (p *pool) Get() (Conn, error) {
   // the first selected from the created connections
   nextRef := p.incrRef()
   p.RLock()
   current := atomic.LoadInt32(&p.current)
   p.RUnlock()
   if current == 0 {
      return nil, ErrClosed
   }
   if nextRef <= current*int32(p.opt.MaxConcurrentStreams) {
      next := atomic.AddUint32(&p.index, 1) % uint32(current)
      return p.conns[next], nil
   }
   // the number connection of pool is reach to max active
   if current == int32(p.opt.MaxActive) {
      // the second if reuse is true, select from pool's connections
      if p.opt.Reuse {
         next := atomic.AddUint32(&p.index, 1) % uint32(current)
         return p.conns[next], nil
      }
      // the third create one-time connection
      c, err := p.opt.Dial(p.address)
      return p.wrapConn(c, true), err
   }
   // the fourth create new connections given back to pool
   p.Lock()
   current = atomic.LoadInt32(&p.current)
   if current < int32(p.opt.MaxActive) && nextRef > current*int32(p.opt.MaxConcurrentStreams) {
      // 2 times the incremental or the remain incremental
      increment := current
      if current+increment > int32(p.opt.MaxActive) {
         increment = int32(p.opt.MaxActive) - current
      }
      var i int32
      var err error
      for i = 0; i < increment; i++ {
         c, er := p.opt.Dial(p.address)
         if er != nil {
            err = er
            break
         }
         p.reset(int(current + i))
         p.conns[current+i] = p.wrapConn(c, false)
      }
      current += i
      log.Printf("grow pool: %d ---> %d, increment: %d, maxActive: %d\n",
         p.current, current, increment, p.opt.MaxActive)
      atomic.StoreInt32(&p.current, current)
      if err != nil {
         p.Unlock()
         return nil, err
      }
   }
   p.Unlock()
   next := atomic.AddUint32(&p.index, 1) % uint32(current)
   return p.conns[next], nil
}

從 Get 的實現(xiàn)中,我們可以知道 Get 的邏輯如下

  • 先增加連接的引用計數(shù),如果在設定 current*int32(p.opt.MaxConcurrentStreams) 范圍內,那么直接取連接進行使用即可
  • 若當前的連接數(shù)達到了最大活躍的連接數(shù),那么就看我們新建池子的時候傳遞的 option 中的 reuse 參數(shù)是否是 true,若是復用,則隨機取出連接池中的任意連接提供使用,如果不復用,則新建一個連接
  • 其余的情況,就需要我們進行 2 倍或者 1 倍的數(shù)量對連接池進行擴容了

實際上,上述的庫中,并沒有提供咱們縮容的算法,如果真的有這方面的需求的話

也可以在 Get 的實現(xiàn)上進行縮容,具體的縮容策略可以根據實際情況來定,例如當引用計數(shù) nextRef 只有當前活躍連接數(shù)的 20% 的時候(這只是一個例子),就可以考慮縮容了

以上就是探究gRPC 客戶端調用服務端需要連接池嗎?的詳細內容,更多關于gRPC客戶端調用服務端連接池的資料請關注腳本之家其它相關文章!

相關文章

  • 深入刨析Golang-map底層原理

    深入刨析Golang-map底層原理

    這篇文章主要介紹了深入刨析Golang-map底層原理,Go 語言的 map 的使用非常簡易, 但其內部實現(xiàn)相對比較復雜,文中有相關的代碼示例,,需要的朋友可以參考下
    2023-05-05
  • golang interface{}類型轉換的實現(xiàn)示例

    golang interface{}類型轉換的實現(xiàn)示例

    在Go語言中,類型轉換可以通過斷言、顯式、隱式和強制四種方式實現(xiàn),針對interface{}類型轉換為float32或float64,需要使用type斷言或reflect包處理,感興趣的可以了解一下
    2024-10-10
  • 深入了解Golang中Slice切片的使用

    深入了解Golang中Slice切片的使用

    本文主要為大家詳細介紹了Golang中Slice切片的使用,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2023-02-02
  • golang goquery selector選擇器使用示例大全

    golang goquery selector選擇器使用示例大全

    這篇文章主要為大家介紹了golang goquery selector選擇器使用示例大全,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-09-09
  • go同步原語Phaser和Barrier區(qū)別

    go同步原語Phaser和Barrier區(qū)別

    這篇文章主要為大家介紹了通過java講解go同步原語Phaser和Barrier區(qū)別,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-12-12
  • golang切片初始化的使用示例

    golang切片初始化的使用示例

    Go語言中,切片可通過直接初始化、數(shù)組初始化、切片表達式等多種方法進行初始化,這些方式提供了靈活的數(shù)據管理方式,適合不同的編程需求和場景,感興趣的可以了解一下
    2024-10-10
  • go語言文件正則表達式搜索功能示例

    go語言文件正則表達式搜索功能示例

    這篇文章主要介紹了go語言文件正則表達式搜索功能,涉及Go語言文件目錄的遍歷及正則操作相關技巧,需要的朋友可以參考下
    2017-01-01
  • Go結合MQTT實現(xiàn)通信的示例代碼

    Go結合MQTT實現(xiàn)通信的示例代碼

    本文主要介紹了Go結合MQTT實現(xiàn)通信的示例代碼,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2023-05-05
  • Go語言LeetCode題解1046最后一塊石頭的重量

    Go語言LeetCode題解1046最后一塊石頭的重量

    這篇文章主要為大家介紹了Go語言LeetCode題解1046最后一塊石頭的重量,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-12-12
  • GO中Json解析的幾種方式

    GO中Json解析的幾種方式

    本文主要介紹了GO中Json解析的幾種方式,詳細的介紹了幾種方法,?文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2024-01-01

最新評論