欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Go?channel實(shí)現(xiàn)批量讀取數(shù)據(jù)

 更新時(shí)間:2023年12月26日 11:03:33   作者:鳥(niǎo)窩聊技術(shù)  
Go中的?channel?其實(shí)并沒(méi)有提供批量讀取數(shù)據(jù)的方法,需要我們自己實(shí)現(xiàn)一個(gè),使用本文就來(lái)為大家大家介紹一下如何通過(guò)Go?channel實(shí)現(xiàn)批量讀取數(shù)據(jù)吧

有時(shí)候批量積攢一批數(shù)據(jù)集中處理,是一個(gè)高效的提高程序性能的方法,比如我們可以批量寫(xiě)入數(shù)據(jù)庫(kù),批量發(fā)送消息到 kafka,批量寫(xiě)入網(wǎng)絡(luò)數(shù)據(jù)等等。 批量把數(shù)據(jù)收集出來(lái),我們常用 channel 類(lèi)型,此時(shí) channel 的功能就是一個(gè) buffer,多個(gè)生產(chǎn)者把數(shù)據(jù)寫(xiě)入到 channel 中,消費(fèi)者從 channel 中讀取數(shù)據(jù),但是 Go 的 channel 并沒(méi)有提供批量讀取的方法,我們需要自己實(shí)現(xiàn)一個(gè)。

github.com/smallnest/exp/chanx 庫(kù)

當(dāng)然我已經(jīng)實(shí)現(xiàn)了一個(gè) batch 庫(kù),你可以直接拿來(lái)用,本文主要介紹它的功能、使用方法以及設(shè)計(jì)原理和考量:github.com/smallnest/exp/chanx[1]。

我們可以使用這個(gè)庫(kù)的Batch方法來(lái)批量讀取數(shù)據(jù),它的定義如下:

func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any"))
  • 第一個(gè)參數(shù)是Context,可以讓調(diào)用者主動(dòng)取消或者超時(shí)控制
  • 第二個(gè)參數(shù)是 channel,我們從這個(gè) channel 中讀取數(shù)據(jù)。channel 可以在外部被關(guān)閉
  • 第三個(gè)參數(shù)是批處理的大小,也就是我們從 channel 中讀取一批數(shù)據(jù)的最大量
  • 第四個(gè)參數(shù)是一個(gè)函數(shù),我們把從 channel 中讀取的一批數(shù)據(jù)傳遞給這個(gè)函數(shù),由這個(gè)函數(shù)來(lái)處理這批數(shù)據(jù)

舉一個(gè)例子:

func TestBatch(t *testing.T) {
 ch := make(chan int, 10)
 for i := 0; i < 10; i++ {
  ch <- i
 }

 count := 0
 go Batch[int](context.Background( "int"), ch, 5, func(batch []int) {
  if len(batch) != 5 {
   assert.Fail(t, "expected batch size 5, got %d", len(batch))
  }
  count += len(batch)
 })
 time.Sleep(time.Second)
 close(ch)
 assert.Equal(t, 10, count)
}

這個(gè)例子一開(kāi)始我們把 10 個(gè)數(shù)據(jù)寫(xiě)入到一個(gè) channel 中,然后我們從 channel 中批量讀取,每次讀取 5 個(gè),然后把這 5 個(gè)數(shù)據(jù)傳遞給一個(gè)函數(shù)來(lái)處理,我們可以看到,我們讀取了兩次,每次讀取 5 個(gè),總共讀取了 10 個(gè)數(shù)據(jù)。

我們還可以使用FlatBatch方法來(lái)批量讀取批量數(shù)據(jù),它的定義如下:

func FlatBatch[T any](ctx context.Context, ch <-chan []T, batchSize int, fn func([]T "T any"))

這個(gè)函數(shù)和Batch類(lèi)似,只不過(guò)它的 channel 中的數(shù)據(jù)是一個(gè)切片,每次從 channel 中讀取到一個(gè)切片后,把這個(gè)切片中的數(shù)據(jù)展開(kāi)放入到一批數(shù)據(jù)中,最后再傳遞給處理函數(shù)。所以它有FlatBatch兩個(gè)功能。

舉一個(gè)例子:

func TestFlatBatch(t *testing.T) {
 ch := make(chan []int, 10)
 for i := 0; i < 10; i++ {
  ch <- []int{i, i}
 }

 count := 0
 go FlatBatch[int](context.Background( "int"), ch, 5, func(batch []int) {
  assert.NotEmpty(t, batch)
  count += len(batch)
 })
 time.Sleep(time.Second)
 close(ch)
 assert.Equal(t, 20, count)
}

