Golang中ringbuffer的實現(xiàn)與應(yīng)用場景詳解
ringbuffer因為它能復(fù)用緩沖空間,通常用于網(wǎng)絡(luò)通信連接的讀寫,雖然市面上已經(jīng)有了go寫的諸多版本的ringbuffer組件,雖然諸多版本,實現(xiàn)ringbuffer的核心邏輯卻是不變的。但發(fā)現(xiàn)其內(nèi)部提供的方法并不能滿足我當(dāng)下的需求,所以還是自己造一個吧。
源碼已經(jīng)上傳到github:https://github.com/HobbyBear/ringbuffer
需求分析
我在基于epoll實現(xiàn)一個網(wǎng)絡(luò)框架時,需要預(yù)先定義好的和客戶端的通信協(xié)議,當(dāng)從連接讀取數(shù)據(jù)時需要判讀當(dāng)前連接是否擁有完整的協(xié)議(實際網(wǎng)絡(luò)環(huán)境中可能完整的協(xié)議字節(jié)只到達(dá)了部分),有才會將數(shù)據(jù)全部讀取出來,然后進(jìn)行處理,否則就等待下次連接可讀時,再判斷連接是否具有完整的協(xié)議。
由于在讀取時需要先判斷當(dāng)前連接是否有完整協(xié)議,所以讀取時不能移動讀指針的位置,因為萬一協(xié)議不完整的話,下次讀取還要從當(dāng)前的讀指針位置開始讀取。
所以對于ringbuffer 組件我會實現(xiàn)一個peek方法
func (r *RingBuffer) Peek(readOffsetBack, n int) ([]byte, error)
peek方法兩個參數(shù),n代表要讀取的字節(jié)數(shù), readOffsetBack 代表讀取是要在當(dāng)前讀位置偏移的字節(jié)數(shù),因為在設(shè)計協(xié)議時,往往協(xié)議不是那么簡單(可能是由多個固定長度的數(shù)據(jù)構(gòu)成) ,比如下面這樣的協(xié)議格式。
完整的協(xié)議有三段構(gòu)成,每段開頭都會有一個4字節(jié)的大小代表每段的長度,在判斷協(xié)議是否完整時,就必須看著3段的數(shù)據(jù)是否都全部到達(dá)。 所以在判斷第二段數(shù)據(jù)是否完整時,會跳過前面3個字節(jié)去判斷,此時readOffsetBack 將會是3。
此外我還需要一個通過分割符獲取字節(jié)的方法,因為有時候協(xié)議不是固定長度的數(shù)組了,而是通過某個分割符判斷某段協(xié)議是否結(jié)束,比如換行符。
func (r *RingBuffer) PeekBytes(readOffsetBack int, delim byte) ([]byte, error)
接著,還需要提供一個更新讀位置的方法,因為一旦判斷是一個完整的協(xié)議后,我會將協(xié)議數(shù)據(jù)全部讀取出來,此時應(yīng)該要更新讀指針的位置,以便下次讀取新的請求。
func (r *RingBuffer) AddReadPosition(n int)
n 便是代表需要將讀指針往后偏移的n個字節(jié)。
ringbuffer 原理解析
接著,我們再來看看實際上ringbuffer的實現(xiàn)原理是什么。
首先來看下一個ringbuffer應(yīng)該有的屬性
type RingBuffer struct { buf []byte reader io.Reader r int // 標(biāo)記下次讀取開始的位置 unReadSize int // 緩沖區(qū)中未讀數(shù)據(jù)大小 }
buf 用作連接讀取的緩沖區(qū),reader 代表了原鏈接,r代表讀取ringbuffer時應(yīng)該從字節(jié)數(shù)組的哪個位置開始讀取,unReadSize 代表緩沖區(qū)當(dāng)中還有多少數(shù)據(jù)沒有讀取,因為你可能一次性從reader里讀取了很多數(shù)據(jù)到buf里,但是上層應(yīng)用只取buf里的部分?jǐn)?shù)據(jù),剩余的未讀數(shù)據(jù)就留在了buf里,等待下次被應(yīng)用層繼續(xù)讀取。
我們用一個5字節(jié)的字節(jié)數(shù)組當(dāng)做緩沖區(qū), 首先從ringbuffer讀取數(shù)據(jù)時,由于ringbuffer內(nèi)部沒有數(shù)據(jù),所以需要從連接中讀取數(shù)據(jù)然后寫到ringbuffer里。
如下圖所示:
假設(shè)ringBuffer規(guī)定每次向原網(wǎng)絡(luò)連接讀取時 按4字節(jié)讀取到緩沖區(qū)中(實際情況為了減少系統(tǒng)調(diào)用開銷,這個值會更多,盡可能會一次性讀取更多數(shù)據(jù)到緩沖區(qū)) write pos 指向的位置則代表從reader讀取的數(shù)據(jù)應(yīng)該從哪個位置開始寫入到buf字節(jié)數(shù)組里。
writePos = (r + unReadSize) % len(buf)
接著,上層應(yīng)用只讀取了3個字節(jié),緩沖區(qū)中的讀指針r和未讀空間就會變成下面這樣
如果此時上層應(yīng)用還想再讀取3個字節(jié),那么ringbuffer就必須再向reader讀取字節(jié)填充到緩沖區(qū)上,我們假設(shè)這次向reader索取3個字節(jié)。緩沖區(qū)的空間就會變成下面這樣
此時已經(jīng)復(fù)用了首次向reader讀取數(shù)據(jù)時占據(jù)的緩沖空間了。
當(dāng)填充上字節(jié)后,應(yīng)用層繼續(xù)讀取3個字節(jié),那么ringBuffer會變成這樣
讀指針又指向了數(shù)組的開頭了,可以得出讀指針的計算公式
r = (r + n)% len(buf)
ringBuffer 代碼解析
有了前面的演示后,再來看代碼就比較容易了。用peek 方法舉例進(jìn)行分析,
func (r *RingBuffer) Peek(readOffsetBack, n int) ([]byte, error) { // 由于目前實現(xiàn)的ringBuffer還不具備自動擴(kuò)容,所以不支持讀取的字節(jié)數(shù)大于緩沖區(qū)的長度 if n > len(r.buf) { return nil, fmt.Errorf("the unReadSize is over range the buffer len") } peek: if n <= r.UnReadSize()-readOffsetBack { // 說明緩沖區(qū)中的未讀字節(jié)數(shù)有足夠長的n個字節(jié),從buf緩沖區(qū)直接讀取 readPos := (r.r + readOffsetBack) % len(r.buf) return r.dataByPos(readPos, (r.r+readOffsetBack+n-1)%len(r.buf)), nil } // 說明緩沖區(qū)中未讀字節(jié)數(shù)不夠n個字節(jié)那么長,還需要從reader里讀取數(shù)據(jù)到緩沖區(qū)中 err := r.fill() if err != nil { return nil, err } goto peek }
peek方法的大致邏輯是首先判斷要讀取的n個字節(jié)能不能從緩沖區(qū)buf里直接讀取,如果能則直接返回,如果不能,則需要從reader里繼續(xù)讀取數(shù)據(jù),直到buf緩沖區(qū)數(shù)據(jù)夠n個字節(jié)那么長。
dataByPos 方法是根據(jù)傳入的元素位置,從buf中讀取在這個位置區(qū)間內(nèi)的數(shù)據(jù)。
// dataByPos 返回索引值在start和end之間的數(shù)據(jù),閉區(qū)間 func (r *RingBuffer) dataByPos(start int, end int) []byte { // 因為環(huán)形緩沖區(qū)原因,所以末位置索引值有可能小于開始位置索引 if end < start { return append(r.buf[start:], r.buf[:end+1]...) } return r.buf[start : end+1] }
fill() 方法則是從reader中讀取數(shù)據(jù)到buf里。
fill 情況分析
reader填充新數(shù)據(jù)到buf后,未讀空間未跨越buf末尾
當(dāng)從reader讀取完數(shù)據(jù)后,如果 end := r.r + r.unReadSize + readBytes end指向了未讀空間的末尾,如果沒有超過buf的長度,那么將數(shù)據(jù)復(fù)制到buf里的邏輯很簡單,直接在當(dāng)前write pos的位置追加讀取到的字節(jié)就行。
// 此時writePos 沒有超過 len(buf) writePos = (r + unReadSize)
未讀 空間 本來就 已經(jīng)從頭覆蓋
當(dāng)未讀空間本來就重新覆蓋了buf頭部,和上面類似,這種情況也是直接在write pos 位置追加數(shù)據(jù)即可。
未讀空間未跨越buf末尾,當(dāng)從reader追加數(shù)據(jù)到buf后發(fā)現(xiàn)需要覆蓋buf頭部
這種情況需要將讀取的數(shù)據(jù)一部分覆蓋到buf的末尾
writePos := (r.r + r.unReadSize) % len(r.buf) n := copy(r.buf[writePos:], buf[:readBytes])
一部分覆蓋到buf的頭部
end := r.r + r.unReadSize + readBytes copy(r.buf[:end%len(r.buf)], buf[len(r.buf)-writePos:])
現(xiàn)在再來看fill的源碼就比較容易理解了。
func (r *RingBuffer) fill() error { if r.unReadSize == len(r.buf) { // 當(dāng)未讀數(shù)據(jù)填滿buf后 ,就應(yīng)該等待上層應(yīng)用把未讀數(shù)據(jù)讀取一部分再來填充緩沖區(qū) return fmt.Errorf("the unReadSize is over range the buffer len") } // batchFetchBytes 為每次向reader里讀取多少個字節(jié),如果此時buf的剩余空間比batchFetchBytes小,則應(yīng)該只向reader讀取剩余空間的字節(jié)數(shù) readLen := int(math.Min(float64(r.batchFetchBytes), float64(len(r.buf)-r.unReadSize))) buf := make([]byte, readLen) readBytes, err := r.reader.Read(buf) if readBytes > 0 { // 查看讀取readBytes個字節(jié)后,未讀空間有沒有超過buf末尾指針,如果超過了,在復(fù)制數(shù)據(jù)時需要特殊處理 end := r.r + r.unReadSize + readBytes if end < len(r.buf) { // 沒有超過末尾指針,直接將數(shù)據(jù)copy到writePos后面 copy(r.buf[r.r+r.unReadSize:], buf[:readBytes]) } else { // 超過了末尾指針,有兩種情況,看下圖分析 writePos := (r.r + r.unReadSize) % len(r.buf) n := copy(r.buf[writePos:], buf[:readBytes]) if n < readBytes { copy(r.buf[:end%len(r.buf)], buf[len(r.buf)-writePos:]) } } r.unReadSize += readBytes return nil } if err != nil { return err } return nil }
以上就是Golang中ringbuffer的實現(xiàn)與應(yīng)用場景詳解的詳細(xì)內(nèi)容,更多關(guān)于Golang ringbuffer的資料請關(guān)注腳本之家其它相關(guān)文章!