欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Go通用的?MapReduce?工具函數(shù)詳解

 更新時間:2024年09月12日 10:20:23   作者:fishjam  
本文介紹了使用Go語言實現(xiàn)的MapReduce框架,特別是在AWSS3?SDK的MultiPartUpload功能中的應(yīng)用,包括并發(fā)上傳和錯誤處理策略,詳細解釋了如何通過并發(fā)goroutines提高上傳效率,并通過MapReduce模型優(yōu)化代碼結(jié)構(gòu)和處理流程,感興趣的朋友跟隨小編一起看看吧

前言

最近在測試學(xué)習(xí) aws s3 sdk 中的 Multi Part Upload 功能,其基本步驟就是 CreateMultipartUpload 后, 串行或并行地 UploadPart ,最后 CompleteMultipartUploadAbortMultipartUpload 收尾。為了最高效率地完成整個傳輸,中間的 UploadPart 部分使用多個 goroutine 并發(fā)地上傳是最快地。因此嘗試著寫了一下,并完美地實現(xiàn)。

擴展

雖然已經(jīng)完成對應(yīng)功能的開發(fā)和測試,但仔細分析一下,發(fā)現(xiàn)有大量的模式代碼,比如:

  • 創(chuàng)建指定個數(shù)的 goroutine, 并使用 sync.WaitGroup 管理和同步.
  • 使用 chan 提供待處理數(shù)據(jù),并接受處理結(jié)果
  • 看起來整個處理流程就是典型的 map-reduce 結(jié)構(gòu) 或者 說是 Java Stream/ParallelStream 中的 Map, Reduce.

網(wǎng)上搜索一下, 發(fā)現(xiàn)很多人也有這個需求,也寫了不少庫,但實測了一下,發(fā)現(xiàn)根本不好用。于是決定自己再造一個“自己覺得比較好的”輪子,因此有了 mapreduce 和本篇文章。

主要功能函數(shù)

  • func Map[T any, R any](ctx context.Context, inputs []T, mapper MapperFunc[T, R]) ResultsMap[T, R]
  • 這是最簡單的同步 Map, 通過泛型的 T 和 R 支持任意類型的數(shù)據(jù)轉(zhuǎn)換
  • func ParallelMap[T any, R any](ctx context.Context, inputs []T, concurrency int, name string, mapper MapperFunc[T, R]) ResultsMap[T, R]
  • 這是并發(fā)的Map,內(nèi)部回啟動最多 2+concurrency 個 goroutine, 并發(fā)的處理完 inputs 中的所有數(shù)據(jù). 并且結(jié)果可以按照輸入的順序排序。
  • func StreamMap[T any, R any](ctx context.Context, concurrency int, name string, startIndex int64, chInput chan T, mapper MapperFunc[T, R]) chan *OutputItem[T, R]

類似纖程池的形態(tài), 可以無限地處理 chInput 中的數(shù)據(jù),并將結(jié)果寫入 OutputItem.

額外說明

錯誤處理

作為一個并發(fā)處理框架,對于錯誤情況也應(yīng)該能很好的支持,有的時候, 一項元素處理失敗了不影響整體的流程處理, 但有的時候其中一項失敗, 就不需要繼續(xù)進行(比如 S3 的 multi part upload, 如果其中一部分失敗, 那其他的部分再上傳也沒有意義了)。因此代碼中定義了 OperationType 類型, 其值分為
ContinueStop , 框架只根據(jù)這個值確認是否繼續(xù)處理, 而不是根據(jù) mapper 函數(shù)是否返回 error.

結(jié)果返回

并發(fā)處理時, 每個 Item 的處理時長/順序等是不同的,而且有可能因為錯誤造成部分輸入元素尚未處理即結(jié)束,因此返回的結(jié)果默認情況下不一定能和輸入順序一一對應(yīng),因此采用了 Map 的方式保存輸入序號 => 結(jié)果。

排序

s3 的 multi part upload 在調(diào)用 CompleteMultipartUpload 時參數(shù) Parts 需要是排好序的,因此通過 ConvertResult 函數(shù)對結(jié)果進行排序。

測試代碼

