欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Golang 實(shí)現(xiàn) Redis系列(六)如何實(shí)現(xiàn) pipeline 模式的 redis 客戶端

 更新時(shí)間:2021年07月11日 09:27:20   作者:Finley  
pipeline 模式的 redis 客戶端需要有兩個(gè)后臺(tái)協(xié)程負(fù)責(zé) tcp 通信,調(diào)用方通過 channel 向后臺(tái)協(xié)程發(fā)送指令,并阻塞等待直到收到響應(yīng),本文是使用 golang 實(shí)現(xiàn) redis 系列的第六篇, 將介紹如何實(shí)現(xiàn)一個(gè) Pipeline 模式的 Redis 客戶端。

本文的完整代碼在github.com/hdt3213/godis/redis/client

通常 TCP 客戶端的通信模式都是阻塞式的: 客戶端發(fā)送請(qǐng)求 -> 等待服務(wù)端響應(yīng) -> 發(fā)送下一個(gè)請(qǐng)求。因?yàn)樾枰却W(wǎng)絡(luò)傳輸數(shù)據(jù),完成一次請(qǐng)求循環(huán)需要等待較多時(shí)間。

我們能否不等待服務(wù)端響應(yīng)直接發(fā)送下一條請(qǐng)求呢?答案是肯定的。

TCP 作為全雙工協(xié)議可以同時(shí)進(jìn)行上行和下行通信,不必?fù)?dān)心客戶端和服務(wù)端同時(shí)發(fā)包會(huì)導(dǎo)致沖突。

p.s. 打電話的時(shí)候兩個(gè)人同時(shí)講話就會(huì)沖突聽不清,只能輪流講。這種通信方式稱為半雙工。廣播只能由電臺(tái)發(fā)送到收音機(jī)不能反向傳輸,這種方式稱為單工。

我們?yōu)槊恳粋€(gè) tcp 連接分配了一個(gè) goroutine 可以保證先收到的請(qǐng)求先先回復(fù)。另一個(gè)方面,tcp 協(xié)議會(huì)保證數(shù)據(jù)流的有序性,同一個(gè) tcp 連接上先發(fā)送的請(qǐng)求服務(wù)端先接收,先回復(fù)的響應(yīng)客戶端先收到。因此我們不必?fù)?dān)心混淆響應(yīng)所對(duì)應(yīng)的請(qǐng)求。

這種在服務(wù)端未響應(yīng)時(shí)客戶端繼續(xù)向服務(wù)端發(fā)送請(qǐng)求的模式稱為 Pipeline 模式。因?yàn)闇p少等待網(wǎng)絡(luò)傳輸?shù)臅r(shí)間,Pipeline 模式可以極大的提高吞吐量,減少所需使用的 tcp 鏈接數(shù)。

pipeline 模式的 redis 客戶端需要有兩個(gè)后臺(tái)協(xié)程程負(fù)責(zé) tcp 通信,調(diào)用方通過 channel 向后臺(tái)協(xié)程發(fā)送指令,并阻塞等待直到收到響應(yīng),這是一個(gè)典型的異步編程模式。

我們先來定義 client 的結(jié)構(gòu):

type Client struct {
    conn        net.Conn // 與服務(wù)端的 tcp 連接
    pendingReqs chan *Request // 等待發(fā)送的請(qǐng)求
    waitingReqs chan *Request // 等待服務(wù)器響應(yīng)的請(qǐng)求
    ticker      *time.Ticker // 用于觸發(fā)心跳包的計(jì)時(shí)器
    addr        string

    ctx        context.Context
    cancelFunc context.CancelFunc
    writing    *sync.WaitGroup // 有請(qǐng)求正在處理不能立即停止,用于實(shí)現(xiàn) graceful shutdown
}

type Request struct {
    id        uint64 // 請(qǐng)求id
    args      [][]byte // 上行參數(shù)
    reply     redis.Reply // 收到的返回值
    heartbeat bool // 標(biāo)記是否是心跳請(qǐng)求
    waiting   *wait.Wait // 調(diào)用協(xié)程發(fā)送請(qǐng)求后通過 waitgroup 等待請(qǐng)求異步處理完成
    err       error
}

調(diào)用者將請(qǐng)求發(fā)送給后臺(tái)協(xié)程,并通過 wait group 等待異步處理完成:

