欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Golang實現(xiàn)自己的Redis(pipeline客戶端)實例探索

 更新時間:2024年01月24日 09:34:17   作者:紹納?nullbody筆記  
這篇文章主要為大家介紹了Golang實現(xiàn)自己的Redis(pipeline客戶端)實例探索,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪

引言

用11篇文章實現(xiàn)一個可用的Redis服務(wù),姑且叫EasyRedis吧,希望通過文章將Redis掰開撕碎了呈現(xiàn)給大家,而不是僅僅停留在八股文的層面,并且有非常爽的感覺,歡迎持續(xù)關(guān)注學(xué)習(xí)。

  • [x] easyredis之TCP服務(wù)
  • [x] easyredis之網(wǎng)絡(luò)請求序列化協(xié)議(RESP)
  • [x] easyredis之內(nèi)存數(shù)據(jù)庫
  • [x] easyredis之過期時間 (時間輪實現(xiàn))
  • [x] easyredis之持久化 (AOF實現(xiàn))
  • [x] easyredis之發(fā)布訂閱功能
  • [x] easyredis之有序集合(跳表實現(xiàn))
  • [x] easyredis之 pipeline 客戶端實現(xiàn)
  • [ ] easyredis之事務(wù)(原子性/回滾)
  • [ ] easyredis之連接池
  • [ ] easyredis之分布式集群存儲

EasyRedis之pipeline客戶端

網(wǎng)絡(luò)編程的一個基礎(chǔ)知識:用同一個sokcet連接發(fā)送多個數(shù)據(jù)包的時候,我們一般的做法是,發(fā)送并立刻接收結(jié)果,在沒有接收到,是不會繼續(xù)發(fā)送數(shù)據(jù)包。這種方法簡單,但是效率太低。時間都浪費在等待上了...

socket的【發(fā)送緩沖區(qū)和接收緩沖區(qū)】是分離的,也就是發(fā)送不用等待接收,接收也不用等待發(fā)送。

所以我們可以把我們要發(fā)送的多個數(shù)據(jù)包【數(shù)據(jù)包1/數(shù)據(jù)包2...數(shù)據(jù)包N】復(fù)用同一個連接,通過發(fā)送緩沖區(qū)按順序都發(fā)送給服務(wù)端。服務(wù)端處理請求的順序,也是按照【數(shù)據(jù)包1/數(shù)據(jù)包2...數(shù)據(jù)包N】這個順序處理的。當(dāng)處理完以后,處理結(jié)果將按照【數(shù)據(jù)包結(jié)果1/數(shù)據(jù)包結(jié)果2...數(shù)據(jù)包結(jié)果N】順序發(fā)送給客戶端的接收緩沖區(qū)??蛻舳酥恍枰獜慕邮站彌_區(qū)中讀取數(shù)據(jù),并保存到請求數(shù)據(jù)包上,即可。這樣我們就可以將發(fā)送和接收分離開來。一個協(xié)程只負(fù)責(zé)發(fā)送,一個協(xié)程只負(fù)責(zé)接收,互相不用等待。關(guān)鍵在于保證發(fā)送和接收的順序是相同的設(shè)計邏輯圖如下:

代碼路徑redis/client/client.go整個代碼也就是200多行,結(jié)合上圖非常容易理解

創(chuàng)建客戶端

type RedisClent struct {
	// socket連接
	conn net.Conn
	addr string
	// 客戶端當(dāng)前狀態(tài)
	connStatus atomic.Int32
	// heartbeat
	ticker time.Ticker
	// buffer cache
	waitSend   chan *request
	waitResult chan *request
	// 有請求正在處理中...
	working sync.WaitGroup
}
// 創(chuàng)建redis客戶端socket
func NewRedisClient(addr string) (*RedisClent, error) {
	conn, err := net.Dial("tcp", addr)
	if err != nil {
		returnnil, err
	}
	rc := RedisClent{}
	rc.conn = conn
	rc.waitSend = make(chan *request, maxChanSize)
	rc.waitResult = make(chan *request, maxChanSize)
	rc.addr = addr
	return &rc, nil
}
// 啟動
func (rc *RedisClent) Start() error {
	rc.ticker = *time.NewTicker(heartBeatInterval)
	// 將waitSend緩沖區(qū)進行發(fā)送
	go rc.execSend()
	// 獲取服務(wù)端結(jié)果
	go rc.execReceive()
	// 定時發(fā)送心跳
	go rc.execHeardBeat()
	rc.connStatus.Store(connRunning) // 啟動狀態(tài)
	returnnil
}