在這個(gè)例子中,我們把 10 個(gè)切片寫(xiě)入到 channel 中,每個(gè)切片中有兩個(gè)元素,然后我們從 channel 中批量讀取并展開(kāi),放入到一個(gè) batch 中,如果 batch 中的數(shù)據(jù)大于貨等于 5 個(gè),就把這 5 個(gè)數(shù)據(jù)傳遞給一個(gè)函數(shù)來(lái)處理,我們可以看到,我們讀取了兩次,每次讀取 5 個(gè),總共讀取了 10 個(gè)數(shù)據(jù)。

實(shí)現(xiàn)原理和考量

想要從 channel 中批量讀取數(shù)據(jù),我們需要考慮以下幾個(gè)問(wèn)題:

  • 我們需要設(shè)定一個(gè)批處理的大小,不能無(wú)限制的讀取而不處理,否則會(huì)把消費(fèi)者餓死,內(nèi)存也會(huì)爆表
  • 從 channel 中讀取數(shù)據(jù)的時(shí)候,如果 channel 中沒(méi)有數(shù)據(jù),我們需要等待,直到 channel 中有數(shù)據(jù),或者 channel 被關(guān)閉。
  • 不能無(wú)限制的等待,或者長(zhǎng)時(shí)間的等待,否則消費(fèi)者會(huì)饑餓,而且時(shí)延太長(zhǎng)業(yè)務(wù)不允許

我先舉一個(gè)簡(jiǎn)單但是不太好的實(shí)現(xiàn)方式,我們?cè)谒幕A(chǔ)上做優(yōu)化:

func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) {
 var batch = make([]T, 0, batchSize)
 for {
  select {
  case <-ctx.Done():
   if len(batch) > 0 {
    fn(batch)
   }
   return
  case v, ok := <-ch:
   if !ok { // closed
    fn(batch)
    return
   }

   batch = append(batch, v)
   if len(batch) == batchSize { // full
    fn(batch)
    batch = make([]T, 0, batchSize) // reset
   }
  }
 }
}

這個(gè)實(shí)現(xiàn)中我們使用了一個(gè)batch變量來(lái)保存從 channel 中讀取的數(shù)據(jù),當(dāng)batch中的數(shù)據(jù)量達(dá)到batchSize時(shí),我們就把這個(gè)batch傳遞給處理函數(shù),然后清空batch,繼續(xù)讀取數(shù)據(jù)。

這個(gè)實(shí)現(xiàn)的一個(gè)最大的問(wèn)題就是,如果 channel 中沒(méi)有數(shù)據(jù),并且當(dāng)前 batch 的數(shù)量還未達(dá)到預(yù)期, 我們就會(huì)一直等待,直到 channel 中有數(shù)據(jù),或者 channel 被關(guān)閉,這樣會(huì)導(dǎo)致消費(fèi)者饑餓。

我們可以使用select語(yǔ)句來(lái)解決這個(gè)問(wèn)題,我們可以在select語(yǔ)句中加入一個(gè)default分支,當(dāng) channel 中沒(méi)有數(shù)據(jù)的時(shí)候,就會(huì)執(zhí)行default分支以便在 channel 中沒(méi)有數(shù)據(jù)的時(shí)候,我們能夠把已讀取到的數(shù)據(jù)也能交給函數(shù) fn 去處理。

func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) {
    var batch = make([]T, 0, batchSize)
    for {
        select {
        case <-ctx.Done():
            if len(batch) > 0 {
                fn(batch)
            }
            return
        case v, ok := <-ch:
            if !ok { // closed
                fn(batch)
                return
            }

            batch = append(batch, v)
            if len(batch) == batchSize { // full
                fn(batch)
                batch = make([]T, 0, batchSize) // reset
            }
        default:
            if len(batch) > 0 {
                fn(batch)
                batch = make([]T, 0, batchSize) // reset
            }
        }
    }
}

這個(gè)實(shí)現(xiàn)貌似解決了消費(fèi)者饑餓的問(wèn)題,但是也會(huì)帶來(lái)一個(gè)新的問(wèn)題,如果 channel 中總是沒(méi)有數(shù)據(jù),那么我們總是落入default分支中,導(dǎo)致 CPU 空轉(zhuǎn),這個(gè) goroutine 可能導(dǎo)致 CPU 占用 100%, 這樣也不行。

