Go?channel實(shí)現(xiàn)批量讀取數(shù)據(jù)
有時(shí)候批量積攢一批數(shù)據(jù)集中處理,是一個(gè)高效的提高程序性能的方法,比如我們可以批量寫入數(shù)據(jù)庫,批量發(fā)送消息到 kafka,批量寫入網(wǎng)絡(luò)數(shù)據(jù)等等。 批量把數(shù)據(jù)收集出來,我們常用 channel 類型,此時(shí) channel 的功能就是一個(gè) buffer,多個(gè)生產(chǎn)者把數(shù)據(jù)寫入到 channel 中,消費(fèi)者從 channel 中讀取數(shù)據(jù),但是 Go 的 channel 并沒有提供批量讀取的方法,我們需要自己實(shí)現(xiàn)一個(gè)。

github.com/smallnest/exp/chanx 庫
當(dāng)然我已經(jīng)實(shí)現(xiàn)了一個(gè) batch 庫,你可以直接拿來用,本文主要介紹它的功能、使用方法以及設(shè)計(jì)原理和考量:github.com/smallnest/exp/chanx[1]。
我們可以使用這個(gè)庫的Batch方法來批量讀取數(shù)據(jù),它的定義如下:
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any"))
- 第一個(gè)參數(shù)是
Context,可以讓調(diào)用者主動(dòng)取消或者超時(shí)控制 - 第二個(gè)參數(shù)是 channel,我們從這個(gè) channel 中讀取數(shù)據(jù)。channel 可以在外部被關(guān)閉
- 第三個(gè)參數(shù)是批處理的大小,也就是我們從 channel 中讀取一批數(shù)據(jù)的最大量
- 第四個(gè)參數(shù)是一個(gè)函數(shù),我們把從 channel 中讀取的一批數(shù)據(jù)傳遞給這個(gè)函數(shù),由這個(gè)函數(shù)來處理這批數(shù)據(jù)
舉一個(gè)例子:
func TestBatch(t *testing.T) {
ch := make(chan int, 10)
for i := 0; i < 10; i++ {
ch <- i
}
count := 0
go Batch[int](context.Background( "int"), ch, 5, func(batch []int) {
if len(batch) != 5 {
assert.Fail(t, "expected batch size 5, got %d", len(batch))
}
count += len(batch)
})
time.Sleep(time.Second)
close(ch)
assert.Equal(t, 10, count)
}
這個(gè)例子一開始我們把 10 個(gè)數(shù)據(jù)寫入到一個(gè) channel 中,然后我們從 channel 中批量讀取,每次讀取 5 個(gè),然后把這 5 個(gè)數(shù)據(jù)傳遞給一個(gè)函數(shù)來處理,我們可以看到,我們讀取了兩次,每次讀取 5 個(gè),總共讀取了 10 個(gè)數(shù)據(jù)。
我們還可以使用FlatBatch方法來批量讀取批量數(shù)據(jù),它的定義如下:
func FlatBatch[T any](ctx context.Context, ch <-chan []T, batchSize int, fn func([]T "T any"))
這個(gè)函數(shù)和Batch類似,只不過它的 channel 中的數(shù)據(jù)是一個(gè)切片,每次從 channel 中讀取到一個(gè)切片后,把這個(gè)切片中的數(shù)據(jù)展開放入到一批數(shù)據(jù)中,最后再傳遞給處理函數(shù)。所以它有Flat和Batch兩個(gè)功能。
舉一個(gè)例子:
func TestFlatBatch(t *testing.T) {
ch := make(chan []int, 10)
for i := 0; i < 10; i++ {
ch <- []int{i, i}
}
count := 0
go FlatBatch[int](context.Background( "int"), ch, 5, func(batch []int) {
assert.NotEmpty(t, batch)
count += len(batch)
})
time.Sleep(time.Second)
close(ch)
assert.Equal(t, 20, count)
}
在這個(gè)例子中,我們把 10 個(gè)切片寫入到 channel 中,每個(gè)切片中有兩個(gè)元素,然后我們從 channel 中批量讀取并展開,放入到一個(gè) batch 中,如果 batch 中的數(shù)據(jù)大于貨等于 5 個(gè),就把這 5 個(gè)數(shù)據(jù)傳遞給一個(gè)函數(shù)來處理,我們可以看到,我們讀取了兩次,每次讀取 5 個(gè),總共讀取了 10 個(gè)數(shù)據(jù)。
實(shí)現(xiàn)原理和考量
想要從 channel 中批量讀取數(shù)據(jù),我們需要考慮以下幾個(gè)問題:
- 我們需要設(shè)定一個(gè)批處理的大小,不能無限制的讀取而不處理,否則會(huì)把消費(fèi)者餓死,內(nèi)存也會(huì)爆表
- 從 channel 中讀取數(shù)據(jù)的時(shí)候,如果 channel 中沒有數(shù)據(jù),我們需要等待,直到 channel 中有數(shù)據(jù),或者 channel 被關(guān)閉。
- 不能無限制的等待,或者長時(shí)間的等待,否則消費(fèi)者會(huì)饑餓,而且時(shí)延太長業(yè)務(wù)不允許
我先舉一個(gè)簡單但是不太好的實(shí)現(xiàn)方式,我們在它的基礎(chǔ)上做優(yōu)化:
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) {
var batch = make([]T, 0, batchSize)
for {
select {
case <-ctx.Done():
if len(batch) > 0 {
fn(batch)
}
return
case v, ok := <-ch:
if !ok { // closed
fn(batch)
return
}
batch = append(batch, v)
if len(batch) == batchSize { // full
fn(batch)
batch = make([]T, 0, batchSize) // reset
}
}
}
}
這個(gè)實(shí)現(xiàn)中我們使用了一個(gè)batch變量來保存從 channel 中讀取的數(shù)據(jù),當(dāng)batch中的數(shù)據(jù)量達(dá)到batchSize時(shí),我們就把這個(gè)batch傳遞給處理函數(shù),然后清空batch,繼續(xù)讀取數(shù)據(jù)。
這個(gè)實(shí)現(xiàn)的一個(gè)最大的問題就是,如果 channel 中沒有數(shù)據(jù),并且當(dāng)前 batch 的數(shù)量還未達(dá)到預(yù)期, 我們就會(huì)一直等待,直到 channel 中有數(shù)據(jù),或者 channel 被關(guān)閉,這樣會(huì)導(dǎo)致消費(fèi)者饑餓。
我們可以使用select語句來解決這個(gè)問題,我們可以在select語句中加入一個(gè)default分支,當(dāng) channel 中沒有數(shù)據(jù)的時(shí)候,就會(huì)執(zhí)行default分支以便在 channel 中沒有數(shù)據(jù)的時(shí)候,我們能夠把已讀取到的數(shù)據(jù)也能交給函數(shù) fn 去處理。
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) {
var batch = make([]T, 0, batchSize)
for {
select {
case <-ctx.Done():
if len(batch) > 0 {
fn(batch)
}
return
case v, ok := <-ch:
if !ok { // closed
fn(batch)
return
}
batch = append(batch, v)
if len(batch) == batchSize { // full
fn(batch)
batch = make([]T, 0, batchSize) // reset
}
default:
if len(batch) > 0 {
fn(batch)
batch = make([]T, 0, batchSize) // reset
}
}
}
}
這個(gè)實(shí)現(xiàn)貌似解決了消費(fèi)者饑餓的問題,但是也會(huì)帶來一個(gè)新的問題,如果 channel 中總是沒有數(shù)據(jù),那么我們總是落入default分支中,導(dǎo)致 CPU 空轉(zhuǎn),這個(gè) goroutine 可能導(dǎo)致 CPU 占用 100%, 這樣也不行。
有些人會(huì)使用time.After來解決這個(gè)問題,我們可以在select語句中加入一個(gè)time.After分支,當(dāng) channel 中沒有數(shù)據(jù)的時(shí)候,就會(huì)執(zhí)行time.After分支,這樣我們就可以在 channel 中沒有數(shù)據(jù)的時(shí)候,等待一段時(shí)間,如果還是沒有數(shù)據(jù),就把已讀取到的數(shù)據(jù)也能交給函數(shù) fn 去處理。
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) {
var batch = make([]T, 0, batchSize)
for {
select {
case <-ctx.Done():
if len(batch) > 0 {
fn(batch)
}
return
case v, ok := <-ch:
if !ok { // closed
fn(batch)
return
}
batch = append(batch, v)
if len(batch) == batchSize { // full
fn(batch)
batch = make([]T, 0, batchSize) // reset
}
case <-time.After(100 * time.Millisecond):
if len(batch) > 0 {
fn(batch)
batch = make([]T, 0, batchSize) // reset
}
}
}
}
這樣貌似解決了 CPU 空轉(zhuǎn)的問題,如果你測試這個(gè)實(shí)現(xiàn),生產(chǎn)者在生產(chǎn)數(shù)據(jù)很慢的時(shí)候,程序的 CPU 的確不會(huì)占用 100%。 但是正如有經(jīng)驗(yàn)的 Gopher 意識到的那樣,這個(gè)實(shí)現(xiàn)還是有問題的,如果生產(chǎn)者生產(chǎn)數(shù)據(jù)的速度很快,而消費(fèi)者處理數(shù)據(jù)的速度很慢,那么我們就會(huì)產(chǎn)生大量的Timer,這些 Timer 不能及時(shí)的被回收,可能導(dǎo)致大量的內(nèi)存占用,而且如果有大量的 Timer,也會(huì)導(dǎo)致 Go 運(yùn)行時(shí)處理 Timer 的性能。
這里我提出一個(gè)新的解決辦法,在這個(gè)庫中實(shí)現(xiàn)了,我們不應(yīng)該使用time.After,因?yàn)?code>time.After既帶來了性能的問題,還可能導(dǎo)致它在休眠的時(shí)候不能及時(shí)讀取 channel 中的數(shù)據(jù),導(dǎo)致業(yè)務(wù)時(shí)延增加。
最終的實(shí)現(xiàn)如下:
func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) {
var batch = make([]T, 0, batchSize)
for {
select {
case <-ctx.Done():
if len(batch) > 0 {
fn(batch)
}
return
case v, ok := <-ch:
if !ok { // closed
fn(batch)
return
}
batch = append(batch, v)
if len(batch) == batchSize { // full
fn(batch)
batch = make([]T, 0, batchSize) // reset
}
default:
if len(batch) > 0 { // partial
fn(batch)
batch = make([]T, 0, batchSize) // reset
} else { // empty
// wait for more
select {
case <-ctx.Done():
if len(batch) > 0 {
fn(batch)
}
return
case v, ok := <-ch:
if !ok {
return
}
batch = append(batch, v)
}
}
}
}
}
這個(gè)實(shí)現(xiàn)的巧妙之處在于default出來。
如果代碼運(yùn)行落入到default分支,說明當(dāng)前 channel 中沒有數(shù)據(jù)可讀。那么它會(huì)檢查當(dāng)前的batch中是否有數(shù)據(jù),如果有,就把這個(gè)batch傳遞給處理函數(shù),然后清空batch,繼續(xù)讀取數(shù)據(jù)。這樣已讀取的數(shù)據(jù)能夠及時(shí)得到處理。
如果當(dāng)前的batch中沒有數(shù)據(jù),那么它會(huì)再次進(jìn)入select語句,等待 channel 中有數(shù)據(jù),或者 channel 被關(guān)閉,或者ctx被取消。如果 channel 中沒有數(shù)據(jù),那么它會(huì)被阻塞,直到 channel 中有數(shù)據(jù),或者 channel 被關(guān)閉,或者ctx被取消。這樣就能夠及時(shí)的讀取 channel 中的數(shù)據(jù),而不會(huì)導(dǎo)致 CPU 空轉(zhuǎn)。
通過在default分支中的特殊處理,我們就可以低時(shí)延高效的從 channel 中批量讀取數(shù)據(jù)了。
以上就是Go channel實(shí)現(xiàn)批量讀取數(shù)據(jù)的詳細(xì)內(nèi)容,更多關(guān)于Go channel讀取數(shù)據(jù)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
通過手機(jī)案例理解Go設(shè)計(jì)模式之裝飾器模式的功能屬性
這篇文章主要為大家介紹了Go設(shè)計(jì)模式之裝飾器模式的功能屬性,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-05-05
go使用Gin框架利用阿里云實(shí)現(xiàn)短信驗(yàn)證碼功能
這篇文章主要介紹了go使用Gin框架利用阿里云實(shí)現(xiàn)短信驗(yàn)證碼,使用json配置文件及配置文件解析,編寫路由controller層,本文通過代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下2021-08-08
golang基礎(chǔ)之waitgroup用法以及使用要點(diǎn)
WaitGroup是Golang并發(fā)的兩種方式之一,一個(gè)是Channel,另一個(gè)是WaitGroup,下面這篇文章主要給大家介紹了關(guān)于golang基礎(chǔ)之waitgroup用法以及使用要點(diǎn)的相關(guān)資料,需要的朋友可以參考下2023-01-01
GoFrame框架Scan類型轉(zhuǎn)換實(shí)例
這篇文章主要為大家介紹了GoFrame框架Scan類型轉(zhuǎn)換的實(shí)例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06