發(fā)送Redis命令

command [][]byte保存到緩沖區(qū) rc.waitSend

// 將redis命令保存到 waitSend 中
func (rc *RedisClent) Send(command [][]byte) (protocol.Reply, error) {
	// 已關(guān)閉
	if rc.connStatus.Load() == connClosed {
		returnnil, errors.New("client closed")
	}
	req := &request{
		command: command,
		wait:    wait.Wait{},
	}
	// 單個請求
	req.wait.Add(1)
	// 所有請求
	rc.working.Add(1)
	defer rc.working.Done()
	// 將數(shù)據(jù)保存到緩沖中
	rc.waitSend <- req
	// 等待處理結(jié)束
	if req.wait.WaitWithTimeOut(maxWait) {
		returnnil, errors.New("time out")
	}
	// 出錯
	if req.err != nil {
		err := req.err
		returnnil, err
	}
	// 正常
	return req.reply, nil
}

發(fā)送Redis命令到服務(wù)端

// 將waitSend緩沖區(qū)進行發(fā)送
func (rc *RedisClent) execSend() {
	for req := range rc.waitSend {
		rc.sendReq(req)
	}
}
func (rc *RedisClent) sendReq(req *request) {
	// 無效請求
	if req == nil || len(req.command) == 0 {
		return
	}
	var err error
	// 網(wǎng)絡(luò)請求(重試3次)
	for i := 0; i < 3; i++ {
		_, err = rc.conn.Write(req.Bytes())
		// 發(fā)送成功 or 發(fā)送錯誤(除了超時錯誤和deadline錯誤)跳出
		if err == nil ||
			(!strings.Contains(err.Error(), "timeout") && // only retry timeout
				!strings.Contains(err.Error(), "deadline exceeded")) {
			break
		}
	}
	if err == nil { // 發(fā)送成功,異步等待結(jié)果
		rc.waitResult <- req
	} else { // 發(fā)送失敗,請求直接失敗
		req.err = err
		req.wait.Done()
	}
}

從服務(wù)端讀取數(shù)據(jù)

func (rc *RedisClent) execReceive() {
	ch := parser.ParseStream(rc.conn)
	for payload := range ch {
		if payload.Err != nil {
			if rc.connStatus.Load() == connClosed { // 連接已關(guān)閉
				return
			}
			// 否則,重新連接(可能因為網(wǎng)絡(luò)抖動臨時斷開了)
			rc.reconnect()
			return
		}
		// 說明一切正常
		rc.handleResult(payload.Reply)
	}
}
func (rc *RedisClent) handleResult(reply protocol.Reply) {
	// 從rc.waitResult 獲取一個等待中的請求,將結(jié)果保存進去
	req := <-rc.waitResult
	if req == nil {
		return
	}
	req.reply = reply
	req.wait.Done() // 通知已經(jīng)獲取到結(jié)果
}

斷線重連

因為網(wǎng)絡(luò)抖動可能存在連接斷開的情況,所以需要有重連的功能

func (rc *RedisClent) reconnect() {
	logger.Info("redis client reconnect...")
	rc.conn.Close()
	var conn net.Conn
	// 重連(重試3次)
	for i := 0; i < 3; i++ {
		var err error
		conn, err = net.Dial("tcp", rc.addr)
		if err != nil {
			logger.Error("reconnect error: " + err.Error())
			time.Sleep(time.Second)
			continue
		} else {
			break
		}
	}
	// 服務(wù)端連不上,說明服務(wù)可能掛了(or 網(wǎng)絡(luò)問題 and so on...)
	if conn == nil { 
		rc.Stop()
		return
	}
	// 這里關(guān)閉沒問題,因為rc.conn.Close已經(jīng)關(guān)閉,函數(shù)Send中保存的請求因為發(fā)送不成功,不會寫入到waitResult
	close(rc.waitResult)
	// 清理 waitResult(因為連接重置,新連接上只能處理新請求,老的請求的數(shù)據(jù)結(jié)果在老連接上,老連接已經(jīng)關(guān)了,新連接上肯定是沒有結(jié)果的)
	for req := range rc.waitResult {
		req.err = errors.New("connect reset")
		req.wait.Done()
	}
	// 新連接(新氣象)
	rc.waitResult = make(chan *request, maxWait)
	rc.conn = conn
	// 重新啟動接收協(xié)程
	go rc.execReceive()
}

