Go語(yǔ)言異步API設(shè)計(jì)的扇入扇出模式詳解
前言
扇出/扇入模式是更高級(jí) API 集成的主要內(nèi)容。這些應(yīng)用程序并不總是表現(xiàn)出相同的可用性或性能特征。
扇出是從電子工程中借用的一個(gè)術(shù)語(yǔ),它描述了輸入的邏輯門連接到另一個(gè)輸出門的數(shù)量。輸出需要提供足夠的電流來(lái)驅(qū)動(dòng)所有連接的輸入。在事務(wù)處理系統(tǒng)中,用來(lái)描述為了服務(wù)一個(gè)輸入請(qǐng)求而需要做的請(qǐng)求總數(shù)。
扇入是指為邏輯單元的輸入方程提供輸入信號(hào)的最大數(shù)量。扇入是定義單個(gè)邏輯門可以接受的最大數(shù)字輸入數(shù)量的術(shù)語(yǔ)。大多數(shù)晶體管-晶體管邏輯 (TTL) 門有一個(gè)或兩個(gè)輸入,盡管有些有兩個(gè)以上。典型的邏輯門具有 1 或 2 的扇入。
扇入/扇出服務(wù)
我們舉一個(gè)現(xiàn)實(shí)世界的例子,一個(gè)電子商務(wù)網(wǎng)站將自己與一個(gè)第三方支付網(wǎng)關(guān)整合在一起。 這里,網(wǎng)站使用支付網(wǎng)關(guān)的 API 來(lái)彈出支付屏幕并輸入安全證書。同時(shí),網(wǎng)站可能會(huì)調(diào)用另一個(gè)稱為分析的 API 來(lái)記錄支付的嘗試。這種將一個(gè)請(qǐng)求分叉成多個(gè)請(qǐng)求的過程被稱為 fan-out 扇出。在現(xiàn)實(shí)世界中,一個(gè)客戶請(qǐng)求可能涉及許多扇出服務(wù)。
另一個(gè)例子是 MapReduce。Map 是一個(gè)扇入的操作,而 Reduce 是一個(gè)扇出的 操作。一個(gè)服務(wù)器可以將一個(gè)信息扇出到下一組服務(wù)(API),并忽略結(jié)果?;蛘呖梢缘鹊竭@些服務(wù)器的所有響應(yīng)都返回。如 如下圖所示,一個(gè)傳入的請(qǐng)求被服務(wù)器復(fù)用為轉(zhuǎn)換成兩個(gè)傳出的請(qǐng)求:
扇入 fan-in 是一種操作,即兩個(gè)或更多傳入的請(qǐng)求會(huì)聚成一個(gè)請(qǐng)求。這種情況下,API如何聚合來(lái)自多個(gè)后端服務(wù)的結(jié)果,并將結(jié)果即時(shí)返回給客戶。
例如,想想一個(gè)酒店價(jià)格聚合器或航班票務(wù)聚合器,它從不同的數(shù)據(jù)提供者那里獲取關(guān)于多個(gè)酒店或航班的請(qǐng)求信息并顯示出來(lái)。
下圖顯示了扇出操作是如何結(jié)合多個(gè)請(qǐng)求并準(zhǔn)備一個(gè)最終的響應(yīng),由客戶端消費(fèi)的。
客戶端也可以是一個(gè)服務(wù)器,為更多的客戶提供服務(wù)。如上圖所示,左側(cè)的服務(wù)器正在收集來(lái)自酒店 A、酒店 B 和 航空公司供應(yīng)商 A,并為不同的客戶準(zhǔn)備另一個(gè)響應(yīng)。
因此,扇入和扇出操作并不總是完全相互獨(dú)立的。大多數(shù)情況下,它將是一個(gè)混合場(chǎng)景,扇入和扇出操作都是相互配合的。
請(qǐng)記住,對(duì)下一組服務(wù)器的扇出操作可以是異步的。也是如此。對(duì)于扇入請(qǐng)求來(lái)說,這可能不是真的。扇入操作有時(shí)被稱為 API 調(diào)用。
Go 語(yǔ)言實(shí)現(xiàn)扇入/扇出模式
Fan-out:多個(gè) goroutine 從同一個(gè)通道讀取數(shù)據(jù),直到該通道關(guān)閉。OUT 是一種張開的模式,所以又被稱為扇出,可以用來(lái)分發(fā)任務(wù)。
Fan-in:1 個(gè) goroutine 從多個(gè)通道讀取數(shù)據(jù),直到這些通道關(guān)閉。IN 是一種收斂的模式,所以又被稱為扇入,用來(lái)收集處理的結(jié)果。
package main import ( "context" "log" "sync" "time" ) // Task 包含任務(wù)編號(hào)及任務(wù)所需時(shí)長(zhǎng) type Task struct { Number int Cost time.Duration } // task channel 生成器 func taskChannelGerenator(ctx context.Context, taskList []Task) <-chan Task { taskCh := make(chan Task) go func() { defer close(taskCh) for _, task := range taskList { select { case <-ctx.Done(): return case taskCh <- task: } } }() return taskCh } // doTask 處理并返回已處理的任務(wù)編號(hào)作為通道的函數(shù) func doTask(ctx context.Context, taskCh <-chan Task) <-chan int { doneTaskCh := make(chan int) go func() { defer close(doneTaskCh) for task := range taskCh { select { case <-ctx.Done(): return default: log.Printf("do task number: %d\n", task.Number) // task 任務(wù)處理 // 根據(jù)任務(wù)耗時(shí)休眠 time.Sleep(task.Cost) doneTaskCh <- task.Number // 已處理任務(wù)的編號(hào)放入通道 } } }() return doneTaskCh } // `fan-in` 意味著將多個(gè)數(shù)據(jù)流復(fù)用或合并成一個(gè)流。 // merge 函數(shù)接收參數(shù)傳遞的多個(gè)通道 “taskChs”,并返回單個(gè)通道 “<-chan int” func merge(ctx context.Context, taskChs []<-chan int) <-chan int { var wg sync.WaitGroup mergedTaskCh := make(chan int) mergeTask := func(taskCh <-chan int) { defer wg.Done() for t := range taskCh { select { case <-ctx.Done(): return case mergedTaskCh <- t: } } } wg.Add(len(taskChs)) for _, taskCh := range taskChs { go mergeTask(taskCh) } // 等待所有任務(wù)處理完畢 go func() { wg.Wait() close(mergedTaskCh) }() return mergedTaskCh } func main() { start := time.Now() // 使用 context 來(lái)防止 goroutine 泄漏,即使在處理過程中被中斷 ctx, cancel := context.WithCancel(context.Background()) defer cancel() // taskList 定義每個(gè)任務(wù)及其成本 taskList := []Task{ Task{1, 1 * time.Second}, Task{2, 7 * time.Second}, Task{3, 2 * time.Second}, Task{4, 3 * time.Second}, Task{5, 5 * time.Second}, Task{6, 3 * time.Second}, } // taskChannelGerenator 是一個(gè)函數(shù),它接收一個(gè) taskList 并將其轉(zhuǎn)換為 Task 類型的通道 // 執(zhí)行結(jié)果(int slice channel)存儲(chǔ)在 worker 中 // 由于 doTask 的結(jié)果是一個(gè)通道,被分給了多個(gè) worker,這就對(duì)應(yīng)了 fan-out 處理 taskCh := taskChannelGerenator(ctx, taskList) numWorkers := 4 workers := make([]<-chan int, numWorkers) for i := 0; i < numWorkers; i++ { workers[i] = doTask(ctx, taskCh) // doTask 處理并返回已處理的任務(wù)編號(hào)作為通道的函數(shù) } count := 0 for d := range merge(ctx, workers) { // merge 從中讀取已處理的任務(wù)編號(hào) count++ log.Printf("done task number: %d\n", d) } log.Printf("Finished. Done %d tasks. Total time: %fs", count, time.Since(start).Seconds()) }
參考鏈接:
Understanding the Fan-Out/Fan-In API Integration Pattern
以上就是Go語(yǔ)言異步API設(shè)計(jì)的扇入扇出模式詳解的詳細(xì)內(nèi)容,更多關(guān)于Go異步API扇入扇出模式的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go語(yǔ)言題解LeetCode1051高度檢查器示例詳解
這篇文章主要為大家介紹了Go語(yǔ)言題解LeetCode1051高度檢查器示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-12-12一文帶你感受Go語(yǔ)言空結(jié)構(gòu)體的魔力
在?Go?語(yǔ)言中,有一種特殊的用法可能讓許多人感到困惑,那就是空結(jié)構(gòu)體,本文將對(duì)Go空結(jié)構(gòu)體進(jìn)行詳解,準(zhǔn)備一杯你最喜歡的飲料或茶,隨著本文一探究竟吧2023-05-05Go并發(fā)編程之sync.Once使用實(shí)例詳解
sync.Once使用起來(lái)很簡(jiǎn)單, 下面是一個(gè)簡(jiǎn)單的使用案例,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2021-11-11go語(yǔ)言開發(fā)環(huán)境安裝及第一個(gè)go程序(推薦)
這篇文章主要介紹了go語(yǔ)言開發(fā)環(huán)境安裝及第一個(gè)go程序,這篇通過實(shí)例代碼給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-02-02Go語(yǔ)言Web編程實(shí)現(xiàn)Get和Post請(qǐng)求發(fā)送與解析的方法詳解
這篇文章主要介紹了Go語(yǔ)言Web編程實(shí)現(xiàn)Get和Post請(qǐng)求發(fā)送與解析的方法,結(jié)合實(shí)例形式分析了Go語(yǔ)言客戶端、服務(wù)器端結(jié)合實(shí)現(xiàn)web數(shù)據(jù)get、post發(fā)送與接收數(shù)據(jù)的相關(guān)操作技巧,需要的朋友可以參考下2017-06-06golang基礎(chǔ)之waitgroup用法以及使用要點(diǎn)
WaitGroup是Golang并發(fā)的兩種方式之一,一個(gè)是Channel,另一個(gè)是WaitGroup,下面這篇文章主要給大家介紹了關(guān)于golang基礎(chǔ)之waitgroup用法以及使用要點(diǎn)的相關(guān)資料,需要的朋友可以參考下2023-01-01