Go語言通過chan進(jìn)行數(shù)據(jù)傳遞的方法詳解
1、并發(fā)范式-管道
通道可以分為兩個方向,一個是讀,另一個是寫,假如一個函數(shù)的輸入?yún)?shù)和輸出參數(shù)都是相同的 chan 類型,則
該函數(shù)可以調(diào)用自己,最終形成一個調(diào)用鏈。當(dāng)然多個具有相同參數(shù)類型的函數(shù)也能組成一個調(diào)用鏈,這很像
UNIX系統(tǒng)的管道,是一個有類型的管道。下面通過具體的示例演示Go程序這種鏈?zhǔn)教幚砟芰Α?/p>
package main
import (
"fmt"
)
// chain函數(shù)輸入?yún)?shù)和輸入?yún)?shù)類型相同都是chan int類型
// chain函數(shù)功能是將chan內(nèi)的數(shù)據(jù)統(tǒng)一加1
func chain(in chan int) chan int {
out := make(chan int)
go func() {
for v := range in {
out <- 1 + v
}
close(out)
}()
return out
}
func main() {
in := make(chan int)
//初始化輸入?yún)?shù)
go func() {
for i := 0; i < 10; i++ {
in <- i
}
close(in)
}()
//連續(xù)調(diào)用3次chan,想當(dāng)與把in中的每個元素都加3
out := chain(chain(chain(in)))
for v := range out {
fmt.Println(v)
}
}程序輸出
3
4
5
6
7
8
9
10
11
12
2、一個線程寫數(shù)據(jù)一個線程讀數(shù)據(jù)
一個線程往管道里寫數(shù)據(jù)、另一個線程從管道里讀數(shù)據(jù)示例。
package main
import (
"fmt"
)
// writerData
func writerData(intChan chan int) {
for i := 1; i <= 10; i++ {
//放入數(shù)據(jù)
intChan <- i
fmt.Printf("writeData寫到數(shù)據(jù)=%v\n", i)
}
// 關(guān)閉
close(intChan)
}
// readData
func readData(intChan chan int, exitChan chan bool) {
for {
v, ok := <-intChan
if !ok {
break
}
fmt.Printf("readData讀到數(shù)據(jù)=%v\n", v)
}
// readData讀完數(shù)據(jù)后,即任務(wù)完成
exitChan <- true
close(exitChan)
}
func main() {
// 創(chuàng)建兩個管道
intChan := make(chan int, 10)
// 判斷子進(jìn)程是否結(jié)束
exitChan := make(chan bool, 1)
go writerData(intChan)
go readData(intChan, exitChan)
// 注意主線程退出,兩個線程直接退出
for {
// 等待讀取
_, ok := <-exitChan
// 沒讀到就退出
if !ok {
break
}
}
}程序輸出
writeData寫到數(shù)據(jù)=1
writeData寫到數(shù)據(jù)=2
writeData寫到數(shù)據(jù)=3
writeData寫到數(shù)據(jù)=4
writeData寫到數(shù)據(jù)=5
writeData寫到數(shù)據(jù)=6
writeData寫到數(shù)據(jù)=7
writeData寫到數(shù)據(jù)=8
writeData寫到數(shù)據(jù)=9
writeData寫到數(shù)據(jù)=10
readData讀到數(shù)據(jù)=1
readData讀到數(shù)據(jù)=2
readData讀到數(shù)據(jù)=3
readData讀到數(shù)據(jù)=4
readData讀到數(shù)據(jù)=5
readData讀到數(shù)據(jù)=6
readData讀到數(shù)據(jù)=7
readData讀到數(shù)據(jù)=8
readData讀到數(shù)據(jù)=9
readData讀到數(shù)據(jù)=10
3、多線程讀寫數(shù)據(jù)
求每個數(shù)字的累加和:
package main
import (
"fmt"
)
// 寫入2000個數(shù)據(jù)到numChan中
func inputNum(numChan chan int) {
for i := 1; i <= 2000; i++ {
numChan <- i
}
// 寫完數(shù)據(jù)關(guān)閉channel
close(numChan)
}
// 計算每個數(shù)字的累加
// 讀取數(shù)據(jù)并且存入到resChan中,exitChan做協(xié)程標(biāo)記
func getNum(numChan chan int, resChan chan int, exitChan chan bool) {
for {
res := 0
n, ok := <-numChan
// 值被取完
if !ok {
break
}
for i := 1; i <= n; i++ {
res += i
}
// 存入到resChan
resChan <- res
}
// 標(biāo)記退出
exitChan <- true
}
func main() {
// 創(chuàng)建三個管道分別為讀、寫、退出標(biāo)記
numChan := make(chan int, 2000)
resChan := make(chan int, 2000)
exitChan := make(chan bool, 8)
// 啟動多線程
go inputNum(numChan)
for i := 1; i <= 8; i++ {
go getNum(numChan, resChan, exitChan)
}
// 再啟動一個線程取出exitChan
go func() {
for i := 0; i < 8; i++ {
// 從exitChan管道取出即可
<-exitChan
}
// 全部取出說明讀取numChan數(shù)據(jù)完畢,關(guān)閉resChan
close(resChan)
}()
// 讀取resChan數(shù)據(jù)
for v := range resChan {
fmt.Println(v)
}
}程序輸出
1
3
6
10
15
21
28
......
多線程求素數(shù):
package main
import (
"fmt"
"math"
)
// 放入數(shù)據(jù)
func putNum(intChan chan int) {
for i := 2; i < 8000; i++ {
intChan <- i
}
close(intChan)
}
// 判斷素數(shù)并放入到primeChan中
func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) {
for {
// 從管道取出并判斷是不是素數(shù)
num, ok := <-intChan
flag := true
if !ok {
// 取出失敗
break
}
for i := 2; i < int(math.Sqrt(float64(num))); i++ {
// 不是素數(shù),退出
if num%i == 0 {
flag = false
break
}
}
if flag {
// 是素數(shù),加入到管道中
primeChan <- num
}
}
// 標(biāo)記管道退出
exitChan <- true
}
func main() {
intChan := make(chan int, 1000)
// 放入結(jié)果
primeChan := make(chan int, 1000)
// 標(biāo)記退出
exitChan := make(chan bool, 4)
// 開啟協(xié)程,放入1-8000
go putNum(intChan)
// 開啟四個協(xié)程,并判斷是否為素數(shù)
for i := 0; i < 4; i++ {
go primeNum(intChan, primeChan, exitChan)
}
go func() {
for i := 0; i < 4; i++ {
// 只從管道里把內(nèi)容取出來
<-exitChan
}
close(primeChan)
}()
for v := range primeChan {
fmt.Println("素數(shù)有", v)
}
}程序輸出
素數(shù)有 2
素數(shù)有 3
素數(shù)有 4
素數(shù)有 5
.....
4、并發(fā)范式-生成器
本節(jié)通過具體的程序示例來演示Go語言強(qiáng)大的并發(fā)處理能力,每個示例代表一個并發(fā)處理范式,這些范式具有典
型的特征,在真實的程序中稍加改造就能使用。
在應(yīng)用系統(tǒng)編程中,常見的應(yīng)用場景就是調(diào)用一個統(tǒng)一的全局的生成器服務(wù),用于生成全局事務(wù)號、訂單號、序列
號和隨機(jī)數(shù)等。Go對這種場景的支持非常簡單,下面以一個隨機(jī)數(shù)生成器為例來說明。
4.1 最簡單的帶緩沖的生成器
package main
import (
"fmt"
"math/rand"
)
func GenerateIntA() chan int {
ch := make(chan int, 5)
// 啟動一個goroutine用于生成隨機(jī)數(shù),函數(shù)返回一個通道用于獲取隨機(jī)數(shù)
go func() {
for {
ch <- rand.Int()
}
}()
return ch
}
func main() {
// 啟動生成器
ch := GenerateIntA()
fmt.Println(<-ch)
fmt.Println(<-ch)
}程序輸出
5577006791947779410
8674665223082153551
4.2 多個goroutine增強(qiáng)型生成器
package main
import (
"fmt"
"math/rand"
)
func GenerateIntA() chan int {
ch := make(chan int, 5)
// 啟動一個goroutine用于生成隨機(jī)數(shù),函數(shù)返回一個通道用于獲取隨機(jī)數(shù)
go func() {
for {
ch <- rand.Int()
}
}()
return ch
}
func GenerateIntB() chan int {
ch := make(chan int, 5)
// 啟動一個goroutine用于生成隨機(jī)數(shù),函數(shù)返回一個通道用于獲取隨機(jī)數(shù)
go func() {
for {
ch <- rand.Int()
}
}()
return ch
}
func GenerateInt() chan int {
ch := make(chan int, 10)
go func() {
for {
// 使用select的扇入技術(shù)增加生成的隨機(jī)源
select {
case ch <- <-GenerateIntA():
case ch <- <-GenerateIntB():
}
}
}()
return ch
}
func main() {
// 啟動生成器
ch := GenerateInt()
// 獲取生成器資源
for i := 0; i < 10; i++ {
fmt.Println(<-ch)
}
}程序輸出
5577006791947779410
2933568871211445515
7981306761429961588
3337066551442961397
2050257992909156333
837825985403119657
8267293389953062911
7660323324116104765
273669266008440571
2282476590775666788
4.3 自動退出
有時希望生成器能夠自動退出,可以借助Go通道的退出通知機(jī)制(close channel to broadcast)實現(xiàn)。
package main
import (
"fmt"
"math/rand"
)
// done接收通知推出信號
func GenerateIntA(done chan struct{}) chan int {
ch := make(chan int)
go func() {
Lable:
for {
select {
case ch <- rand.Int():
case <-done:
break Lable
}
}
close(ch)
}()
return ch
}
func main() {
// 創(chuàng)建一個作為接收推出信號的chan
done := make(chan struct{})
// 啟動生成器
ch := GenerateIntA(done)
fmt.Println(<-ch)
fmt.Println(<-ch)
// 不在需要生成器,通過close chan發(fā)送一個通知給生成器
close(done)
// 獲取生成器資源
for v := range ch {
fmt.Println(v)
}
}程序結(jié)果
5577006791947779410
8674665223082153551
4.4 多重特性的生成器
一個融合了并發(fā)、緩沖、退出通知等多重特性的生成器。
package main
import (
"fmt"
"math/rand"
)
//done 接收通知推出信號
func GenerateIntA(done chan struct{}) chan int {
ch := make(chan int, 5)
go func() {
Lable:
for {
select {
case ch <- rand.Int():
case <-done:
break Lable
}
}
close(ch)
}()
return ch
}
//done 接收通知推出信號
func GenerateIntB(done chan struct{}) chan int {
ch := make(chan int, 10)
go func() {
Lable:
for {
select {
case ch <- rand.Int():
case <-done:
break Lable
}
}
close(ch)
}()
return ch
}
// 通過select做了扇入(Fan In)
func GenerateInt(done chan struct{}) chan int {
ch := make(chan int)
send := make(chan struct{})
go func() {
Lable:
for {
select {
case ch <- <-GenerateIntA(send):
case ch <- <-GenerateIntB(send):
case <-done:
send <- struct{}{}
send <- struct{}{}
break Lable
}
}
close(ch)
}()
return ch
}
func main() {
//創(chuàng)建一個作為接收推出信號的chan
done := make(chan struct{})
//啟動生成器
ch := GenerateInt(done)
//獲取生成器資源
for i := 0; i < 10; i++ {
fmt.Println(<-ch)
}
//通知生產(chǎn)者停止生產(chǎn)
done <- struct{}{}
fmt.Println("stop gernarate")
}程序輸出
5577006791947779410
2015796113853353331
4893789450120281907
1687184559264975024
9093919513921919021
2202916659517317514
26222426471854123
8999011805617471788
4534277910591376951
6607332037155172840
stop gernarate
5、固定worker工作池
服務(wù)器編程中使用最多的就是通過線程池來提升服務(wù)的并發(fā)處理能力。在Go語言編程中,一樣可以輕松地構(gòu)建固
定數(shù)目的 goroutines 作為工作線程池。下面還是以計算多個整數(shù)的和為例來說明這種并發(fā)范式。
程序中除了主要的 main goroutine ,還開啟了如下幾類 goroutine:
(1)、初始化任務(wù)的 goroutine。
(2)、分發(fā)任務(wù)的 goroutine。
(3)、等待所有 worker 結(jié)束通知,然后關(guān)閉結(jié)果通道的 goroutine。
main 函數(shù)負(fù)責(zé)拉起上述 goroutine,并從結(jié)果通道獲取最終的結(jié)果。
程序采用三個通道,分別是:
(1)、傳遞 task 任務(wù)的通道。
(2)、傳遞 task 結(jié)果的通道。
(3)、接收 worker 處理完任務(wù)后所發(fā)送通知的通道。
package main
import (
"fmt"
)
// 工作池的goroutine數(shù)目
const (
NUMBER = 10
)
// 工作任務(wù)
type task struct {
begin int
end int
result chan<- int
}
// 初始化待處理task chan
func InitTask(taskChan chan<- task, r chan int, p int) {
qu := p / 10
mod := p % 10
high := qu * 10
for j := 0; j < qu; j++ {
b := 10*j + 1
e := 10 * (j + 1)
// 1-10
// 11-20
// ......
tsk := task{
begin: b,
end: e,
result: r,
}
taskChan <- tsk
}
if mod != 0 {
tsk := task{
begin: high + 1,
end: p,
result: r,
}
taskChan <- tsk
}
close(taskChan)
}
//讀取task chan分發(fā)到worker goroutine處理,workers的總的數(shù)量是workers
func DistributeTask(taskChan <-chan task, workers int, done chan struct{}) {
for i := 0; i < workers; i++ {
go ProcessTask(taskChan, done)
}
}
//工作goroutine處理具體工作,并將處理結(jié)構(gòu)發(fā)送到結(jié)果chan
func ProcessTask(taskChan <-chan task, done chan struct{}) {
for t := range taskChan {
t.do()
}
done <- struct{}{}
}
// 任務(wù)處理:計算begin到end的和
// 執(zhí)行結(jié)果寫入到結(jié)果chan result中
func (t *task) do() {
sum := 0
for i := t.begin; i <= t.end; i++ {
sum += i
}
t.result <- sum
}
// 通過done channel來同步等待所有工作goroutine的結(jié)束,然后關(guān)閉結(jié)果chan
func CloseResult(done chan struct{}, resultChan chan int, workers int) {
for i := 0; i < workers; i++ {
<-done
}
close(done)
close(resultChan)
}
// 讀取結(jié)果通道,匯總結(jié)果
func ProcessResult(resultChan chan int) int {
sum := 0
for r := range resultChan {
sum += r
}
return sum
}
func main() {
// 多線程數(shù)目
workers := NUMBER
// 工作通道
taskChan := make(chan task, 10)
// 結(jié)果通道
resultChan := make(chan int, 10)
// worker信號通道
done := make(chan struct{}, 10)
// 初始化task的goroutine,計算1000個自然數(shù)之和
go InitTask(taskChan, resultChan, 1000)
//分發(fā)任務(wù)在NUMBER個goroutine池
DistributeTask(taskChan, workers, done)
//獲取各個goroutine處理完任務(wù)的通知,并關(guān)閉結(jié)果通道
go CloseResult(done, resultChan, workers)
//通過結(jié)果通道處理結(jié)果
sum := ProcessResult(resultChan)
fmt.Println("sum=", sum)
}程序輸出
sum= 500500
程序的邏輯分析:
(1)、構(gòu)建 task 并發(fā)送到 task 通道中。
(2)、分別啟動 n 個工作線程,不停地從 task 通道中獲取任務(wù),然后將結(jié)果寫入結(jié)果通道。如果任務(wù)通道被關(guān)閉,
則負(fù)責(zé)向收斂結(jié)果的 goroutine 發(fā)送通知,告訴其當(dāng)前 worker 已經(jīng)完成工作。
(3)、收斂結(jié)果的 goroutine 接收到所有 task 已經(jīng)處理完畢的信號后,主動關(guān)閉結(jié)果通道。
(4)、 main 中的函數(shù) ProcessResult 讀取并統(tǒng)計所有的結(jié)果。
6、使用 WaitGroup實現(xiàn)固工作池
package main
import (
"fmt"
"sync"
)
// 工作任務(wù)
type task struct {
begin int
end int
result chan<- int
}
// 構(gòu)建task并寫入task通道
func InitTask(taskChan chan<- task, r chan int, p int) {
qu := p / 10
mod := p % 10
high := qu * 10
for j := 0; j < qu; j++ {
b := 10*j + 1
e := 10 * (j + 1)
tsk := task{
begin: b,
end: e,
result: r,
}
taskChan <- tsk
}
if mod != 0 {
tsk := task{
begin: high + 1,
end: p,
result: r,
}
taskChan <- tsk
}
close(taskChan)
}
//讀取task chan,每個task啟動一個worker goroutine進(jìn)行處理,并等待每個task運(yùn)行完,關(guān)閉結(jié)果通道
func DistributeTask(taskChan <-chan task, wait *sync.WaitGroup, result chan int) {
for v := range taskChan {
wait.Add(1)
go ProcessTask(v, wait)
}
wait.Wait()
close(result)
}
// goroutine處理具體工作,并將結(jié)果發(fā)送到結(jié)果通道
func ProcessTask(t task, wait *sync.WaitGroup) {
t.do()
wait.Done()
}
//任務(wù)執(zhí)行: 計算begin到end的和,執(zhí)行結(jié)果寫入結(jié)果chan result
func (t *task) do() {
sum := 0
for i := t.begin; i <= t.end; i++ {
sum += i
}
t.result <- sum
}
//讀取結(jié)果通道,匯總結(jié)果
func ProcessResult(resultChan chan int) int {
sum := 0
for r := range resultChan {
sum += r
}
return sum
}
func main() {
// 創(chuàng)建任務(wù)通道
taskChan := make(chan task, 10)
// 創(chuàng)建結(jié)果通道
resultChan := make(chan int, 10)
// wait用于同步等待任務(wù)的執(zhí)行
wait := &sync.WaitGroup{}
// 初始化task的goroutine,計算100個自然數(shù)之和
go InitTask(taskChan, resultChan, 100)
// 每個task啟動一個goroutine進(jìn)行處理
go DistributeTask(taskChan, wait, resultChan)
//通過結(jié)果通道獲取結(jié)果并匯總
sum := ProcessResult(resultChan)
fmt.Println("sum=", sum)
}程序輸出
sum= 5050
程序的邏輯分析:
(1)、InitTask 函數(shù)構(gòu)建 task 并發(fā)送到 task 通道中。
(2)、分發(fā)任務(wù)函數(shù) DistributeTask 為每個 task 啟動一個 goroutine 處理任務(wù),等待其處理完成,然后關(guān)閉結(jié)果通道。
(3)、ProcessResult 函數(shù)讀取并統(tǒng)計所有的結(jié)果。
這幾個函數(shù)分別在不同的 goroutine 中運(yùn)行,它們通過通道和 sync.WaitGroup 進(jìn)行通信和同步。
以上就是Go語言通過chan進(jìn)行數(shù)據(jù)傳遞的方法詳解的詳細(xì)內(nèi)容,更多關(guān)于Go chan數(shù)據(jù)傳遞的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Golang實現(xiàn)IP地址轉(zhuǎn)整數(shù)的方法詳解
在 Go 語言中,將 IP 地址轉(zhuǎn)換為整數(shù)涉及到解析 IP 地址并處理其字節(jié)表示,本文給大家介紹了Golang實現(xiàn)IP地址轉(zhuǎn)整數(shù)的方法,文中有詳細(xì)的代碼示例供大家參考,需要的朋友可以參考下2024-02-02
Golang實現(xiàn)短網(wǎng)址/短鏈服務(wù)的開發(fā)筆記分享
這篇文章主要為大家詳細(xì)介紹了如何使用Golang實現(xiàn)短網(wǎng)址/短鏈服務(wù),文中的示例代碼講解詳細(xì),具有一定的學(xué)習(xí)價值,感興趣的小伙伴可以了解一下2023-05-05
如何使用Go語言實現(xiàn)基于泛型的Jaccard相似度算法
這篇文章主要介紹了如何使用Go語言實現(xiàn)基于泛型的Jaccard相似度算法,本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2024-08-08
golang并發(fā)之使用sync.Pool優(yōu)化性能
在Go提供如何實現(xiàn)對象的緩存池功能,常用一種實現(xiàn)方式是sync.Pool,?其旨在緩存已分配但未使用的項目以供以后重用,從而減輕垃圾收集器(GC)的壓力,下面我們就來看看具體操作吧2023-10-10
go本地環(huán)境配置及vscode go插件安裝的詳細(xì)教程
這篇文章主要介紹了go本地環(huán)境配置及vscode go插件安裝的詳細(xì)教程,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-05-05

