Go語(yǔ)言通過(guò)chan進(jìn)行數(shù)據(jù)傳遞的方法詳解
1、并發(fā)范式-管道
通道可以分為兩個(gè)方向,一個(gè)是讀,另一個(gè)是寫,假如一個(gè)函數(shù)的輸入?yún)?shù)和輸出參數(shù)都是相同的 chan 類型,則
該函數(shù)可以調(diào)用自己,最終形成一個(gè)調(diào)用鏈。當(dāng)然多個(gè)具有相同參數(shù)類型的函數(shù)也能組成一個(gè)調(diào)用鏈,這很像
UNIX系統(tǒng)的管道,是一個(gè)有類型的管道。下面通過(guò)具體的示例演示Go程序這種鏈?zhǔn)教幚砟芰Α?/p>
package main import ( "fmt" ) // chain函數(shù)輸入?yún)?shù)和輸入?yún)?shù)類型相同都是chan int類型 // chain函數(shù)功能是將chan內(nèi)的數(shù)據(jù)統(tǒng)一加1 func chain(in chan int) chan int { out := make(chan int) go func() { for v := range in { out <- 1 + v } close(out) }() return out } func main() { in := make(chan int) //初始化輸入?yún)?shù) go func() { for i := 0; i < 10; i++ { in <- i } close(in) }() //連續(xù)調(diào)用3次chan,想當(dāng)與把in中的每個(gè)元素都加3 out := chain(chain(chain(in))) for v := range out { fmt.Println(v) } }
程序輸出
3
4
5
6
7
8
9
10
11
12
2、一個(gè)線程寫數(shù)據(jù)一個(gè)線程讀數(shù)據(jù)
一個(gè)線程往管道里寫數(shù)據(jù)、另一個(gè)線程從管道里讀數(shù)據(jù)示例。
package main import ( "fmt" ) // writerData func writerData(intChan chan int) { for i := 1; i <= 10; i++ { //放入數(shù)據(jù) intChan <- i fmt.Printf("writeData寫到數(shù)據(jù)=%v\n", i) } // 關(guān)閉 close(intChan) } // readData func readData(intChan chan int, exitChan chan bool) { for { v, ok := <-intChan if !ok { break } fmt.Printf("readData讀到數(shù)據(jù)=%v\n", v) } // readData讀完數(shù)據(jù)后,即任務(wù)完成 exitChan <- true close(exitChan) } func main() { // 創(chuàng)建兩個(gè)管道 intChan := make(chan int, 10) // 判斷子進(jìn)程是否結(jié)束 exitChan := make(chan bool, 1) go writerData(intChan) go readData(intChan, exitChan) // 注意主線程退出,兩個(gè)線程直接退出 for { // 等待讀取 _, ok := <-exitChan // 沒(méi)讀到就退出 if !ok { break } } }
程序輸出
writeData寫到數(shù)據(jù)=1
writeData寫到數(shù)據(jù)=2
writeData寫到數(shù)據(jù)=3
writeData寫到數(shù)據(jù)=4
writeData寫到數(shù)據(jù)=5
writeData寫到數(shù)據(jù)=6
writeData寫到數(shù)據(jù)=7
writeData寫到數(shù)據(jù)=8
writeData寫到數(shù)據(jù)=9
writeData寫到數(shù)據(jù)=10
readData讀到數(shù)據(jù)=1
readData讀到數(shù)據(jù)=2
readData讀到數(shù)據(jù)=3
readData讀到數(shù)據(jù)=4
readData讀到數(shù)據(jù)=5
readData讀到數(shù)據(jù)=6
readData讀到數(shù)據(jù)=7
readData讀到數(shù)據(jù)=8
readData讀到數(shù)據(jù)=9
readData讀到數(shù)據(jù)=10
3、多線程讀寫數(shù)據(jù)
求每個(gè)數(shù)字的累加和:
package main import ( "fmt" ) // 寫入2000個(gè)數(shù)據(jù)到numChan中 func inputNum(numChan chan int) { for i := 1; i <= 2000; i++ { numChan <- i } // 寫完數(shù)據(jù)關(guān)閉channel close(numChan) } // 計(jì)算每個(gè)數(shù)字的累加 // 讀取數(shù)據(jù)并且存入到resChan中,exitChan做協(xié)程標(biāo)記 func getNum(numChan chan int, resChan chan int, exitChan chan bool) { for { res := 0 n, ok := <-numChan // 值被取完 if !ok { break } for i := 1; i <= n; i++ { res += i } // 存入到resChan resChan <- res } // 標(biāo)記退出 exitChan <- true } func main() { // 創(chuàng)建三個(gè)管道分別為讀、寫、退出標(biāo)記 numChan := make(chan int, 2000) resChan := make(chan int, 2000) exitChan := make(chan bool, 8) // 啟動(dòng)多線程 go inputNum(numChan) for i := 1; i <= 8; i++ { go getNum(numChan, resChan, exitChan) } // 再啟動(dòng)一個(gè)線程取出exitChan go func() { for i := 0; i < 8; i++ { // 從exitChan管道取出即可 <-exitChan } // 全部取出說(shuō)明讀取numChan數(shù)據(jù)完畢,關(guān)閉resChan close(resChan) }() // 讀取resChan數(shù)據(jù) for v := range resChan { fmt.Println(v) } }
程序輸出
1
3
6
10
15
21
28
......
多線程求素?cái)?shù):
package main import ( "fmt" "math" ) // 放入數(shù)據(jù) func putNum(intChan chan int) { for i := 2; i < 8000; i++ { intChan <- i } close(intChan) } // 判斷素?cái)?shù)并放入到primeChan中 func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) { for { // 從管道取出并判斷是不是素?cái)?shù) num, ok := <-intChan flag := true if !ok { // 取出失敗 break } for i := 2; i < int(math.Sqrt(float64(num))); i++ { // 不是素?cái)?shù),退出 if num%i == 0 { flag = false break } } if flag { // 是素?cái)?shù),加入到管道中 primeChan <- num } } // 標(biāo)記管道退出 exitChan <- true } func main() { intChan := make(chan int, 1000) // 放入結(jié)果 primeChan := make(chan int, 1000) // 標(biāo)記退出 exitChan := make(chan bool, 4) // 開啟協(xié)程,放入1-8000 go putNum(intChan) // 開啟四個(gè)協(xié)程,并判斷是否為素?cái)?shù) for i := 0; i < 4; i++ { go primeNum(intChan, primeChan, exitChan) } go func() { for i := 0; i < 4; i++ { // 只從管道里把內(nèi)容取出來(lái) <-exitChan } close(primeChan) }() for v := range primeChan { fmt.Println("素?cái)?shù)有", v) } }
程序輸出
素?cái)?shù)有 2
素?cái)?shù)有 3
素?cái)?shù)有 4
素?cái)?shù)有 5
.....
4、并發(fā)范式-生成器
本節(jié)通過(guò)具體的程序示例來(lái)演示Go語(yǔ)言強(qiáng)大的并發(fā)處理能力,每個(gè)示例代表一個(gè)并發(fā)處理范式,這些范式具有典
型的特征,在真實(shí)的程序中稍加改造就能使用。
在應(yīng)用系統(tǒng)編程中,常見的應(yīng)用場(chǎng)景就是調(diào)用一個(gè)統(tǒng)一的全局的生成器服務(wù),用于生成全局事務(wù)號(hào)、訂單號(hào)、序列
號(hào)和隨機(jī)數(shù)等。Go對(duì)這種場(chǎng)景的支持非常簡(jiǎn)單,下面以一個(gè)隨機(jī)數(shù)生成器為例來(lái)說(shuō)明。
4.1 最簡(jiǎn)單的帶緩沖的生成器
package main import ( "fmt" "math/rand" ) func GenerateIntA() chan int { ch := make(chan int, 5) // 啟動(dòng)一個(gè)goroutine用于生成隨機(jī)數(shù),函數(shù)返回一個(gè)通道用于獲取隨機(jī)數(shù) go func() { for { ch <- rand.Int() } }() return ch } func main() { // 啟動(dòng)生成器 ch := GenerateIntA() fmt.Println(<-ch) fmt.Println(<-ch) }
程序輸出
5577006791947779410
8674665223082153551
4.2 多個(gè)goroutine增強(qiáng)型生成器
package main import ( "fmt" "math/rand" ) func GenerateIntA() chan int { ch := make(chan int, 5) // 啟動(dòng)一個(gè)goroutine用于生成隨機(jī)數(shù),函數(shù)返回一個(gè)通道用于獲取隨機(jī)數(shù) go func() { for { ch <- rand.Int() } }() return ch } func GenerateIntB() chan int { ch := make(chan int, 5) // 啟動(dòng)一個(gè)goroutine用于生成隨機(jī)數(shù),函數(shù)返回一個(gè)通道用于獲取隨機(jī)數(shù) go func() { for { ch <- rand.Int() } }() return ch } func GenerateInt() chan int { ch := make(chan int, 10) go func() { for { // 使用select的扇入技術(shù)增加生成的隨機(jī)源 select { case ch <- <-GenerateIntA(): case ch <- <-GenerateIntB(): } } }() return ch } func main() { // 啟動(dòng)生成器 ch := GenerateInt() // 獲取生成器資源 for i := 0; i < 10; i++ { fmt.Println(<-ch) } }
程序輸出
5577006791947779410
2933568871211445515
7981306761429961588
3337066551442961397
2050257992909156333
837825985403119657
8267293389953062911
7660323324116104765
273669266008440571
2282476590775666788
4.3 自動(dòng)退出
有時(shí)希望生成器能夠自動(dòng)退出,可以借助Go通道的退出通知機(jī)制(close channel to broadcast)實(shí)現(xiàn)。
package main import ( "fmt" "math/rand" ) // done接收通知推出信號(hào) func GenerateIntA(done chan struct{}) chan int { ch := make(chan int) go func() { Lable: for { select { case ch <- rand.Int(): case <-done: break Lable } } close(ch) }() return ch } func main() { // 創(chuàng)建一個(gè)作為接收推出信號(hào)的chan done := make(chan struct{}) // 啟動(dòng)生成器 ch := GenerateIntA(done) fmt.Println(<-ch) fmt.Println(<-ch) // 不在需要生成器,通過(guò)close chan發(fā)送一個(gè)通知給生成器 close(done) // 獲取生成器資源 for v := range ch { fmt.Println(v) } }
程序結(jié)果
5577006791947779410
8674665223082153551
4.4 多重特性的生成器
一個(gè)融合了并發(fā)、緩沖、退出通知等多重特性的生成器。
package main import ( "fmt" "math/rand" ) //done 接收通知推出信號(hào) func GenerateIntA(done chan struct{}) chan int { ch := make(chan int, 5) go func() { Lable: for { select { case ch <- rand.Int(): case <-done: break Lable } } close(ch) }() return ch } //done 接收通知推出信號(hào) func GenerateIntB(done chan struct{}) chan int { ch := make(chan int, 10) go func() { Lable: for { select { case ch <- rand.Int(): case <-done: break Lable } } close(ch) }() return ch } // 通過(guò)select做了扇入(Fan In) func GenerateInt(done chan struct{}) chan int { ch := make(chan int) send := make(chan struct{}) go func() { Lable: for { select { case ch <- <-GenerateIntA(send): case ch <- <-GenerateIntB(send): case <-done: send <- struct{}{} send <- struct{}{} break Lable } } close(ch) }() return ch } func main() { //創(chuàng)建一個(gè)作為接收推出信號(hào)的chan done := make(chan struct{}) //啟動(dòng)生成器 ch := GenerateInt(done) //獲取生成器資源 for i := 0; i < 10; i++ { fmt.Println(<-ch) } //通知生產(chǎn)者停止生產(chǎn) done <- struct{}{} fmt.Println("stop gernarate") }
程序輸出
5577006791947779410
2015796113853353331
4893789450120281907
1687184559264975024
9093919513921919021
2202916659517317514
26222426471854123
8999011805617471788
4534277910591376951
6607332037155172840
stop gernarate
5、固定worker工作池
服務(wù)器編程中使用最多的就是通過(guò)線程池來(lái)提升服務(wù)的并發(fā)處理能力。在Go語(yǔ)言編程中,一樣可以輕松地構(gòu)建固
定數(shù)目的 goroutines 作為工作線程池。下面還是以計(jì)算多個(gè)整數(shù)的和為例來(lái)說(shuō)明這種并發(fā)范式。
程序中除了主要的 main goroutine ,還開啟了如下幾類 goroutine:
(1)、初始化任務(wù)的 goroutine。
(2)、分發(fā)任務(wù)的 goroutine。
(3)、等待所有 worker 結(jié)束通知,然后關(guān)閉結(jié)果通道的 goroutine。
main 函數(shù)負(fù)責(zé)拉起上述 goroutine,并從結(jié)果通道獲取最終的結(jié)果。
程序采用三個(gè)通道,分別是:
(1)、傳遞 task 任務(wù)的通道。
(2)、傳遞 task 結(jié)果的通道。
(3)、接收 worker 處理完任務(wù)后所發(fā)送通知的通道。
package main import ( "fmt" ) // 工作池的goroutine數(shù)目 const ( NUMBER = 10 ) // 工作任務(wù) type task struct { begin int end int result chan<- int } // 初始化待處理task chan func InitTask(taskChan chan<- task, r chan int, p int) { qu := p / 10 mod := p % 10 high := qu * 10 for j := 0; j < qu; j++ { b := 10*j + 1 e := 10 * (j + 1) // 1-10 // 11-20 // ...... tsk := task{ begin: b, end: e, result: r, } taskChan <- tsk } if mod != 0 { tsk := task{ begin: high + 1, end: p, result: r, } taskChan <- tsk } close(taskChan) } //讀取task chan分發(fā)到worker goroutine處理,workers的總的數(shù)量是workers func DistributeTask(taskChan <-chan task, workers int, done chan struct{}) { for i := 0; i < workers; i++ { go ProcessTask(taskChan, done) } } //工作goroutine處理具體工作,并將處理結(jié)構(gòu)發(fā)送到結(jié)果chan func ProcessTask(taskChan <-chan task, done chan struct{}) { for t := range taskChan { t.do() } done <- struct{}{} } // 任務(wù)處理:計(jì)算begin到end的和 // 執(zhí)行結(jié)果寫入到結(jié)果chan result中 func (t *task) do() { sum := 0 for i := t.begin; i <= t.end; i++ { sum += i } t.result <- sum } // 通過(guò)done channel來(lái)同步等待所有工作goroutine的結(jié)束,然后關(guān)閉結(jié)果chan func CloseResult(done chan struct{}, resultChan chan int, workers int) { for i := 0; i < workers; i++ { <-done } close(done) close(resultChan) } // 讀取結(jié)果通道,匯總結(jié)果 func ProcessResult(resultChan chan int) int { sum := 0 for r := range resultChan { sum += r } return sum } func main() { // 多線程數(shù)目 workers := NUMBER // 工作通道 taskChan := make(chan task, 10) // 結(jié)果通道 resultChan := make(chan int, 10) // worker信號(hào)通道 done := make(chan struct{}, 10) // 初始化task的goroutine,計(jì)算1000個(gè)自然數(shù)之和 go InitTask(taskChan, resultChan, 1000) //分發(fā)任務(wù)在NUMBER個(gè)goroutine池 DistributeTask(taskChan, workers, done) //獲取各個(gè)goroutine處理完任務(wù)的通知,并關(guān)閉結(jié)果通道 go CloseResult(done, resultChan, workers) //通過(guò)結(jié)果通道處理結(jié)果 sum := ProcessResult(resultChan) fmt.Println("sum=", sum) }
程序輸出
sum= 500500
程序的邏輯分析:
(1)、構(gòu)建 task 并發(fā)送到 task 通道中。
(2)、分別啟動(dòng) n 個(gè)工作線程,不停地從 task 通道中獲取任務(wù),然后將結(jié)果寫入結(jié)果通道。如果任務(wù)通道被關(guān)閉,
則負(fù)責(zé)向收斂結(jié)果的 goroutine 發(fā)送通知,告訴其當(dāng)前 worker 已經(jīng)完成工作。
(3)、收斂結(jié)果的 goroutine 接收到所有 task 已經(jīng)處理完畢的信號(hào)后,主動(dòng)關(guān)閉結(jié)果通道。
(4)、 main 中的函數(shù) ProcessResult 讀取并統(tǒng)計(jì)所有的結(jié)果。
6、使用 WaitGroup實(shí)現(xiàn)固工作池
package main import ( "fmt" "sync" ) // 工作任務(wù) type task struct { begin int end int result chan<- int } // 構(gòu)建task并寫入task通道 func InitTask(taskChan chan<- task, r chan int, p int) { qu := p / 10 mod := p % 10 high := qu * 10 for j := 0; j < qu; j++ { b := 10*j + 1 e := 10 * (j + 1) tsk := task{ begin: b, end: e, result: r, } taskChan <- tsk } if mod != 0 { tsk := task{ begin: high + 1, end: p, result: r, } taskChan <- tsk } close(taskChan) } //讀取task chan,每個(gè)task啟動(dòng)一個(gè)worker goroutine進(jìn)行處理,并等待每個(gè)task運(yùn)行完,關(guān)閉結(jié)果通道 func DistributeTask(taskChan <-chan task, wait *sync.WaitGroup, result chan int) { for v := range taskChan { wait.Add(1) go ProcessTask(v, wait) } wait.Wait() close(result) } // goroutine處理具體工作,并將結(jié)果發(fā)送到結(jié)果通道 func ProcessTask(t task, wait *sync.WaitGroup) { t.do() wait.Done() } //任務(wù)執(zhí)行: 計(jì)算begin到end的和,執(zhí)行結(jié)果寫入結(jié)果chan result func (t *task) do() { sum := 0 for i := t.begin; i <= t.end; i++ { sum += i } t.result <- sum } //讀取結(jié)果通道,匯總結(jié)果 func ProcessResult(resultChan chan int) int { sum := 0 for r := range resultChan { sum += r } return sum } func main() { // 創(chuàng)建任務(wù)通道 taskChan := make(chan task, 10) // 創(chuàng)建結(jié)果通道 resultChan := make(chan int, 10) // wait用于同步等待任務(wù)的執(zhí)行 wait := &sync.WaitGroup{} // 初始化task的goroutine,計(jì)算100個(gè)自然數(shù)之和 go InitTask(taskChan, resultChan, 100) // 每個(gè)task啟動(dòng)一個(gè)goroutine進(jìn)行處理 go DistributeTask(taskChan, wait, resultChan) //通過(guò)結(jié)果通道獲取結(jié)果并匯總 sum := ProcessResult(resultChan) fmt.Println("sum=", sum) }
程序輸出
sum= 5050
程序的邏輯分析:
(1)、InitTask 函數(shù)構(gòu)建 task 并發(fā)送到 task 通道中。
(2)、分發(fā)任務(wù)函數(shù) DistributeTask 為每個(gè) task 啟動(dòng)一個(gè) goroutine 處理任務(wù),等待其處理完成,然后關(guān)閉結(jié)果通道。
(3)、ProcessResult 函數(shù)讀取并統(tǒng)計(jì)所有的結(jié)果。
這幾個(gè)函數(shù)分別在不同的 goroutine 中運(yùn)行,它們通過(guò)通道和 sync.WaitGroup 進(jìn)行通信和同步。
以上就是Go語(yǔ)言通過(guò)chan進(jìn)行數(shù)據(jù)傳遞的方法詳解的詳細(xì)內(nèi)容,更多關(guān)于Go chan數(shù)據(jù)傳遞的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Golang實(shí)現(xiàn)IP地址轉(zhuǎn)整數(shù)的方法詳解
在 Go 語(yǔ)言中,將 IP 地址轉(zhuǎn)換為整數(shù)涉及到解析 IP 地址并處理其字節(jié)表示,本文給大家介紹了Golang實(shí)現(xiàn)IP地址轉(zhuǎn)整數(shù)的方法,文中有詳細(xì)的代碼示例供大家參考,需要的朋友可以參考下2024-02-02Golang實(shí)現(xiàn)短網(wǎng)址/短鏈服務(wù)的開發(fā)筆記分享
這篇文章主要為大家詳細(xì)介紹了如何使用Golang實(shí)現(xiàn)短網(wǎng)址/短鏈服務(wù),文中的示例代碼講解詳細(xì),具有一定的學(xué)習(xí)價(jià)值,感興趣的小伙伴可以了解一下2023-05-05如何使用Go語(yǔ)言實(shí)現(xiàn)基于泛型的Jaccard相似度算法
這篇文章主要介紹了如何使用Go語(yǔ)言實(shí)現(xiàn)基于泛型的Jaccard相似度算法,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2024-08-08golang中snappy的使用場(chǎng)合實(shí)例詳解
在java 和go語(yǔ)言 大字符傳達(dá)的時(shí)候, 采用snappy 壓縮 解壓縮是最好的方案。下面這篇文章主要給大家介紹了關(guān)于golang中snappy使用場(chǎng)合的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),需要的朋友可以參考下。2017-12-12golang并發(fā)之使用sync.Pool優(yōu)化性能
在Go提供如何實(shí)現(xiàn)對(duì)象的緩存池功能,常用一種實(shí)現(xiàn)方式是sync.Pool,?其旨在緩存已分配但未使用的項(xiàng)目以供以后重用,從而減輕垃圾收集器(GC)的壓力,下面我們就來(lái)看看具體操作吧2023-10-10Go語(yǔ)言定時(shí)任務(wù)的實(shí)現(xiàn)示例
本文主要介紹了Go語(yǔ)言定時(shí)任務(wù)的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-05-05go本地環(huán)境配置及vscode go插件安裝的詳細(xì)教程
這篇文章主要介紹了go本地環(huán)境配置及vscode go插件安裝的詳細(xì)教程,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-05-05