Golang?實現(xiàn)Redis?協(xié)議解析器的解決方案
本文是 《用 Golang 實現(xiàn)一個 Redis》系列文章第二篇,本文將分別介紹Redis 通信協(xié)議 以及 協(xié)議解析器 的實現(xiàn),若您對協(xié)議有所了解可以直接閱讀協(xié)議解析器部分。
Redis 通信協(xié)議
Redis 自 2.0 版本起使用了統(tǒng)一的協(xié)議 RESP (REdis Serialization Protocol),該協(xié)議易于實現(xiàn),計算機可以高效的進行解析且易于被人類讀懂。
RESP 是一個二進制安全的文本協(xié)議,工作于 TCP 協(xié)議上。RESP 以行作為單位,客戶端和服務(wù)器發(fā)送的命令或數(shù)據(jù)一律以 \r\n (CRLF)作為換行符。
二進制安全是指允許協(xié)議中出現(xiàn)任意字符而不會導(dǎo)致故障。比如 C 語言的字符串以 \0
作為結(jié)尾不允許字符串中間出現(xiàn)\0
, 而 Go 語言的 string 則允許出現(xiàn) \0
,我們說 Go 語言的 string 是二進制安全的,而 C 語言字符串不是二進制安全的。
RESP 的二進制安全性允許我們在 key 或者 value 中包含 \r
或者 \n
這樣的特殊字符。在使用 redis 存儲 protobuf、msgpack 等二進制數(shù)據(jù)時,二進制安全性尤為重要。
RESP 定義了5種格式:
- 簡單字符串(Simple String): 服務(wù)器用來返回簡單的結(jié)果,比如"OK"。非二進制安全,且不允許換行。
- 錯誤信息(Error): 服務(wù)器用來返回簡單的錯誤信息,比如"ERR Invalid Synatx"。非二進制安全,且不允許換行。
- 整數(shù)(Integer): llen、scard 等命令的返回值, 64位有符號整數(shù)
- 字符串(Bulk String): 二進制安全字符串, 比如 get 等命令的返回值
- 數(shù)組(Array, 又稱 Multi Bulk Strings): Bulk String 數(shù)組,客戶端發(fā)送指令以及 lrange 等命令響應(yīng)的格式
RESP 通過第一個字符來表示格式:
- 簡單字符串:以"+" 開始, 如:"+OK\r\n"
- 錯誤:以"-" 開始,如:"-ERR Invalid Synatx\r\n"
- 整數(shù):以":"開始,如:":1\r\n"
- 字符串:以
$
開始 - 數(shù)組:以
*
開始
Bulk String有兩行,第一行為 $
+正文長度,第二行為實際內(nèi)容。如:
$3\r\nSET\r\n
Bulk String 是二進制安全的可以包含任意字節(jié),就是說可以在 Bulk String 內(nèi)部包含 "\r\n" 字符(行尾的CRLF被隱藏):
$4a\r\nb
$-1
表示 nil, 比如使用 get 命令查詢一個不存在的key時,響應(yīng)即為$-1
。
Array 格式第一行為 "*"+數(shù)組長度,其后是相應(yīng)數(shù)量的 Bulk String。如, ["foo", "bar"]
的報文:
*2 $3 foo $3 bar
客戶端也使用 Array 格式向服務(wù)端發(fā)送指令。命令本身將作為第一個參數(shù),如 SET key value
指令的RESP報文:
*3 $3 SET $3 key $5 value
將換行符打印出來:
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
協(xié)議解析器
我們在 實現(xiàn)TCP服務(wù)器 一文中已經(jīng)介紹過TCP服務(wù)器的實現(xiàn),協(xié)議解析器將實現(xiàn)其 Handler 接口充當應(yīng)用層服務(wù)器。
協(xié)議解析器將接收 Socket 傳來的數(shù)據(jù),并將其數(shù)據(jù)還原為 [][]byte
格式,如 "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\value\r\n"
將被還原為 ['SET', 'key', 'value']
。
本文完整代碼: github.com/hdt3213/godis/redis/parser
來自客戶端的請求均為數(shù)組格式,它在第一行中標記報文的總行數(shù)并使用CRLF
作為分行符。
bufio
標準庫可以將從 reader 讀到的數(shù)據(jù)緩存到 buffer 中,直至遇到分隔符或讀取完畢后返回,所以我們使用 reader.ReadBytes('\n')
來保證每次讀取到完整的一行。
需要注意的是RESP是二進制安全
的協(xié)議,它允許在正文中使用CRLF
字符。舉例來說 Redis 可以正確接收并執(zhí)行SET "a\r\nb" 1
指令, 這條指令的正確報文是這樣的:
*3 $3 SET $4 a\r\nb $7 myvalue
當 ReadBytes
讀取到第五行 "a\r\nb\r\n"時會將其誤認為兩行:
*3 $3 SET $4 a // 錯誤的分行 b // 錯誤的分行 $7 myvalue
因此當讀取到第四行$4
后, 不應(yīng)該繼續(xù)使用 ReadBytes('\n')
讀取下一行, 應(yīng)使用 io.ReadFull(reader, msg)
方法來讀取指定長度的內(nèi)容。
msg = make([]byte, 4 + 2) // 正文長度4 + 換行符長度2 _, err = io.ReadFull(reader, msg)
首先我們來定義解析器的接口:
// Payload stores redis.Reply or error type Payload struct { Data redis.Reply Err error } // ParseStream 通過 io.Reader 讀取數(shù)據(jù)并將結(jié)果通過 channel 將結(jié)果返回給調(diào)用者 // 流式處理的接口適合供客戶端/服務(wù)端使用 func ParseStream(reader io.Reader) <-chan *Payload { ch := make(chan *Payload) go parse0(reader, ch) return ch } // ParseOne 解析 []byte 并返回 redis.Reply func ParseOne(data []byte) (redis.Reply, error) { ch := make(chan *Payload) reader := bytes.NewReader(data) go parse0(reader, ch) payload := <-ch // parse0 will close the channel if payload == nil { return nil, errors.New("no reply") } return payload.Data, payload.Err }
接下來我們可以看一下解析器核心流程的偽代碼,您可以在parser.go看到完整代碼:
func parse0(reader io.Reader, ch chan<- *Payload) { // 初始化讀取狀態(tài) readingMultiLine := false expectedArgsCount := 0 var args [][]byte var bulkLen int64 for { // 上文中我們提到 RESP 是以行為單位的 // 因為行分為簡單字符串和二進制安全的BulkString,我們需要封裝一個 readLine 函數(shù)來兼容 line, err = readLine(reader, bulkLen) if err != nil { // 處理錯誤 return } // 接下來我們對剛剛讀取的行進行解析 // 我們簡單的將 Reply 分為兩類: // 單行: StatusReply, IntReply, ErrorReply // 多行: BulkReply, MultiBulkReply if !readingMultiLine { if isMulitBulkHeader(line) { // 我們收到了 MulitBulkReply 的第一行 // 獲得 MulitBulkReply 中 BulkString 的個數(shù) expectedArgsCount = parseMulitBulkHeader(line) // 等待 MulitBulkReply 后續(xù)行 readingMultiLine = true } else if isBulkHeader(line) { // 我們收到了 BulkReply 的第一行 // 獲得 BulkReply 第二行的長度, 通過 bulkLen 告訴 readLine 函數(shù)下一行 BulkString 的長度 bulkLen = parseBulkHeader() // 這個 Reply 中一共有 1 個 BulkString expectedArgsCount = 1 // 等待 BulkReply 后續(xù)行 readingMultiLine = true } else { // 處理 StatusReply, IntReply, ErrorReply 等單行 Reply reply := parseSingleLineReply(line) // 通過 ch 返回結(jié)果 emitReply(ch) } } else { // 進入此分支說明我們正在等待 MulitBulkReply 或 BulkReply 的后續(xù)行 // MulitBulkReply 的后續(xù)行有兩種,BulkHeader 或者 BulkString if isBulkHeader(line) { bulkLen = parseBulkHeader() } else { // 我們正在讀取一個 BulkString, 它可能是 MulitBulkReply 或 BulkReply args = append(args, line) } if len(args) == expectedArgsCount { // 我們已經(jīng)讀取了所有后續(xù)行 // 通過 ch 返回結(jié)果 emitReply(ch) // 重置狀態(tài), 準備解析下一條 Reply readingMultiLine = false expectedArgsCount = 0 args = nil bulkLen = 0 } } } }
貼一下工具函數(shù)的實現(xiàn):
func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) { var msg []byte var err error if state.bulkLen == 0 { // read simple line msg, err = bufReader.ReadBytes('\n') if err != nil { return nil, true, err } if len(msg) == 0 || msg[len(msg)-2] != '\r' { return nil, false, errors.New("protocol error: " + string(msg)) } } else { // read bulk line (binary safe) msg = make([]byte, state.bulkLen+2) _, err = io.ReadFull(bufReader, msg) if err != nil { return nil, true, err } if len(msg) == 0 || msg[len(msg)-2] != '\r' || msg[len(msg)-1] != '\n' { return nil, false, errors.New("protocol error: " + string(msg)) } state.bulkLen = 0 } return msg, false, nil } func parseMultiBulkHeader(msg []byte, state *readState) error { var err error var expectedLine uint64 expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32) if err != nil { return errors.New("protocol error: " + string(msg)) } if expectedLine == 0 { state.expectedArgsCount = 0 return nil } else if expectedLine > 0 { // first line of multi bulk reply state.msgType = msg[0] state.readingMultiLine = true state.expectedArgsCount = int(expectedLine) state.args = make([][]byte, 0, expectedLine) return nil } else { return errors.New("protocol error: " + string(msg)) } } func parseBulkHeader(msg []byte, state *readState) error { var err error state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64) if err != nil { return errors.New("protocol error: " + string(msg)) } if state.bulkLen == -1 { // null bulk return nil } else if state.bulkLen > 0 { state.msgType = msg[0] state.readingMultiLine = true state.expectedArgsCount = 1 state.args = make([][]byte, 0, 1) return nil } else { return errors.New("protocol error: " + string(msg)) } } func parseSingleLineReply(msg []byte) (redis.Reply, error) { str := strings.TrimSuffix(string(msg), "\n") str = strings.TrimSuffix(str, "\r") var result redis.Reply switch msg[0] { case '+': // status reply result = reply.MakeStatusReply(str[1:]) case '-': // err reply result = reply.MakeErrReply(str[1:]) case ':': // int reply val, err := strconv.ParseInt(str[1:], 10, 64) if err != nil { return nil, errors.New("protocol error: " + string(msg)) } result = reply.MakeIntReply(val) default: // parse as text protocol strs := strings.Split(str, " ") args := make([][]byte, len(strs)) for i, s := range strs { args[i] = []byte(s) } result = reply.MakeMultiBulkReply(args) } return result, nil }
到此這篇關(guān)于Golang 實現(xiàn) Redis 協(xié)議解析器的文章就介紹到這了,更多相關(guān)go redis 協(xié)議解析器內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
淺談Go語言多態(tài)的實現(xiàn)與interface使用
如果大家系統(tǒng)的學(xué)過C++、Java等語言以及面向?qū)ο蟮脑?,相信?yīng)該對多態(tài)不會陌生。多態(tài)是面向?qū)ο蠓懂牣斨薪?jīng)常使用并且非常好用的一個功能,它主要是用在強類型語言當中,像是Python這樣的弱類型語言,變量的類型可以隨意變化,也沒有任何限制,其實區(qū)別不是很大2021-06-06