Go channel實現(xiàn)原理分析
channel
單純地將函數(shù)并發(fā)執(zhí)行是沒有意義的。函數(shù)與函數(shù)間需要交換數(shù)據(jù)才能體現(xiàn)并發(fā)執(zhí)行函數(shù)的意義。
雖然可以使用共享內(nèi)存進行數(shù)據(jù)交換,但是共享內(nèi)存在不同的goroutine中容易發(fā)生競態(tài)問題。為了保證數(shù)據(jù)交換的正確性,必須使用互斥量對內(nèi)存進行加鎖,這種做法勢必造成性能問題。
Go語言的并發(fā)模型是CSP(Communicating Sequential Processes),提倡通過通信共享內(nèi)存而不是通過共享內(nèi)存而實現(xiàn)通信。
如果說goroutine是Go程序并發(fā)的執(zhí)行體,channel就是它們之間的連接。channel是可以讓一個goroutine發(fā)送特定值到另一個goroutine的通信機制。
Go 語言中的通道(channel)是一種特殊的類型。通道像一個傳送帶或者隊列,總是遵循先入先出(First In First Out)的規(guī)則,保證收發(fā)數(shù)據(jù)的順序。每一個通道都是一個具體類型的導(dǎo)管,也就是聲明channel的時候需要為其指定元素類型。
channel(管道)底層是一個環(huán)形隊列(先進先出),send(插入)和recv(取走)從同一個位置沿同一個方向順序執(zhí)行。sendx表示最后一次插入元素的位置,recvx表示最后一次取走元素的位置。
var ch chan int //管道的聲明 ch = make(chan int, 8) //管道的初始化,環(huán)形隊列里可容納8個int ch <- 1 //往管道里寫入(send)數(shù)據(jù) ch <- 2 ch <- 3 ch <- 4 ch <- 5 v := <-ch //從管道里取走(recv)數(shù)據(jù) fmt.Println(v) v = <-ch fmt.Println(v)
channel類型
channel是一種類型,一種引用類型。聲明通道類型的格式如下:
var 變量 chan 元素類型
舉幾個例子:
var ch1 chan int // 聲明一個傳遞整型的通道 var ch2 chan bool // 聲明一個傳遞布爾型的通道 var ch3 chan []int // 聲明一個傳遞int切片的通道
創(chuàng)建channel
通道是引用類型,通道類型的空值是nil。
var ch chan int fmt.Println(ch) // <nil>
聲明的通道后需要使用make函數(shù)初始化之后才能使用。
創(chuàng)建channel的格式如下:
make(chan 元素類型, [緩沖大小])
channel的緩沖大小是可選的。
舉幾個例子:
ch4 := make(chan int) ch5 := make(chan bool) ch6 := make(chan []int)
channel操作
通道有發(fā)送(send)、接收(receive)和關(guān)閉(close)三種操作。
發(fā)送和接收都使用<-符號。
現(xiàn)在我們先使用以下語句定義一個通道:
ch := make(chan int)
發(fā)送
將一個值發(fā)送到通道中。
ch <- 10 // 把10發(fā)送到ch中
接收
從一個通道中接收值。
x := <- ch // 從ch中接收值并賦值給變量x <-ch // 從ch中接收值,忽略結(jié)果
關(guān)閉
我們通過調(diào)用內(nèi)置的close函數(shù)來關(guān)閉通道。
close(ch)
關(guān)于關(guān)閉通道需要注意的事情是,只有在通知接收方goroutine所有的數(shù)據(jù)都發(fā)送完畢的時候才需要關(guān)閉通道。通道是可以被垃圾回收機制回收的,它和關(guān)閉文件是不一樣的,在結(jié)束操作之后關(guān)閉文件是必須要做的,但關(guān)閉通道不是必須的。
關(guān)閉后的通道有以下特點:
- 對一個關(guān)閉的通道再發(fā)送值就會導(dǎo)致panic。
- 對一個關(guān)閉的通道進行接收會一直獲取值直到通道為空。
- 對一個關(guān)閉的并且沒有值的通道執(zhí)行接收操作會得到對應(yīng)類型的零值。
- 關(guān)閉一個已經(jīng)關(guān)閉的通道會導(dǎo)致panic。
無緩沖的通道
無緩沖的通道又稱為阻塞的通道。我們來看一下下面的代碼:
func main() { ch := make(chan int) ch <- 10 fmt.Println("發(fā)送成功") }
上面這段代碼能夠通過編譯,但是執(zhí)行的時候會出現(xiàn)以下錯誤:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
.../src/github.com/pprof/studygo/day06/channel02/main.go:8 +0x54
為什么會出現(xiàn)deadlock錯誤呢?
因為我們使用ch := make(chan int)創(chuàng)建的是無緩沖的通道,無緩沖的通道只有在有人接收值的時候才能發(fā)送值。就像你住的小區(qū)沒有快遞柜和代收點,快遞員給你打電話必須要把這個物品送到你的手中,簡單來說就是無緩沖的通道必須有接收才能發(fā)送。
上面的代碼會阻塞在ch <- 10這一行代碼形成死鎖,那如何解決這個問題呢?
一種方法是啟用一個goroutine去接收值,例如:
func recv(c chan int) { ret := <-c fmt.Println("接收成功", ret) } func main() { ch := make(chan int) go recv(ch) // 啟用goroutine從通道接收值 ch <- 10 fmt.Println("發(fā)送成功") }
無緩沖通道上的發(fā)送操作會阻塞,直到另一個goroutine在該通道上執(zhí)行接收操作,這時值才能發(fā)送成功,兩個goroutine將繼續(xù)執(zhí)行。相反,如果接收操作先執(zhí)行,接收方的goroutine將阻塞,直到另一個goroutine在該通道上發(fā)送一個值。
使用無緩沖通道進行通信將導(dǎo)致發(fā)送和接收的goroutine同步化。因此,無緩沖通道也被稱為同步通道。
有緩沖的通道
解決上面問題的方法還有一種就是使用有緩沖區(qū)的通道。
我們可以在使用make函數(shù)初始化通道的時候為其指定通道的容量,例如:
func main() { ch := make(chan int, 1) // 創(chuàng)建一個容量為1的有緩沖區(qū)通道 ch <- 10 fmt.Println("發(fā)送成功") }
只要通道的容量大于零,那么該通道就是有緩沖的通道,通道的容量表示通道中能存放元素的數(shù)量。就像你小區(qū)的快遞柜只有那么個多格子,格子滿了就裝不下了,就阻塞了,等到別人取走一個快遞員就能往里面放一個。
我們可以使用內(nèi)置的len函數(shù)獲取通道內(nèi)元素的數(shù)量,使用cap函數(shù)獲取通道的容量,雖然我們很少會這么做。
close()
可以通過內(nèi)置的close()函數(shù)關(guān)閉channel(如果你的管道不往里存值或者取值的時候一定記得關(guān)閉管道)
package main import "fmt" func main() { c := make(chan int) go func() { for i := 0; i < 5; i++ { c <- i } close(c) }() for { if data, ok := <-c; ok { fmt.Println(data) } else { break } } fmt.Println("main結(jié)束") }
如何優(yōu)雅的從通道循環(huán)取值
當(dāng)通過通道發(fā)送有限的數(shù)據(jù)時,我們可以通過close函數(shù)關(guān)閉通道來告知從該通道接收值的goroutine停止等待。當(dāng)通道被關(guān)閉時,往該通道發(fā)送值會引發(fā)panic,從該通道里接收的值一直都是類型零值。那如何判斷一個通道是否被關(guān)閉了呢?
我們來看下面這個例子:
// channel 練習(xí) func main() { ch1 := make(chan int) ch2 := make(chan int) // 開啟goroutine將0~100的數(shù)發(fā)送到ch1中 go func() { for i := 0; i < 100; i++ { ch1 <- i } close(ch1) }() // 開啟goroutine從ch1中接收值,并將該值的平方發(fā)送到ch2中 go func() { for { i, ok := <-ch1 // 通道關(guān)閉后再取值ok=false if !ok { break } ch2 <- i * i } close(ch2) }() // 在主goroutine中從ch2中接收值打印 for i := range ch2 { // 通道關(guān)閉后會退出for range循環(huán) fmt.Println(i) } }
從上面的例子中我們看到有兩種方式在接收值的時候判斷通道是否被關(guān)閉,我們通常使用的是for range的方式。
單向通道
有的時候我們會將通道作為參數(shù)在多個任務(wù)函數(shù)間傳遞,很多時候我們在不同的任務(wù)函數(shù)中使用通道都會對其進行限制,比如限制通道在函數(shù)中只能發(fā)送或只能接收。
Go語言中提供了單向通道來處理這種情況。例如,我們把上面的例子改造如下:
func counter(out chan<- int) { for i := 0; i < 100; i++ { out <- i } close(out) } func squarer(out chan<- int, in <-chan int) { for i := range in { out <- i * i } close(out) } func printer(in <-chan int) { for i := range in { fmt.Println(i) } } func main() { ch1 := make(chan int) ch2 := make(chan int) go counter(ch1) go squarer(ch2, ch1) printer(ch2) }
其中,
1.chan<- int是一個只能發(fā)送的通道,可以發(fā)送但是不能接收;
2.<-chan int是一個只能接收的通道,可以接收但是不能發(fā)送。
在函數(shù)傳參及任何賦值操作中將雙向通道轉(zhuǎn)換為單向通道是可以的,但反過來是不可以的。
read_only := make (<-chan int) //定義只讀的channel write_only := make (chan<- int) //定義只寫的channel
定義只讀和只寫的channel意義不大,一般用于在參數(shù)傳遞中。
//只能向channel里寫數(shù)據(jù) func send(c chan<- int) { c <- 1 } //只能取channel中的數(shù)據(jù) func recv(c <-chan int) { _ = <-c } //返回一個只讀channel func (c *Context) Done() <-chan struct{} { return nil }
通道遍歷
可以通過for range的方式遍歷管道,遍歷前必須先關(guān)閉管道,禁止再寫入元素。
close(ch) //遍歷前必須先關(guān)閉管道,禁止再寫入元素 //遍歷管道里剩下的元素 for ele := range ch { fmt.Println(ele) }
slice、map和channel是go語言里的3種引用類型,都可以通過make函數(shù)來進行初始化(申請內(nèi)存分配)。因為它們都包含一個指向底層數(shù)據(jù)結(jié)構(gòu)的指針,所以稱之為“引用”類型。引用類型未初始化時都是nil,可以對它們執(zhí)行l(wèi)en()函數(shù),返回0。
異步通道
異步管道
asynChann := make(chan int, 8)
channel底層維護一個環(huán)形隊列(先進先出),make初始化時指定隊列的長度。隊列滿時,寫阻塞;隊列空時,讀阻塞。sendx指向下一次寫入的位置, recvx指向下一次讀取的位置。 recvq維護因讀管道而被阻塞的協(xié)程,sendq維護因?qū)懝艿蓝蛔枞膮f(xié)程。
同步管道可以認(rèn)為隊列容量為0,當(dāng)讀協(xié)程和寫協(xié)程同時就緒時它們才會彼此幫對方解除阻塞。
syncChann := make(chan int)
channel僅作為協(xié)程間同步的工具,不需要傳遞具體的數(shù)據(jù),管道類型可以用struct{}??战Y(jié)構(gòu)體變量的內(nèi)存占用為0,因此struct{}類型的管道比bool類型的管道還要省內(nèi)存。
sc := make(chan struct{}) sc <- struct{}{}
關(guān)于channel的死鎖與阻塞
- Channel滿了,就阻塞寫;Channel空了,就阻塞讀。
- 阻塞之后會交出cpu,去執(zhí)行其他協(xié)程,希望其他協(xié)程能幫自己解除阻塞。
- 如果阻塞發(fā)生在main協(xié)程里,并且沒有其他子協(xié)程可以執(zhí)行,那就可以確定“希望永遠等不來”,自已把自己殺掉,報一個fatal error:deadlock出來。
- 如果阻塞發(fā)生在子協(xié)程里,就不會發(fā)生死鎖,因為至少main協(xié)程是一個值得等待的“希望”,會一直等(阻塞)下去。
package main import ( "fmt" "time" ) func main() { ch := make(chan struct{}, 1) ch <- struct{}{} //有1個緩沖可以用,無需阻塞,可以立即執(zhí)行 go func() { //子協(xié)程1 time.Sleep(5 * time.Second) //sleep一個很長的時間 <-ch //如果把本行代碼注釋掉,main協(xié)程5秒鐘后會報fatal error fmt.Println("sub routine 1 over") }() ch <- struct{}{} //由于子協(xié)程1已經(jīng)啟動,寄希望于子協(xié)程1幫自己解除阻塞,所以會一直等子協(xié)程1執(zhí)行結(jié)束。如果子協(xié)程1執(zhí)行結(jié)束后沒幫自己解除阻塞,則希望完全破滅,報出deadlock fmt.Println("send to channel in main routine") go func() { //子協(xié)程2 time.Sleep(2 * time.Second) ch <- struct{}{} //channel已滿,子協(xié)程2會一直阻塞在這一行 fmt.Println("sub routine 2 over") }() time.Sleep(3 * time.Second) fmt.Println("main routine exit") }
send to channel in main routine
sub routine 1 over
main routine exit
關(guān)閉channel
- 只有當(dāng)管道關(guān)閉時,才能通過range遍歷管道里的數(shù)據(jù),否則會發(fā)生fatal error。
- 管道關(guān)閉后讀操作會立即返回,如果緩沖已空會返回“0值”。
- ele, ok := <-ch ok==true代表ele是管道里的真實數(shù)據(jù)。
- 向已關(guān)閉的管道里send數(shù)據(jù)會發(fā)生panic。
- 不能重復(fù)關(guān)閉管道,不能關(guān)閉值為nil的管道,否則都會panic。
package main import ( "fmt" "time" ) var cloch = make(chan int, 1) var cloch2 = make(chan int, 1) func traverseChannel() { for ele := range cloch { fmt.Printf("receive %d\n", ele) } fmt.Println() } func traverseChannel2() { for { if ele, ok := <-cloch2; ok { //ok==true代表管道還沒有close fmt.Printf("receive %d\n", ele) } else { //管道關(guān)閉后,讀操作會立即返回“0值” fmt.Printf("channel have been closed, receive %d\n", ele) break } } } func main() { cloch <- 1 close(cloch) traverseChannel() //如果不close就直接通過range遍歷管道,會發(fā)生fatal error: all goroutines are asleep - deadlock! fmt.Println("==================") go traverseChannel2() cloch2 <- 1 close(cloch2) time.Sleep(10 * time.Millisecond) }
channel在并發(fā)編程中有多種玩法,經(jīng)常用channel來實現(xiàn)協(xié)程間的同步。
package main import ( "fmt" "time" ) func upstream(ch chan struct{}) { time.Sleep(15 * time.Millisecond) fmt.Println("一個上游協(xié)程執(zhí)行結(jié)束") ch <- struct{}{} } func downstream(ch chan struct{}) { <-ch fmt.Println("下游協(xié)程開始執(zhí)行") } func main() { upstreamNum := 4 //上游協(xié)程的數(shù)量 downstreamNum := 5 //下游協(xié)程的數(shù)量 upstreamCh := make(chan struct{}, upstreamNum) downstreamCh := make(chan struct{}, downstreamNum) //啟動上游協(xié)程和下游協(xié)程,實際下游協(xié)程會先阻塞 for i := 0; i < upstreamNum; i++ { go upstream(upstreamCh) } for i := 0; i < downstreamNum; i++ { go downstream(downstreamCh) } //同步點 for i := 0; i < upstreamNum; i++ { <-upstreamCh } //通過管道讓下游協(xié)程開始執(zhí)行 for i := 0; i < downstreamNum; i++ { downstreamCh <- struct{}{} } time.Sleep(10 * time.Millisecond) //等下游協(xié)程執(zhí)行結(jié)束 }
通道總結(jié)
channel常見的異常總結(jié),如下圖:
注意:關(guān)閉已經(jīng)關(guān)閉的channel也會引發(fā)panic。
到此這篇關(guān)于Go channel實現(xiàn)原理分析的文章就介紹到這了,更多相關(guān)Go channel內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang限流器time/rate設(shè)計與實現(xiàn)詳解
在?Golang?庫中官方給我們提供了限流器的實現(xiàn)golang.org/x/time/rate,它是基于令牌桶算法(Token?Bucket)設(shè)計實現(xiàn)的,下面我們就來看看他的具體使用吧2024-03-03go 下載非標(biāo)準(zhǔn)庫包(部份包被墻了)到本地使用的方法
今天小編就為大家分享一篇go 下載非標(biāo)準(zhǔn)庫包(部份包被墻了)到本地使用的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-06-06go開源Hugo站點構(gòu)建三步曲之集結(jié)渲染
這篇文章主要為大家介紹了go開源Hugo站點構(gòu)建三步曲之集結(jié)渲染詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-02-02深入解析Go語言的io.ioutil標(biāo)準(zhǔn)庫使用
這篇文章主要介紹了Go語言的io.ioutil標(biāo)準(zhǔn)庫使用,是Golang入門學(xué)習(xí)中的基礎(chǔ)知識,需要的朋友可以參考下2015-10-10golang socket斷點續(xù)傳大文件的實現(xiàn)方法
今天小編就為大家分享一篇golang socket斷點續(xù)傳大文件的實現(xiàn)方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-07-07