欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Go?channel實現(xiàn)批量讀取數(shù)據(jù)

 更新時間:2023年12月26日 11:03:33   作者:鳥窩聊技術(shù)  
Go中的?channel?其實并沒有提供批量讀取數(shù)據(jù)的方法,需要我們自己實現(xiàn)一個,使用本文就來為大家大家介紹一下如何通過Go?channel實現(xiàn)批量讀取數(shù)據(jù)吧

有時候批量積攢一批數(shù)據(jù)集中處理,是一個高效的提高程序性能的方法,比如我們可以批量寫入數(shù)據(jù)庫,批量發(fā)送消息到 kafka,批量寫入網(wǎng)絡(luò)數(shù)據(jù)等等。 批量把數(shù)據(jù)收集出來,我們常用 channel 類型,此時 channel 的功能就是一個 buffer,多個生產(chǎn)者把數(shù)據(jù)寫入到 channel 中,消費(fèi)者從 channel 中讀取數(shù)據(jù),但是 Go 的 channel 并沒有提供批量讀取的方法,我們需要自己實現(xiàn)一個。

github.com/smallnest/exp/chanx 庫

當(dāng)然我已經(jīng)實現(xiàn)了一個 batch 庫,你可以直接拿來用,本文主要介紹它的功能、使用方法以及設(shè)計原理和考量:github.com/smallnest/exp/chanx[1]。

我們可以使用這個庫的Batch方法來批量讀取數(shù)據(jù),它的定義如下:

func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any"))
  • 第一個參數(shù)是Context,可以讓調(diào)用者主動取消或者超時控制
  • 第二個參數(shù)是 channel,我們從這個 channel 中讀取數(shù)據(jù)。channel 可以在外部被關(guān)閉
  • 第三個參數(shù)是批處理的大小,也就是我們從 channel 中讀取一批數(shù)據(jù)的最大量
  • 第四個參數(shù)是一個函數(shù),我們把從 channel 中讀取的一批數(shù)據(jù)傳遞給這個函數(shù),由這個函數(shù)來處理這批數(shù)據(jù)

舉一個例子:

func TestBatch(t *testing.T) {
 ch := make(chan int, 10)
 for i := 0; i < 10; i++ {
  ch <- i
 }

 count := 0
 go Batch[int](context.Background( "int"), ch, 5, func(batch []int) {
  if len(batch) != 5 {
   assert.Fail(t, "expected batch size 5, got %d", len(batch))
  }
  count += len(batch)
 })
 time.Sleep(time.Second)
 close(ch)
 assert.Equal(t, 10, count)
}

這個例子一開始我們把 10 個數(shù)據(jù)寫入到一個 channel 中,然后我們從 channel 中批量讀取,每次讀取 5 個,然后把這 5 個數(shù)據(jù)傳遞給一個函數(shù)來處理,我們可以看到,我們讀取了兩次,每次讀取 5 個,總共讀取了 10 個數(shù)據(jù)。

我們還可以使用FlatBatch方法來批量讀取批量數(shù)據(jù),它的定義如下:

func FlatBatch[T any](ctx context.Context, ch <-chan []T, batchSize int, fn func([]T "T any"))

這個函數(shù)和Batch類似,只不過它的 channel 中的數(shù)據(jù)是一個切片,每次從 channel 中讀取到一個切片后,把這個切片中的數(shù)據(jù)展開放入到一批數(shù)據(jù)中,最后再傳遞給處理函數(shù)。所以它有FlatBatch兩個功能。

舉一個例子:

func TestFlatBatch(t *testing.T) {
 ch := make(chan []int, 10)
 for i := 0; i < 10; i++ {
  ch <- []int{i, i}
 }

 count := 0
 go FlatBatch[int](context.Background( "int"), ch, 5, func(batch []int) {
  assert.NotEmpty(t, batch)
  count += len(batch)
 })
 time.Sleep(time.Second)
 close(ch)
 assert.Equal(t, 20, count)
}

在這個例子中,我們把 10 個切片寫入到 channel 中,每個切片中有兩個元素,然后我們從 channel 中批量讀取并展開,放入到一個 batch 中,如果 batch 中的數(shù)據(jù)大于貨等于 5 個,就把這 5 個數(shù)據(jù)傳遞給一個函數(shù)來處理,我們可以看到,我們讀取了兩次,每次讀取 5 個,總共讀取了 10 個數(shù)據(jù)。

實現(xiàn)原理和考量

想要從 channel 中批量讀取數(shù)據(jù),我們需要考慮以下幾個問題:

  • 我們需要設(shè)定一個批處理的大小,不能無限制的讀取而不處理,否則會把消費(fèi)者餓死,內(nèi)存也會爆表
  • 從 channel 中讀取數(shù)據(jù)的時候,如果 channel 中沒有數(shù)據(jù),我們需要等待,直到 channel 中有數(shù)據(jù),或者 channel 被關(guān)閉。
  • 不能無限制的等待,或者長時間的等待,否則消費(fèi)者會饑餓,而且時延太長業(yè)務(wù)不允許

