再次探討go實(shí)現(xiàn)無(wú)限 buffer 的 channel方法
前言
總所周知,go 里面只有兩種 channel,一種是 unbuffered channel, 其聲明方式為
ch := make(chan interface{})
另一種是 buffered channel,其聲明方式為
bufferSize := 5
ch := make(chan interface{},bufferSize)
對(duì)于一個(gè) buffered channel,無(wú)論它的 buffer 有多大,它終究是有極限的。這個(gè)極限就是該 channel 最初被 make 時(shí),所指定的 bufferSize 。
jojo,buffer channel 的大小是有極限的,我不做 channel 了。
一旦 channel 滿了的話,再往里面添加元素的話,將會(huì)阻塞。
so how can we make a infinite buffer channel?
本文參考了 medinum 上面的一篇文章,有興趣的同學(xué)可以直接閱讀原文。
實(shí)現(xiàn)
接口的設(shè)計(jì)
首先當(dāng)然是建一個(gè) struct,在百度翻譯的幫助下,我們將這個(gè) struct 取名為 InfiniteChannel
type InfiniteChannel struct {
}
思考一下 channel 的核心行為,實(shí)際上就兩個(gè),一個(gè)流入(Fan in),一個(gè)流出(Fan out),因此我們添加如下幾個(gè) method。
func (c *InfiniteChannel) In(val interface{}) {
// todo
}
func (c *InfiniteChannel) Out() interface{} {
// todo
}
內(nèi)部實(shí)現(xiàn)
通過(guò) In() 接收的數(shù)據(jù),總得需要一個(gè)地方來(lái)存放。我們可以用一個(gè) slice 來(lái)存放,就算用 In() 往里面添加了很多元素,也可以通過(guò) append() 來(lái)拓展 slice,slice 的容量可以無(wú)限拓展下去(內(nèi)存足夠的話),所以 channel 也是 infinite 。 InfiniteChannel 的第一個(gè)成員就這么敲定下來(lái)的。
type InfiniteChannel struct {
data []interface{}
}
用戶調(diào)用 In() 和 Out() 時(shí),可能是并發(fā)的環(huán)境,在 go 中如何進(jìn)行并發(fā)編程,最容易想到的肯定是 channel 了,因此我們?cè)趦?nèi)部準(zhǔn)備兩個(gè) channel,一個(gè) inChan,一個(gè) outChan,用 inChan 來(lái)接收數(shù)據(jù),用 outChan 來(lái)流出數(shù)據(jù)。
type InfiniteChannel struct {
inChan chan interface{}
outChan chan interface{}
data []interface{}
}
func (c *InfiniteChannel) In(val interface{}) {
c.inChan <- val
}
func (c *InfiniteChannel) Out() interface{} {
return <-c.outChan
}
其中, inChan 和 outChan 都是 unbuffered channel。
此外,也肯定是需要一個(gè) select 來(lái)處理來(lái)自 inChan 和 outChan 身上的事件。因此我們另起一個(gè)協(xié)程,在里面做 select 操作。
func (c *InfiniteChannel) background() {
for true {
select {
case newVal := <-c.inChan:
c.data = append(c.data, newVal)
case c.outChan <- c.pop(): // pop() 將取出隊(duì)列的首個(gè)元素
}
}
}
func NewInfiniteChannel() *InfiniteChannel {
c := &InfiniteChannel{
inChan: make(chan interface{}),
outChan: make(chan interface{}),
}
go c.background() // 注意這里另起了一個(gè)協(xié)程
return c
}
ps:感覺(jué)這也算是 go 并發(fā)編程的一個(gè)套路了。即
- 在 new struct 的時(shí)候,順手 go 一個(gè) select 協(xié)程,select 協(xié)程內(nèi)執(zhí)行一個(gè) for 循環(huán),不停的 select,監(jiān)聽一個(gè)或者多個(gè) channel 的事件。
- struct 對(duì)外提供的 method,只會(huì)操作 struct 內(nèi)的 channel(在本例中就是 inChan 和 outChan),不會(huì)操作 struct 內(nèi)的其他數(shù)據(jù)(在本例中,In() 和 Out() 都沒(méi)有直接操作 data)。
- 觸發(fā) channel 的事件后,由 select 協(xié)程進(jìn)行數(shù)據(jù)的更新(在本例中就是 data )。因?yàn)橹挥?select 協(xié)程對(duì)除 channel 外的數(shù)據(jù)成員進(jìn)行讀寫操作,且 go 保證了對(duì)于 channel 的并發(fā)讀寫是安全的,所以代碼是并發(fā)安全的。
- 如果 struct 是 exported ,用戶或許會(huì)越過(guò) new ,直接手動(dòng) make 一個(gè) struct,可以考慮將 struct 設(shè)置為 unexported,把它的首字母小寫即可。
pop() 的實(shí)現(xiàn)也非常簡(jiǎn)單。
// 取出隊(duì)列的首個(gè)元素,如果隊(duì)列為空,將會(huì)返回一個(gè) nil
func (c *InfiniteChannel) pop() interface{} {
if len(c.data) == 0 {
return nil
}
val := c.data[0]
c.data = c.data[1:]
return val
}
測(cè)試一下
用一個(gè)協(xié)程每秒鐘生產(chǎn)一條數(shù)據(jù),另一個(gè)協(xié)程每半秒消費(fèi)一條數(shù)據(jù),并打印。
func main() {
c := NewInfiniteChannel()
go func() {
for i := 0; i < 20; i++ {
c.In(i)
time.Sleep(time.Second)
}
}()
for i := 0; i < 50; i++ {
val := c.Out()
fmt.Print(val)
time.Sleep(time.Millisecond * 500)
}
}
// out <nil>0<nil>1<nil>23<nil>4<nil><nil>5<nil>67<nil><nil>89<nil><nil>1011<nil>12<nil>13<nil>14<nil>15<nil>16<nil>17<nil><nil>1819<nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil> Process finished with the exit code 0
可以看到,將 InfiniteChannel 內(nèi)沒(méi)有數(shù)據(jù)可供消費(fèi)時(shí),調(diào)用 Out() 將會(huì)返回一個(gè) nil,不過(guò)這也在我們的意料之中,原因是 pop() 在隊(duì)列為空時(shí),將會(huì)返回 nil。
目前 InfiniteChannel 的行為與標(biāo)準(zhǔn)的 channel 的行為是有出入的,go 中的 channel,在沒(méi)有數(shù)據(jù)卻仍要取數(shù)據(jù)時(shí)會(huì)被阻塞,如何實(shí)現(xiàn)這個(gè)效果?
優(yōu)化
我認(rèn)為此處是是整篇文章最有技巧的地方,我第一次看到時(shí)忍不住拍案叫絕。
首先把原來(lái)的 background() 摘出來(lái)
func (c *InfiniteChannel) background() {
for true {
select {
case newVal := <-c.inChan:
c.data = append(c.data, newVal)
case c.outChan <- c.pop():
}
}
}
對(duì) outChan 進(jìn)行一個(gè)簡(jiǎn)單封裝
func (c *InfiniteChannel) background() {
for true {
select {
case newVal := <-c.inChan:
c.data = append(c.data, newVal)
case c.outChanWrapper() <- c.pop():
}
}
}
func (c *InfiniteChannel) outChanWrapper() chan interface{} {
return c.outChan
}
目前為止,一切照舊。
點(diǎn)睛之筆來(lái)了:
func (c *InfiniteChannel) outChanWrapper() chan interface{} {
if len(c.data) == 0 {
return nil
}
return c.outChan
}
在 c.data 為空的時(shí)候,返回一個(gè) nil
在 background() 中,當(dāng)執(zhí)行到 case c.outChan <- c.pop(): 時(shí),實(shí)際上將會(huì)變成:
case nil <- nil:
在 go 中,是無(wú)法往一個(gè) nil 的 channel 中發(fā)送元素的。例如
func main() {
var c chan interface{}
select {
case c <- 1:
}
}
// fatal error: all goroutines are asleep - deadlock!
func main() {
var c chan interface{}
select {
case c <- 1:
default:
fmt.Println("hello world")
}
}
// hello world
因此,對(duì)于
select {
case newVal := <-c.inChan:
c.data = append(c.data, newVal)
case c.outChanWrapper() <- c.pop():
}
將會(huì)一直阻塞在 select 那里,直到 inChan 來(lái)了數(shù)據(jù)。
再測(cè)試一下
012345678910111213141516171819fatal error: all goroutines are asleep - deadlock!
最后,程序 panic 了,因?yàn)樗梨i了。
補(bǔ)充
實(shí)際上 channel 除了 In() 和 Out() 外,還有一個(gè)行為,即 close(),如果 channel close 后,依舊從其中取元素的話,將會(huì)取出該類型的默認(rèn)值。
func main() {
c := make(chan interface{})
close(c)
for true {
v := <-c
fmt.Println(v)
time.Sleep(time.Second)
}
}
// output
// <nil>
// <nil>
// <nil>
// <nil>
func main() {
c := make(chan interface{})
close(c)
for true {
v, isOpen := <-c
fmt.Println(v, isOpen)
time.Sleep(time.Second)
}
}
// output
// <nil> false
// <nil> false
// <nil> false
// <nil> false
我們也需要實(shí)現(xiàn)相同的效果。
func (c *InfiniteChannel) Close() {
close(c.inChan)
}
func (c *InfiniteChannel) background() {
for true {
select {
case newVal, isOpen := <-c.inChan:
if isOpen {
c.data = append(c.data, newVal)
} else {
c.isOpen = false
}
case c.outChanWrapper() <- c.pop():
}
}
}
func NewInfiniteChannel() *InfiniteChannel {
c := &InfiniteChannel{
inChan: make(chan interface{}),
outChan: make(chan interface{}),
isOpen: true,
}
go c.background()
return c
}
func (c *InfiniteChannel) outChanWrapper() chan interface{} {
// 這里添加了對(duì) c.isOpen 的判斷
if c.isOpen && len(c.data) == 0 {
return nil
}
return c.outChan
}
再測(cè)試一下
func main() {
c := NewInfiniteChannel()
go func() {
for i := 0; i < 20; i++ {
c.In(i)
time.Sleep(time.Second)
}
c.Close() // 這里調(diào)用了 Close
}()
for i := 0; i < 50; i++ {
val := c.Out()
fmt.Print(val)
time.Sleep(time.Millisecond * 500)
}
}
// output 012345678910111213141516171819<nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil><nil> Process finished with the exit code 0
符合預(yù)期
遺憾
目前看上去已經(jīng)很完美了,但是和標(biāo)準(zhǔn)的 channel 相比,仍然有差距。因?yàn)闃?biāo)準(zhǔn)的 channel 是有這種用法的
v,isOpen := <- ch
可以通過(guò) isOpen 變量來(lái)獲取 channel 的開閉情況。
因此 InfiniteChannel 也應(yīng)該提供一個(gè)類似的 method
func (c *InfiniteChannel) OutAndIsOpen() (interface{}, bool) {
// todo
}
可惜的是,要想得知 InfiniteChannel 是否是 Open 的,就必定要訪問(wèn) InfiniteChannel 內(nèi)的 isOpen 成員。
type InfiniteChannel struct {
inChan chan interface{}
outChan chan interface{}
data []interface{}
isOpen bool
}
而 isOpen 并非 channel 類型,根據(jù)之前的套路,這種非 channel 類型的成員只應(yīng)該被 select 協(xié)程訪問(wèn)。一旦有多個(gè)協(xié)程訪問(wèn),就會(huì)出現(xiàn)并發(fā)問(wèn)題,除非加鎖。
我不能接受!所以干脆不提供這個(gè) method 了,嘿嘿。
完整代碼
func main() {
c := NewInfiniteChannel()
go func() {
for i := 0; i < 20; i++ {
c.In(i)
time.Sleep(time.Second)
}
c.Close()
}()
for i := 0; i < 50; i++ {
val := c.Out()
fmt.Print(val)
time.Sleep(time.Millisecond * 500)
}
}
type InfiniteChannel struct {
inChan chan interface{}
outChan chan interface{}
data []interface{}
isOpen bool
}
func (c *InfiniteChannel) In(val interface{}) {
c.inChan <- val
}
func (c *InfiniteChannel) Out() interface{} {
return <-c.outChan
}
func (c *InfiniteChannel) Close() {
close(c.inChan)
}
func (c *InfiniteChannel) background() {
for true {
select {
case newVal, isOpen := <-c.inChan:
if isOpen {
c.data = append(c.data, newVal)
} else {
c.isOpen = false
}
case c.outChanWrapper() <- c.pop():
}
}
}
func NewInfiniteChannel() *InfiniteChannel {
c := &InfiniteChannel{
inChan: make(chan interface{}),
outChan: make(chan interface{}),
isOpen: true,
}
go c.background()
return c
}
// 取出隊(duì)列的首個(gè)元素,如果隊(duì)列為空,將會(huì)返回一個(gè) nil
func (c *InfiniteChannel) pop() interface{} {
if len(c.data) == 0 {
return nil
}
val := c.data[0]
c.data = c.data[1:]
return val
}
func (c *InfiniteChannel) outChanWrapper() chan interface{} {
if c.isOpen && len(c.data) == 0 {
return nil
}
return c.outChan
}
參考
https://medium.com/capital-one-tech/building-an-unbounded-channel-in-go-789e175cd2cd
以上就是再次探討go實(shí)現(xiàn)無(wú)限 buffer 的 channel方法的詳細(xì)內(nèi)容,更多關(guān)于go無(wú)限 buffer 的 channel的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Go語(yǔ)言for-range函數(shù)使用技巧實(shí)例探究
這篇文章主要為大家介紹了Go語(yǔ)言for-range函數(shù)使用技巧實(shí)例探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01
詳解Go語(yǔ)言中for循環(huán),break和continue的使用
這篇文章主要通過(guò)一些示例為大家介紹一下Go語(yǔ)言中for循環(huán)、break和continue的基本語(yǔ)法以及使用,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以了解一下2022-06-06
幾個(gè)小技巧幫你實(shí)現(xiàn)Golang永久阻塞
Go 的運(yùn)行時(shí)的當(dāng)前設(shè)計(jì),假定程序員自己負(fù)責(zé)檢測(cè)何時(shí)終止一個(gè) goroutine 以及何時(shí)終止該程序。有時(shí)候我們需要的是使程序阻塞在這一行,本文就來(lái)詳細(xì)的介紹一下,感興趣的可以了解一下2021-12-12
golang敏感詞過(guò)濾的實(shí)現(xiàn)
本文主要介紹了golang敏感詞過(guò)濾的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01
Go語(yǔ)言操作Excel利器之excelize類庫(kù)詳解
Excelize是Go語(yǔ)言編寫的用于操作Office Excel文檔基礎(chǔ)庫(kù),基于ECMA-376,ISO/IEC 29500國(guó)際標(biāo)準(zhǔn),可以使用它來(lái)讀取、寫入由Excel 2007及以上版本創(chuàng)建的電子表格文檔,下面這篇文章主要給大家介紹了關(guān)于Go語(yǔ)言操作Excel利器之excelize類庫(kù)的相關(guān)資料,需要的朋友可以參考下2022-10-10

