Golang無限緩存channel的設計與實現(xiàn)解析
一.引言
Go語言的Channel有兩種類型,一種是無緩存的channle,一個種是有緩存的channel,但是對于有緩存的channle來說,其緩存長度在創(chuàng)建時就已經(jīng)固定了,中間也不能擴縮容,這導致對某些特定的業(yè)務場景來說不太方便
業(yè)務場景如下 :
爬蟲場景,想爬取某個URL頁面上可達的所有URL
一個channle中存在待處理的URL
一堆worker groutine從channle中讀取URL,下載解析網(wǎng)頁并且提取URL,再把URL放入channle
這種場景下,使用消息隊列或sync包可以解決這個問題,但是比較復雜,如果有一個可以無限緩存的Channle或許是比較好的解決方案
二.設計
基于以上特定的業(yè)務場景,我們的無限緩存Channle應該滿足以下要求:
緩存無限,最核心的基本要求。
不能阻塞寫,普通channle的寫操作之所以阻塞,是因為緩存滿了,無限緩存的channle不應該存在這個問題。
無數(shù)據(jù)時阻塞讀,此特性保持和普通channle一樣。
讀寫都應通過channle操作 :通過channle的 <- 和 ->,第一個是方便,仍遵循普通channle的語法,第二是不能暴露內(nèi)部緩存
channle被關閉后,未讀取的數(shù)據(jù)應該仍然可讀,此特性和普通channle保持一致
可基于數(shù)據(jù)量自動擴縮容,在數(shù)據(jù)量很大的時候要求可以自適應的擴容,在數(shù)據(jù)量變小后,為了避免內(nèi)存浪費,要求可以自適應的縮容
針對以上要求,設計思想如下:
內(nèi)部含有兩個普通channle,分別用于讀寫,我們將其稱作In和Out,往In中寫入數(shù)據(jù),然后從Out中讀取數(shù)據(jù)
內(nèi)部有一個可以自適應擴縮容的buf,當寫channle滿了寫不了之后,寫入到此buf中
內(nèi)部含有一個工作goroutine,總是In中數(shù)據(jù)放入到Out或者buf中
內(nèi)部的自適應擴縮容buf可以采用雙向環(huán)形鏈表
和采用數(shù)組實現(xiàn)相比,優(yōu)點如下:
數(shù)組大小是有限制的,語言層面就做不到真正的無限緩存
數(shù)組擴容代價大,而采用雙向環(huán)形鏈表則只用增加節(jié)點即可,縮容同樣
type T interface{} type UnlimitSizeChan struct { bufCount int64 // 統(tǒng)計元素數(shù)量,原子操作 In chan<- T // 寫入channle Out <-chan T // 讀取channle buffer *RingBuffer // 自適應擴縮容Buf }
雙向環(huán)形鏈表 如何寫入和讀取數(shù)據(jù),并且做到自適應擴縮容?
雙向環(huán)形鏈表buf其結構類似于一個手串,手串上的珠子就可以當做是一個節(jié)點,每個節(jié)點可以是一個固定大小的數(shù)組
雙向環(huán)形鏈表buf上分別有兩個讀寫指針readCell和writeCell,指向將要進行讀寫操作的cell,負責進行數(shù)據(jù)讀寫
readCell永遠追趕writeCell,當追上時,代表寫滿了,進行擴容操作
擴容操作即在寫指針的后面插入一個新建的空閑cell
當buf中沒有數(shù)據(jù)時,代表此時的流量高峰應該已經(jīng)過去了,應該進行縮容操作
縮容操作修改鏈表指向即可,讓buf恢復原樣,僅保持兩個cell即可,其他cell由于不再被引用,會被GC自動回收
cell上也有兩個讀寫指針r和w,分別負責進行cell上的讀寫,也是r讀指針永遠追趕w寫指針
type cell struct { Data []T // 數(shù)據(jù)部分 fullFlag bool // cell滿的標志 next *cell // 指向后一個cellBuffer pre *cell // 指向前一個cellBuffer r int // 下一個要讀的指針 w int // 下一個要下的指針 } type RingBuffer struct { cellCount int // cell 數(shù)量統(tǒng)計 readCell *cell // 下一個要讀的cell writeCell *cell // 下一個要寫的cell }
數(shù)據(jù)FIFO原則是如何保證的?
無限緩存Channle內(nèi)部的Goroutine,我們稱其為Worker
當Out channle還沒有滿時并且Buf中沒有數(shù)據(jù)時,Worker將讀取In中數(shù)據(jù),將其放入Out,直到Out滿
當Buf中有數(shù)據(jù)時,無論Out是否滿,都將將In中讀到的數(shù)據(jù),直接寫入到Buf中,目的就是為了保證數(shù)據(jù)的FIFO原則
當cell標記為滿時,就算此cell中已經(jīng)被讀取了一部分數(shù)據(jù)了,此cell在讀取完所有數(shù)據(jù)之前也不能用于寫,目的也是為了保證數(shù)據(jù)的FIFO原則
三.實現(xiàn)
1.雙向環(huán)形鏈表實現(xiàn)
package unlimitSizeChan import ( "errors" "fmt" ) var ErrRingIsEmpty = errors.New("ringbuffer is empty") // CellInitialSize cell的初始容量 var CellInitialSize = 1024 // CellInitialCount 初始化cell數(shù)量 var CellInitialCount = 2 type cell struct { Data []T // 數(shù)據(jù)部分 fullFlag bool // cell滿的標志 next *cell // 指向后一個cellBuffer pre *cell // 指向前一個cellBuffer r int // 下一個要讀的指針 w int // 下一個要下的指針 } type RingBuffer struct { cellCount int // cell 數(shù)量統(tǒng)計 readCell *cell // 下一個要讀的cell writeCell *cell // 下一個要寫的cell } // NewRingBuffer 新建一個ringbuffe,包含兩個cell func NewRingBuffer() *RingBuffer { rootCell := &cell{ Data: make([]T, CellInitialSize), } lastCell := &cell{ Data: make([]T, CellInitialSize), } rootCell.pre = lastCell lastCell.pre = rootCell rootCell.next = lastCell lastCell.next = rootCell return &RingBuffer{ cellCount: CellInitialCount, readCell: rootCell, writeCell: rootCell, } } // Read 讀取數(shù)據(jù) func (r *RingBuffer) Read() (T, error) { // 無數(shù)據(jù) if r.IsEmpty() { return nil, ErrRingIsEmpty } // 讀取數(shù)據(jù),并將讀指針向右移動一位 value := r.readCell.Data[r.readCell.r] r.readCell.r++ // 此cell已經(jīng)讀完 if r.readCell.r == CellInitialSize { // 讀指針歸零,并將該cell狀態(tài)置為非滿 r.readCell.r = 0 r.readCell.fullFlag = false // 將readCell指向下一個cell r.readCell = r.readCell.next } return value, nil } // Pop 讀一個元素,讀完后移動指針 func (r *RingBuffer) Pop() T { value, err := r.Read() if err != nil { panic(err.Error()) } return value } // Peek 窺視 讀一個元素,僅讀但不移動指針 func (r *RingBuffer) Peek() T { if r.IsEmpty() { panic(ErrRingIsEmpty.Error()) } // 僅讀 value := r.readCell.Data[r.readCell.r] return value } // Write 寫入數(shù)據(jù) func (r *RingBuffer) Write(value T) { // 在 r.writeCell.w 位置寫入數(shù)據(jù),指針向右移動一位 r.writeCell.Data[r.writeCell.w] = value r.writeCell.w++ // 當前cell寫滿了 if r.writeCell.w == CellInitialSize { // 指針置0,將該cell標記為已滿,并指向下一個cell r.writeCell.w = 0 r.writeCell.fullFlag = true r.writeCell = r.writeCell.next } // 下一個cell也已滿,擴容 if r.writeCell.fullFlag == true { r.grow() } } // grow 擴容 func (r *RingBuffer) grow() { // 新建一個cell newCell := &cell{ Data: make([]T, CellInitialSize), } // 總共三個cell,writeCell,preCell,newCell // 本來關系: preCell <===> writeCell // 現(xiàn)在將newcell插入:preCell <===> newCell <===> writeCell pre := r.writeCell.pre pre.next = newCell newCell.pre = pre newCell.next = r.writeCell r.writeCell.pre = newCell // 將writeCell指向新建的cell r.writeCell = r.writeCell.pre // cell 數(shù)量加一 r.cellCount++ } // IsEmpty 判斷ringbuffer是否為空 func (r *RingBuffer) IsEmpty() bool { // readCell和writeCell指向同一個cell,并且該cell的讀寫指針也指向同一個位置,并且cell狀態(tài)為非滿 if r.readCell == r.writeCell && r.readCell.r == r.readCell.w && r.readCell.fullFlag == false { return true } return false } // Capacity ringBuffer容量 func (r *RingBuffer) Capacity() int { return r.cellCount * CellInitialSize } // Reset 重置為僅指向兩個cell的ring func (r *RingBuffer) Reset() { lastCell := r.readCell.next lastCell.w = 0 lastCell.r = 0 r.readCell.r = 0 r.readCell.w = 0 r.cellCount = CellInitialCount lastCell.next = r.readCell }
2.無限緩存Channle實現(xiàn)
package unlimitSizeChan import "sync/atomic" type T interface{} // UnlimitSizeChan 無限緩存的Channle type UnlimitSizeChan struct { bufCount int64 // 統(tǒng)計元素數(shù)量,原子操作 In chan<- T // 寫入channle Out <-chan T // 讀取channle buffer *RingBuffer // 自適應擴縮容Buf } // Len uc中總共的元素數(shù)量 func (uc UnlimitSizeChan) Len() int { return len(uc.In) + uc.BufLen() + len(uc.Out) } // BufLen uc的buf中的元素數(shù)量 func (uc UnlimitSizeChan) BufLen() int { return int(atomic.LoadInt64(&uc.bufCount)) } // NewUnlimitSizeChan 新建一個無限緩存的Channle,并指定In和Out大小(In和Out設置得一樣大) func NewUnlimitSizeChan(initCapacity int) *UnlimitSizeChan { return NewUnlitSizeChanSize(initCapacity, initCapacity) } // NewUnlitSizeChanSize 新建一個無限緩存的Channle,并指定In和Out大小(In和Out設置得不一樣大) func NewUnlitSizeChanSize(initInCapacity, initOutCapacity int) *UnlimitSizeChan { in := make(chan T, initInCapacity) out := make(chan T, initOutCapacity) ch := UnlimitSizeChan{In: in, Out: out, buffer: NewRingBuffer()} go process(in, out, &ch) return &ch } // 內(nèi)部Worker Groutine實現(xiàn) func process(in, out chan T, ch *UnlimitSizeChan) { defer close(out) // in 關閉,數(shù)據(jù)讀取后也把out關閉 // 不斷從in中讀取數(shù)據(jù)放入到out或者ringbuf中 loop: for { // 第一步:從in中讀取數(shù)據(jù) value, ok := <-in if !ok { // in 關閉了,退出loop break loop } // 第二步:將數(shù)據(jù)存儲到out或者buf中 if atomic.LoadInt64(&ch.bufCount) > 0 { // 當buf中有數(shù)據(jù)時,新數(shù)據(jù)優(yōu)先存放到buf中,確保數(shù)據(jù)FIFO原則 ch.buffer.Write(value) atomic.AddInt64(&ch.bufCount, 1) } else { // out 沒有滿,數(shù)據(jù)放入out中 select { case out <- value: continue default: } // out 滿了,數(shù)據(jù)放入buf中 ch.buffer.Write(value) atomic.AddInt64(&ch.bufCount, 1) } // 第三步:處理buf,一直嘗試把buf中的數(shù)據(jù)放入到out中,直到buf中沒有數(shù)據(jù) for !ch.buffer.IsEmpty() { select { // 為了避免阻塞in,還要嘗試從in中讀取數(shù)據(jù) case val, ok := <-in: if !ok { // in 關閉了,退出loop break loop } // 因為這個時候out是滿的,新數(shù)據(jù)直接放入buf中 ch.buffer.Write(val) atomic.AddInt64(&ch.bufCount, 1) // 將buf中數(shù)據(jù)放入out case out <- ch.buffer.Peek(): ch.buffer.Pop() atomic.AddInt64(&ch.bufCount, -1) if ch.buffer.IsEmpty() { // 避免內(nèi)存泄露 ch.buffer.Reset() atomic.StoreInt64(&ch.bufCount, 0) } } } } // in被關閉退出loop后,buf中還有可能有未處理的數(shù)據(jù),將他們?nèi)雘ut中,并重置buf for !ch.buffer.IsEmpty() { out <- ch.buffer.Pop() atomic.AddInt64(&ch.bufCount, -1) } ch.buffer.Reset() atomic.StoreInt64(&ch.bufCount, 0) }
四.使用
ch := NewUnlimitSizeChan(1000) // or ch := NewUnlitSizeChanSize(100,200) go func() { ? ? for ...... { ? ? ? ? ... ? ? ? ? ch.In <- ... // send values ? ? ? ? ... ? ? } ? ? close(ch.In) // close In channel }() for v := range ch.Out { // read values ? ? fmt.Println(v) }
以上就是Golang無限緩存channel的設計與實現(xiàn)解析的詳細內(nèi)容,更多關于Golang無限緩存channel的資料請關注腳本之家其它相關文章!
相關文章
Go集成swagger實現(xiàn)在線接口文檔的教程指南
wagger是一個用于設計,構建和文檔化API的開源框架,在Go語言中,Swagger可以幫助后端開發(fā)人員快速創(chuàng)建和定義RESTful API,并提供自動生成接口文檔的功能,所以本文給大家介紹了Go集成swagger實現(xiàn)在線接口文檔的方法,需要的朋友可以參考下2024-11-11golang中struct和interface的基礎使用教程
Go不同于一般的面向對象語言,需要我們好好的學習研究,下面這篇文章主要給大家介紹了關于golang中struct和interface的基礎使用的相關資料,需要的朋友可以參考借鑒,下面隨著小編來一起學習學習吧。2018-03-03