Go?channel實現(xiàn)批量讀取數(shù)據(jù)
有時候批量積攢一批數(shù)據(jù)集中處理,是一個高效的提高程序性能的方法,比如我們可以批量寫入數(shù)據(jù)庫,批量發(fā)送消息到 kafka,批量寫入網(wǎng)絡(luò)數(shù)據(jù)等等。 批量把數(shù)據(jù)收集出來,我們常用 channel 類型,此時 channel 的功能就是一個 buffer,多個生產(chǎn)者把數(shù)據(jù)寫入到 channel 中,消費(fèi)者從 channel 中讀取數(shù)據(jù),但是 Go 的 channel 并沒有提供批量讀取的方法,我們需要自己實現(xiàn)一個。
github.com/smallnest/exp/chanx 庫
當(dāng)然我已經(jīng)實現(xiàn)了一個 batch 庫,你可以直接拿來用,本文主要介紹它的功能、使用方法以及設(shè)計原理和考量:github.com/smallnest/exp/chanx[1]。
我們可以使用這個庫的Batch
方法來批量讀取數(shù)據(jù),它的定義如下:
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any"))
- 第一個參數(shù)是
Context
,可以讓調(diào)用者主動取消或者超時控制 - 第二個參數(shù)是 channel,我們從這個 channel 中讀取數(shù)據(jù)。channel 可以在外部被關(guān)閉
- 第三個參數(shù)是批處理的大小,也就是我們從 channel 中讀取一批數(shù)據(jù)的最大量
- 第四個參數(shù)是一個函數(shù),我們把從 channel 中讀取的一批數(shù)據(jù)傳遞給這個函數(shù),由這個函數(shù)來處理這批數(shù)據(jù)
舉一個例子:
func TestBatch(t *testing.T) { ch := make(chan int, 10) for i := 0; i < 10; i++ { ch <- i } count := 0 go Batch[int](context.Background( "int"), ch, 5, func(batch []int) { if len(batch) != 5 { assert.Fail(t, "expected batch size 5, got %d", len(batch)) } count += len(batch) }) time.Sleep(time.Second) close(ch) assert.Equal(t, 10, count) }
這個例子一開始我們把 10 個數(shù)據(jù)寫入到一個 channel 中,然后我們從 channel 中批量讀取,每次讀取 5 個,然后把這 5 個數(shù)據(jù)傳遞給一個函數(shù)來處理,我們可以看到,我們讀取了兩次,每次讀取 5 個,總共讀取了 10 個數(shù)據(jù)。
我們還可以使用FlatBatch
方法來批量讀取批量數(shù)據(jù),它的定義如下:
func FlatBatch[T any](ctx context.Context, ch <-chan []T, batchSize int, fn func([]T "T any"))
這個函數(shù)和Batch
類似,只不過它的 channel 中的數(shù)據(jù)是一個切片,每次從 channel 中讀取到一個切片后,把這個切片中的數(shù)據(jù)展開放入到一批數(shù)據(jù)中,最后再傳遞給處理函數(shù)。所以它有Flat
和Batch
兩個功能。
舉一個例子:
func TestFlatBatch(t *testing.T) { ch := make(chan []int, 10) for i := 0; i < 10; i++ { ch <- []int{i, i} } count := 0 go FlatBatch[int](context.Background( "int"), ch, 5, func(batch []int) { assert.NotEmpty(t, batch) count += len(batch) }) time.Sleep(time.Second) close(ch) assert.Equal(t, 20, count) }
在這個例子中,我們把 10 個切片寫入到 channel 中,每個切片中有兩個元素,然后我們從 channel 中批量讀取并展開,放入到一個 batch 中,如果 batch 中的數(shù)據(jù)大于貨等于 5 個,就把這 5 個數(shù)據(jù)傳遞給一個函數(shù)來處理,我們可以看到,我們讀取了兩次,每次讀取 5 個,總共讀取了 10 個數(shù)據(jù)。
實現(xiàn)原理和考量
想要從 channel 中批量讀取數(shù)據(jù),我們需要考慮以下幾個問題:
- 我們需要設(shè)定一個批處理的大小,不能無限制的讀取而不處理,否則會把消費(fèi)者餓死,內(nèi)存也會爆表
- 從 channel 中讀取數(shù)據(jù)的時候,如果 channel 中沒有數(shù)據(jù),我們需要等待,直到 channel 中有數(shù)據(jù),或者 channel 被關(guān)閉。
- 不能無限制的等待,或者長時間的等待,否則消費(fèi)者會饑餓,而且時延太長業(yè)務(wù)不允許
我先舉一個簡單但是不太好的實現(xiàn)方式,我們在它的基礎(chǔ)上做優(yōu)化:
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) { var batch = make([]T, 0, batchSize) for { select { case <-ctx.Done(): if len(batch) > 0 { fn(batch) } return case v, ok := <-ch: if !ok { // closed fn(batch) return } batch = append(batch, v) if len(batch) == batchSize { // full fn(batch) batch = make([]T, 0, batchSize) // reset } } } }
這個實現(xiàn)中我們使用了一個batch
變量來保存從 channel 中讀取的數(shù)據(jù),當(dāng)batch
中的數(shù)據(jù)量達(dá)到batchSize
時,我們就把這個batch
傳遞給處理函數(shù),然后清空batch
,繼續(xù)讀取數(shù)據(jù)。
這個實現(xiàn)的一個最大的問題就是,如果 channel 中沒有數(shù)據(jù),并且當(dāng)前 batch 的數(shù)量還未達(dá)到預(yù)期, 我們就會一直等待,直到 channel 中有數(shù)據(jù),或者 channel 被關(guān)閉,這樣會導(dǎo)致消費(fèi)者饑餓。
我們可以使用select
語句來解決這個問題,我們可以在select
語句中加入一個default
分支,當(dāng) channel 中沒有數(shù)據(jù)的時候,就會執(zhí)行default
分支以便在 channel 中沒有數(shù)據(jù)的時候,我們能夠把已讀取到的數(shù)據(jù)也能交給函數(shù) fn 去處理。
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) { var batch = make([]T, 0, batchSize) for { select { case <-ctx.Done(): if len(batch) > 0 { fn(batch) } return case v, ok := <-ch: if !ok { // closed fn(batch) return } batch = append(batch, v) if len(batch) == batchSize { // full fn(batch) batch = make([]T, 0, batchSize) // reset } default: if len(batch) > 0 { fn(batch) batch = make([]T, 0, batchSize) // reset } } } }
這個實現(xiàn)貌似解決了消費(fèi)者饑餓的問題,但是也會帶來一個新的問題,如果 channel 中總是沒有數(shù)據(jù),那么我們總是落入default
分支中,導(dǎo)致 CPU 空轉(zhuǎn),這個 goroutine 可能導(dǎo)致 CPU 占用 100%, 這樣也不行。
有些人會使用time.After
來解決這個問題,我們可以在select
語句中加入一個time.After
分支,當(dāng) channel 中沒有數(shù)據(jù)的時候,就會執(zhí)行time.After
分支,這樣我們就可以在 channel 中沒有數(shù)據(jù)的時候,等待一段時間,如果還是沒有數(shù)據(jù),就把已讀取到的數(shù)據(jù)也能交給函數(shù) fn 去處理。
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) { var batch = make([]T, 0, batchSize) for { select { case <-ctx.Done(): if len(batch) > 0 { fn(batch) } return case v, ok := <-ch: if !ok { // closed fn(batch) return } batch = append(batch, v) if len(batch) == batchSize { // full fn(batch) batch = make([]T, 0, batchSize) // reset } case <-time.After(100 * time.Millisecond): if len(batch) > 0 { fn(batch) batch = make([]T, 0, batchSize) // reset } } } }
這樣貌似解決了 CPU 空轉(zhuǎn)的問題,如果你測試這個實現(xiàn),生產(chǎn)者在生產(chǎn)數(shù)據(jù)很慢的時候,程序的 CPU 的確不會占用 100%。 但是正如有經(jīng)驗的 Gopher 意識到的那樣,這個實現(xiàn)還是有問題的,如果生產(chǎn)者生產(chǎn)數(shù)據(jù)的速度很快,而消費(fèi)者處理數(shù)據(jù)的速度很慢,那么我們就會產(chǎn)生大量的Timer
,這些 Timer 不能及時的被回收,可能導(dǎo)致大量的內(nèi)存占用,而且如果有大量的 Timer,也會導(dǎo)致 Go 運(yùn)行時處理 Timer 的性能。
這里我提出一個新的解決辦法,在這個庫中實現(xiàn)了,我們不應(yīng)該使用time.After
,因為time.After
既帶來了性能的問題,還可能導(dǎo)致它在休眠的時候不能及時讀取 channel 中的數(shù)據(jù),導(dǎo)致業(yè)務(wù)時延增加。
最終的實現(xiàn)如下:
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) { var batch = make([]T, 0, batchSize) for { select { case <-ctx.Done(): if len(batch) > 0 { fn(batch) } return case v, ok := <-ch: if !ok { // closed fn(batch) return } batch = append(batch, v) if len(batch) == batchSize { // full fn(batch) batch = make([]T, 0, batchSize) // reset } default: if len(batch) > 0 { // partial fn(batch) batch = make([]T, 0, batchSize) // reset } else { // empty // wait for more select { case <-ctx.Done(): if len(batch) > 0 { fn(batch) } return case v, ok := <-ch: if !ok { return } batch = append(batch, v) } } } } }
這個實現(xiàn)的巧妙之處在于default
出來。
如果代碼運(yùn)行落入到default
分支,說明當(dāng)前 channel 中沒有數(shù)據(jù)可讀。那么它會檢查當(dāng)前的batch
中是否有數(shù)據(jù),如果有,就把這個batch
傳遞給處理函數(shù),然后清空batch
,繼續(xù)讀取數(shù)據(jù)。這樣已讀取的數(shù)據(jù)能夠及時得到處理。
如果當(dāng)前的batch
中沒有數(shù)據(jù),那么它會再次進(jìn)入select
語句,等待 channel 中有數(shù)據(jù),或者 channel 被關(guān)閉,或者ctx
被取消。如果 channel 中沒有數(shù)據(jù),那么它會被阻塞,直到 channel 中有數(shù)據(jù),或者 channel 被關(guān)閉,或者ctx
被取消。這樣就能夠及時的讀取 channel 中的數(shù)據(jù),而不會導(dǎo)致 CPU 空轉(zhuǎn)。
通過在default
分支中的特殊處理,我們就可以低時延高效的從 channel 中批量讀取數(shù)據(jù)了。
以上就是Go channel實現(xiàn)批量讀取數(shù)據(jù)的詳細(xì)內(nèi)容,更多關(guān)于Go channel讀取數(shù)據(jù)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
通過手機(jī)案例理解Go設(shè)計模式之裝飾器模式的功能屬性
這篇文章主要為大家介紹了Go設(shè)計模式之裝飾器模式的功能屬性,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-05-05golang基礎(chǔ)之waitgroup用法以及使用要點
WaitGroup是Golang并發(fā)的兩種方式之一,一個是Channel,另一個是WaitGroup,下面這篇文章主要給大家介紹了關(guān)于golang基礎(chǔ)之waitgroup用法以及使用要點的相關(guān)資料,需要的朋友可以參考下2023-01-01