有些人會(huì)使用time.After來(lái)解決這個(gè)問(wèn)題,我們可以在select語(yǔ)句中加入一個(gè)time.After分支,當(dāng) channel 中沒(méi)有數(shù)據(jù)的時(shí)候,就會(huì)執(zhí)行time.After分支,這樣我們就可以在 channel 中沒(méi)有數(shù)據(jù)的時(shí)候,等待一段時(shí)間,如果還是沒(méi)有數(shù)據(jù),就把已讀取到的數(shù)據(jù)也能交給函數(shù) fn 去處理。

func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) {
    var batch = make([]T, 0, batchSize)
    for {
        select {
        case <-ctx.Done():
            if len(batch) > 0 {
                fn(batch)
            }
            return
        case v, ok := <-ch:
            if !ok { // closed
                fn(batch)
                return
            }

            batch = append(batch, v)
            if len(batch) == batchSize { // full
                fn(batch)
                batch = make([]T, 0, batchSize) // reset
            }
        case <-time.After(100 * time.Millisecond):
            if len(batch) > 0 {
                fn(batch)
                batch = make([]T, 0, batchSize) // reset
            }
        }
    }
}

這樣貌似解決了 CPU 空轉(zhuǎn)的問(wèn)題,如果你測(cè)試這個(gè)實(shí)現(xiàn),生產(chǎn)者在生產(chǎn)數(shù)據(jù)很慢的時(shí)候,程序的 CPU 的確不會(huì)占用 100%。 但是正如有經(jīng)驗(yàn)的 Gopher 意識(shí)到的那樣,這個(gè)實(shí)現(xiàn)還是有問(wèn)題的,如果生產(chǎn)者生產(chǎn)數(shù)據(jù)的速度很快,而消費(fèi)者處理數(shù)據(jù)的速度很慢,那么我們就會(huì)產(chǎn)生大量的Timer,這些 Timer 不能及時(shí)的被回收,可能導(dǎo)致大量的內(nèi)存占用,而且如果有大量的 Timer,也會(huì)導(dǎo)致 Go 運(yùn)行時(shí)處理 Timer 的性能。

這里我提出一個(gè)新的解決辦法,在這個(gè)庫(kù)中實(shí)現(xiàn)了,我們不應(yīng)該使用time.After,因?yàn)?code>time.After既帶來(lái)了性能的問(wèn)題,還可能導(dǎo)致它在休眠的時(shí)候不能及時(shí)讀取 channel 中的數(shù)據(jù),導(dǎo)致業(yè)務(wù)時(shí)延增加。

最終的實(shí)現(xiàn)如下:

func Batch[T any](ctx context.Context, ch <-chan T, batchSize int, fn func([]T "T any")) {
 var batch = make([]T, 0, batchSize)
 for {
  select {
  case <-ctx.Done():
   if len(batch) > 0 {
    fn(batch)
   }
   return
  case v, ok := <-ch:
   if !ok { // closed
    fn(batch)
    return
   }

   batch = append(batch, v)
   if len(batch) == batchSize { // full
    fn(batch)
    batch = make([]T, 0, batchSize) // reset
   }
  default:
   if len(batch) > 0 { // partial
    fn(batch)
    batch = make([]T, 0, batchSize) // reset
   } else { // empty
    // wait for more
    select {
    case <-ctx.Done():
     if len(batch) > 0 {
      fn(batch)
     }
     return
    case v, ok := <-ch:
     if !ok {
      return
     }

     batch = append(batch, v)
    }

   }
  }
 }
}

這個(gè)實(shí)現(xiàn)的巧妙之處在于default出來(lái)。

如果代碼運(yùn)行落入到default分支,說(shuō)明當(dāng)前 channel 中沒(méi)有數(shù)據(jù)可讀。那么它會(huì)檢查當(dāng)前的batch中是否有數(shù)據(jù),如果有,就把這個(gè)batch傳遞給處理函數(shù),然后清空batch,繼續(xù)讀取數(shù)據(jù)。這樣已讀取的數(shù)據(jù)能夠及時(shí)得到處理。

