Go?channel實(shí)現(xiàn)批量讀取數(shù)據(jù)
有時(shí)候批量積攢一批數(shù)據(jù)集中處理,是一個(gè)高效的提高程序性能的方法,比如我們可以批量寫(xiě)入數(shù)據(jù)庫(kù),批量發(fā)送消息到 kafka,批量寫(xiě)入網(wǎng)絡(luò)數(shù)據(jù)等等。 批量把數(shù)據(jù)收集出來(lái),我們常用 channel 類(lèi)型,此時(shí) channel 的功能就是一個(gè) buffer,多個(gè)生產(chǎn)者把數(shù)據(jù)寫(xiě)入到 channel 中,消費(fèi)者從 channel 中讀取數(shù)據(jù),但是 Go 的 channel 并沒(méi)有提供批量讀取的方法,我們需要自己實(shí)現(xiàn)一個(gè)。
github.com/smallnest/exp/chanx 庫(kù)
當(dāng)然我已經(jīng)實(shí)現(xiàn)了一個(gè) batch 庫(kù),你可以直接拿來(lái)用,本文主要介紹它的功能、使用方法以及設(shè)計(jì)原理和考量:github.com/smallnest/exp/chanx[1]。
我們可以使用這個(gè)庫(kù)的Batch
方法來(lái)批量讀取數(shù)據(jù),它的定義如下:
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any"))
- 第一個(gè)參數(shù)是
Context
,可以讓調(diào)用者主動(dòng)取消或者超時(shí)控制 - 第二個(gè)參數(shù)是 channel,我們從這個(gè) channel 中讀取數(shù)據(jù)。channel 可以在外部被關(guān)閉
- 第三個(gè)參數(shù)是批處理的大小,也就是我們從 channel 中讀取一批數(shù)據(jù)的最大量
- 第四個(gè)參數(shù)是一個(gè)函數(shù),我們把從 channel 中讀取的一批數(shù)據(jù)傳遞給這個(gè)函數(shù),由這個(gè)函數(shù)來(lái)處理這批數(shù)據(jù)
舉一個(gè)例子:
func TestBatch(t *testing.T) { ch := make(chan int, 10) for i := 0; i < 10; i++ { ch <- i } count := 0 go Batch[int](context.Background( "int"), ch, 5, func(batch []int) { if len(batch) != 5 { assert.Fail(t, "expected batch size 5, got %d", len(batch)) } count += len(batch) }) time.Sleep(time.Second) close(ch) assert.Equal(t, 10, count) }
這個(gè)例子一開(kāi)始我們把 10 個(gè)數(shù)據(jù)寫(xiě)入到一個(gè) channel 中,然后我們從 channel 中批量讀取,每次讀取 5 個(gè),然后把這 5 個(gè)數(shù)據(jù)傳遞給一個(gè)函數(shù)來(lái)處理,我們可以看到,我們讀取了兩次,每次讀取 5 個(gè),總共讀取了 10 個(gè)數(shù)據(jù)。
我們還可以使用FlatBatch
方法來(lái)批量讀取批量數(shù)據(jù),它的定義如下:
func FlatBatch[T any](ctx context.Context, ch <-chan []T, batchSize int, fn func([]T "T any"))
這個(gè)函數(shù)和Batch
類(lèi)似,只不過(guò)它的 channel 中的數(shù)據(jù)是一個(gè)切片,每次從 channel 中讀取到一個(gè)切片后,把這個(gè)切片中的數(shù)據(jù)展開(kāi)放入到一批數(shù)據(jù)中,最后再傳遞給處理函數(shù)。所以它有Flat
和Batch
兩個(gè)功能。
舉一個(gè)例子:
func TestFlatBatch(t *testing.T) { ch := make(chan []int, 10) for i := 0; i < 10; i++ { ch <- []int{i, i} } count := 0 go FlatBatch[int](context.Background( "int"), ch, 5, func(batch []int) { assert.NotEmpty(t, batch) count += len(batch) }) time.Sleep(time.Second) close(ch) assert.Equal(t, 20, count) }
在這個(gè)例子中,我們把 10 個(gè)切片寫(xiě)入到 channel 中,每個(gè)切片中有兩個(gè)元素,然后我們從 channel 中批量讀取并展開(kāi),放入到一個(gè) batch 中,如果 batch 中的數(shù)據(jù)大于貨等于 5 個(gè),就把這 5 個(gè)數(shù)據(jù)傳遞給一個(gè)函數(shù)來(lái)處理,我們可以看到,我們讀取了兩次,每次讀取 5 個(gè),總共讀取了 10 個(gè)數(shù)據(jù)。
實(shí)現(xiàn)原理和考量
想要從 channel 中批量讀取數(shù)據(jù),我們需要考慮以下幾個(gè)問(wèn)題:
- 我們需要設(shè)定一個(gè)批處理的大小,不能無(wú)限制的讀取而不處理,否則會(huì)把消費(fèi)者餓死,內(nèi)存也會(huì)爆表
- 從 channel 中讀取數(shù)據(jù)的時(shí)候,如果 channel 中沒(méi)有數(shù)據(jù),我們需要等待,直到 channel 中有數(shù)據(jù),或者 channel 被關(guān)閉。
- 不能無(wú)限制的等待,或者長(zhǎng)時(shí)間的等待,否則消費(fèi)者會(huì)饑餓,而且時(shí)延太長(zhǎng)業(yè)務(wù)不允許
我先舉一個(gè)簡(jiǎn)單但是不太好的實(shí)現(xiàn)方式,我們?cè)谒幕A(chǔ)上做優(yōu)化:
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) { var batch = make([]T, 0, batchSize) for { select { case <-ctx.Done(): if len(batch) > 0 { fn(batch) } return case v, ok := <-ch: if !ok { // closed fn(batch) return } batch = append(batch, v) if len(batch) == batchSize { // full fn(batch) batch = make([]T, 0, batchSize) // reset } } } }
這個(gè)實(shí)現(xiàn)中我們使用了一個(gè)batch
變量來(lái)保存從 channel 中讀取的數(shù)據(jù),當(dāng)batch
中的數(shù)據(jù)量達(dá)到batchSize
時(shí),我們就把這個(gè)batch
傳遞給處理函數(shù),然后清空batch
,繼續(xù)讀取數(shù)據(jù)。
這個(gè)實(shí)現(xiàn)的一個(gè)最大的問(wèn)題就是,如果 channel 中沒(méi)有數(shù)據(jù),并且當(dāng)前 batch 的數(shù)量還未達(dá)到預(yù)期, 我們就會(huì)一直等待,直到 channel 中有數(shù)據(jù),或者 channel 被關(guān)閉,這樣會(huì)導(dǎo)致消費(fèi)者饑餓。
我們可以使用select
語(yǔ)句來(lái)解決這個(gè)問(wèn)題,我們可以在select
語(yǔ)句中加入一個(gè)default
分支,當(dāng) channel 中沒(méi)有數(shù)據(jù)的時(shí)候,就會(huì)執(zhí)行default
分支以便在 channel 中沒(méi)有數(shù)據(jù)的時(shí)候,我們能夠把已讀取到的數(shù)據(jù)也能交給函數(shù) fn 去處理。
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) { var batch = make([]T, 0, batchSize) for { select { case <-ctx.Done(): if len(batch) > 0 { fn(batch) } return case v, ok := <-ch: if !ok { // closed fn(batch) return } batch = append(batch, v) if len(batch) == batchSize { // full fn(batch) batch = make([]T, 0, batchSize) // reset } default: if len(batch) > 0 { fn(batch) batch = make([]T, 0, batchSize) // reset } } } }
這個(gè)實(shí)現(xiàn)貌似解決了消費(fèi)者饑餓的問(wèn)題,但是也會(huì)帶來(lái)一個(gè)新的問(wèn)題,如果 channel 中總是沒(méi)有數(shù)據(jù),那么我們總是落入default
分支中,導(dǎo)致 CPU 空轉(zhuǎn),這個(gè) goroutine 可能導(dǎo)致 CPU 占用 100%, 這樣也不行。
有些人會(huì)使用time.After
來(lái)解決這個(gè)問(wèn)題,我們可以在select
語(yǔ)句中加入一個(gè)time.After
分支,當(dāng) channel 中沒(méi)有數(shù)據(jù)的時(shí)候,就會(huì)執(zhí)行time.After
分支,這樣我們就可以在 channel 中沒(méi)有數(shù)據(jù)的時(shí)候,等待一段時(shí)間,如果還是沒(méi)有數(shù)據(jù),就把已讀取到的數(shù)據(jù)也能交給函數(shù) fn 去處理。
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) { var batch = make([]T, 0, batchSize) for { select { case <-ctx.Done(): if len(batch) > 0 { fn(batch) } return case v, ok := <-ch: if !ok { // closed fn(batch) return } batch = append(batch, v) if len(batch) == batchSize { // full fn(batch) batch = make([]T, 0, batchSize) // reset } case <-time.After(100 * time.Millisecond): if len(batch) > 0 { fn(batch) batch = make([]T, 0, batchSize) // reset } } } }
這樣貌似解決了 CPU 空轉(zhuǎn)的問(wèn)題,如果你測(cè)試這個(gè)實(shí)現(xiàn),生產(chǎn)者在生產(chǎn)數(shù)據(jù)很慢的時(shí)候,程序的 CPU 的確不會(huì)占用 100%。 但是正如有經(jīng)驗(yàn)的 Gopher 意識(shí)到的那樣,這個(gè)實(shí)現(xiàn)還是有問(wèn)題的,如果生產(chǎn)者生產(chǎn)數(shù)據(jù)的速度很快,而消費(fèi)者處理數(shù)據(jù)的速度很慢,那么我們就會(huì)產(chǎn)生大量的Timer
,這些 Timer 不能及時(shí)的被回收,可能導(dǎo)致大量的內(nèi)存占用,而且如果有大量的 Timer,也會(huì)導(dǎo)致 Go 運(yùn)行時(shí)處理 Timer 的性能。
這里我提出一個(gè)新的解決辦法,在這個(gè)庫(kù)中實(shí)現(xiàn)了,我們不應(yīng)該使用time.After
,因?yàn)?code>time.After既帶來(lái)了性能的問(wèn)題,還可能導(dǎo)致它在休眠的時(shí)候不能及時(shí)讀取 channel 中的數(shù)據(jù),導(dǎo)致業(yè)務(wù)時(shí)延增加。
最終的實(shí)現(xiàn)如下:
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) { var batch = make([]T, 0, batchSize) for { select { case <-ctx.Done(): if len(batch) > 0 { fn(batch) } return case v, ok := <-ch: if !ok { // closed fn(batch) return } batch = append(batch, v) if len(batch) == batchSize { // full fn(batch) batch = make([]T, 0, batchSize) // reset } default: if len(batch) > 0 { // partial fn(batch) batch = make([]T, 0, batchSize) // reset } else { // empty // wait for more select { case <-ctx.Done(): if len(batch) > 0 { fn(batch) } return case v, ok := <-ch: if !ok { return } batch = append(batch, v) } } } } }
這個(gè)實(shí)現(xiàn)的巧妙之處在于default
出來(lái)。
如果代碼運(yùn)行落入到default
分支,說(shuō)明當(dāng)前 channel 中沒(méi)有數(shù)據(jù)可讀。那么它會(huì)檢查當(dāng)前的batch
中是否有數(shù)據(jù),如果有,就把這個(gè)batch
傳遞給處理函數(shù),然后清空batch
,繼續(xù)讀取數(shù)據(jù)。這樣已讀取的數(shù)據(jù)能夠及時(shí)得到處理。
如果當(dāng)前的batch
中沒(méi)有數(shù)據(jù),那么它會(huì)再次進(jìn)入select
語(yǔ)句,等待 channel 中有數(shù)據(jù),或者 channel 被關(guān)閉,或者ctx
被取消。如果 channel 中沒(méi)有數(shù)據(jù),那么它會(huì)被阻塞,直到 channel 中有數(shù)據(jù),或者 channel 被關(guān)閉,或者ctx
被取消。這樣就能夠及時(shí)的讀取 channel 中的數(shù)據(jù),而不會(huì)導(dǎo)致 CPU 空轉(zhuǎn)。
通過(guò)在default
分支中的特殊處理,我們就可以低時(shí)延高效的從 channel 中批量讀取數(shù)據(jù)了。
以上就是Go channel實(shí)現(xiàn)批量讀取數(shù)據(jù)的詳細(xì)內(nèi)容,更多關(guān)于Go channel讀取數(shù)據(jù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
go語(yǔ)言題解LeetCode1275找出井字棋的獲勝者示例
這篇文章主要為大家介紹了go語(yǔ)言題解LeetCode1275找出井字棋的獲勝者示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-01-01通過(guò)手機(jī)案例理解Go設(shè)計(jì)模式之裝飾器模式的功能屬性
這篇文章主要為大家介紹了Go設(shè)計(jì)模式之裝飾器模式的功能屬性,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-05-05go使用Gin框架利用阿里云實(shí)現(xiàn)短信驗(yàn)證碼功能
這篇文章主要介紹了go使用Gin框架利用阿里云實(shí)現(xiàn)短信驗(yàn)證碼,使用json配置文件及配置文件解析,編寫(xiě)路由controller層,本文通過(guò)代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2021-08-08golang基礎(chǔ)之waitgroup用法以及使用要點(diǎn)
WaitGroup是Golang并發(fā)的兩種方式之一,一個(gè)是Channel,另一個(gè)是WaitGroup,下面這篇文章主要給大家介紹了關(guān)于golang基礎(chǔ)之waitgroup用法以及使用要點(diǎn)的相關(guān)資料,需要的朋友可以參考下2023-01-01使用Golang獲取音視頻時(shí)長(zhǎng)信息的示例代碼
這篇文章主要介紹了如何使用Golang獲取音視頻時(shí)長(zhǎng)信息,文中通過(guò)代碼示例講解的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2024-03-03GoFrame框架Scan類(lèi)型轉(zhuǎn)換實(shí)例
這篇文章主要為大家介紹了GoFrame框架Scan類(lèi)型轉(zhuǎn)換的實(shí)例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06