注意: 并發(fā)處理帶錯誤數(shù)據(jù)的時候, 由于錯誤項的處理順序比較隨機, 因此我使用了 concurrency: 1 的方式保證 UT 能順利判斷。如果將 concurrency 更改為大于1的情況, 其 want 不一定能滿足. 比如: “error with stop” 時, 如果 concurrency > 1, 結(jié)果有可能就不是 [1 2 3 0] 而是 [1 2 3 0 4 5] 了, 這種屬于正?,F(xiàn)象(自己更改測試一下即可理解 )

func TestMap(t *testing.T) {
	type args struct {
		ctx         context.Context
		inputs      []string
		concurrency int
		mapper      MapperFunc[string, int]
	}
	tests := []struct {
		name     string
		args     args
		want     []int
		wantErrs []error
		opType   OperationType
	}{
		{
			name: "all successful",
			args: args{ctx: context.Background(),
				inputs: []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}, concurrency: runtime.NumCPU(), mapper: convertStopFunc,
			},
			want:     []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
			wantErrs: []error{nil, nil, nil, nil, nil, nil, nil, nil, nil, nil},
			opType:   Continue,
		},
		{
			name:     "error with continue",
			args:     args{ctx: context.Background(), inputs: []string{"1", "not", "3"}, concurrency: 1, mapper: convertContinueFunc},
			want:     []int{1, 0, 3},
			wantErrs: []error{nil, numberErrHelper("not"), nil},
			opType:   Continue, // 出現(xiàn)過錯誤,但忽略了. 如果采用 Continue 的方式來處理錯誤, 則只能自己遍歷 ResultsMap 的結(jié)果集才知道是否有錯誤
		},
		{
			// 注意: 如果并發(fā)度 concurrency > 1, 則結(jié)果個數(shù)不確定, 但肯定至少有一個錯誤的
			name: "error with stop",
			args: args{ctx: context.Background(), inputs: []string{"1", "2", "3", "not_exist", "4", "5", "6"},
				concurrency: 1, mapper: convertStopFunc},
			want:     []int{1, 2, 3, 0},
			wantErrs: []error{nil, nil, nil, numberErrHelper("not_exist")},
			opType:   Stop,
		},
		{
			name: "error last", // 最后一個數(shù)據(jù)出錯時,其返回的結(jié)果數(shù)組長度和輸入數(shù)組的長度相同. 因此不能依靠數(shù)組長度來判斷是否有問題.
			args: args{ctx: context.Background(), inputs: []string{"1", "2", "3", "not_exist"},
				concurrency: 1, mapper: convertStopFunc},
			want:     []int{1, 2, 3, 0},
			wantErrs: []error{nil, nil, nil, numberErrHelper("not_exist")},
			opType:   Stop,
		},
	}
	for _, tt := range tests {
		t.Run(tt.name, func(t *testing.T) {
			if true {
				//使用 Map 串行轉(zhuǎn)換
				got := Map(tt.args.ctx, tt.args.inputs, tt.args.mapper)
				//flog.Infof("Map name=%s, got=%+v", tt.name, got)
				realResult, errs, opType := got.ConvertResult()
				du.GoAssertEqual(t, tt.want, realResult, "want")
				du.GoAssertEqual(t, tt.wantErrs, errs, "wantErrs")
				du.GoAssertEqual(t, tt.opType, opType, "opType")
			}
			if true {
				//使用 ParallelMap 并行轉(zhuǎn)換
				got := ParallelMap(tt.args.ctx, tt.args.inputs, tt.args.concurrency, tt.name, tt.args.mapper)
				//flog.Infof("ParallelMap name=%s, got=%+v", tt.name, got)
				realResult, errs, opType := got.ConvertResult()
				du.GoAssertEqual(t, tt.want, realResult, "want")
				du.GoAssertEqual(t, tt.wantErrs, errs, "wantErrs")
				du.GoAssertEqual(t, tt.opType, opType, "opType")
			}
		})
	}
}
func TestStreamMap(t *testing.T) {
	ctx := context.Background()
	inItemCount := 10000
	chInput := make(chan string)
	go func() {
		for i := 0; i < inItemCount; i++ {
			idx := rand.Intn(100)
			chInput <- fmt.Sprintf("%d", idx)
		}
		close(chInput)
	}()
	//啟動 100 個 纖程并行處理 inItemCount(10000) 個數(shù)據(jù)的轉(zhuǎn)換
	chOutput := StreamMap(ctx, 100, "testStreamMap", 100, chInput, convertContinueFunc)
	mapResultCount := 0
	for outItem := range chOutput {
		mapResultCount++
		flog.Debugf("outItem=%v", outItem)
	}
	du.GoAssertEqual(t, inItemCount, mapResultCount, "inItemCount")
}