我先舉一個簡單但是不太好的實現(xiàn)方式,我們在它的基礎(chǔ)上做優(yōu)化:

func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) {
 var batch = make([]T, 0, batchSize)
 for {
  select {
  case <-ctx.Done():
   if len(batch) > 0 {
    fn(batch)
   }
   return
  case v, ok := <-ch:
   if !ok { // closed
    fn(batch)
    return
   }

   batch = append(batch, v)
   if len(batch) == batchSize { // full
    fn(batch)
    batch = make([]T, 0, batchSize) // reset
   }
  }
 }
}

這個實現(xiàn)中我們使用了一個batch變量來保存從 channel 中讀取的數(shù)據(jù),當(dāng)batch中的數(shù)據(jù)量達(dá)到batchSize時,我們就把這個batch傳遞給處理函數(shù),然后清空batch,繼續(xù)讀取數(shù)據(jù)。

這個實現(xiàn)的一個最大的問題就是,如果 channel 中沒有數(shù)據(jù),并且當(dāng)前 batch 的數(shù)量還未達(dá)到預(yù)期, 我們就會一直等待,直到 channel 中有數(shù)據(jù),或者 channel 被關(guān)閉,這樣會導(dǎo)致消費(fèi)者饑餓。

我們可以使用select語句來解決這個問題,我們可以在select語句中加入一個default分支,當(dāng) channel 中沒有數(shù)據(jù)的時候,就會執(zhí)行default分支以便在 channel 中沒有數(shù)據(jù)的時候,我們能夠把已讀取到的數(shù)據(jù)也能交給函數(shù) fn 去處理。

func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) {
    var batch = make([]T, 0, batchSize)
    for {
        select {
        case <-ctx.Done():
            if len(batch) > 0 {
                fn(batch)
            }
            return
        case v, ok := <-ch:
            if !ok { // closed
                fn(batch)
                return
            }

            batch = append(batch, v)
            if len(batch) == batchSize { // full
                fn(batch)
                batch = make([]T, 0, batchSize) // reset
            }
        default:
            if len(batch) > 0 {
                fn(batch)
                batch = make([]T, 0, batchSize) // reset
            }
        }
    }
}

這個實現(xiàn)貌似解決了消費(fèi)者饑餓的問題,但是也會帶來一個新的問題,如果 channel 中總是沒有數(shù)據(jù),那么我們總是落入default分支中,導(dǎo)致 CPU 空轉(zhuǎn),這個 goroutine 可能導(dǎo)致 CPU 占用 100%, 這樣也不行。

有些人會使用time.After來解決這個問題,我們可以在select語句中加入一個time.After分支,當(dāng) channel 中沒有數(shù)據(jù)的時候,就會執(zhí)行time.After分支,這樣我們就可以在 channel 中沒有數(shù)據(jù)的時候,等待一段時間,如果還是沒有數(shù)據(jù),就把已讀取到的數(shù)據(jù)也能交給函數(shù) fn 去處理。

func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) {
    var batch = make([]T, 0, batchSize)
    for {
        select {
        case <-ctx.Done():
            if len(batch) > 0 {
                fn(batch)
            }
            return
        case v, ok := <-ch:
            if !ok { // closed
                fn(batch)
                return
            }

            batch = append(batch, v)
            if len(batch) == batchSize { // full
                fn(batch)
                batch = make([]T, 0, batchSize) // reset
            }
        case <-time.After(100 * time.Millisecond):
            if len(batch) > 0 {
                fn(batch)
                batch = make([]T, 0, batchSize) // reset
            }
        }
    }
}

這樣貌似解決了 CPU 空轉(zhuǎn)的問題,如果你測試這個實現(xiàn),生產(chǎn)者在生產(chǎn)數(shù)據(jù)很慢的時候,程序的 CPU 的確不會占用 100%。 但是正如有經(jīng)驗的 Gopher 意識到的那樣,這個實現(xiàn)還是有問題的,如果生產(chǎn)者生產(chǎn)數(shù)據(jù)的速度很快,而消費(fèi)者處理數(shù)據(jù)的速度很慢,那么我們就會產(chǎn)生大量的Timer,這些 Timer 不能及時的被回收,可能導(dǎo)致大量的內(nèi)存占用,而且如果有大量的 Timer,也會導(dǎo)致 Go 運(yùn)行時處理 Timer 的性能。

這里我提出一個新的解決辦法,在這個庫中實現(xiàn)了,我們不應(yīng)該使用time.After,因為time.After既帶來了性能的問題,還可能導(dǎo)致它在休眠的時候不能及時讀取 channel 中的數(shù)據(jù),導(dǎo)致業(yè)務(wù)時延增加。

最終的實現(xiàn)如下:

