淺談Go連接池的設(shè)計(jì)與實(shí)現(xiàn)
為什么需要連接池
如果不用連接池,而是每次請(qǐng)求都創(chuàng)建一個(gè)連接是比較昂貴的,因此需要完成3次tcp握手
同時(shí)在高并發(fā)場(chǎng)景下,由于沒有連接池的最大連接數(shù)限制,可以創(chuàng)建無數(shù)個(gè)連接,耗盡文件描述符
連接池就是為了復(fù)用這些創(chuàng)建好的連接
連接池設(shè)計(jì)
基本上連接池都會(huì)設(shè)計(jì)以下幾個(gè)參數(shù):
初始連接數(shù):在初始化連接池時(shí)就會(huì)預(yù)先創(chuàng)建好的連接數(shù)量,如果設(shè)置得:
- 過大:可能造成浪費(fèi)
- 過?。赫?qǐng)求到來時(shí)需要新建連接
最大空閑連接數(shù)maxIdle:池中最大緩存的連接個(gè)數(shù),如果設(shè)置得:
- 過大:造成浪費(fèi),自己不用還把持著連接。因?yàn)閿?shù)據(jù)庫(kù)整體的連接數(shù)是有限的,當(dāng)前進(jìn)程占用多了,其他進(jìn)程能獲取的就少了
- 過?。簾o法應(yīng)對(duì)突發(fā)流量
最大連接數(shù)maxCap:
- 如果已經(jīng)用了maxCap個(gè)連接,要申請(qǐng)第maxCap+1個(gè)連接時(shí),一般會(huì)阻塞在那里,直到超時(shí)或者別人歸還一個(gè)連接
最大空閑時(shí)間idleTimeout:當(dāng)發(fā)現(xiàn)某連接空閑超過這個(gè)時(shí)間時(shí),會(huì)將其關(guān)閉,重新去獲取連接
避免連接長(zhǎng)時(shí)間沒用,自動(dòng)失效的問題
連接池對(duì)外提供兩個(gè)方法,Get
:獲取一個(gè)連接,Put
:歸還一個(gè)連接
大部分連接池的實(shí)現(xiàn)大同小異,基本流程如下:
Get
需要注意:
- 當(dāng)有空閑連接時(shí),需要進(jìn)一步判斷連接是否有過期(超過最大空閑時(shí)間idleTimeout)
- 這些連接有可能很久沒用過了,在數(shù)據(jù)庫(kù)層面已經(jīng)過期。如果貿(mào)然使用可能出現(xiàn)錯(cuò)誤,因此最好檢查下是否超時(shí)
- 當(dāng)陷入阻塞時(shí),最好設(shè)置超時(shí)時(shí)間,避免一直沒等到有人歸還連接而一直阻塞
Put
歸還連接時(shí):
- 先看有沒有阻塞的獲取連接的請(qǐng)求,如果有轉(zhuǎn)交連接,并喚醒阻塞請(qǐng)求
- 否則看能否放回去空閑隊(duì)列,如果不能直接關(guān)閉請(qǐng)求
總結(jié)
根據(jù)上面總結(jié)的流程,連接池還需要維護(hù)另外兩個(gè)結(jié)構(gòu):
- 空閑隊(duì)列
- 阻塞請(qǐng)求的隊(duì)列
開源實(shí)現(xiàn)
接下來看幾個(gè)開源連接池的實(shí)現(xiàn),都大體符合上面介紹的流程
silenceper/pool
代碼地址:https://github.com/silenceper/pool
數(shù)據(jù)結(jié)構(gòu):
// channelPool 存放連接信息 type channelPool struct { mu sync.RWMutex // 空閑連接 conns chan *idleConn // 產(chǎn)生新連接的方法 factory func() (interface{}, error) // 關(guān)閉連接的方法 close func(interface{}) error ping func(interface{}) error // 最大空閑時(shí)間,最大阻塞等待時(shí)間(實(shí)際沒用到) idleTimeout, waitTimeOut time.Duration // 最大連接數(shù) maxActive int openingConns int // 阻塞的請(qǐng)求 connReqs []chan connReq }
可以看出,silenceper/pool
:
- 用channel實(shí)現(xiàn)了空閑連接隊(duì)列
conns
- 為每個(gè)阻塞的請(qǐng)求創(chuàng)建一個(gè)channel,加入
connReqs
中。這樣請(qǐng)求會(huì)阻塞在自己的channel上
Get:
func (c *channelPool) Get() (interface{}, error) { conns := c.getConns() if conns == nil { return nil, ErrClosed } for { select { // 如果有空閑連接 case wrapConn := <-conns: if wrapConn == nil { return nil, ErrClosed } //判斷是否超時(shí),超時(shí)則丟棄 if timeout := c.idleTimeout; timeout > 0 { if wrapConn.t.Add(timeout).Before(time.Now()) { //丟棄并關(guān)閉該連接 c.Close(wrapConn.conn) continue } } //判斷是否失效,失效則丟棄,如果用戶沒有設(shè)定 ping 方法,就不檢查 if c.ping != nil { if err := c.Ping(wrapConn.conn); err != nil { c.Close(wrapConn.conn) continue } } return wrapConn.conn, nil // 沒有空閑連接 default: c.mu.Lock() log.Debugf("openConn %v %v", c.openingConns, c.maxActive) if c.openingConns >= c.maxActive { // 連接數(shù)已經(jīng)達(dá)到上線,不能再創(chuàng)建連接 req := make(chan connReq, 1) c.connReqs = append(c.connReqs, req) c.mu.Unlock() // 將自己阻塞在channel上 ret, ok := <-req if !ok { return nil, ErrMaxActiveConnReached } // 再檢查一次是否超時(shí) if timeout := c.idleTimeout; timeout > 0 { if ret.idleConn.t.Add(timeout).Before(time.Now()) { //丟棄并關(guān)閉該連接 c.Close(ret.idleConn.conn) continue } } return ret.idleConn.conn, nil } // 沒有超過最大連接數(shù),創(chuàng)建一個(gè)新的連接 if c.factory == nil { c.mu.Unlock() return nil, ErrClosed } conn, err := c.factory() if err != nil { c.mu.Unlock() return nil, err } c.openingConns++ c.mu.Unlock() return conn, nil } } }
這段代碼基本符合上面介紹的Get流程,應(yīng)該很好理解
需要注意:
- 當(dāng)收到別人歸還的連接狗,這里再檢查了一次是否超時(shí)。但我認(rèn)為這次檢查是沒必要的,因?yàn)閯e人剛用完,一般不可能超時(shí)
- 雖然在pool的數(shù)據(jù)結(jié)構(gòu)定義中有
waitTimeOut
字段,但實(shí)際沒有使用,即阻塞獲取可能無限期阻塞,這是一個(gè)優(yōu)化點(diǎn)
Put:
// Put 將連接放回pool中 func (c *channelPool) Put(conn interface{}) error { if conn == nil { return errors.New("connection is nil. rejecting") } c.mu.Lock() if c.conns == nil { c.mu.Unlock() return c.Close(conn) } // 如果有請(qǐng)求在阻塞獲取連接 if l := len(c.connReqs); l > 0 { req := c.connReqs[0] copy(c.connReqs, c.connReqs[1:]) c.connReqs = c.connReqs[:l-1] // 將連接轉(zhuǎn)交 req <- connReq{ idleConn: &idleConn{conn: conn, t: time.Now()}, } c.mu.Unlock() return nil } else { // 否則嘗試是否能放回空閑連接隊(duì)列 select { case c.conns <- &idleConn{conn: conn, t: time.Now()}: c.mu.Unlock() return nil default: c.mu.Unlock() //連接池已滿,直接關(guān)閉該連接 return c.Close(conn) } } }
值得注意的是:
put方法喚醒阻塞請(qǐng)求時(shí),從隊(duì)頭開始喚醒,這樣先阻塞的請(qǐng)求先被喚醒,保證了公平性
sql.DB
Go在官方庫(kù)sql中就實(shí)現(xiàn)了連接池,這樣的好處在于:
- 對(duì)于開發(fā):就不用像java一樣,需要自己找第三方的連接池實(shí)現(xiàn)
- 對(duì)于driver的實(shí)現(xiàn):只用關(guān)心怎么和數(shù)據(jù)庫(kù)交互,不用考慮連接池的問題
sql.DB
中和連接池相關(guān)的字段如下:
type DB struct { /** ... */ // 空閑連接隊(duì)列 freeConn []*driverConn // 阻塞請(qǐng)求的隊(duì)列 connRequests map[uint64]chan connRequest // 已經(jīng)打開的連接 numOpen int // number of opened and pending open connections // 最大空閑連接 maxIdle int // zero means defaultMaxIdleConns; negative means 0 // 最大連接數(shù) maxOpen int // <= 0 means unlimited // ... }
繼續(xù)看獲取連接:
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) { // 檢測(cè)連接池是否被關(guān)閉 db.mu.Lock() if db.closed { db.mu.Unlock() return nil, errDBClosed } select { default: // 檢測(cè)ctx是否超時(shí) case <-ctx.Done(): db.mu.Unlock() return nil, ctx.Err() } lifetime := db.maxLifetime db.numOpen++ // optimistically db.mu.Unlock() ci, err := db.connector.Connect(ctx) if err != nil { db.mu.Lock() db.numOpen-- // correct for earlier optimism db.maybeOpenNewConnections() db.mu.Unlock() return nil, err } db.mu.Lock() dc := &driverConn{ db: db, createdAt: nowFunc(), ci: ci, inUse: true, } db.addDepLocked(dc, dc) db.mu.Unlock() return dc, nil }
接下來檢測(cè)是否有空閑連接:
numFree := len(db.freeConn) // 如果有空閑連接 if strategy == cachedOrNewConn && numFree > 0 { // 從隊(duì)頭取一個(gè) conn := db.freeConn[0] copy(db.freeConn, db.freeConn[1:]) db.freeConn = db.freeConn[:numFree-1] conn.inUse = true db.mu.Unlock() if conn.expired(lifetime) { conn.Close() return nil, driver.ErrBadConn } // Reset the session if required. if err := conn.resetSession(ctx); err == driver.ErrBadConn { conn.Close() return nil, driver.ErrBadConn } return conn, nil }
以上代碼是1.14版本,但是到了1.18以后,獲取空閑連接的方式發(fā)生了變化:
last := len(db.freeConn) - 1 if strategy == cachedOrNewConn && last >= 0 { // 從最后一個(gè)位置獲取連接 conn := db.freeConn[last] db.freeConn = db.freeConn[:last] conn.inUse = true if conn.expired(lifetime) { db.maxLifetimeClosed++ db.mu.Unlock() conn.Close() return nil, driver.ErrBadConn }
可以看出,1.14版本從隊(duì)首獲取,1.18改成從隊(duì)尾獲取連接
為啥從隊(duì)尾拿連接?
因?yàn)殛?duì)尾的連接是才放進(jìn)去的,該連接過期的概率比隊(duì)首連接小
繼續(xù)看:
// 如果已經(jīng)達(dá)到最大連接數(shù) if db.maxOpen > 0 && db.numOpen >= db.maxOpen { req := make(chan connRequest, 1) reqKey := db.nextRequestKeyLocked() db.connRequests[reqKey] = req db.waitCount++ db.mu.Unlock() waitStart := time.Now() // 阻塞當(dāng)前請(qǐng)求,要么ctx超時(shí),要么別人歸還了連接 select { case <-ctx.Done(): db.mu.Lock() // 把自己從阻塞隊(duì)列中刪除 delete(db.connRequests, reqKey) db.mu.Unlock() atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart))) select { default: case ret, ok := <-req: if ok && ret.conn != nil { db.putConn(ret.conn, ret.err, false) } } return nil, ctx.Err() case ret, ok := <-req: // 別人歸還連接 atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart))) if !ok { return nil, errDBClosed } if strategy == cachedOrNewConn && ret.err == nil && ret.conn.expired(lifetime) { ret.conn.Close() return nil, driver.ErrBadConn } if ret.conn == nil { return nil, ret.err } return ret.conn, ret.err } }
這里需要注意,在ctx超時(shí)分支中:
- 首先把自己從阻塞隊(duì)列中刪除
- 再檢查一下req中是否有連接,如果有,將連接放回連接池
奇怪的是為啥把自己刪除后,req
還可能收到連接呢?
因?yàn)?code>put連接時(shí),會(huì)先拿出一個(gè)阻塞連接的req,如果這里刪除req在put拿出req:
- 之前:那沒問題,put不可能再放該req發(fā)送連接
- 之后:那有可能put往該req發(fā)送了連接,因此需要再檢查下req中是否有連接,如果有歸還
也解釋了為啥阻塞隊(duì)列要用map
:
- 用于快速找到自己的req,并刪除
最后看看put:
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool { if db.closed { return false } if db.maxOpen > 0 && db.numOpen > db.maxOpen { return false } // 有阻塞的請(qǐng)求,轉(zhuǎn)移連接 if c := len(db.connRequests); c > 0 { var req chan connRequest var reqKey uint64 for reqKey, req = range db.connRequests { break } delete(db.connRequests, reqKey) // Remove from pending requests. if err == nil { dc.inUse = true } req <- connRequest{ conn: dc, err: err, } return true // 判斷能否放回空閑隊(duì)列 } else if err == nil && !db.closed { if db.maxIdleConnsLocked() > len(db.freeConn) { db.freeConn = append(db.freeConn, dc) db.startCleanerLocked() return true } db.maxIdleClosed++ } return false }
到此這篇關(guān)于淺談Go連接池的設(shè)計(jì)與實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)Go連接池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang中int類型和字符串類型相互轉(zhuǎn)換的實(shí)現(xiàn)方法
在日常開發(fā)中,經(jīng)常需要將數(shù)字轉(zhuǎn)換為字符串或者將字符串轉(zhuǎn)換為數(shù)字,在 Golang 中,有一些很簡(jiǎn)便的方法可以實(shí)現(xiàn)這個(gè)功能,接下來就詳細(xì)講解一下如何實(shí)現(xiàn) int 類型和字符串類型之間的互相轉(zhuǎn)換,需要的朋友可以參考下2023-09-09Golang 負(fù)載均衡算法實(shí)現(xiàn)示例
在Go語(yǔ)言中,負(fù)載均衡算法通常由代理、反向代理或者應(yīng)用層負(fù)載均衡器來實(shí)現(xiàn),在這些實(shí)現(xiàn)中,有一些經(jīng)典的負(fù)載均衡算法,跟隨本文來一一探究2024-01-01Go語(yǔ)言設(shè)置JSON的默認(rèn)值操作
這篇文章主要介紹了Go語(yǔ)言設(shè)置JSON的默認(rèn)值操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-12-12Go操作各大消息隊(duì)列教程(RabbitMQ、Kafka)
消息隊(duì)列是一種異步的服務(wù)間通信方式,適用于無服務(wù)器和微服務(wù)架構(gòu),本文主要介紹了Go操作各大消息隊(duì)列教程(RabbitMQ、Kafka),需要的朋友可以了解一下2024-02-02Go語(yǔ)言使用goroutine及通道實(shí)現(xiàn)并發(fā)詳解
這篇文章主要為大家介紹了Go語(yǔ)言使用goroutine及通道實(shí)現(xiàn)并發(fā)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08golang 實(shí)現(xiàn)struct、json、map互相轉(zhuǎn)化
這篇文章主要介紹了golang 實(shí)現(xiàn)struct、json、map互相轉(zhuǎn)化,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-12-12Go語(yǔ)言計(jì)算兩個(gè)經(jīng)度和緯度之間距離的方法
這篇文章主要介紹了Go語(yǔ)言計(jì)算兩個(gè)經(jīng)度和緯度之間距離的方法,涉及Go語(yǔ)言相關(guān)數(shù)學(xué)函數(shù)的使用技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-02-02