使用golang實現(xiàn)一個MapReduce的示例代碼
背景
在日常業(yè)務(wù)開發(fā)中,我們經(jīng)常遇到需要并發(fā)處理的場景。例如:
- 依據(jù)id列表查詢db,獲取數(shù)據(jù)。為了保證查詢性能,單次查詢的id列表長度最好不要超過50(依據(jù)業(yè)務(wù)來判斷),當(dāng)id列表長度超過50時,拆分成并發(fā)請求,減少耗時和提高性能,返回聚合后的結(jié)果
- 外部提供的接口不支持批量寫入/讀取數(shù)據(jù),當(dāng)需要批量處理數(shù)據(jù)時,為了減少耗時和提高性能,并發(fā)請求外部接口
以上處理數(shù)據(jù)的場景,都可以分成兩個階段:
- 請求階段?;径际荌O操作,請求db,或者是調(diào)用外部接口
- 處理階段。對返回的數(shù)據(jù)進行轉(zhuǎn)換,過濾,聚合等操作
同步調(diào)用,調(diào)用耗時增長明顯

并發(fā)調(diào)用,可以減少調(diào)用耗時

分析
上面說的處理數(shù)據(jù)的場景,都可以分成兩個階段:
- 請求階段。IO操作,可以并發(fā)的去進行,互不干擾
- 處理階段。同步進行,保證聚合結(jié)果的正確性
這種是一種特殊的MapReduce

為了處理這類場景,我們需要明確以下幾個部分:
- 列表長度。代表有多少數(shù)據(jù)需要進行處理
- map函數(shù)。并發(fā)處理的函數(shù),互不干擾
- reduce函數(shù)。同步處理的函數(shù)
- 最大并發(fā)數(shù)。決定需要開多少線程/協(xié)程來處理
- 拆分長度。列表長度 / 拆分長度 = 子任務(wù)數(shù)
由于我在日常開發(fā)中常使用golang語言,下面梳理下使用golang來解決這類問題的一個思路
函數(shù)簽名
func ChunkProcess(length int, procedure func(start, end int) (interface{}, error),
reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) 核心邏輯:
當(dāng)最大并發(fā)數(shù) <= 1 或者子任務(wù)數(shù)(列表長度 / 拆分長度) <= 1時,同步執(zhí)行map函數(shù)和reduce函數(shù)即可
其余情況,并發(fā)處理map函數(shù),同步執(zhí)行reduce函數(shù)
- 獲取并發(fā)處理的子任務(wù)數(shù)量:lengthTask := int(math.Ceil(float64(length) / float64(chunkSize)))
- 通過sync.Mutex保證reduce同步執(zhí)行
- 通過sync.WaitGroup保證等待子任務(wù)全部執(zhí)行完成
- 通過chan控制最大并發(fā)數(shù)
代碼實現(xiàn)
package test
import (
"math"
"sync"
)
func ChunkProcess(length int, procedure func(start, end int) (interface{}, error),
reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) {
if length < 1 {
return
}
if maxConcurrent <= 1 || length <= chunkSize {
doChunkProcessSerially(length, procedure, reduce, chunkSize)
} else {
doChunkProcessConcurrently(length, procedure, reduce, maxConcurrent, chunkSize)
}
}
// 同步處理
func doChunkProcessSerially(length int, procedure func(start, end int) (interface{}, error),
reduce func(partialResult interface{}, partialErr error, start, end int), chunkSize int) {
// 拆分的子任務(wù)數(shù)
chunkNums := int(math.Ceil(float64(length) / float64(chunkSize)))
for i := 0; i < chunkNums; i++ {
func(chunkIndex int) {
defer func() {
if err := recover(); err != nil {
// 自定義錯誤處理
}
}()
start := chunkIndex * chunkSize
end := start + chunkSize
if end > length {
end = length
}
// 執(zhí)行map
response, err := procedure(start, end)
// 執(zhí)行reduce
if reduce != nil {
reduce(response, err, start, end)
}
}(i)
}
}
// 并發(fā)處理
func doChunkProcessConcurrently(length int, procedure func(start, end int) (interface{}, error),
reduce func(partialResult interface{}, partialErr error, start, end int), maxConcurrent int, chunkSize int) {
index := 0
chunkIndex := 0
// 拆分的子任務(wù)數(shù)
lengthTask := int(math.Ceil(float64(length) / float64(chunkSize)))
// 保證reduce同步執(zhí)行
var lock sync.Mutex
// 保證子任務(wù)全部執(zhí)行完成
var wg sync.WaitGroup
wg.Add(lengthTask)
// 控制并發(fā)數(shù)
throttleChan := make(chan struct{}, maxConcurrent)
for {
start := index
end := index + chunkSize
if end > length {
end = length
}
throttleChan <- struct{}{}
go func(chunkIndex int) {
defer func() {
<-throttleChan
if err := recover(); err != nil {
// 自定義錯誤處理
}
wg.Done()
}()
// 執(zhí)行map
response, err := procedure(start, end)
// 執(zhí)行reduce
if reduce != nil {
lock.Lock()
defer lock.Unlock()
reduce(response, err, start, end)
}
}(chunkIndex)
chunkIndex++
index = index + chunkSize
if index >= length {
break
}
}
wg.Wait()
close(throttleChan)
}測試:
func TestChunkProcess(t *testing.T) {
trackIDs := []int64{123, 456, 789}
results := make([]int64, 0)
ChunkProcess(len(trackIDs), func(start, end int) (interface{}, error) {
result := trackIDs[start] + 100
return result, nil
}, func(partialResult interface{}, partialErr error, start, end int) {
results = append(results, partialResult.(int64))
}, 2, 1)
fmt.Println(results)
}總結(jié)
多對業(yè)務(wù)場景進行抽象分析,為這一類場景提供解決方案
以上就是使用golang實現(xiàn)一個MapReduce的詳細內(nèi)容,更多關(guān)于golang實現(xiàn)MapReduce的資料請關(guān)注腳本之家其它相關(guān)文章!
- Go語言實現(xiàn)MapReduce的示例代碼
- golang并發(fā)工具MapReduce降低服務(wù)響應(yīng)時間
- Golang編程并發(fā)工具庫MapReduce使用實踐
- golang如何實現(xiàn)mapreduce單進程版本詳解
- MongoDB中MapReduce的使用方法詳解
- Mongodb中MapReduce實現(xiàn)數(shù)據(jù)聚合方法詳解
- MongoDB學(xué)習(xí)筆記之MapReduce使用示例
- MongoDB中的MapReduce簡介
- MongoDB中MapReduce編程模型使用實例
- Go通用的?MapReduce?工具函數(shù)詳解
相關(guān)文章
一文搞懂Golang 時間和日期相關(guān)函數(shù)
這篇文章主要介紹了Golang 時間和日期相關(guān)函數(shù),本文通過實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-12-12
淺談golang package中init方法的多處定義及運行順序問題
這篇文章主要介紹了淺談golang package中init方法的多處定義及運行順序問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-05-05
Golang 實現(xiàn)復(fù)制文件夾同時復(fù)制文件
這篇文章主要介紹了Golang 實現(xiàn)復(fù)制文件夾同時復(fù)制文件,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12
Go語言基于HTTP的內(nèi)存緩存服務(wù)的實現(xiàn)
這篇文章主要介紹了Go語言基于HTTP的內(nèi)存緩存服務(wù),本程序采用REST接口,支持設(shè)置(Set)、獲取(Get)和刪除(Del)這3個基本操作,同時還支持對緩存服務(wù)狀態(tài)進行查詢,需要的朋友可以參考下2022-08-08