func (client *Client) Send(args [][]byte) redis.Reply {
	request := &request{
		args:      args,
		heartbeat: false,
		waiting:   &wait.Wait{},
	}
	request.waiting.Add(1)
	client.working.Add(1)
	defer client.working.Done()
	client.pendingReqs <- request // 請(qǐng)求入隊(duì)
	timeout := request.waiting.WaitWithTimeout(maxWait) // 等待響應(yīng)或者超時(shí)
	if timeout {
		return reply.MakeErrReply("server time out")
	}
	if request.err != nil {
		return reply.MakeErrReply("request failed")
	}
	return request.reply
}

client 的核心部分是后臺(tái)的讀寫協(xié)程。先從寫協(xié)程開始:

// 寫協(xié)程入口
func (client *Client) handleWrite() {
	for req := range client.pendingReqs {
		client.doRequest(req)
	}
}

// 發(fā)送請(qǐng)求
func (client *Client) doRequest(req *request) {
	if req == nil || len(req.args) == 0 {
		return
	}
    // 序列化請(qǐng)求
	re := reply.MakeMultiBulkReply(req.args)
	bytes := re.ToBytes()
	_, err := client.conn.Write(bytes)
	i := 0
    // 失敗重試
	for err != nil && i < 3 {
		err = client.handleConnectionError(err)
		if err == nil {
			_, err = client.conn.Write(bytes)
		}
		i++
	}
	if err == nil {
        // 發(fā)送成功等待服務(wù)器響應(yīng)
		client.waitingReqs <- req
	} else {
		req.err = err
		req.waiting.Done()
	}
}

讀協(xié)程是我們熟悉的協(xié)議解析器模板, 不熟悉的朋友可以到解析Redis Cluster原理了解更多。

// 收到服務(wù)端的響應(yīng)
func (client *Client) finishRequest(reply redis.Reply) {
	defer func() {
		if err := recover(); err != nil {
			debug.PrintStack()
			logger.Error(err)
		}
	}()
	request := <-client.waitingReqs
	if request == nil {
		return
	}
	request.reply = reply
	if request.waiting != nil {
		request.waiting.Done()
	}
}

// 讀協(xié)程是個(gè) RESP 協(xié)議解析器
func (client *Client) handleRead() error {
	ch := parser.ParseStream(client.conn)
	for payload := range ch {
		if payload.Err != nil {
			client.finishRequest(reply.MakeErrReply(payload.Err.Error()))
			continue
		}
		client.finishRequest(payload.Data)
	}
	return nil
}

最后編寫 client 的構(gòu)造器和啟動(dòng)異步協(xié)程的代碼:

func MakeClient(addr string) (*Client, error) {
    conn, err := net.Dial("tcp", addr)
    if err != nil {
        return nil, err
    }
    ctx, cancel := context.WithCancel(context.Background())
    return &Client{
        addr:        addr,
        conn:        conn,
        sendingReqs: make(chan *Request, chanSize),
        waitingReqs: make(chan *Request, chanSize),
        ctx:         ctx,
        cancelFunc:  cancel,
        writing:     &sync.WaitGroup{},
    }, nil
}

func (client *Client) Start() {
    client.ticker = time.NewTicker(10 * time.Second)
    go client.handleWrite()
    go func() {
        err := client.handleRead()
        logger.Warn(err)
    }()
    go client.heartbeat()
}

關(guān)閉 client 的時(shí)候記得等待請(qǐng)求完成:

func (client *Client) Close() {
    // 先阻止新請(qǐng)求進(jìn)入隊(duì)列
    close(client.sendingReqs)

    // 等待處理中的請(qǐng)求完成
    client.writing.Wait()

    // 釋放資源
    _ = client.conn.Close() // 關(guān)閉與服務(wù)端的連接,連接關(guān)閉后讀協(xié)程會(huì)退出
    client.cancelFunc() // 使用 context 關(guān)閉讀協(xié)程
    close(client.waitingReqs) // 關(guān)閉隊(duì)列
}

測(cè)試一下:

func TestClient(t *testing.T) {
    client, err := MakeClient("localhost:6379")
    if err != nil {
        t.Error(err)
    }
    client.Start()

    result = client.Send([][]byte{
        []byte("SET"),
        []byte("a"),
        []byte("a"),
    })
    if statusRet, ok := result.(*reply.StatusReply); ok {
        if statusRet.Status != "OK" {
            t.Error("`set` failed, result: " + statusRet.Status)
        }
    }

    result = client.Send([][]byte{
        []byte("GET"),
        []byte("a"),
    })
    if bulkRet, ok := result.(*reply.BulkReply); ok {
        if string(bulkRet.Arg) != "a" {
            t.Error("`get` failed, result: " + string(bulkRet.Arg))
        }
    }
}

