Go通用的?MapReduce?工具函數(shù)詳解
前言
最近在測試學(xué)習(xí) aws s3 sdk 中的 Multi Part Upload 功能,其基本步驟就是 CreateMultipartUpload
后, 串行或并行地 UploadPart
,最后 CompleteMultipartUpload
或 AbortMultipartUpload
收尾。為了最高效率地完成整個傳輸,中間的 UploadPart 部分使用多個 goroutine 并發(fā)地上傳是最快地。因此嘗試著寫了一下,并完美地實現(xiàn)。
擴展
雖然已經(jīng)完成對應(yīng)功能的開發(fā)和測試,但仔細分析一下,發(fā)現(xiàn)有大量的模式代碼,比如:
- 創(chuàng)建指定個數(shù)的 goroutine, 并使用
sync.WaitGroup
管理和同步. - 使用 chan 提供待處理數(shù)據(jù),并接受處理結(jié)果
- 看起來整個處理流程就是典型的 map-reduce 結(jié)構(gòu) 或者 說是 Java Stream/ParallelStream 中的 Map, Reduce.
網(wǎng)上搜索一下, 發(fā)現(xiàn)很多人也有這個需求,也寫了不少庫,但實測了一下,發(fā)現(xiàn)根本不好用。于是決定自己再造一個“自己覺得比較好的”輪子,因此有了 mapreduce 和本篇文章。
主要功能函數(shù)
func Map[T any, R any](ctx context.Context, inputs []T, mapper MapperFunc[T, R]) ResultsMap[T, R]
- 這是最簡單的同步 Map, 通過泛型的 T 和 R 支持任意類型的數(shù)據(jù)轉(zhuǎn)換
func ParallelMap[T any, R any](ctx context.Context, inputs []T, concurrency int, name string, mapper MapperFunc[T, R]) ResultsMap[T, R]
- 這是并發(fā)的Map,內(nèi)部回啟動最多 2+concurrency 個 goroutine, 并發(fā)的處理完
inputs
中的所有數(shù)據(jù). 并且結(jié)果可以按照輸入的順序排序。 func StreamMap[T any, R any](ctx context.Context, concurrency int, name string, startIndex int64, chInput chan T, mapper MapperFunc[T, R]) chan *OutputItem[T, R]
類似纖程池的形態(tài), 可以無限地處理 chInput
中的數(shù)據(jù),并將結(jié)果寫入 OutputItem
.
額外說明
錯誤處理
作為一個并發(fā)處理框架,對于錯誤情況也應(yīng)該能很好的支持,有的時候, 一項元素處理失敗了不影響整體的流程處理, 但有的時候其中一項失敗, 就不需要繼續(xù)進行(比如 S3 的 multi part upload, 如果其中一部分失敗, 那其他的部分再上傳也沒有意義了)。因此代碼中定義了 OperationType
類型, 其值分為Continue
或 Stop
, 框架只根據(jù)這個值確認是否繼續(xù)處理, 而不是根據(jù) mapper 函數(shù)是否返回 error.
結(jié)果返回
并發(fā)處理時, 每個 Item 的處理時長/順序等是不同的,而且有可能因為錯誤造成部分輸入元素尚未處理即結(jié)束,因此返回的結(jié)果默認情況下不一定能和輸入順序一一對應(yīng),因此采用了 Map 的方式保存輸入序號 => 結(jié)果。
排序
s3 的 multi part upload 在調(diào)用 CompleteMultipartUpload
時參數(shù) Parts
需要是排好序的,因此通過 ConvertResult
函數(shù)對結(jié)果進行排序。
測試代碼
注意: 并發(fā)處理帶錯誤數(shù)據(jù)的時候, 由于錯誤項的處理順序比較隨機, 因此我使用了 concurrency: 1
的方式保證 UT 能順利判斷。如果將 concurrency
更改為大于1的情況, 其 want 不一定能滿足. 比如: “error with stop” 時, 如果 concurrency
> 1, 結(jié)果有可能就不是 [1 2 3 0]
而是 [1 2 3 0 4 5]
了, 這種屬于正?,F(xiàn)象(自己更改測試一下即可理解 )
func TestMap(t *testing.T) { type args struct { ctx context.Context inputs []string concurrency int mapper MapperFunc[string, int] } tests := []struct { name string args args want []int wantErrs []error opType OperationType }{ { name: "all successful", args: args{ctx: context.Background(), inputs: []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}, concurrency: runtime.NumCPU(), mapper: convertStopFunc, }, want: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, wantErrs: []error{nil, nil, nil, nil, nil, nil, nil, nil, nil, nil}, opType: Continue, }, { name: "error with continue", args: args{ctx: context.Background(), inputs: []string{"1", "not", "3"}, concurrency: 1, mapper: convertContinueFunc}, want: []int{1, 0, 3}, wantErrs: []error{nil, numberErrHelper("not"), nil}, opType: Continue, // 出現(xiàn)過錯誤,但忽略了. 如果采用 Continue 的方式來處理錯誤, 則只能自己遍歷 ResultsMap 的結(jié)果集才知道是否有錯誤 }, { // 注意: 如果并發(fā)度 concurrency > 1, 則結(jié)果個數(shù)不確定, 但肯定至少有一個錯誤的 name: "error with stop", args: args{ctx: context.Background(), inputs: []string{"1", "2", "3", "not_exist", "4", "5", "6"}, concurrency: 1, mapper: convertStopFunc}, want: []int{1, 2, 3, 0}, wantErrs: []error{nil, nil, nil, numberErrHelper("not_exist")}, opType: Stop, }, { name: "error last", // 最后一個數(shù)據(jù)出錯時,其返回的結(jié)果數(shù)組長度和輸入數(shù)組的長度相同. 因此不能依靠數(shù)組長度來判斷是否有問題. args: args{ctx: context.Background(), inputs: []string{"1", "2", "3", "not_exist"}, concurrency: 1, mapper: convertStopFunc}, want: []int{1, 2, 3, 0}, wantErrs: []error{nil, nil, nil, numberErrHelper("not_exist")}, opType: Stop, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if true { //使用 Map 串行轉(zhuǎn)換 got := Map(tt.args.ctx, tt.args.inputs, tt.args.mapper) //flog.Infof("Map name=%s, got=%+v", tt.name, got) realResult, errs, opType := got.ConvertResult() du.GoAssertEqual(t, tt.want, realResult, "want") du.GoAssertEqual(t, tt.wantErrs, errs, "wantErrs") du.GoAssertEqual(t, tt.opType, opType, "opType") } if true { //使用 ParallelMap 并行轉(zhuǎn)換 got := ParallelMap(tt.args.ctx, tt.args.inputs, tt.args.concurrency, tt.name, tt.args.mapper) //flog.Infof("ParallelMap name=%s, got=%+v", tt.name, got) realResult, errs, opType := got.ConvertResult() du.GoAssertEqual(t, tt.want, realResult, "want") du.GoAssertEqual(t, tt.wantErrs, errs, "wantErrs") du.GoAssertEqual(t, tt.opType, opType, "opType") } }) } } func TestStreamMap(t *testing.T) { ctx := context.Background() inItemCount := 10000 chInput := make(chan string) go func() { for i := 0; i < inItemCount; i++ { idx := rand.Intn(100) chInput <- fmt.Sprintf("%d", idx) } close(chInput) }() //啟動 100 個 纖程并行處理 inItemCount(10000) 個數(shù)據(jù)的轉(zhuǎn)換 chOutput := StreamMap(ctx, 100, "testStreamMap", 100, chInput, convertContinueFunc) mapResultCount := 0 for outItem := range chOutput { mapResultCount++ flog.Debugf("outItem=%v", outItem) } du.GoAssertEqual(t, inItemCount, mapResultCount, "inItemCount") }
##補充信息
- 因為眾所周知的原因, 以后 go-library 的代碼將只更新 https://gitee.com/fishjam/go-library, 不再更新 github 上的版本.
- S3 的 multi upload 不需要大家自己寫,
manager.NewUploader
已經(jīng)提供了完整的實現(xiàn), 比大多數(shù)人實現(xiàn)得更好。
到此這篇關(guān)于Go通用的 MapReduce 工具函數(shù)的文章就介紹到這了,更多相關(guān)Go MapReduce 工具函數(shù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Go語言實現(xiàn)MapReduce的示例代碼
- 使用golang實現(xiàn)一個MapReduce的示例代碼
- golang并發(fā)工具MapReduce降低服務(wù)響應(yīng)時間
- Golang編程并發(fā)工具庫MapReduce使用實踐
- golang如何實現(xiàn)mapreduce單進程版本詳解
- MongoDB中MapReduce的使用方法詳解
- Mongodb中MapReduce實現(xiàn)數(shù)據(jù)聚合方法詳解
- MongoDB學(xué)習(xí)筆記之MapReduce使用示例
- MongoDB中的MapReduce簡介
- MongoDB中MapReduce編程模型使用實例
相關(guān)文章
golang并發(fā)編程中Goroutine 協(xié)程的實現(xiàn)
Go語言中的協(xié)程是一種輕量級線程,通過在函數(shù)前加go關(guān)鍵字來并發(fā)執(zhí)行,具有動態(tài)棧、快速啟動和低內(nèi)存使用等特點,本文就來詳細的介紹一下,感興趣的可以了解一下2024-10-10Golang迭代如何在Go中循環(huán)數(shù)據(jù)結(jié)構(gòu)使用詳解
這篇文章主要為大家介紹了Golang迭代之如何在Go中循環(huán)數(shù)據(jù)結(jié)構(gòu)使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-10-10