如果當(dāng)前的batch中沒(méi)有數(shù)據(jù),那么它會(huì)再次進(jìn)入select語(yǔ)句,等待 channel 中有數(shù)據(jù),或者 channel 被關(guān)閉,或者ctx被取消。如果 channel 中沒(méi)有數(shù)據(jù),那么它會(huì)被阻塞,直到 channel 中有數(shù)據(jù),或者 channel 被關(guān)閉,或者ctx被取消。這樣就能夠及時(shí)的讀取 channel 中的數(shù)據(jù),而不會(huì)導(dǎo)致 CPU 空轉(zhuǎn)。

通過(guò)在default分支中的特殊處理,我們就可以低時(shí)延高效的從 channel 中批量讀取數(shù)據(jù)了。

以上就是Go channel實(shí)現(xiàn)批量讀取數(shù)據(jù)的詳細(xì)內(nèi)容,更多關(guān)于Go channel讀取數(shù)據(jù)的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • go語(yǔ)言題解LeetCode1275找出井字棋的獲勝者示例

    go語(yǔ)言題解LeetCode1275找出井字棋的獲勝者示例

    這篇文章主要為大家介紹了go語(yǔ)言題解LeetCode1275找出井字棋的獲勝者示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-01-01
  • Go語(yǔ)言中字符串賦值中的問(wèn)題與解決方法

    Go語(yǔ)言中字符串賦值中的問(wèn)題與解決方法

    這篇文章主要為大家詳細(xì)介紹了Go語(yǔ)言中字符串賦值會(huì)出現(xiàn)的一些問(wèn)題以及解決方法,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以參考一下
    2024-12-12
  • Go ORM的封裝解決方式詳解

    Go ORM的封裝解決方式詳解

    這篇文章主要為大家介紹了Go ORM的封裝解決方式詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-01-01
  • 通過(guò)手機(jī)案例理解Go設(shè)計(jì)模式之裝飾器模式的功能屬性

    通過(guò)手機(jī)案例理解Go設(shè)計(jì)模式之裝飾器模式的功能屬性

    這篇文章主要為大家介紹了Go設(shè)計(jì)模式之裝飾器模式的功能屬性,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-05-05
  • go使用Gin框架利用阿里云實(shí)現(xiàn)短信驗(yàn)證碼功能

    go使用Gin框架利用阿里云實(shí)現(xiàn)短信驗(yàn)證碼功能

    這篇文章主要介紹了go使用Gin框架利用阿里云實(shí)現(xiàn)短信驗(yàn)證碼,使用json配置文件及配置文件解析,編寫(xiě)路由controller層,本文通過(guò)代碼給大家介紹的非常詳細(xì),需要的朋友可以參考下
    2021-08-08
  • golang基礎(chǔ)之waitgroup用法以及使用要點(diǎn)

    golang基礎(chǔ)之waitgroup用法以及使用要點(diǎn)

    WaitGroup是Golang并發(fā)的兩種方式之一,一個(gè)是Channel,另一個(gè)是WaitGroup,下面這篇文章主要給大家介紹了關(guān)于golang基礎(chǔ)之waitgroup用法以及使用要點(diǎn)的相關(guān)資料,需要的朋友可以參考下
    2023-01-01
  • 詳解Golang中Channel的用法

    詳解Golang中Channel的用法

    如果說(shuō)goroutine是Go語(yǔ)言程序的并發(fā)體的話(huà),那么channels則是它們之間的通信機(jī)制。這篇文章主要介紹Golang中Channel的用法,需要的朋友可以參考下
    2020-11-11
  • 使用Golang獲取音視頻時(shí)長(zhǎng)信息的示例代碼

    使用Golang獲取音視頻時(shí)長(zhǎng)信息的示例代碼

    這篇文章主要介紹了如何使用Golang獲取音視頻時(shí)長(zhǎng)信息,文中通過(guò)代碼示例講解的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下
    2024-03-03
  • golang中tar壓縮和解壓文件詳情

    golang中tar壓縮和解壓文件詳情

    這篇文章主要給大家介紹golang中tar壓縮和解壓文件,文章以查看官方文檔自帶的給大家演習(xí)一下golang的archive/tar壓縮和解壓功能,需要的朋友可以參考一下
    2021-11-11
  • GoFrame框架Scan類(lèi)型轉(zhuǎn)換實(shí)例

    GoFrame框架Scan類(lèi)型轉(zhuǎn)換實(shí)例

    這篇文章主要為大家介紹了GoFrame框架Scan類(lèi)型轉(zhuǎn)換的實(shí)例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-06-06

最新評(píng)論