Keep working, we will find a way out.This is Finley, welcome to join us.

到此這篇關(guān)于Golang 實(shí)現(xiàn) Redis系列(六)如何實(shí)現(xiàn) pipeline 模式的 redis 客戶端的文章就介紹到這了,更多相關(guān)Golang實(shí)現(xiàn)pipeline模式的redis客戶端內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

您可能感興趣的文章:

相關(guān)文章

  • Golang中切片的用法與本質(zhì)詳解

    Golang中切片的用法與本質(zhì)詳解

    Go的切片類型為處理同類型數(shù)據(jù)序列提供一個(gè)方便而高效的方式,下面這篇文章就來給大家介紹了關(guān)于Golang中切片的用法與本質(zhì)的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下
    2018-07-07
  • GO語言的IO方法實(shí)例小結(jié)

    GO語言的IO方法實(shí)例小結(jié)

    這篇文章主要介紹了GO語言的IO方法實(shí)例小結(jié),Docker的火爆促成了當(dāng)下新興的Go語言人氣的大幅攀升,需要的朋友可以參考下
    2015-10-10
  • golang兩種調(diào)用rpc的方法

    golang兩種調(diào)用rpc的方法

    這篇文章主要介紹了golang兩種調(diào)用rpc的方法,結(jié)合實(shí)例形式分析了Go語言調(diào)用rpc的原理與實(shí)現(xiàn)方法,需要的朋友可以參考下
    2016-07-07
  • Golang中channel的原理解讀(推薦)

    Golang中channel的原理解讀(推薦)

    channel主要是為了實(shí)現(xiàn)go的并發(fā)特性,用于并發(fā)通信的,也就是在不同的協(xié)程單元goroutine之間同步通信。接下來通過本文給大家介紹Golang中channel的原理解讀,感興趣的朋友一起看看吧
    2021-10-10
  • 使用Golang的gomail庫(kù)實(shí)現(xiàn)郵件發(fā)送功能

    使用Golang的gomail庫(kù)實(shí)現(xiàn)郵件發(fā)送功能

    本篇博客詳細(xì)介紹了如何使用Golang語言中的gomail庫(kù)來實(shí)現(xiàn)郵件發(fā)送的功能,首先,需要準(zhǔn)備工作,包括安裝Golang環(huán)境、gomail庫(kù),以及申請(qǐng)126郵箱的SMTP服務(wù)和獲取授權(quán)碼,其次,介紹了在config文件中配置SMTP服務(wù)器信息的步驟
    2024-10-10
  • 重學(xué)Go語言之?dāng)?shù)組的具體使用詳解

    重學(xué)Go語言之?dāng)?shù)組的具體使用詳解

    Go的數(shù)組是一種復(fù)合數(shù)據(jù)類型,在平時(shí)開發(fā)中并不常用,更常用的是切片(slice),可以把切片看作是能動(dòng)態(tài)擴(kuò)容的數(shù)組,切片的底層數(shù)據(jù)結(jié)構(gòu)就是數(shù)組,所以數(shù)組雖不常用,但仍然有必要掌握
    2023-02-02
  • Hugo 游樂場(chǎng)內(nèi)容初始化示例詳解

    Hugo 游樂場(chǎng)內(nèi)容初始化示例詳解

    這篇文章主要為大家介紹了Hugo 游樂場(chǎng)內(nèi)容初始化示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-02-02
  • golang數(shù)組-----尋找數(shù)組中缺失的整數(shù)方法

    golang數(shù)組-----尋找數(shù)組中缺失的整數(shù)方法

    這篇文章主要介紹了golang數(shù)組-----尋找數(shù)組中缺失的整數(shù)方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • golang中一種不常見的switch語句寫法示例詳解

    golang中一種不常見的switch語句寫法示例詳解

    這篇文章主要介紹了golang中一種不常見的switch語句寫法,本文通過示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2023-05-05
  • go語言中的map如何解決散列性能下降

    go語言中的map如何解決散列性能下降

    近期對(duì)go語言的map進(jìn)行深入了解和探究,其中關(guān)于map解決大量沖突的擴(kuò)容操作設(shè)計(jì)的十分巧妙,所以筆者特地整理了這篇文章來探討一下go語言中map如何解決散列性能下降,文中有相關(guān)的代碼示例供大家參考,需要的朋友可以參考下
    2024-03-03

最新評(píng)論