欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

使用golang實現(xiàn)一個MapReduce的示例代碼

 更新時間:2023年09月21日 10:57:26   作者:寫代碼的lorre  
這篇文章主要給大家介紹了關(guān)于如何使用golang實現(xiàn)一個MapReduce,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

背景

在日常業(yè)務(wù)開發(fā)中,我們經(jīng)常遇到需要并發(fā)處理的場景。例如:

  • 依據(jù)id列表查詢db,獲取數(shù)據(jù)。為了保證查詢性能,單次查詢的id列表長度最好不要超過50(依據(jù)業(yè)務(wù)來判斷),當(dāng)id列表長度超過50時,拆分成并發(fā)請求,減少耗時和提高性能,返回聚合后的結(jié)果
  • 外部提供的接口不支持批量寫入/讀取數(shù)據(jù),當(dāng)需要批量處理數(shù)據(jù)時,為了減少耗時和提高性能,并發(fā)請求外部接口

以上處理數(shù)據(jù)的場景,都可以分成兩個階段:

  • 請求階段?;径际荌O操作,請求db,或者是調(diào)用外部接口
  • 處理階段。對返回的數(shù)據(jù)進(jìn)行轉(zhuǎn)換,過濾,聚合等操作

同步調(diào)用,調(diào)用耗時增長明顯

并發(fā)調(diào)用,可以減少調(diào)用耗時

分析

上面說的處理數(shù)據(jù)的場景,都可以分成兩個階段:

  • 請求階段。IO操作,可以并發(fā)的去進(jìn)行,互不干擾
  • 處理階段。同步進(jìn)行,保證聚合結(jié)果的正確性

這種是一種特殊的MapReduce

為了處理這類場景,我們需要明確以下幾個部分:

  • 列表長度。代表有多少數(shù)據(jù)需要進(jìn)行處理
  • map函數(shù)。并發(fā)處理的函數(shù),互不干擾
  • reduce函數(shù)。同步處理的函數(shù)
  • 最大并發(fā)數(shù)。決定需要開多少線程/協(xié)程來處理
  • 拆分長度。列表長度 / 拆分長度 = 子任務(wù)數(shù)

由于我在日常開發(fā)中常使用golang語言,下面梳理下使用golang來解決這類問題的一個思路

函數(shù)簽名

func ChunkProcess(length int, procedure func(start, end int) (interface{}, error),
   reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) 

核心邏輯:

  • 當(dāng)最大并發(fā)數(shù) <= 1 或者子任務(wù)數(shù)(列表長度 / 拆分長度) <= 1時,同步執(zhí)行map函數(shù)和reduce函數(shù)即可

  • 其余情況,并發(fā)處理map函數(shù),同步執(zhí)行reduce函數(shù)

    • 獲取并發(fā)處理的子任務(wù)數(shù)量:lengthTask := int(math.Ceil(float64(length) / float64(chunkSize)))
    • 通過sync.Mutex保證reduce同步執(zhí)行
    • 通過sync.WaitGroup保證等待子任務(wù)全部執(zhí)行完成
    • 通過chan控制最大并發(fā)數(shù)

代碼實現(xiàn)

package test
import (
   "math"
   "sync"
)
func ChunkProcess(length int, procedure func(start, end int) (interface{}, error),
   reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) {
   if length < 1 {
      return
   }
   if maxConcurrent <= 1 || length <= chunkSize {
      doChunkProcessSerially(length, procedure, reduce, chunkSize)
   } else {
      doChunkProcessConcurrently(length, procedure, reduce, maxConcurrent, chunkSize)
   }
}
// 同步處理
func doChunkProcessSerially(length int, procedure func(start, end int) (interface{}, error),
   reduce func(partialResult interface{}, partialErr error, start, end int), chunkSize int) {
   // 拆分的子任務(wù)數(shù)
   chunkNums := int(math.Ceil(float64(length) / float64(chunkSize)))
   for i := 0; i < chunkNums; i++ {
      func(chunkIndex int) {
         defer func() {
            if err := recover(); err != nil {
               // 自定義錯誤處理
            }
         }()
         start := chunkIndex * chunkSize
         end := start + chunkSize
         if end > length {
            end = length
         }
         // 執(zhí)行map
         response, err := procedure(start, end)
         // 執(zhí)行reduce
         if reduce != nil {
            reduce(response, err, start, end)
         }
      }(i)
   }
}
// 并發(fā)處理
func doChunkProcessConcurrently(length int, procedure func(start, end int) (interface{}, error),
   reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) {
   index := 0
   chunkIndex := 0
   // 拆分的子任務(wù)數(shù)
   lengthTask := int(math.Ceil(float64(length) / float64(chunkSize)))
   // 保證reduce同步執(zhí)行
   var lock sync.Mutex
   // 保證子任務(wù)全部執(zhí)行完成
   var wg sync.WaitGroup
   wg.Add(lengthTask)
   // 控制并發(fā)數(shù)
   throttleChan := make(chan struct{}, maxConcurrent)
   for {
      start := index
      end := index + chunkSize
      if end > length {
         end = length
      }
      throttleChan <- struct{}{}
      go func(chunkIndex int) {
         defer func() {
            <-throttleChan
            if err := recover(); err != nil {
               // 自定義錯誤處理
            }
            wg.Done()
         }()
         // 執(zhí)行map
         response, err := procedure(start, end)
         // 執(zhí)行reduce
         if reduce != nil {
            lock.Lock()
            defer lock.Unlock()
            reduce(response, err, start, end)
         }
      }(chunkIndex)
      chunkIndex++
      index = index + chunkSize
      if index >= length {
         break
      }
   }
   wg.Wait()
   close(throttleChan)
}

