Golang并發(fā)繞不開的重要組件之Channel詳解
在上一篇文章中有介紹Golang實現并發(fā)的重要關鍵字 go
,通過這個我們可以方便快速地啟動Goroutinue協程。協程之間一定會有通信的需求,而Golang的核心設計思想為:不通過共享內存的方式進行通信,而應該通過通信來共享內存。與其他通過共享內存來進行數據傳遞的編程語言略有差異,而實現這一方案的正是 Channel。
Channel是一個提供可接收和發(fā)送特定類型值的用于并發(fā)函數通信的數據類型,滿足FIFO(先進先出)原則的隊列類型。FIFO在數據類型與操作上都有體現:
- Channel類型的元素是先進先出的,先發(fā)送到Channel的元素會先被接收
- 先向channel發(fā)送數據的Goroutinue會優(yōu)先執(zhí)行
- 先從channel接收數據的Goroutinue會優(yōu)先執(zhí)行
Channel使用
語法
channel是Golang中的一種數據類型,相關語法也非常簡單
ChannelType = ( "chan" | "chan" "<-" | "<-" "chan" ) ElementType
chan為channel類型關鍵字
<- 操作符用于channel中數據的收發(fā),在聲明時用于表示channel數據流動的方向
- chan 默認為雙向傳遞,即channel既可以接收數據也可以發(fā)送數據
- chan<- 僅可以發(fā)送數據的channel
- <-chan 僅可以接受數據的channel
ElementType 代表元素類型,例如 int、string...
初始化
channel數據類型是一種引用類型,類似于map和slice,所以channel的初始化需要使用內建函數make():
make(ChannelType, Capacity) ch := make(chan int) var ch = make(chan int) ch := make(chan int, 10) ch := make(<-chan int) ch := make(chan<- int, 10)
- ChannelType就是前面介紹的類型
- Capacity代表緩沖容量。省略時就是為默認0,表示無緩沖的Channel
如果不使用make()函數來初始化channel,則不能執(zhí)行收發(fā)通信操作,并且會造成阻塞,進而造成Goroutinue泄露,示例:
func main() { defer func() { fmt.Println("goroutines: ", runtime.NumGoroutine()) }() var ch chan int go func() { <-ch }() time.Sleep(time.Second) }
代碼執(zhí)行結果為:
goroutines: 2
可以看到,直到程序退出,Goroutinue數量仍然為2,原因就是channel沒有正確的使用make()進行初始化,channel變量實際為nil,進而造成了內存泄露。
數據的接收與發(fā)送
channel中數據的接收與發(fā)送是通過操作符 <- 來進行操作的:
// 接收數據 ch <- Expression ch <- 111 ch <- struct{}{} // 發(fā)送數據 <- ch v := <- ch f(<-ch)
除了操作符 <- 外,我們還可以使用 for range
持續(xù)地從channel中接收數據:
for e := range ch { // e逐個讀取ch中的元素值 }
持續(xù)接收操作與 <- 沒有很大區(qū)別:
- 如果ch為nil channel就會阻塞
- 如果ch沒有發(fā)送元素也會阻塞
for 會持續(xù)讀取直到channel執(zhí)行關閉,關閉后for會將剩余元素全部讀取之后結束。那么對已經關閉的channel進行數據的收發(fā)會怎樣呢?
channel的關閉
channel使用過后要使用內置函數close()來關閉channel。關閉channel的意思是記錄該Channel不能再被發(fā)送任何元素了,而不是銷毀該Channel的意思。也就意味著關閉的Channel是可以繼續(xù)接收值的
- 如果向已經關閉的channel發(fā)送數據會引發(fā)panic
- 關閉 nil channel 會引發(fā)panic
- 關閉已經關閉的 channel 會引發(fā)panic
- 如果讀取已經關閉的channel值,可以接收關閉前發(fā)送的全部值;關閉前的值接收完會返回類型的零值和一個false,不會阻塞及panic
以上幾種情況可以自己編寫一個簡單的代碼來測試一下。
channel分類
前面有提到,在make一個channel時,第二個參數就代表了緩沖區(qū)大小,如果沒有第二個參數就為默認的無緩沖channel。具體用法:
- 緩沖Channel,make(chan T, cap),cap是大于0的值。
- 無緩沖Channel, make(chan T), make(chan T, 0)
無緩沖channel
無緩沖的channel也稱為同步Channel,只有當發(fā)送方和接收方都準備就緒時,通信才會成功。
同步操作示例:
func ChannelSync() { // 初始化數據 ch := make(chan int) wg := sync.WaitGroup{} // 間隔發(fā)送 wg.Add(1) go func() { defer wg.Done() for i := 0; i < 5; i++ { ch <- i println("Send ", i, ".\tNow:", time.Now().Format("15:04:05.999999999")) // 間隔時間 time.Sleep(1 * time.Second) } close(ch) }() // 間隔接收 wg.Add(1) go func() { defer wg.Done() for v := range ch { println("Received ", v, ".\tNow:", time.Now().Format("15:04:05.999999999")) // 間隔時間,注意與send的間隔時間不同 time.Sleep(3 * time.Second) } }() wg.Wait() }
執(zhí)行結果:
Send 0 . Now: 17:54:27.772773
Received 0 . Now: 17:54:27.772795
Received 1 . Now: 17:54:30.773878
Send 1 . Now: 17:54:30.773959
Received 2 . Now: 17:54:33.775132
Send 2 . Now: 17:54:33.775208
Received 3 . Now: 17:54:36.775816
Send 3 . Now: 17:54:36.775902
Received 4 . Now: 17:54:39.776408
Send 4 . Now: 17:54:39.776456
代碼中,采用同步channel,使用兩個goroutine完成發(fā)送和接收。每次發(fā)送和接收的時間間隔不同。我們分別打印發(fā)送和接收的值和時間。可以看到執(zhí)行結果:發(fā)送和接收時間一致;間隔以長的為準,可見發(fā)送和接收操作為同步操作。因此,同步Channel適合在gotoutine間用做同步的信號
緩沖Channel
緩沖Channel也稱為異步Channel,接收和發(fā)送方不用等待雙方就緒即可成功。緩沖Channel會存在一個容量為cap的緩沖空間。當使用緩沖Channel通信時,接收和發(fā)送操作是在操作Channel的Buffer,是典型的隊列操作:
- 接收時,從緩沖中接收元素,只要緩沖不為空,不會阻塞。反之,緩沖為空,會阻塞,goroutine掛起
- 發(fā)送時,向緩沖中發(fā)送元素,只要緩沖未滿,不會阻塞。反之,緩沖滿了,會阻塞,goroutine掛起
操作示例:
func main() { // 初始化數據 ch := make(chan int, 5) wg := sync.WaitGroup{} // 間隔發(fā)送 wg.Add(1) go func() { defer wg.Done() for i := 0; i < 5; i++ { ch <- i println("Send ", i, ".\tNow:", time.Now().Format("15:04:05.999999999")) // 間隔時間 time.Sleep(1 * time.Second) } }() // 間隔接收 wg.Add(1) go func() { defer wg.Done() for v := range ch { println("Received ", v, ".\tNow:", time.Now().Format("15:04:05.999999999")) // 間隔時間,注意與send的間隔時間不同 time.Sleep(3 * time.Second) } }() wg.Wait() }
執(zhí)行結果:
Send 0 . Now: 17:59:32.990698
Received 0 . Now: 17:59:32.99071
Send 1 . Now: 17:59:33.992127
Send 2 . Now: 17:59:34.992832
Received 1 . Now: 17:59:35.991488
Send 3 . Now: 17:59:35.993155
Send 4 . Now: 17:59:36.993445
Received 2 . Now: 17:59:38.991663
Received 3 . Now: 17:59:41.99184
Received 4 . Now: 17:59:44.992214
代碼中,與同步channel一致,只是采用了容量為5的緩沖channel,使用兩個goroutine完成發(fā)送和接收。每次發(fā)送和接收的時間間隔不同。我們分別打印發(fā)送和接收的值和時間。可以看到執(zhí)行結果:發(fā)送和接收時間不同;發(fā)送和接收操作不會阻塞,可見發(fā)送和接收操作為異步操作。因此,緩沖channel非常適合做goroutine之間的數據通信
Channel原理
源碼
在源碼包中的 runtime/chan.go 可以看到Channel實現源碼:
type hchan struct { qcount uint // 元素個數,通過len()獲取 dataqsiz uint // 緩沖隊列的長度,即容量,通過cap()獲取 buf unsafe.Pointer // 緩沖隊列指針,無緩沖隊列則為nil elemsize uint16 // 元素大小 closed uint32 // 關閉標志 elemtype \*\_type // 元素類型 sendx uint // 發(fā)送元素索引 recvx uint // 接收元素索引 recvq waitq // 接收Goroutinue隊列 sendq waitq // 發(fā)送Goroutinue隊列 // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex // 鎖 }
buf 可以理解為一個環(huán)形數組,用來緩存Channel中的元素。為何使用環(huán)形數組而不使用普通數組呢?因為普通數組更適合指定的空間,彈出元素時,普通數組需要全部都前移,而使用環(huán)形數組+下標索引的方式可以在不移動元素的情況下實現數據的高效讀寫。
sendx與recvx 當下標超過數組容量后會回到第一個位置,所以需要有兩個字段記錄當前讀和寫的下標位置。
recvq與sendq 用于記錄等待接收和發(fā)送的goroutine隊列,當基于某channel的接收或發(fā)送的goroutine無法理解執(zhí)行時,也就是需要阻塞時,會被記錄到Channel的等待隊列中。當channel可以完成相應的接收或發(fā)送操作時,從等待隊列中喚醒goroutine進行操作。
等待隊列實際是一個雙向鏈表結構
生命周期
創(chuàng)建策略
- 無緩沖的直接分配內存
- 有緩沖的不包含指針,為hchan和底層數組分配連續(xù)的地址
- 有緩沖的channel且包含元素指針,會為hchan和底層數組分配地址
發(fā)送策略
- 發(fā)送操作編譯時轉換為 runtime.chansend函數
- 阻塞式:block=true;非阻塞式:block=false
- 向channel中發(fā)送數據分為檢查和數據發(fā)送兩塊,數據發(fā)送:
- 如果channel的讀等待隊列存在接受者goroutinue
- 將數據直接發(fā)送給第一個等待的goroutinue,喚醒接收的goroutinue
- 如果channel讀等待隊列不存在接收者goroutinue
- 如果循環(huán)數組buf未滿,則將數據發(fā)送到循環(huán)數組buf的隊尾
- 如果循環(huán)數組buf已滿,這時就會走阻塞發(fā)送的流程,將當前goroutinue加入寫等待隊列,并掛起等待喚醒
- 如果channel的讀等待隊列存在接受者goroutinue
接收策略
- 接收操作編譯是轉換為 runtime.chanrecv 函數
- 阻塞式:block=true;非阻塞式:block=false
- 向channel中接收數據數據接收:
- 如果channel的寫等待隊列存在發(fā)送者goroutinue:
- 如果是無緩沖channel,直接從第一個發(fā)送者goroutinue將數據拷貝給接收變量,喚醒發(fā)送的goroutinue
- 如果是有緩沖channel(已滿),將循環(huán)數組buf的隊首元素拷貝給接收變量,將第一個發(fā)送者goroutinue的數據拷貝到buf循環(huán)數組,喚醒發(fā)送的goroutinue
- 如果channel的寫等待不存在發(fā)送者goroutinue
- 如果循環(huán)數組buf非空,將循環(huán)數組buf的隊首元素拷貝給接收變量
- 如果循環(huán)數組buf為空,這個時候就會走阻塞接收的流程,將當前 goroutine 加入讀等待隊列,并掛起等待喚醒
- 如果channel的寫等待隊列存在發(fā)送者goroutinue:
關閉
調用 runtime.closechan 函數
簡單的對Channel一些基礎用法及原理做了一個解釋,可以多寫一寫并發(fā)代碼以及閱讀源碼來加深對Channel的理解。
以上就是Golang并發(fā)繞不開的重要組件之Channel詳解的詳細內容,更多關于Golang Channel的資料請關注腳本之家其它相關文章!
相關文章
解決Goland 提示 Unresolved reference 錯誤的問題
這篇文章主要介紹了解決Goland 提示 Unresolved reference 錯誤的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12