golang并發(fā)工具M(jìn)apReduce降低服務(wù)響應(yīng)時(shí)間
前言
在微服務(wù)中開發(fā)中,api網(wǎng)關(guān)扮演對外提供restful api的角色,而api的數(shù)據(jù)往往會依賴其他服務(wù),復(fù)雜的api更是會依賴多個(gè)甚至數(shù)十個(gè)服務(wù)。雖然單個(gè)被依賴服務(wù)的耗時(shí)一般都比較低,但如果多個(gè)服務(wù)串行依賴的話那么整個(gè)api的耗時(shí)將會大大增加。
那么通過什么手段來優(yōu)化呢?我們首先想到的是通過并發(fā)來的方式來處理依賴,這樣就能降低整個(gè)依賴的耗時(shí),Go基礎(chǔ)庫中為我們提供了 WaitGroup 工具用來進(jìn)行并發(fā)控制,但實(shí)際業(yè)務(wù)場景中多個(gè)依賴如果有一個(gè)出錯(cuò)我們期望能立即返回而不是等所有依賴都執(zhí)行完再返回結(jié)果,而且WaitGroup中對變量的賦值往往需要加鎖,每個(gè)依賴函數(shù)都需要添加Add和Done對于新手來說比較容易出錯(cuò)
基于以上的背景,go-zero框架中為我們提供了并發(fā)處理工具M(jìn)apReduce,該工具開箱即用,不需要做什么初始化,我們通過下圖看下使用MapReduce和沒使用的耗時(shí)對比:
相同的依賴,串行處理的話需要200ms,使用MapReduce后的耗時(shí)等于所有依賴中最大的耗時(shí)為100ms,可見MapReduce可以大大降低服務(wù)耗時(shí),而且隨著依賴的增加效果就會越明顯,減少處理耗時(shí)的同時(shí)并不會增加服務(wù)器壓力
并發(fā)處理工具M(jìn)apReduce
MapReduce是Google提出的一個(gè)軟件架構(gòu),用于大規(guī)模數(shù)據(jù)集的并行運(yùn)算,go-zero中的MapReduce工具正是借鑒了這種架構(gòu)思想
go-zero框架中的MapReduce工具主要用來對批量數(shù)據(jù)進(jìn)行并發(fā)的處理,以此來提升服務(wù)的性能
MapReduce的用法演示
MapReduce主要有三個(gè)參數(shù),第一個(gè)參數(shù)為generate用以生產(chǎn)數(shù)據(jù),第二個(gè)參數(shù)為mapper用以對數(shù)據(jù)進(jìn)行處理,第三個(gè)參數(shù)為reducer用以對mapper后的數(shù)據(jù)做聚合返回,還可以通過opts選項(xiàng)設(shè)置并發(fā)處理的線程數(shù)量
場景一:
某些功能的結(jié)果往往需要依賴多個(gè)服務(wù),比如商品詳情的結(jié)果往往會依賴用戶服務(wù)、庫存服務(wù)、訂單服務(wù)等等,一般被依賴的服務(wù)都是以rpc的形式對外提供,為了降低依賴的耗時(shí)我們往往需要對依賴做并行處理
func productDetail(uid, pid int64) (*ProductDetail, error) { var pd ProductDetail err := mr.Finish(func() (err error) { pd.User, err = userRpc.User(uid) return }, func() (err error) { pd.Store, err = storeRpc.Store(pid) return }, func() (err error) { pd.Order, err = orderRpc.Order(pid) return }) if err != nil { log.Printf("product detail error: %v", err) return nil, err } return &pd, nil }
該示例中返回商品詳情依賴了多個(gè)服務(wù)獲取數(shù)據(jù),因此做并發(fā)的依賴處理,對接口的性能有很大的提升
場景二:
很多時(shí)候我們需要對一批數(shù)據(jù)進(jìn)行處理,比如對一批用戶id,效驗(yàn)每個(gè)用戶的合法性并且效驗(yàn)過程中有一個(gè)出錯(cuò)就認(rèn)為效驗(yàn)失敗,返回的結(jié)果為效驗(yàn)合法的用戶id
func checkLegal(uids []int64) ([]int64, error) { r, err := mr.MapReduce(func(source chan<- interface{}) { for _, uid := range uids { source <- uid } }, func(item interface{}, writer mr.Writer, cancel func(error)) { uid := item.(int64) ok, err := check(uid) if err != nil { cancel(err) } if ok { writer.Write(uid) } }, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) { var uids []int64 for p := range pipe { uids = append(uids, p.(int64)) } writer.Write(uids) }) if err != nil { log.Printf("check error: %v", err) return nil, err } return r.([]int64), nil } func check(uid int64) (bool, error) { // do something check user legal return true, nil }
該示例中,如果check過程出現(xiàn)錯(cuò)誤則通過cancel方法結(jié)束效驗(yàn)過程,并返回error整個(gè)效驗(yàn)過程結(jié)束,如果某個(gè)uid效驗(yàn)結(jié)果為false則最終結(jié)果不返回該uid
MapReduce使用注意事項(xiàng)
mapper和reducer中都可以調(diào)用cancel,參數(shù)為error,調(diào)用后立即返回,返回結(jié)果為nil, error
mapper中如果不調(diào)用writer.Write則item最終不會被reducer聚合
reducer中如果不調(diào)用writer.Wirte則返回結(jié)果為nil, ErrReduceNoOutput
reducer為單線程,所有mapper出來的結(jié)果在這里串行聚合
實(shí)現(xiàn)原理分析:
MapReduce中首先通過buildSource方法通過執(zhí)行g(shù)enerate(參數(shù)為無緩沖channel)產(chǎn)生數(shù)據(jù),并返回?zé)o緩沖的channel,mapper會從該channel中讀取數(shù)據(jù)
func buildSource(generate GenerateFunc) chan interface{} { source := make(chan interface{}) go func() { defer close(source) generate(source) }() return source }
在MapReduceWithSource方法中定義了cancel方法,mapper和reducer中都可以調(diào)用該方法,調(diào)用后主線程收到close信號會立馬返回
cancel := once(func(err error) { if err != nil { retErr.Set(err) } else { // 默認(rèn)的error retErr.Set(ErrCancelWithNil) } drain(source) // 調(diào)用close(ouput)主線程收到Done信號,立馬返回 finish() })
在mapperDispatcher方法中調(diào)用了executeMappers,executeMappers消費(fèi)buildSource產(chǎn)生的數(shù)據(jù),每一個(gè)item都會起一個(gè)goroutine單獨(dú)處理,默認(rèn)最大并發(fā)數(shù)為16,可以通過WithWorkers進(jìn)行設(shè)置
var wg sync.WaitGroup defer func() { wg.Wait() // 保證所有的item都處理完成 close(collector) }() pool := make(chan lang.PlaceholderType, workers) writer := newGuardedWriter(collector, done) // 將mapper處理完的數(shù)據(jù)寫入collector for { select { case <-done: // 當(dāng)調(diào)用了cancel會觸發(fā)立即返回 return case pool <- lang.Placeholder: // 控制最大并發(fā)數(shù) item, ok := <-input if !ok { <-pool return } wg.Add(1) go func() { defer func() { wg.Done() <-pool }() mapper(item, writer) // 對item進(jìn)行處理,處理完調(diào)用writer.Write把結(jié)果寫入collector對應(yīng)的channel中 }() } }
reducer單goroutine對數(shù)mapper寫入collector的數(shù)據(jù)進(jìn)行處理,如果reducer中沒有手動(dòng)調(diào)用writer.Write則最終會執(zhí)行finish方法對output進(jìn)行close避免死鎖
go func() { defer func() { if r := recover(); r != nil { cancel(fmt.Errorf("%v", r)) } else { finish() } }() reducer(collector, writer, cancel) }()
在該工具包中還提供了許多針對不同業(yè)務(wù)場景的方法,實(shí)現(xiàn)原理與MapReduce大同小異,感興趣的同學(xué)可以查看源碼學(xué)習(xí)
MapReduceVoid
功能和MapReduce類似但沒有結(jié)果返回只返回error
Finish
處理固定數(shù)量的依賴,返回error,有一個(gè)error立即返回
FinishVoid
和Finish方法功能類似,沒有返回值
Map
只做generate和mapper處理,返回channel
MapVoid
和Map功能類似,無返回
文末
項(xiàng)目地址 https://github.com/zeromicro/go-zero
本文主要介紹了go-zero框架中的MapReduce工具,在實(shí)際的項(xiàng)目中非常實(shí)用。用好工具對于提升服務(wù)性能和開發(fā)效率都有很大的幫助,希望本篇文章能給大家?guī)硪恍┦斋@,更多關(guān)于golang并發(fā)MapReduce服務(wù)響應(yīng)的資料請關(guān)注腳本之家其它相關(guān)文章!
- Go語言實(shí)現(xiàn)MapReduce的示例代碼
- 使用golang實(shí)現(xiàn)一個(gè)MapReduce的示例代碼
- Golang編程并發(fā)工具庫MapReduce使用實(shí)踐
- golang如何實(shí)現(xiàn)mapreduce單進(jìn)程版本詳解
- MongoDB中MapReduce的使用方法詳解
- Mongodb中MapReduce實(shí)現(xiàn)數(shù)據(jù)聚合方法詳解
- MongoDB學(xué)習(xí)筆記之MapReduce使用示例
- MongoDB中的MapReduce簡介
- MongoDB中MapReduce編程模型使用實(shí)例
- Go通用的?MapReduce?工具函數(shù)詳解
相關(guān)文章

golang默認(rèn)Logger日志庫在項(xiàng)目中使用Zap日志庫

Goland IDEA項(xiàng)目多開設(shè)置方式

Go語言實(shí)現(xiàn)有規(guī)律的數(shù)字版本號的排序工具

Golang使用cobra實(shí)現(xiàn)命令行程序的示例代碼