使用golang實現(xiàn)一個MapReduce的示例代碼
背景
在日常業(yè)務(wù)開發(fā)中,我們經(jīng)常遇到需要并發(fā)處理的場景。例如:
- 依據(jù)id列表查詢db,獲取數(shù)據(jù)。為了保證查詢性能,單次查詢的id列表長度最好不要超過50(依據(jù)業(yè)務(wù)來判斷),當(dāng)id列表長度超過50時,拆分成并發(fā)請求,減少耗時和提高性能,返回聚合后的結(jié)果
- 外部提供的接口不支持批量寫入/讀取數(shù)據(jù),當(dāng)需要批量處理數(shù)據(jù)時,為了減少耗時和提高性能,并發(fā)請求外部接口
以上處理數(shù)據(jù)的場景,都可以分成兩個階段:
- 請求階段?;径际荌O操作,請求db,或者是調(diào)用外部接口
- 處理階段。對返回的數(shù)據(jù)進(jìn)行轉(zhuǎn)換,過濾,聚合等操作
同步調(diào)用,調(diào)用耗時增長明顯
并發(fā)調(diào)用,可以減少調(diào)用耗時
分析
上面說的處理數(shù)據(jù)的場景,都可以分成兩個階段:
- 請求階段。IO操作,可以并發(fā)的去進(jìn)行,互不干擾
- 處理階段。同步進(jìn)行,保證聚合結(jié)果的正確性
這種是一種特殊的MapReduce
為了處理這類場景,我們需要明確以下幾個部分:
- 列表長度。代表有多少數(shù)據(jù)需要進(jìn)行處理
- map函數(shù)。并發(fā)處理的函數(shù),互不干擾
- reduce函數(shù)。同步處理的函數(shù)
- 最大并發(fā)數(shù)。決定需要開多少線程/協(xié)程來處理
- 拆分長度。列表長度 / 拆分長度 = 子任務(wù)數(shù)
由于我在日常開發(fā)中常使用golang語言,下面梳理下使用golang來解決這類問題的一個思路
函數(shù)簽名
func ChunkProcess(length int, procedure func(start, end int) (interface{}, error), reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int)
核心邏輯:
當(dāng)最大并發(fā)數(shù) <= 1 或者子任務(wù)數(shù)(列表長度 / 拆分長度) <= 1時,同步執(zhí)行map函數(shù)和reduce函數(shù)即可
其余情況,并發(fā)處理map函數(shù),同步執(zhí)行reduce函數(shù)
- 獲取并發(fā)處理的子任務(wù)數(shù)量:lengthTask := int(math.Ceil(float64(length) / float64(chunkSize)))
- 通過sync.Mutex保證reduce同步執(zhí)行
- 通過sync.WaitGroup保證等待子任務(wù)全部執(zhí)行完成
- 通過chan控制最大并發(fā)數(shù)
代碼實現(xiàn)
package test import ( "math" "sync" ) func ChunkProcess(length int, procedure func(start, end int) (interface{}, error), reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) { if length < 1 { return } if maxConcurrent <= 1 || length <= chunkSize { doChunkProcessSerially(length, procedure, reduce, chunkSize) } else { doChunkProcessConcurrently(length, procedure, reduce, maxConcurrent, chunkSize) } } // 同步處理 func doChunkProcessSerially(length int, procedure func(start, end int) (interface{}, error), reduce func(partialResult interface{}, partialErr error, start, end int), chunkSize int) { // 拆分的子任務(wù)數(shù) chunkNums := int(math.Ceil(float64(length) / float64(chunkSize))) for i := 0; i < chunkNums; i++ { func(chunkIndex int) { defer func() { if err := recover(); err != nil { // 自定義錯誤處理 } }() start := chunkIndex * chunkSize end := start + chunkSize if end > length { end = length } // 執(zhí)行map response, err := procedure(start, end) // 執(zhí)行reduce if reduce != nil { reduce(response, err, start, end) } }(i) } } // 并發(fā)處理 func doChunkProcessConcurrently(length int, procedure func(start, end int) (interface{}, error), reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) { index := 0 chunkIndex := 0 // 拆分的子任務(wù)數(shù) lengthTask := int(math.Ceil(float64(length) / float64(chunkSize))) // 保證reduce同步執(zhí)行 var lock sync.Mutex // 保證子任務(wù)全部執(zhí)行完成 var wg sync.WaitGroup wg.Add(lengthTask) // 控制并發(fā)數(shù) throttleChan := make(chan struct{}, maxConcurrent) for { start := index end := index + chunkSize if end > length { end = length } throttleChan <- struct{}{} go func(chunkIndex int) { defer func() { <-throttleChan if err := recover(); err != nil { // 自定義錯誤處理 } wg.Done() }() // 執(zhí)行map response, err := procedure(start, end) // 執(zhí)行reduce if reduce != nil { lock.Lock() defer lock.Unlock() reduce(response, err, start, end) } }(chunkIndex) chunkIndex++ index = index + chunkSize if index >= length { break } } wg.Wait() close(throttleChan) }
測試:
func TestChunkProcess(t *testing.T) { trackIDs := []int64{123, 456, 789} results := make([]int64, 0) ChunkProcess(len(trackIDs), func(start, end int) (interface{}, error) { result := trackIDs[start] + 100 return result, nil }, func(partialResult interface{}, partialErr error, start, end int) { results = append(results, partialResult.(int64)) }, 2, 1) fmt.Println(results) }
總結(jié)
多對業(yè)務(wù)場景進(jìn)行抽象分析,為這一類場景提供解決方案
以上就是使用golang實現(xiàn)一個MapReduce的詳細(xì)內(nèi)容,更多關(guān)于golang實現(xiàn)MapReduce的資料請關(guān)注腳本之家其它相關(guān)文章!
- Go語言實現(xiàn)MapReduce的示例代碼
- golang并發(fā)工具M(jìn)apReduce降低服務(wù)響應(yīng)時間
- Golang編程并發(fā)工具庫MapReduce使用實踐
- golang如何實現(xiàn)mapreduce單進(jìn)程版本詳解
- MongoDB中MapReduce的使用方法詳解
- Mongodb中MapReduce實現(xiàn)數(shù)據(jù)聚合方法詳解
- MongoDB學(xué)習(xí)筆記之MapReduce使用示例
- MongoDB中的MapReduce簡介
- MongoDB中MapReduce編程模型使用實例
- Go通用的?MapReduce?工具函數(shù)詳解
相關(guān)文章
一文搞懂Golang 時間和日期相關(guān)函數(shù)
這篇文章主要介紹了Golang 時間和日期相關(guān)函數(shù),本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-12-12淺談golang package中init方法的多處定義及運(yùn)行順序問題
這篇文章主要介紹了淺談golang package中init方法的多處定義及運(yùn)行順序問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-05-05Golang 實現(xiàn)復(fù)制文件夾同時復(fù)制文件
這篇文章主要介紹了Golang 實現(xiàn)復(fù)制文件夾同時復(fù)制文件,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12Go語言基于HTTP的內(nèi)存緩存服務(wù)的實現(xiàn)
這篇文章主要介紹了Go語言基于HTTP的內(nèi)存緩存服務(wù),本程序采用REST接口,支持設(shè)置(Set)、獲取(Get)和刪除(Del)這3個基本操作,同時還支持對緩存服務(wù)狀態(tài)進(jìn)行查詢,需要的朋友可以參考下2022-08-08