測試:

func TestChunkProcess(t *testing.T) {
   trackIDs := []int64{123, 456, 789}
   results := make([]int64, 0)
   ChunkProcess(len(trackIDs), func(start, end int) (interface{}, error) {
      result := trackIDs[start] + 100
      return result, nil
   }, func(partialResult interface{}, partialErr error, start, end int) {
      results = append(results, partialResult.(int64))
   }, 2, 1)
   fmt.Println(results)
}

總結(jié)

多對業(yè)務(wù)場景進(jìn)行抽象分析,為這一類場景提供解決方案

以上就是使用golang實現(xiàn)一個MapReduce的詳細(xì)內(nèi)容,更多關(guān)于golang實現(xiàn)MapReduce的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Go接口的用法詳解

    Go接口的用法詳解

    本文主要介紹了Go接口的用法詳解,包括定義接口、實現(xiàn)接口、使用接口、空接口等,通過接口,可以實現(xiàn)多態(tài)性,即一個對象可以實現(xiàn)多個接口,從而實現(xiàn)不同接口的行為,感興趣的可以了解一下
    2023-11-11
  • 一文搞懂Golang 時間和日期相關(guān)函數(shù)

    一文搞懂Golang 時間和日期相關(guān)函數(shù)

    這篇文章主要介紹了Golang 時間和日期相關(guān)函數(shù),本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2021-12-12
  • Go 語言結(jié)構(gòu)實例分析

    Go 語言結(jié)構(gòu)實例分析

    在本篇文章里小編給大家整理的是一篇關(guān)于Go 語言結(jié)構(gòu)實例分析的相關(guān)知識點,有興趣的朋友們可以學(xué)習(xí)下。
    2021-07-07
  • 淺談golang package中init方法的多處定義及運(yùn)行順序問題

    淺談golang package中init方法的多處定義及運(yùn)行順序問題

    這篇文章主要介紹了淺談golang package中init方法的多處定義及運(yùn)行順序問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-05-05
  • Golang 實現(xiàn)復(fù)制文件夾同時復(fù)制文件

    Golang 實現(xiàn)復(fù)制文件夾同時復(fù)制文件

    這篇文章主要介紹了Golang 實現(xiàn)復(fù)制文件夾同時復(fù)制文件,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-12-12
  • Go中過濾范型集合性能示例詳解

    Go中過濾范型集合性能示例詳解

    這篇文章主要為大家介紹了Go中過濾范型集合性能示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-03-03
  • Go語言基于HTTP的內(nèi)存緩存服務(wù)的實現(xiàn)

    Go語言基于HTTP的內(nèi)存緩存服務(wù)的實現(xiàn)

    這篇文章主要介紹了Go語言基于HTTP的內(nèi)存緩存服務(wù),本程序采用REST接口,支持設(shè)置(Set)、獲取(Get)和刪除(Del)這3個基本操作,同時還支持對緩存服務(wù)狀態(tài)進(jìn)行查詢,需要的朋友可以參考下
    2022-08-08
  • Golang斷言判斷值類型的實現(xiàn)方法

    Golang斷言判斷值類型的實現(xiàn)方法

    這篇文章主要介紹了Golang斷言判斷值類型的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2021-03-03
  • Go中Context使用源碼解析

    Go中Context使用源碼解析

    這篇文章主要為大家介紹了Go中Context使用源碼解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-04-04
  • Golang?RPC的原理與簡單調(diào)用詳解

    Golang?RPC的原理與簡單調(diào)用詳解

    RPC(Remote?Procedure?Call),主要是幫助我們屏蔽網(wǎng)絡(luò)編程細(xì)節(jié)?,使我們更專注于業(yè)務(wù)邏輯,所以本文主要來和大家聊聊RPC的原理與簡單調(diào)用,希望對大家有所幫助
    2023-05-05

最新評論