Golang?實(shí)現(xiàn)Redis?協(xié)議解析器的解決方案
本文是 《用 Golang 實(shí)現(xiàn)一個(gè) Redis》系列文章第二篇,本文將分別介紹Redis 通信協(xié)議 以及 協(xié)議解析器 的實(shí)現(xiàn),若您對(duì)協(xié)議有所了解可以直接閱讀協(xié)議解析器部分。
Redis 通信協(xié)議
Redis 自 2.0 版本起使用了統(tǒng)一的協(xié)議 RESP (REdis Serialization Protocol),該協(xié)議易于實(shí)現(xiàn),計(jì)算機(jī)可以高效的進(jìn)行解析且易于被人類讀懂。
RESP 是一個(gè)二進(jìn)制安全的文本協(xié)議,工作于 TCP 協(xié)議上。RESP 以行作為單位,客戶端和服務(wù)器發(fā)送的命令或數(shù)據(jù)一律以 \r\n (CRLF)作為換行符。
二進(jìn)制安全是指允許協(xié)議中出現(xiàn)任意字符而不會(huì)導(dǎo)致故障。比如 C 語言的字符串以 \0 作為結(jié)尾不允許字符串中間出現(xiàn)\0, 而 Go 語言的 string 則允許出現(xiàn) \0,我們說 Go 語言的 string 是二進(jìn)制安全的,而 C 語言字符串不是二進(jìn)制安全的。
RESP 的二進(jìn)制安全性允許我們?cè)?key 或者 value 中包含 \r 或者 \n 這樣的特殊字符。在使用 redis 存儲(chǔ) protobuf、msgpack 等二進(jìn)制數(shù)據(jù)時(shí),二進(jìn)制安全性尤為重要。
RESP 定義了5種格式:
- 簡(jiǎn)單字符串(Simple String): 服務(wù)器用來返回簡(jiǎn)單的結(jié)果,比如"OK"。非二進(jìn)制安全,且不允許換行。
- 錯(cuò)誤信息(Error): 服務(wù)器用來返回簡(jiǎn)單的錯(cuò)誤信息,比如"ERR Invalid Synatx"。非二進(jìn)制安全,且不允許換行。
- 整數(shù)(Integer): llen、scard 等命令的返回值, 64位有符號(hào)整數(shù)
- 字符串(Bulk String): 二進(jìn)制安全字符串, 比如 get 等命令的返回值
- 數(shù)組(Array, 又稱 Multi Bulk Strings): Bulk String 數(shù)組,客戶端發(fā)送指令以及 lrange 等命令響應(yīng)的格式
RESP 通過第一個(gè)字符來表示格式:
- 簡(jiǎn)單字符串:以"+" 開始, 如:"+OK\r\n"
- 錯(cuò)誤:以"-" 開始,如:"-ERR Invalid Synatx\r\n"
- 整數(shù):以":"開始,如:":1\r\n"
- 字符串:以
$開始 - 數(shù)組:以
*開始
Bulk String有兩行,第一行為 $+正文長(zhǎng)度,第二行為實(shí)際內(nèi)容。如:
$3\r\nSET\r\n
Bulk String 是二進(jìn)制安全的可以包含任意字節(jié),就是說可以在 Bulk String 內(nèi)部包含 "\r\n" 字符(行尾的CRLF被隱藏):
$4a\r\nb
$-1 表示 nil, 比如使用 get 命令查詢一個(gè)不存在的key時(shí),響應(yīng)即為$-1。
Array 格式第一行為 "*"+數(shù)組長(zhǎng)度,其后是相應(yīng)數(shù)量的 Bulk String。如, ["foo", "bar"]的報(bào)文:
*2 $3 foo $3 bar
客戶端也使用 Array 格式向服務(wù)端發(fā)送指令。命令本身將作為第一個(gè)參數(shù),如 SET key value指令的RESP報(bào)文:
*3 $3 SET $3 key $5 value
將換行符打印出來:
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
協(xié)議解析器
我們?cè)?a href="http://www.dbjr.com.cn/article/265861.htm" target="_blank"> 實(shí)現(xiàn)TCP服務(wù)器 一文中已經(jīng)介紹過TCP服務(wù)器的實(shí)現(xiàn),協(xié)議解析器將實(shí)現(xiàn)其 Handler 接口充當(dāng)應(yīng)用層服務(wù)器。
協(xié)議解析器將接收 Socket 傳來的數(shù)據(jù),并將其數(shù)據(jù)還原為 [][]byte 格式,如 "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\value\r\n" 將被還原為 ['SET', 'key', 'value']。
本文完整代碼: github.com/hdt3213/godis/redis/parser
來自客戶端的請(qǐng)求均為數(shù)組格式,它在第一行中標(biāo)記報(bào)文的總行數(shù)并使用CRLF作為分行符。
bufio 標(biāo)準(zhǔn)庫可以將從 reader 讀到的數(shù)據(jù)緩存到 buffer 中,直至遇到分隔符或讀取完畢后返回,所以我們使用 reader.ReadBytes('\n') 來保證每次讀取到完整的一行。
需要注意的是RESP是二進(jìn)制安全的協(xié)議,它允許在正文中使用CRLF字符。舉例來說 Redis 可以正確接收并執(zhí)行SET "a\r\nb" 1指令, 這條指令的正確報(bào)文是這樣的:
*3 $3 SET $4 a\r\nb $7 myvalue
當(dāng) ReadBytes 讀取到第五行 "a\r\nb\r\n"時(shí)會(huì)將其誤認(rèn)為兩行:
*3 $3 SET $4 a // 錯(cuò)誤的分行 b // 錯(cuò)誤的分行 $7 myvalue
因此當(dāng)讀取到第四行$4后, 不應(yīng)該繼續(xù)使用 ReadBytes('\n') 讀取下一行, 應(yīng)使用 io.ReadFull(reader, msg) 方法來讀取指定長(zhǎng)度的內(nèi)容。
msg = make([]byte, 4 + 2) // 正文長(zhǎng)度4 + 換行符長(zhǎng)度2 _, err = io.ReadFull(reader, msg)
首先我們來定義解析器的接口:
// Payload stores redis.Reply or error
type Payload struct {
Data redis.Reply
Err error
}
// ParseStream 通過 io.Reader 讀取數(shù)據(jù)并將結(jié)果通過 channel 將結(jié)果返回給調(diào)用者
// 流式處理的接口適合供客戶端/服務(wù)端使用
func ParseStream(reader io.Reader) <-chan *Payload {
ch := make(chan *Payload)
go parse0(reader, ch)
return ch
}
// ParseOne 解析 []byte 并返回 redis.Reply
func ParseOne(data []byte) (redis.Reply, error) {
ch := make(chan *Payload)
reader := bytes.NewReader(data)
go parse0(reader, ch)
payload := <-ch // parse0 will close the channel
if payload == nil {
return nil, errors.New("no reply")
}
return payload.Data, payload.Err
}接下來我們可以看一下解析器核心流程的偽代碼,您可以在parser.go看到完整代碼:
func parse0(reader io.Reader, ch chan<- *Payload) {
// 初始化讀取狀態(tài)
readingMultiLine := false
expectedArgsCount := 0
var args [][]byte
var bulkLen int64
for {
// 上文中我們提到 RESP 是以行為單位的
// 因?yàn)樾蟹譃楹?jiǎn)單字符串和二進(jìn)制安全的BulkString,我們需要封裝一個(gè) readLine 函數(shù)來兼容
line, err = readLine(reader, bulkLen)
if err != nil {
// 處理錯(cuò)誤
return
}
// 接下來我們對(duì)剛剛讀取的行進(jìn)行解析
// 我們簡(jiǎn)單的將 Reply 分為兩類:
// 單行: StatusReply, IntReply, ErrorReply
// 多行: BulkReply, MultiBulkReply
if !readingMultiLine {
if isMulitBulkHeader(line) {
// 我們收到了 MulitBulkReply 的第一行
// 獲得 MulitBulkReply 中 BulkString 的個(gè)數(shù)
expectedArgsCount = parseMulitBulkHeader(line)
// 等待 MulitBulkReply 后續(xù)行
readingMultiLine = true
} else if isBulkHeader(line) {
// 我們收到了 BulkReply 的第一行
// 獲得 BulkReply 第二行的長(zhǎng)度, 通過 bulkLen 告訴 readLine 函數(shù)下一行 BulkString 的長(zhǎng)度
bulkLen = parseBulkHeader()
// 這個(gè) Reply 中一共有 1 個(gè) BulkString
expectedArgsCount = 1
// 等待 BulkReply 后續(xù)行
readingMultiLine = true
} else {
// 處理 StatusReply, IntReply, ErrorReply 等單行 Reply
reply := parseSingleLineReply(line)
// 通過 ch 返回結(jié)果
emitReply(ch)
}
} else {
// 進(jìn)入此分支說明我們正在等待 MulitBulkReply 或 BulkReply 的后續(xù)行
// MulitBulkReply 的后續(xù)行有兩種,BulkHeader 或者 BulkString
if isBulkHeader(line) {
bulkLen = parseBulkHeader()
} else {
// 我們正在讀取一個(gè) BulkString, 它可能是 MulitBulkReply 或 BulkReply
args = append(args, line)
}
if len(args) == expectedArgsCount { // 我們已經(jīng)讀取了所有后續(xù)行
// 通過 ch 返回結(jié)果
emitReply(ch)
// 重置狀態(tài), 準(zhǔn)備解析下一條 Reply
readingMultiLine = false
expectedArgsCount = 0
args = nil
bulkLen = 0
}
}
}
}貼一下工具函數(shù)的實(shí)現(xiàn):
func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
var msg []byte
var err error
if state.bulkLen == 0 { // read simple line
msg, err = bufReader.ReadBytes('\n')
if err != nil {
return nil, true, err
}
if len(msg) == 0 || msg[len(msg)-2] != '\r' {
return nil, false, errors.New("protocol error: " + string(msg))
}
} else { // read bulk line (binary safe)
msg = make([]byte, state.bulkLen+2)
_, err = io.ReadFull(bufReader, msg)
if err != nil {
return nil, true, err
}
if len(msg) == 0 ||
msg[len(msg)-2] != '\r' ||
msg[len(msg)-1] != '\n' {
return nil, false, errors.New("protocol error: " + string(msg))
}
state.bulkLen = 0
}
return msg, false, nil
}
func parseMultiBulkHeader(msg []byte, state *readState) error {
var err error
var expectedLine uint64
expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
if err != nil {
return errors.New("protocol error: " + string(msg))
}
if expectedLine == 0 {
state.expectedArgsCount = 0
return nil
} else if expectedLine > 0 {
// first line of multi bulk reply
state.msgType = msg[0]
state.readingMultiLine = true
state.expectedArgsCount = int(expectedLine)
state.args = make([][]byte, 0, expectedLine)
return nil
} else {
return errors.New("protocol error: " + string(msg))
}
}
func parseBulkHeader(msg []byte, state *readState) error {
var err error
state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
if err != nil {
return errors.New("protocol error: " + string(msg))
}
if state.bulkLen == -1 { // null bulk
return nil
} else if state.bulkLen > 0 {
state.msgType = msg[0]
state.readingMultiLine = true
state.expectedArgsCount = 1
state.args = make([][]byte, 0, 1)
return nil
} else {
return errors.New("protocol error: " + string(msg))
}
}
func parseSingleLineReply(msg []byte) (redis.Reply, error) {
str := strings.TrimSuffix(string(msg), "\n")
str = strings.TrimSuffix(str, "\r")
var result redis.Reply
switch msg[0] {
case '+': // status reply
result = reply.MakeStatusReply(str[1:])
case '-': // err reply
result = reply.MakeErrReply(str[1:])
case ':': // int reply
val, err := strconv.ParseInt(str[1:], 10, 64)
if err != nil {
return nil, errors.New("protocol error: " + string(msg))
}
result = reply.MakeIntReply(val)
default:
// parse as text protocol
strs := strings.Split(str, " ")
args := make([][]byte, len(strs))
for i, s := range strs {
args[i] = []byte(s)
}
result = reply.MakeMultiBulkReply(args)
}
return result, nil
}到此這篇關(guān)于Golang 實(shí)現(xiàn) Redis 協(xié)議解析器的文章就介紹到這了,更多相關(guān)go redis 協(xié)議解析器內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
詳解Golang如何優(yōu)雅接入多個(gè)遠(yuǎn)程配置中心
這篇文章主要為大家為大家介紹了Golang如何優(yōu)雅接入多個(gè)遠(yuǎn)程配置中心詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-05-05
淺談Go語言多態(tài)的實(shí)現(xiàn)與interface使用
如果大家系統(tǒng)的學(xué)過C++、Java等語言以及面向?qū)ο蟮脑挘嘈艖?yīng)該對(duì)多態(tài)不會(huì)陌生。多態(tài)是面向?qū)ο蠓懂牣?dāng)中經(jīng)常使用并且非常好用的一個(gè)功能,它主要是用在強(qiáng)類型語言當(dāng)中,像是Python這樣的弱類型語言,變量的類型可以隨意變化,也沒有任何限制,其實(shí)區(qū)別不是很大2021-06-06