func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) {
 var batch = make([]T, 0, batchSize)
 for {
  select {
  case <-ctx.Done():
   if len(batch) > 0 {
    fn(batch)
   }
   return
  case v, ok := <-ch:
   if !ok { // closed
    fn(batch)
    return
   }

   batch = append(batch, v)
   if len(batch) == batchSize { // full
    fn(batch)
    batch = make([]T, 0, batchSize) // reset
   }
  default:
   if len(batch) > 0 { // partial
    fn(batch)
    batch = make([]T, 0, batchSize) // reset
   } else { // empty
    // wait for more
    select {
    case <-ctx.Done():
     if len(batch) > 0 {
      fn(batch)
     }
     return
    case v, ok := <-ch:
     if !ok {
      return
     }

     batch = append(batch, v)
    }

   }
  }
 }
}

這個實現(xiàn)的巧妙之處在于default出來。

如果代碼運(yùn)行落入到default分支,說明當(dāng)前 channel 中沒有數(shù)據(jù)可讀。那么它會檢查當(dāng)前的batch中是否有數(shù)據(jù),如果有,就把這個batch傳遞給處理函數(shù),然后清空batch,繼續(xù)讀取數(shù)據(jù)。這樣已讀取的數(shù)據(jù)能夠及時得到處理。

如果當(dāng)前的batch中沒有數(shù)據(jù),那么它會再次進(jìn)入select語句,等待 channel 中有數(shù)據(jù),或者 channel 被關(guān)閉,或者ctx被取消。如果 channel 中沒有數(shù)據(jù),那么它會被阻塞,直到 channel 中有數(shù)據(jù),或者 channel 被關(guān)閉,或者ctx被取消。這樣就能夠及時的讀取 channel 中的數(shù)據(jù),而不會導(dǎo)致 CPU 空轉(zhuǎn)。

通過在default分支中的特殊處理,我們就可以低時延高效的從 channel 中批量讀取數(shù)據(jù)了。

以上就是Go channel實現(xiàn)批量讀取數(shù)據(jù)的詳細(xì)內(nèi)容,更多關(guān)于Go channel讀取數(shù)據(jù)的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • go語言題解LeetCode1275找出井字棋的獲勝者示例

    go語言題解LeetCode1275找出井字棋的獲勝者示例

    這篇文章主要為大家介紹了go語言題解LeetCode1275找出井字棋的獲勝者示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-01-01
  • Go語言中字符串賦值中的問題與解決方法

    Go語言中字符串賦值中的問題與解決方法

    這篇文章主要為大家詳細(xì)介紹了Go語言中字符串賦值會出現(xiàn)的一些問題以及解決方法,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以參考一下
    2024-12-12
  • Go ORM的封裝解決方式詳解

    Go ORM的封裝解決方式詳解

    這篇文章主要為大家介紹了Go ORM的封裝解決方式詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-01-01
  • 通過手機(jī)案例理解Go設(shè)計模式之裝飾器模式的功能屬性

    通過手機(jī)案例理解Go設(shè)計模式之裝飾器模式的功能屬性

    這篇文章主要為大家介紹了Go設(shè)計模式之裝飾器模式的功能屬性,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-05-05
  • go使用Gin框架利用阿里云實現(xiàn)短信驗證碼功能

    go使用Gin框架利用阿里云實現(xiàn)短信驗證碼功能

    這篇文章主要介紹了go使用Gin框架利用阿里云實現(xiàn)短信驗證碼,使用json配置文件及配置文件解析,編寫路由controller層,本文通過代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下
    2021-08-08
  • golang基礎(chǔ)之waitgroup用法以及使用要點

    golang基礎(chǔ)之waitgroup用法以及使用要點

    WaitGroup是Golang并發(fā)的兩種方式之一,一個是Channel,另一個是WaitGroup,下面這篇文章主要給大家介紹了關(guān)于golang基礎(chǔ)之waitgroup用法以及使用要點的相關(guān)資料,需要的朋友可以參考下
    2023-01-01
  • 詳解Golang中Channel的用法

    詳解Golang中Channel的用法

    如果說goroutine是Go語言程序的并發(fā)體的話,那么channels則是它們之間的通信機(jī)制。這篇文章主要介紹Golang中Channel的用法,需要的朋友可以參考下
    2020-11-11
  • 使用Golang獲取音視頻時長信息的示例代碼

    使用Golang獲取音視頻時長信息的示例代碼

    這篇文章主要介紹了如何使用Golang獲取音視頻時長信息,文中通過代碼示例講解的非常詳細(xì),對大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下
    2024-03-03
  • golang中tar壓縮和解壓文件詳情

    golang中tar壓縮和解壓文件詳情

    這篇文章主要給大家介紹golang中tar壓縮和解壓文件,文章以查看官方文檔自帶的給大家演習(xí)一下golang的archive/tar壓縮和解壓功能,需要的朋友可以參考一下
    2021-11-11
  • GoFrame框架Scan類型轉(zhuǎn)換實例

    GoFrame框架Scan類型轉(zhuǎn)換實例

    這篇文章主要為大家介紹了GoFrame框架Scan類型轉(zhuǎn)換的實例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-06-06

最新評論