##補充信息

  • 因為眾所周知的原因, 以后 go-library 的代碼將只更新 https://gitee.com/fishjam/go-library, 不再更新 github 上的版本.
  • S3 的 multi upload 不需要大家自己寫, manager.NewUploader 已經(jīng)提供了完整的實現(xiàn), 比大多數(shù)人實現(xiàn)得更好。

到此這篇關(guān)于Go通用的 MapReduce 工具函數(shù)的文章就介紹到這了,更多相關(guān)Go MapReduce 工具函數(shù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • 深入理解Go中defer的機制

    深入理解Go中defer的機制

    本文主要介紹了Go中defer的機制,包括執(zhí)行順序、參數(shù)預(yù)計算、閉包和與返回值的交互,具有一定的參考價值,感興趣的可以了解一下
    2025-02-02
  • Go?文件讀取和寫入操作全面講解

    Go?文件讀取和寫入操作全面講解

    這篇文章主要為大家介紹了Go文件的讀取和寫入操作示例的全面詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-11-11
  • 一文帶你深入理解Golang中的泛型

    一文帶你深入理解Golang中的泛型

    Go?在泛型方面一直被詬病,因為它在這方面相對比較落后。但是,在?Go?1.18?版本中,泛型已經(jīng)被正式引入,成為了?Go?語言中一個重要的特性。本文將會詳細介紹?Go?泛型的相關(guān)概念,語法和用法,希望能夠幫助大家更好地理解和應(yīng)用這一特性
    2023-05-05
  • Go語言使用sqlx操作數(shù)據(jù)庫的示例詳解

    Go語言使用sqlx操作數(shù)據(jù)庫的示例詳解

    sqlx?是?Go?語言中一個流行的第三方包,它提供了對?Go?標(biāo)準(zhǔn)庫?database/sql?的擴展,本文重點講解?sqlx?在?database/sql?基礎(chǔ)上擴展的功能,希望對大家有所幫助
    2023-06-06
  • 一篇文章帶你輕松搞懂Golang的error處理

    一篇文章帶你輕松搞懂Golang的error處理

    在進行后臺開發(fā)的時候,錯誤處理是每個程序員都會遇到的問題,下面這篇文章主要給大家介紹了關(guān)于Golang中error處理的相關(guān)資料,文中通過實例代碼介紹的非常詳細,需要的朋友可以參考下
    2022-07-07
  • 淺析如何利用Go的plugin機制實現(xiàn)熱更新

    淺析如何利用Go的plugin機制實現(xiàn)熱更新

    熱更新,或稱熱重載或動態(tài)更新,是一種軟件更新技術(shù),允許程序在運行時,不停機更新代碼或資源,本文主要來討論下GO語言是否可以利用plugin機制實現(xiàn)熱更新,感興趣的可以了解下
    2024-04-04
  • golang并發(fā)編程中Goroutine 協(xié)程的實現(xiàn)

    golang并發(fā)編程中Goroutine 協(xié)程的實現(xiàn)

    Go語言中的協(xié)程是一種輕量級線程,通過在函數(shù)前加go關(guān)鍵字來并發(fā)執(zhí)行,具有動態(tài)棧、快速啟動和低內(nèi)存使用等特點,本文就來詳細的介紹一下,感興趣的可以了解一下
    2024-10-10
  • Go語言中websocket的使用demo分享

    Go語言中websocket的使用demo分享

    WebSocket是一種在單個TCP連接上進行全雙工通信的協(xié)議。這篇文章主要和大家分享了一個Go語言中websocket的使用demo,需要的可以參考一下
    2022-12-12
  • Golang迭代如何在Go中循環(huán)數(shù)據(jù)結(jié)構(gòu)使用詳解

    Golang迭代如何在Go中循環(huán)數(shù)據(jù)結(jié)構(gòu)使用詳解

    這篇文章主要為大家介紹了Golang迭代之如何在Go中循環(huán)數(shù)據(jù)結(jié)構(gòu)使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-10-10
  • Go語言題解LeetCode35搜索插入位置示例詳解

    Go語言題解LeetCode35搜索插入位置示例詳解

    這篇文章主要為大家介紹了Go語言題解LeetCode35搜索插入位置示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-12-12

最新評論