Go語言高效I/O并發(fā)處理雙緩沖和Exchanger模式實(shí)例探索
雙緩沖(double buffering)
雙緩沖(double buffering)是高效處理 I/O 操作的一種并發(fā)技術(shù),它使用兩個(gè) buffer,一個(gè) goroutine 使用其中一個(gè) buffer 進(jìn)行寫,而另一個(gè) goroutine 使用另一個(gè) buffer 進(jìn)行讀,然后進(jìn)行交換。這樣兩個(gè) goroutine 可能并發(fā)的執(zhí)行,減少它們之間的等待和阻塞。
本文還提供了一個(gè)類似 Java 的java.util.concurrent.Exchanger[1]的 Go 并發(fā)原語,它可以用來在兩個(gè) goroutine 之間交換數(shù)據(jù),快速實(shí)現(xiàn)雙緩沖的模式。 這個(gè)并發(fā)原語可以在github.com/smallnest/exp/sync/Exchanger[2]找到。
double buffering 并發(fā)模式
雙緩沖(double buffering)設(shè)計(jì)方式雖然在一些領(lǐng)域中被廣泛的應(yīng)用,但是我還沒有看到它在并發(fā)模式中專門列出了,或者專門列為一種模式。這里我們不妨把它稱之為雙緩存模式。
這是一種在 I/O 處理領(lǐng)域廣泛使用的用來提速的編程技術(shù),它使用兩個(gè)緩沖區(qū)來加速計(jì)算機(jī),該計(jì)算機(jī)可以重疊 I/O 和處理。一個(gè)緩沖區(qū)中的數(shù)據(jù)正在處理,而下一組數(shù)據(jù)被讀入另一個(gè)緩沖區(qū)。 在流媒體應(yīng)用程序中,一個(gè)緩沖區(qū)中的數(shù)據(jù)被發(fā)送到聲音或圖形卡,而另一個(gè)緩沖區(qū)則被來自源(Internet、本地服務(wù)器等)的更多數(shù)據(jù)填充。 當(dāng)視頻顯示在屏幕上時(shí),一個(gè)緩沖區(qū)中的數(shù)據(jù)被填充,而另一個(gè)緩沖區(qū)中的數(shù)據(jù)正在顯示。當(dāng)在緩沖區(qū)之間移動(dòng)數(shù)據(jù)的功能是在硬件電路中實(shí)現(xiàn)的,而不是由軟件執(zhí)行時(shí),全動(dòng)態(tài)視頻的速度會加快,不但速度被加快,而且可以減少黑屏閃爍的可能。
在這個(gè)模式中,兩個(gè) goroutine 并發(fā)的執(zhí)行,一個(gè) goroutine 使用一個(gè) buffer 進(jìn)行寫(不妨稱為 buffer1),而另一個(gè) goroutine 使用另一個(gè) buffer 進(jìn)行讀(不妨稱為 buffer2)。如圖所示。 當(dāng)左邊的 writer 寫滿它當(dāng)前使用的 buffer1 后,它申請和右邊的 goroutine 的 buffer2 進(jìn)行交換,這會出現(xiàn)兩種情況:
右邊的 reader 已經(jīng)讀完了它當(dāng)前使用的 buffer2,那么它會立即交換,這樣左邊的 writer 可以繼續(xù)寫 buffer2,而右邊的 reader 可以繼續(xù)讀 buffer1。
右邊的 reader 還沒有讀完 buffer2,那么左邊的 writer 就會阻塞,直到右邊的 reader 讀完 buffer2,然后交換。 周而復(fù)始。
同樣右邊的 goroutine 也是同樣的處理,當(dāng)它讀完 buffer2 后,它會申請和左邊的 goroutine 的 buffer1 進(jìn)行交換,這會出現(xiàn)兩種情況:
左邊的 writer 已經(jīng)寫完了它當(dāng)前使用的 buffer1,那么它會立即交換,這樣右邊的 reader 可以繼續(xù)讀 buffer1,而左邊的 writer 可以繼續(xù)寫 buffer2。
左邊的 writer 還沒有寫完 buffer1,那么右邊的 reader 就會阻塞,直到左邊的 writer 寫完 buffer1,然后交換。 周而復(fù)始。
這樣兩個(gè) goroutine 就可以并發(fā)的執(zhí)行,而不用等待對方的讀寫操作。這樣可以提高并發(fā)處理的效率。
不僅僅如此, double buffering 其實(shí)可以應(yīng)用于更多的場景, 不僅僅是 buffer 的場景,如 Java 的垃圾回收機(jī)制中,HotSpot JVM 把年輕代分為了三部分:1 個(gè) Eden 區(qū)和 2 個(gè) Survivor 區(qū)(分別叫 from 和 to,或者 s0 和 s1),在 GC 開始的時(shí)候,對象只會存在于 Eden 區(qū)和名為“From”的 Survivor 區(qū),Survivor 區(qū)“To”是空的。緊接著進(jìn)行 GC,Eden 區(qū)中所有存活的對象都會被復(fù)制到“To”,而在“From”區(qū)中,仍存活的對象會根據(jù)他們的年齡值來決定去向。年齡達(dá)到一定值的對象會被移動(dòng)到年老代中,沒有達(dá)到閾值的對象會被復(fù)制到“To”區(qū)域。經(jīng)過這次 GC 后,Eden 區(qū)和 From 區(qū)已經(jīng)被清空。這個(gè)時(shí)候,“From”和“To”會交換(exchange)他們的角色,也就是新的“To”就是上次 GC 前的“From”,新的“From”就是上次 GC 前的“To”。不管怎樣,都會保證名為 To 的 Survivor 區(qū)域是空的。Minor GC 會一直重復(fù)這樣的過程,直到“To”區(qū)被填滿,“To”區(qū)被填滿之后,會將所有對象移動(dòng)到年老代中。
Exchanger 的實(shí)現(xiàn)
既然有這樣的場景,有這樣的需求,所以我們需要針對這樣場景的一個(gè)同步原語。Java 給我們做了一個(gè)很好的師范,接下來我們使用實(shí)現(xiàn)相應(yīng)的 Go,但是我們的實(shí)現(xiàn)和 Java 的實(shí)現(xiàn)完全不同,我們要基于 Go 既有的同步原語來實(shí)現(xiàn)。
基于 Java 實(shí)現(xiàn)的 Exchanger 的功能,我們也實(shí)現(xiàn)一個(gè)Exchanger
, 我們期望它的功能如下:
只用作兩個(gè) goroutine 之間的數(shù)據(jù)交換,不支持多個(gè) goroutine 之間的數(shù)據(jù)交換。
可以重用。交換完之后還可以繼續(xù)交換
支持泛型,可以交換任意類型的數(shù)據(jù)
如果對端還沒有準(zhǔn)備交換,就阻塞等待
在交換完之前,阻塞的 goroutine 不可能調(diào)用
Exchange
方法兩次Go 內(nèi)存模型補(bǔ)充: 同一次交換, 一個(gè) goroutine 在調(diào)用
Exchange
方法的完成,一定happens after另一個(gè) goroutine 調(diào)用Exchange
方法的開始。
如果你非常熟悉 Go 的各種同步原語,你可以很快的組合出這樣一個(gè)同步原語。如果你還不是那么熟悉,建議你閱讀《深入理解 Go 并發(fā)編程》這本書,京東有售。 下面是一個(gè)簡單的實(shí)現(xiàn),代碼在Exchanger[3]。 我們只用left
、right
指代這兩個(gè) goroutine, goroutine 是 Go 語言中的并發(fā)單元,我們期望的就是這兩個(gè) goroutine 發(fā)生關(guān)系。
為了跟蹤這兩個(gè) goroutine,我們需要使用 goroutine id 來標(biāo)記這兩個(gè) goroutine,這樣避免了第三者插入。
type Exchanger[T any] struct { leftGoID, rightGoID int64 left, right chan T }
你必須使用 NewExchanger
創(chuàng)建一個(gè)Exchanger
,它會返回一個(gè)Exchanger
的指針。 初始化的時(shí)候我們把 left 和 right 的 id 都設(shè)置為-1,表示還沒有 goroutine 使用它們,并且不會和所有的 goroutine 的 id 沖突。 同時(shí)我們創(chuàng)建兩個(gè) channel,一個(gè)用來左邊的 goroutine 寫,右邊的 goroutine 讀,另一個(gè)用來右邊的 goroutine 寫,左邊的 goroutine 讀。channel 的 buffer 設(shè)置為 1,這樣可以避免死鎖。
func NewExchanger[T any]( "T any") *Exchanger[T] { return &Exchanger[T]{ leftGoID: -1, rightGoID: -1, left: make(chan T, 1), right: make(chan T, 1), } }
Exchange
方法是核心方法,它用來交換數(shù)據(jù),它的實(shí)現(xiàn)如下:
func (e *Exchanger[T]) Exchange(value T) T { goid := goroutine.ID() // left goroutine isLeft := atomic.CompareAndSwapInt64(&e.leftGoID, -1, goid) if !isLeft { isLeft = atomic.LoadInt64(&e.leftGoID) == goid } if isLeft { e.right <- value // send value to right return <-e.left // wait for value from right } // right goroutine isRight := atomic.CompareAndSwapInt64(&e.rightGoID, -1, goid) if !isRight { isRight = atomic.LoadInt64(&e.rightGoID) == goid } if isRight { e.left <- value // send value to left return <-e.right // wait for value from left } // other goroutine panic("sync: exchange called from neither left nor right goroutine") }
當(dāng)一個(gè) goroutine 調(diào)用的時(shí)候,首先我們嘗試把它設(shè)置為left
,如果成功,那么它就是left
。 如果不成功,我們就判斷它是不是先前已經(jīng)是left
,如果是,那么它就是left
。 如果先前,或者此時(shí)left
已經(jīng)被另一個(gè) goroutine 占用了,它還有機(jī)會成為right
,同樣的邏輯檢查和設(shè)置right
。
如果既不是left
也不是right
,那么就是第三者插入了,我們需要 panic,因?yàn)槲覀儾幌M谌卟遄恪?/p>
如果它是left
,那么它就會把數(shù)據(jù)發(fā)送到right
,然后等待right
發(fā)送數(shù)據(jù)過來。 如果它是right
,那么它就會把數(shù)據(jù)發(fā)送到left
,然后等待left
發(fā)送數(shù)據(jù)過來。
這樣就實(shí)現(xiàn)了數(shù)據(jù)的交換。
Exchanger 的使用
我們使用一個(gè)簡單的雙緩沖例子來說明如何使用Exchanger
,我們創(chuàng)建兩個(gè) goroutine,一個(gè) goroutine 負(fù)責(zé)寫,另一個(gè) goroutine 負(fù)責(zé)讀,它們之間通過Exchanger
來交換數(shù)據(jù)。
buf1 := bytes.NewBuffer(make([]byte, 1024)) buf2 := bytes.NewBuffer(make([]byte, 1024)) exchanger := syncx.NewExchanger[*bytes.Buffer]( "*bytes.Buffer") var wg sync.WaitGroup wg.Add(2) expect := 0 go func() { // g1 defer wg.Done() buf := buf1 for i := 0; i < 10; i++ { for j := 0; j < 1024; j++ { buf.WriteByte(byte(j / 256)) expect += j / 256 } buf = exchanger.Exchange(buf) } }() var got int go func() { // g2 defer wg.Done() buf := buf2 for i := 0; i < 10; i++ { buf = exchanger.Exchange(buf) for _, b := range buf.Bytes() { got += int(b) } buf.Reset() } }() wg.Wait() fmt.Println(got) fmt.Println(expect == got)
在這個(gè)例子中 g1
負(fù)責(zé)寫,每個(gè) buffer 的容量是 1024,寫滿就交給另外一個(gè)讀 g2,并從讀 g2 中交換過來一個(gè)空的 buffer 繼續(xù)寫。 交換 10 次之后,兩個(gè) goroutine 都退出了,我們檢查寫入的數(shù)據(jù)和讀取的數(shù)據(jù)是否一致,如果一致,那么就說明我們的Exchanger
實(shí)現(xiàn)是正確的。
總結(jié)
文本介紹了一種類似 Java 的Exchanger
的同步原語的實(shí)現(xiàn),這個(gè)同步原語可以在雙緩沖的場景中使用,提高并發(fā)處理的性能。
參考資料
[1]java.util.concurrent.Exchanger: https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/Exchanger.html
[2]github.com/smallnest/exp/sync/Exchanger: https://pkg.go.dev/github.com/smallnest/exp@v0.2.2/sync#Exchanger
[3]Exchanger: https://pkg.go.dev/github.com/smallnest/exp@v0.2.2/sync#Exchanger
以上就是Go語言高效I/O并發(fā)處理雙緩沖和Exchanger模式實(shí)例探索的詳細(xì)內(nèi)容,更多關(guān)于Go I/O并發(fā)處理的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go語言實(shí)現(xiàn)IP段范圍校驗(yàn)示例
這篇文章主要介紹了Go語言實(shí)現(xiàn)IP段范圍校驗(yàn)示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09利用Go語言實(shí)現(xiàn)流量回放工具的示例代碼
今天給大家推薦一款使用Go語言編寫的流量回放工具?--?goreplay;工作中你一定遇到過需要在服務(wù)器上抓包的場景,有了這個(gè)工具就可以助你一臂之力,廢話不多,我們接下來來看一看這個(gè)工具2022-09-09Go在GoLand中引用github.com中的第三方包具體步驟
這篇文章主要給大家介紹了關(guān)于Go在GoLand中引用github.com中第三方包的具體步驟,文中通過圖文介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用Go具有一定的參考價(jià)值,需要的朋友可以參考下2024-01-01k8s容器互聯(lián)-flannel?host-gw原理篇
這篇文章主要為大家介紹了k8s容器互聯(lián)-flannel?host-gw原理篇,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-04-04Golang性能提升利器之SectionReader的用法詳解
本文將介紹 Go 語言中的 SectionReader,包括 SectionReader的基本使用方法、實(shí)現(xiàn)原理、使用注意事項(xiàng),感興趣的小伙伴可以了解一下2023-07-07Golang編程實(shí)現(xiàn)生成n個(gè)從a到b不重復(fù)隨機(jī)數(shù)的方法
這篇文章主要介紹了Golang編程實(shí)現(xiàn)生成n個(gè)從a到b不重復(fù)隨機(jī)數(shù)的方法,結(jié)合實(shí)例形式分析了Go語言字符串操作及隨機(jī)數(shù)生成的相關(guān)操作技巧,需要的朋友可以參考下2017-01-01