Go語言構(gòu)建流數(shù)據(jù)pipeline的示例詳解
什么是pipeline
pipeline在Go中并沒有書面的定義,只是眾多并發(fā)程序中的一種。非正式地,pipeline由一系列stage組成。每個stage是運(yùn)行著同一個function的協(xié)程組。在每個stage,協(xié)程們
- 通過inbound channel從上游獲取數(shù)據(jù)
- 在data上進(jìn)行運(yùn)算,通常會產(chǎn)生新的值
- 通過outbound channel向下游發(fā)送數(shù)據(jù)
每個Stage都有數(shù)個inbound channel和outbound channel,除了第一個和最后一個Stage,分別只有outbound和inbound channel。第一個Stage通常叫做Source或Producer。最后一個Stage通常叫做Sink或Consumer。
我們將從一個簡單的示例pipeline開始來解釋這些想法和技術(shù)。 稍后,我們將提供一個更實際的例子。
Squaring numbers 平方數(shù)
考慮一個有著三個階段的流水線。
第一階段,gen,是個將整數(shù)列表轉(zhuǎn)換為一個發(fā)射列表中整數(shù)的channel的函數(shù)。gen函數(shù)啟動一個go routine,用來發(fā)送channel中的整數(shù),然后當(dāng)所有的整數(shù)都被發(fā)出后,將channel關(guān)閉:
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
第二階段,sq從上面的channel中接收數(shù)據(jù),返回一個發(fā)射對應(yīng)整數(shù)平方數(shù)的channel。當(dāng)inbound channel關(guān)閉后,并且這一階段將所有的value發(fā)送到下游后,再將這個outbound channel關(guān)閉
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
main函數(shù)組織整個pipeline,并且運(yùn)行最終的stage:從第二個stage中接收數(shù)據(jù)然后逐個打印,直到channel被關(guān)閉
func main() {
// Set up the pipeline
c := gen(2, 3)
out := sq(c)
// Consume the output
// 4
fmt.Println(<-out)
// 9
fmt.Println(<-out)
}
既然sq的inbound channel和outbound channel類型相同,我們可以將其進(jìn)行任意數(shù)量的組合。我們還可以將main函數(shù)重寫為循環(huán),就像在其他Stage中做的那樣一樣。
func main() {
// Set up the pipeline and consume the output.
for n := range sq(sq(gen(2, 3))) {
fmt.Println(n) // 16 then 81
}
}
扇入和扇出
許多函數(shù)可以從一個channel中獲取數(shù)據(jù)直到channel被關(guān)閉,這被叫做扇出。這提供了一種在worker之間分配工作以并行化 CPU 使用和 I/O 的方法。
一個函數(shù)可以通過將多個input channel多路復(fù)用到同一個channel,當(dāng)所有的channel關(guān)閉時,該多路復(fù)用channel才關(guān)閉。從而達(dá)到從多個input獲取數(shù)據(jù)并處理,直到所有input channel都關(guān)閉才停止的效果。這叫做扇入。
我們可以將我們的流水線改為運(yùn)行兩個sq,每個都從相同的channel讀取數(shù)據(jù)。我們引入一個新的函數(shù)merge,來做扇入的工作
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the merged output from c1 and c2.
for n := range merge(c1, c2) {
fmt.Println(n) // 4 then 9, or 9 then 4
}
}
merge函數(shù)通過對每個channel開啟一個協(xié)程,把數(shù)據(jù)拷貝到另一個out channel中,實現(xiàn)將channel列表轉(zhuǎn)換為一個channel的效果。當(dāng)所有send操作完成后,再將out channel關(guān)閉。
向一個已經(jīng)關(guān)閉上的channel發(fā)送數(shù)據(jù)會導(dǎo)致panic,所以保證發(fā)送完所有再關(guān)閉channel至關(guān)重要。sync.WaitGroup提供了一個簡單地方式來編排這個同步
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
短暫的停頓
我們的pipeline函數(shù)有這樣的模式:
- 當(dāng)發(fā)送任務(wù)結(jié)束后,關(guān)閉發(fā)送output channel
- 直到input channel關(guān)閉前,一直從input channel中接收消息
這個模式下,每個階段都可以用協(xié)程+for循環(huán)的模式來書寫,保證每個數(shù)據(jù)發(fā)送到下游后再關(guān)閉所有協(xié)程。
但是在實際的pipeline中,階段并不總是接收所有來自inbound channel的數(shù)據(jù)。通常,如果inbound的值出現(xiàn)了錯誤,pipeline會提前退出。 在任何一種情況下,接收者都不必等待剩余值到達(dá),并且我們希望fast fail(較早階段的Stage盡早停止后期Stage不需要的值)。
在我們的示例pipeline中,如果一個Stage未能消費(fèi)所有inbound值,則嘗試計算后并發(fā)送這些值的 goroutine 將無限期阻塞:
// Consume the first value from the output.
out := merge(c1, c2)
fmt.Println(<-out) // 4 or 9
return
// Since we didn't receive the second value from out,
// one of the output goroutines is hung attempting to send it.
}
這就導(dǎo)致了資源泄漏:協(xié)程消耗內(nèi)存、運(yùn)行資源,并且在協(xié)程棧內(nèi)的golang堆引用導(dǎo)致垃圾無法回收。協(xié)程只能自己退出,不能由垃圾回收機(jī)制回收。
即使下游的Stage無法接收所有inbound value,我們也需要把上游的協(xié)程退出。如果把上游的協(xié)程改為有buffer的,可以解決上面的問題。如果Buffer中還有空間,則發(fā)送操作可以立刻完成
c := make(chan int, 2) // buffer size 2 c <- 1 // succeeds immediately c <- 2 // succeeds immediately c <- 3 // blocks until another goroutine does <-c and receives 1
當(dāng)要發(fā)送的數(shù)目可以在channel創(chuàng)建時知道時,buffer可以簡化代碼。舉個例子,讓我們來使用buffer channel,不開辟新的協(xié)程來重寫gen方法:
func gen(nums ...int) <-chan int {
out := make(chan int, len(nums))
for _, n := range nums {
out <- n
}
close(out)
return out
}
在我們的pipeline中,我們就需要在merge方法中使用的channel添加buffer:
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int, 1) // enough space for the unread inputs
// ... 其余的沒有變更 ...
盡管上面這個方案修復(fù)了阻塞的問題,但它是很差的方案。這里有一個對1的硬編碼,這太脆弱了?你真的能預(yù)料到有多少個值不能被正常發(fā)送嗎?一旦兩個值不能正常發(fā)送,你的協(xié)程又阻塞了。
作為替代,我們需要給下游階段提供一個機(jī)制,知會下游階段,發(fā)送者已經(jīng)停止發(fā)送了。
Explicity cancellation 顯示取消
當(dāng)main函數(shù)決定不從out處接收所有數(shù)據(jù),而是退出時,它必須知會上游階段的協(xié)程放棄接下來的發(fā)送。它通過向一個名叫done的channel發(fā)送數(shù)據(jù)來完成這個動作。因為發(fā)送方有兩個,所以 向done發(fā)送兩次數(shù)據(jù)。
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the first value from output.
done := make(chan struct{}, 2)
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// Tell the remaining senders we're leaving.
done <- struct{}{}
done <- struct{}{}
}
發(fā)送到out channel的發(fā)送者把原來的邏輯替換成一個select操作,select或者發(fā)送一個數(shù)據(jù),抑或從done處接收到數(shù)據(jù)。因為done中數(shù)據(jù)值的類型根本不重要,主要是接收到值這個事件本身很重要,所以done channel的類型時struct {}。output循環(huán)繼續(xù)在inbound channel上執(zhí)行,所以上游的階段并沒有被阻塞。(我們稍后會討論如何讓循環(huán)迅速返回。)
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed or it receives a value
// from done, then output calls wg.Done.
output := func(c <-chan int) {
for n := range c {
select {
case out <- n:
case <-done:
}
}
wg.Done()
}
// ... the rest is unchanged ...
這個方法有一個問題:每一個下游接收者都需要知道可能阻塞的上游發(fā)送者總數(shù)。維護(hù)它們的數(shù)目,是一個瑣碎又容易出錯的事情。
我們需要一個機(jī)制來讓不可知的、無界的發(fā)送協(xié)程來停止發(fā)送到下游的值。在Go,我們可以通過關(guān)閉channel來完成這件事,因為在已經(jīng)關(guān)閉的channel上執(zhí)行receive操作,會立刻返回該元素的零值。
這說明main函數(shù)可以簡單地通過關(guān)閉done channel來讓所有的發(fā)送者不阻塞。關(guān)閉操作是一個高效的廣播。我們把pipeline中的每個函數(shù)都接受done作為參數(shù),并把done在defer語句中關(guān)閉, 這樣,如果在main函數(shù)中返回,都會通知pipeline中的階段退出。
func main() {
// Set up a done channel that's shared by the whole pipeline,
// and close that channel when this pipeline exits, as a signal
// for all the goroutines we started to exit.
done := make(chan struct{})
defer close(done)
in := gen(done, 2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(done, in)
c2 := sq(done, in)
// Consume the first value from output.
out := merge(done, c1, c2)
fmt.Println(<-out) // 4 or 9
// done will be closed by the deferred call.
}
現(xiàn)在當(dāng)donechannel關(guān)閉后,接收到close信息的階段,都可以直接退出了。merge函數(shù)中的outout協(xié)程可以不從inbound channel中取數(shù)據(jù)直接退出,因為它知道,上游的發(fā)送sq,接收到close信息,也會直接退出。output通過defer語句來保證wg.Done()一定被調(diào)用。(譯者注:來關(guān)閉out channel)
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c or done is closed, then calls
// wg.Done.
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-done:
return
}
}
}
// ... the rest is unchanged ...
相似的,當(dāng)接收到close信號時,sq函數(shù)也可以立刻返回。sq通過defer語句來保證outchannel一定被關(guān)閉。
這是給構(gòu)建pipeline的一些指導(dǎo):
- 當(dāng)所有的發(fā)送操作完成后,關(guān)閉outbound channel
- 如果發(fā)送發(fā)不阻塞,或是channel沒有關(guān)閉,接收者會一直從channel中接收數(shù)據(jù)
Pipeline通過如下兩個方式來解除發(fā)送者的阻塞
- 確保channel的buffer足夠大
- 顯示知會發(fā)送者,接收者已經(jīng)放棄接收
Digesting a tree 對樹進(jìn)行摘要
讓我們來考慮一個更實際的pipeline
MD5 是一種消息摘要算法,可用作文件校驗和。 命令行實用程序 md5sum 打印文件列表的摘要值。
% md5sum *.go d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go ee869afd31f83cbb2d10ee81b2b831dc parallel.go b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我們的示例程序類似于 md5sum,但將單個目錄作為參數(shù)并打印該目錄下每個常規(guī)文件的摘要值,按路徑名排序。
% go run serial.go . d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go ee869afd31f83cbb2d10ee81b2b831dc parallel.go b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
我們的主函數(shù)調(diào)MD5All這個輔助函數(shù),返回路徑名和摘要值的map,main函數(shù)再將它們排序打印
MD5All函數(shù)是我們討論的重點。在如下串行化的實現(xiàn)中,沒有使用并發(fā)技術(shù),只是簡單對文件進(jìn)行了遍歷
func main() {
// Calculate the MD5 sum of all files under the specified directory,
// then print the results sorted by path name.
m, err := MD5All(os.Args[1])
if err != nil {
fmt.Println(err)
return
}
var paths []string
for path := range m {
paths = append(paths, path)
}
sort.Strings(paths)
for _, path := range paths {
fmt.Printf("%x %s\n", m[path], path)
}
}
并行計算摘要
在并行的解法中,我們將MD5All分割為兩個階段的pipeline。第一個階段,sumFiles,遍歷文件樹,針對每個文件,在新的協(xié)程中計算摘要,然后把結(jié)果發(fā)送到channel中,這是result的類型
type result struct {
path string
sum [md5.Size]byte
err error
}
sumFiles返回兩個channel:一個是result channel,另一個是filepath.Walk中產(chǎn)生的錯誤。walk函數(shù)針對每個文件啟動一個新的協(xié)程來處理,然后檢查donechannel。如果done已經(jīng)被關(guān)閉,walk函數(shù)會立刻停止:
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
// For each regular file, start a goroutine that sums the file and
// sends the result on c.
// Send the result of the walk on errc.
c := make(chan result)
errc := make(chan error, 1)
go func() {
var wg sync.WaitGroup
// If any error occurred, walk method will return
err := filepath.Walk(root, func(path string, info fs.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
wg.Add(1)
go func() {
data, err := ioutil.ReadFile(path)
select {
case c <- result{
path: path,
sum: md5.Sum(data),
err: err,
}:
case <-done:
}
wg.Done()
}()
// Abort the walk if done is closed.
select {
case <-done:
return errors.New("walk canceled")
default:
return nil
}
})
// Walk has returned, so all calls to wg.Add are done.
// Start a goroutine to close c once all the sends are done.
// No select needed here, since errc is buffered.
errc <- err
}()
return c, errc
}
MD5All從c中接收到摘要數(shù)據(jù)。當(dāng)發(fā)生錯誤時,MD5All會迅速返回,通過defer語句來關(guān)閉done channel
func MD5All(root string) (map[string][md5.Size]byte, error) {
// MD5All closes the done channel when it returns; it may do so before
// receiving all the values from c and errc.
done := make(chan struct{})
defer close(done)
c, errc := sumFiles(done, root)
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
有界的并行
parallel.go 中的 MD5All 實現(xiàn)為每個文件啟動一個新的 goroutine。 在包含許多大文件的目錄中,這可能會分配比機(jī)器上可用的內(nèi)存更多的內(nèi)存。
我們可以通過限制并行讀取的文件數(shù)量來限制這些分配。 在新的解決方式中,我們通過創(chuàng)建固定數(shù)量的 goroutine 來讀取文件來做到這一點。 我們的pipeline現(xiàn)在分為三個階段:遍歷樹、讀取并計算文件摘要以及收集摘要。
第一階段 walkFiles 發(fā)射出文件樹中常規(guī)文件的路徑:
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
go func() {
// Close the paths channel after Walk returns.
defer close(paths)
// No select needed for this send, since errc is buffered.
errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
}
return nil
})
}()
return paths, errc
}
第二階段啟動固定數(shù)量的協(xié)程來計算文件摘要,然后發(fā)送到c channel中
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
for path := range paths {
data, err := ioutil.ReadFile(path)
select {
case c <- result{path, md5.Sum(data), err}:
case <-done:
return
}
}
}
和之前的示例不同,因為多個協(xié)程都在共享channel上發(fā)送數(shù)據(jù),digester函數(shù)并沒有關(guān)閉output channel。作為替代,當(dāng)所有的digesters跑完之后,MD5All會關(guān)閉channel
// Start a fixed number of goroutines to read and digest files.
c := make(chan result)
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
for i := 0; i < numDigesters; i++ {
go func() {
digester(done, paths, c)
wg.Done()
}()
}
go func() {
wg.Wait()
close(c)
}()
這里也可以針對每個digester開啟獨立的channel,不過到時候就要對channel進(jìn)行扇入處理。
最終階段從c中取得所有結(jié)果,并且檢查errc中的錯誤。此檢查不能更早發(fā)生,因為在此之前,walkFiles 可能會阻塞:
(譯者注:要保證檢查errc的錯誤,發(fā)生在filePath.Walk啟動后,done不會再次發(fā)送了,協(xié)程就不會退出)
m := make(map[string][md5.Size]byte)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = r.sum
}
// Check whether the Walk failed.
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
總結(jié)
本文介紹了在 Go 中構(gòu)建流數(shù)據(jù)pipeline的技術(shù)。 處理此類pipeline中的故障很棘手,因為pipeline中的每個階段可能會阻止嘗試向下游發(fā)送值,并且下游階段可能不再關(guān)心傳入的數(shù)據(jù)。 我們展示了關(guān)閉通道如何向管道啟動的所有 goroutine 廣播“done”信號,并定義了正確構(gòu)建管道的指南。
以上就是Go語言構(gòu)建流數(shù)據(jù)pipeline的示例詳解的詳細(xì)內(nèi)容,更多關(guān)于Go構(gòu)建流數(shù)據(jù)pipeline的資料請關(guān)注腳本之家其它相關(guān)文章
相關(guān)文章
goland?-sync/atomic原子操作小結(jié)
這篇文章主要介紹了goland?-sync/atomic原子操作,原子操作能夠保證執(zhí)行期間是連續(xù)且不會被中斷(變量不會被其他修改,mutex可能存在被其他修改的情況),本文給大家介紹的非常詳細(xì),需要的朋友參考下2022-08-08
Golang中數(shù)據(jù)結(jié)構(gòu)Queue的實現(xiàn)方法詳解
這篇文章主要給大家介紹了關(guān)于Golang中數(shù)據(jù)結(jié)構(gòu)Queue的實現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2017-09-09
Ruby序列化和持久化存儲(Marshal、Pstore)操作方法詳解
這篇文章主要介紹了Ruby序列化和持久化存儲(Marshal、Pstore)操作方法詳解,包括Ruby Marshal序列化,Ruby Pstore存儲,需要的朋友可以參考下2022-04-04

