Golang實(shí)現(xiàn)自己的Redis(pipeline客戶端)實(shí)例探索
引言
用11篇文章實(shí)現(xiàn)一個(gè)可用的Redis服務(wù),姑且叫EasyRedis吧,希望通過(guò)文章將Redis掰開撕碎了呈現(xiàn)給大家,而不是僅僅停留在八股文的層面,并且有非常爽的感覺(jué),歡迎持續(xù)關(guān)注學(xué)習(xí)。
- [x] easyredis之TCP服務(wù)
- [x] easyredis之網(wǎng)絡(luò)請(qǐng)求序列化協(xié)議(RESP)
- [x] easyredis之內(nèi)存數(shù)據(jù)庫(kù)
- [x] easyredis之過(guò)期時(shí)間 (時(shí)間輪實(shí)現(xiàn))
- [x] easyredis之持久化 (AOF實(shí)現(xiàn))
- [x] easyredis之發(fā)布訂閱功能
- [x] easyredis之有序集合(跳表實(shí)現(xiàn))
- [x] easyredis之 pipeline 客戶端實(shí)現(xiàn)
- [ ] easyredis之事務(wù)(原子性/回滾)
- [ ] easyredis之連接池
- [ ] easyredis之分布式集群存儲(chǔ)
EasyRedis之pipeline客戶端
網(wǎng)絡(luò)編程的一個(gè)基礎(chǔ)知識(shí):用同一個(gè)sokcet連接發(fā)送多個(gè)數(shù)據(jù)包的時(shí)候,我們一般的做法是,發(fā)送并立刻接收結(jié)果,在沒(méi)有接收到,是不會(huì)繼續(xù)發(fā)送數(shù)據(jù)包。這種方法簡(jiǎn)單,但是效率太低。時(shí)間都浪費(fèi)在等待上了...
socket的【發(fā)送緩沖區(qū)和接收緩沖區(qū)】是分離的,也就是發(fā)送不用等待接收,接收也不用等待發(fā)送。
所以我們可以把我們要發(fā)送的多個(gè)數(shù)據(jù)包【數(shù)據(jù)包1/數(shù)據(jù)包2...數(shù)據(jù)包N】復(fù)用同一個(gè)連接,通過(guò)發(fā)送緩沖區(qū)按順序都發(fā)送給服務(wù)端。服務(wù)端處理請(qǐng)求的順序,也是按照【數(shù)據(jù)包1/數(shù)據(jù)包2...數(shù)據(jù)包N】這個(gè)順序處理的。當(dāng)處理完以后,處理結(jié)果將按照【數(shù)據(jù)包結(jié)果1/數(shù)據(jù)包結(jié)果2...數(shù)據(jù)包結(jié)果N】順序發(fā)送給客戶端的接收緩沖區(qū)??蛻舳酥恍枰獜慕邮站彌_區(qū)中讀取數(shù)據(jù),并保存到請(qǐng)求數(shù)據(jù)包上,即可。這樣我們就可以將發(fā)送和接收分離開來(lái)。一個(gè)協(xié)程只負(fù)責(zé)發(fā)送,一個(gè)協(xié)程只負(fù)責(zé)接收,互相不用等待。關(guān)鍵在于保證發(fā)送和接收的順序是相同的設(shè)計(jì)邏輯圖如下:
代碼路徑redis/client/client.go
整個(gè)代碼也就是200多行,結(jié)合上圖非常容易理解
創(chuàng)建客戶端
type RedisClent struct { // socket連接 conn net.Conn addr string // 客戶端當(dāng)前狀態(tài) connStatus atomic.Int32 // heartbeat ticker time.Ticker // buffer cache waitSend chan *request waitResult chan *request // 有請(qǐng)求正在處理中... working sync.WaitGroup } // 創(chuàng)建redis客戶端socket func NewRedisClient(addr string) (*RedisClent, error) { conn, err := net.Dial("tcp", addr) if err != nil { returnnil, err } rc := RedisClent{} rc.conn = conn rc.waitSend = make(chan *request, maxChanSize) rc.waitResult = make(chan *request, maxChanSize) rc.addr = addr return &rc, nil } // 啟動(dòng) func (rc *RedisClent) Start() error { rc.ticker = *time.NewTicker(heartBeatInterval) // 將waitSend緩沖區(qū)進(jìn)行發(fā)送 go rc.execSend() // 獲取服務(wù)端結(jié)果 go rc.execReceive() // 定時(shí)發(fā)送心跳 go rc.execHeardBeat() rc.connStatus.Store(connRunning) // 啟動(dòng)狀態(tài) returnnil }
發(fā)送Redis命令
將command [][]byte
保存到緩沖區(qū) rc.waitSend
中
// 將redis命令保存到 waitSend 中 func (rc *RedisClent) Send(command [][]byte) (protocol.Reply, error) { // 已關(guān)閉 if rc.connStatus.Load() == connClosed { returnnil, errors.New("client closed") } req := &request{ command: command, wait: wait.Wait{}, } // 單個(gè)請(qǐng)求 req.wait.Add(1) // 所有請(qǐng)求 rc.working.Add(1) defer rc.working.Done() // 將數(shù)據(jù)保存到緩沖中 rc.waitSend <- req // 等待處理結(jié)束 if req.wait.WaitWithTimeOut(maxWait) { returnnil, errors.New("time out") } // 出錯(cuò) if req.err != nil { err := req.err returnnil, err } // 正常 return req.reply, nil }
發(fā)送Redis命令到服務(wù)端
// 將waitSend緩沖區(qū)進(jìn)行發(fā)送 func (rc *RedisClent) execSend() { for req := range rc.waitSend { rc.sendReq(req) } } func (rc *RedisClent) sendReq(req *request) { // 無(wú)效請(qǐng)求 if req == nil || len(req.command) == 0 { return } var err error // 網(wǎng)絡(luò)請(qǐng)求(重試3次) for i := 0; i < 3; i++ { _, err = rc.conn.Write(req.Bytes()) // 發(fā)送成功 or 發(fā)送錯(cuò)誤(除了超時(shí)錯(cuò)誤和deadline錯(cuò)誤)跳出 if err == nil || (!strings.Contains(err.Error(), "timeout") && // only retry timeout !strings.Contains(err.Error(), "deadline exceeded")) { break } } if err == nil { // 發(fā)送成功,異步等待結(jié)果 rc.waitResult <- req } else { // 發(fā)送失敗,請(qǐng)求直接失敗 req.err = err req.wait.Done() } }
從服務(wù)端讀取數(shù)據(jù)
func (rc *RedisClent) execReceive() { ch := parser.ParseStream(rc.conn) for payload := range ch { if payload.Err != nil { if rc.connStatus.Load() == connClosed { // 連接已關(guān)閉 return } // 否則,重新連接(可能因?yàn)榫W(wǎng)絡(luò)抖動(dòng)臨時(shí)斷開了) rc.reconnect() return } // 說(shuō)明一切正常 rc.handleResult(payload.Reply) } } func (rc *RedisClent) handleResult(reply protocol.Reply) { // 從rc.waitResult 獲取一個(gè)等待中的請(qǐng)求,將結(jié)果保存進(jìn)去 req := <-rc.waitResult if req == nil { return } req.reply = reply req.wait.Done() // 通知已經(jīng)獲取到結(jié)果 }
斷線重連
因?yàn)榫W(wǎng)絡(luò)抖動(dòng)可能存在連接斷開的情況,所以需要有重連的功能
func (rc *RedisClent) reconnect() { logger.Info("redis client reconnect...") rc.conn.Close() var conn net.Conn // 重連(重試3次) for i := 0; i < 3; i++ { var err error conn, err = net.Dial("tcp", rc.addr) if err != nil { logger.Error("reconnect error: " + err.Error()) time.Sleep(time.Second) continue } else { break } } // 服務(wù)端連不上,說(shuō)明服務(wù)可能掛了(or 網(wǎng)絡(luò)問(wèn)題 and so on...) if conn == nil { rc.Stop() return } // 這里關(guān)閉沒(méi)問(wèn)題,因?yàn)閞c.conn.Close已經(jīng)關(guān)閉,函數(shù)Send中保存的請(qǐng)求因?yàn)榘l(fā)送不成功,不會(huì)寫入到waitResult close(rc.waitResult) // 清理 waitResult(因?yàn)檫B接重置,新連接上只能處理新請(qǐng)求,老的請(qǐng)求的數(shù)據(jù)結(jié)果在老連接上,老連接已經(jīng)關(guān)了,新連接上肯定是沒(méi)有結(jié)果的) for req := range rc.waitResult { req.err = errors.New("connect reset") req.wait.Done() } // 新連接(新氣象) rc.waitResult = make(chan *request, maxWait) rc.conn = conn // 重新啟動(dòng)接收協(xié)程 go rc.execReceive() }
項(xiàng)目代碼地址: https://github.com/gofish2020/easyredis
以上就是Golang實(shí)現(xiàn)自己的Redis(pipeline客戶端)實(shí)例探索的詳細(xì)內(nèi)容,更多關(guān)于Golang Redis pipeline客戶端的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- Golang實(shí)現(xiàn)自己的Redis數(shù)據(jù)庫(kù)內(nèi)存實(shí)例探究
- Golang實(shí)現(xiàn)Redis過(guò)期時(shí)間實(shí)例探究
- 探索Golang實(shí)現(xiàn)Redis持久化AOF實(shí)例
- 探索Golang?Redis實(shí)現(xiàn)發(fā)布訂閱功能實(shí)例
- Golang實(shí)現(xiàn)自己的Redis(有序集合跳表)實(shí)例探究
- Golang實(shí)現(xiàn)自己的Redis(TCP篇)實(shí)例探究
- Golang實(shí)現(xiàn)Redis事務(wù)深入探究
- Golang實(shí)現(xiàn)Redis網(wǎng)絡(luò)協(xié)議實(shí)例探究
相關(guān)文章
Golang實(shí)現(xiàn)Trie(前綴樹)的示例
本文主要介紹了Golang實(shí)現(xiàn)Trie(前綴樹)的示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01淺析Golang開發(fā)中g(shù)oroutine的正確使用姿勢(shì)
很多初級(jí)的Gopher在學(xué)習(xí)了goroutine之后,在項(xiàng)目中其實(shí)使用率不高,所以這篇文章小編主要來(lái)帶大家深入了解一下goroutine的常見(jiàn)使用方法,希望對(duì)大家有所幫助2024-03-03golang實(shí)現(xiàn)各種情況的get請(qǐng)求操作
這篇文章主要介紹了golang實(shí)現(xiàn)各種情況的get請(qǐng)求操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-12-12GO語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單的目錄復(fù)制功能
這篇文章主要介紹了GO語(yǔ)言實(shí)現(xiàn)簡(jiǎn)單的目錄復(fù)制功能,通過(guò)新建及復(fù)制內(nèi)容等操作最終實(shí)現(xiàn)復(fù)制目錄的功能效果,具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2014-12-12Go語(yǔ)言對(duì)字符串進(jìn)行SHA1哈希運(yùn)算的方法
這篇文章主要介紹了Go語(yǔ)言對(duì)字符串進(jìn)行SHA1哈希運(yùn)算的方法,實(shí)例分析了Go語(yǔ)言針對(duì)字符串操作的技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-03-03Go語(yǔ)言:打造優(yōu)雅數(shù)據(jù)庫(kù)單元測(cè)試的實(shí)戰(zhàn)指南
Go語(yǔ)言數(shù)據(jù)庫(kù)單元測(cè)試入門:聚焦高效、可靠的數(shù)據(jù)庫(kù)代碼驗(yàn)證!想要確保您的Go應(yīng)用數(shù)據(jù)層堅(jiān)如磐石嗎?本指南將手把手教您如何利用Go進(jìn)行數(shù)據(jù)庫(kù)單元測(cè)試,輕松揪出隱藏的bug,打造無(wú)懈可擊的數(shù)據(jù)處理邏輯,一起來(lái)探索吧!2024-01-01Go http client 連接池不復(fù)用的問(wèn)題
這篇文章主要介紹了Go http client 連接池不復(fù)用的問(wèn)題,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01goland遠(yuǎn)程調(diào)試k8s上容器的實(shí)現(xiàn)
本文主要介紹了goland遠(yuǎn)程調(diào)試k8s上容器的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-02-02