項目代碼地址: https://github.com/gofish2020/easyredis 

以上就是Golang實現(xiàn)自己的Redis(pipeline客戶端)實例探索的詳細(xì)內(nèi)容,更多關(guān)于Golang Redis pipeline客戶端的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Golang實現(xiàn)Trie(前綴樹)的示例

    Golang實現(xiàn)Trie(前綴樹)的示例

    本文主要介紹了Golang實現(xiàn)Trie(前綴樹)的示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-01-01
  • 淺析Golang開發(fā)中g(shù)oroutine的正確使用姿勢

    淺析Golang開發(fā)中g(shù)oroutine的正確使用姿勢

    很多初級的Gopher在學(xué)習(xí)了goroutine之后,在項目中其實使用率不高,所以這篇文章小編主要來帶大家深入了解一下goroutine的常見使用方法,希望對大家有所幫助
    2024-03-03
  • golang實現(xiàn)各種情況的get請求操作

    golang實現(xiàn)各種情況的get請求操作

    這篇文章主要介紹了golang實現(xiàn)各種情況的get請求操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • GO語言實現(xiàn)簡單的目錄復(fù)制功能

    GO語言實現(xiàn)簡單的目錄復(fù)制功能

    這篇文章主要介紹了GO語言實現(xiàn)簡單的目錄復(fù)制功能,通過新建及復(fù)制內(nèi)容等操作最終實現(xiàn)復(fù)制目錄的功能效果,具有一定的參考借鑒價值,需要的朋友可以參考下
    2014-12-12
  • Go語言對字符串進行SHA1哈希運算的方法

    Go語言對字符串進行SHA1哈希運算的方法

    這篇文章主要介紹了Go語言對字符串進行SHA1哈希運算的方法,實例分析了Go語言針對字符串操作的技巧,具有一定參考借鑒價值,需要的朋友可以參考下
    2015-03-03
  • 深入解析Sync.Pool如何提升Go程序性能

    深入解析Sync.Pool如何提升Go程序性能

    在并發(fā)編程中,資源的分配和回收是一個很重要的問題。Go?語言的?Sync.Pool?是一個可以幫助我們優(yōu)化這個問題的工具。本篇文章將會介紹?Sync.Pool?的用法、原理以及如何在項目中正確使用它,希望對大家有所幫助
    2023-05-05
  • Go語言:打造優(yōu)雅數(shù)據(jù)庫單元測試的實戰(zhàn)指南

    Go語言:打造優(yōu)雅數(shù)據(jù)庫單元測試的實戰(zhàn)指南

    Go語言數(shù)據(jù)庫單元測試入門:聚焦高效、可靠的數(shù)據(jù)庫代碼驗證!想要確保您的Go應(yīng)用數(shù)據(jù)層堅如磐石嗎?本指南將手把手教您如何利用Go進行數(shù)據(jù)庫單元測試,輕松揪出隱藏的bug,打造無懈可擊的數(shù)據(jù)處理邏輯,一起來探索吧!
    2024-01-01
  • Go http client 連接池不復(fù)用的問題

    Go http client 連接池不復(fù)用的問題

    這篇文章主要介紹了Go http client 連接池不復(fù)用的問題,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-01-01
  • golang 之import和package的使用

    golang 之import和package的使用

    這篇文章主要介紹了golang 之import和package的使用,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-02-02
  • goland遠(yuǎn)程調(diào)試k8s上容器的實現(xiàn)

    goland遠(yuǎn)程調(diào)試k8s上容器的實現(xiàn)

    本文主要介紹了goland遠(yuǎn)程調(diào)試k8s上容器的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-02-02

最新評論