淺談Go語言中高效并發(fā)模式
一、并發(fā)編程基礎(chǔ)回顧
Go語言以其輕量級(jí)的Goroutine和簡潔的并發(fā)控制機(jī)制(如Channel和Select)聞名,非常適合構(gòu)建高并發(fā)、高吞吐量的系統(tǒng)。
1.1 Goroutine
Go語言中的Goroutine是輕量級(jí)線程,由Go運(yùn)行時(shí)調(diào)度。啟動(dòng)一個(gè)Goroutine非常簡單:
go func() {
fmt.Println("Hello from goroutine")
}()
1.2 Channel
Channel是Go語言中用于Goroutine之間通信的機(jī)制,支持同步與數(shù)據(jù)傳遞:
ch := make(chan int)
go func() { ch <- 42 }()
value := <-ch
fmt.Println(value)
1.3 Select
Select用于在多個(gè)Channel操作中進(jìn)行選擇,常用于超時(shí)控制、多路復(fù)用等場景:
select {
case msg := <-ch1:
fmt.Println("Got from ch1:", msg)
case msg := <-ch2:
fmt.Println("Got from ch2:", msg)
case <-time.After(1 * time.Second):
fmt.Println("Timeout")
}
二、常用并發(fā)模式詳解
2.1 Worker Pool(工作池模式)
適用場景:任務(wù)分發(fā)與并行處理,如爬蟲、批量任務(wù)處理。
案例:使用固定數(shù)量的Worker處理任務(wù)隊(duì)列。
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for j := range jobs {
fmt.Printf("Worker %d started job %d\n", id, j)
time.Sleep(time.Second) // 模擬耗時(shí)任務(wù)
fmt.Printf("Worker %d finished job %d\n", id, j)
results <- j * 2
}
}
func main() {
const numJobs = 5
const numWorkers = 3
jobs := make(chan int, numJobs)
results := make(chan int, numJobs)
var wg sync.WaitGroup
// 啟動(dòng)3個(gè)worker
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// 發(fā)送5個(gè)任務(wù)
for j := 1; j <= numJobs; j++ {
jobs <- j
}
close(jobs)
// 等待所有worker完成
wg.Wait()
close(results)
// 收集結(jié)果
for result := range results {
fmt.Println("Result:", result)
}
}
2.2 Fan-in / Fan-out(扇入/扇出模式)
適用場景:將多個(gè)輸入合并到一個(gè)輸出(Fan-in),或?qū)⒁粋€(gè)輸入分發(fā)到多個(gè)處理單元(Fan-out)。
案例:
package main
import (
"fmt"
"sync"
"time"
)
func producer(id int, out chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 3; i++ {
fmt.Printf("Producer %d produced %d\n", id, i)
out <- id*10 + i
time.Sleep(200 * time.Millisecond)
}
}
func consumer(in <-chan int, done chan<- bool) {
for value := range in {
fmt.Printf("Consumed: %d\n", value)
}
done <- true
}
func main() {
ch := make(chan int)
done := make(chan bool)
var wg sync.WaitGroup
// 啟動(dòng)3個(gè)producer
for i := 1; i <= 3; i++ {
wg.Add(1)
go producer(i, ch, &wg)
}
// 啟動(dòng)1個(gè)consumer
go consumer(ch, done)
// 等待所有producer完成
go func() {
wg.Wait()
close(ch)
}()
// 等待consumer完成
<-done
fmt.Println("All done!")
}
2.3 Pipeline(流水線模式)
適用場景:數(shù)據(jù)流處理,分階段處理任務(wù),如ETL流程。
案例:模擬數(shù)據(jù)經(jīng)過多個(gè)階段的處理。
package main
import (
"fmt"
"sync"
)
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func main() {
// 構(gòu)建pipeline: generator -> square -> print
input := generator(1, 2, 3, 4, 5)
squares := square(input)
// 打印結(jié)果
for result := range squares {
fmt.Println(result)
}
}
2.4 Timeout / Cancel(超時(shí)與取消模式)
適用場景:控制任務(wù)執(zhí)行時(shí)間,防止阻塞或資源浪費(fèi)。
案例:使用Context實(shí)現(xiàn)超時(shí)控制。
package main
import (
"context"
"fmt"
"time"
)
func longRunningTask(ctx context.Context) {
select {
case <-time.After(3 * time.Second):
fmt.Println("Task completed")
case <-ctx.Done():
fmt.Println("Task canceled:", ctx.Err())
}
}
func main() {
// 帶超時(shí)的context
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
go longRunningTask(ctx)
// 等待足夠長時(shí)間觀察結(jié)果
time.Sleep(4 * time.Second)
}
2.5 Future / Promise(異步結(jié)果模式)
適用場景:異步執(zhí)行任務(wù)并獲取結(jié)果,常用于HTTP請(qǐng)求、數(shù)據(jù)庫查詢等。
案例:模擬異步任務(wù)并獲取結(jié)果。
package main
import (
"fmt"
"time"
)
type Future struct {
result chan int
}
func NewFuture() (*Future, func(int)) {
f := &Future{result: make(chan int)}
return f, func(r int) { f.result <- r }
}
func (f *Future) Get() int {
return <-f.result
}
func asyncCompute(complete func(int)) {
go func() {
time.Sleep(2 * time.Second)
complete(42)
}()
}
func main() {
future, complete := NewFuture()
asyncCompute(complete)
fmt.Println("Waiting for result...")
result := future.Get()
fmt.Println("Result:", result)
}
三、進(jìn)階并發(fā)模式
3.1 Or-Done 模式
適用場景:監(jiān)聽多個(gè)Channel,任意一個(gè)完成即退出。
package main
import (
"fmt"
"time"
)
func orDone(channels ...<-chan interface{}) <-chan interface{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-orDone(channels[3:]...):
}
}
}()
return orDone
}
func main() {
sig := make(chan interface{})
time.AfterFunc(2*time.Second, func() { close(sig) })
start := time.Now()
<-orDone(sig, time.After(5*time.Second))
fmt.Printf("Done after %v\n", time.Since(start))
}
3.2 Rate Limiting(限流模式)
適用場景:控制請(qǐng)求頻率,防止系統(tǒng)過載。
package main
import (
"fmt"
"time"
)
func main() {
requests := make(chan int, 5)
for i := 1; i <= 5; i++ {
requests <- i
}
close(requests)
// 限制每200ms處理一個(gè)請(qǐng)求
limiter := time.Tick(200 * time.Millisecond)
for req := range requests {
<-limiter
fmt.Println("Request", req, time.Now())
}
// 突發(fā)限制模式
burstyLimiter := make(chan time.Time, 3)
for i := 0; i < 3; i++ {
burstyLimiter <- time.Now()
}
go func() {
for t := range time.Tick(200 * time.Millisecond) {
burstyLimiter <- t
}
}()
burstyRequests := make(chan int, 5)
for i := 1; i <= 5; i++ {
burstyRequests <- i
}
close(burstyRequests)
for req := range burstyRequests {
<-burstyLimiter
fmt.Println("Bursty request", req, time.Now())
}
}
四、注意點(diǎn)
4.1 使用說明
- 每個(gè)示例都是獨(dú)立的Go程序,可以直接保存為
.go文件運(yùn)行 - 所有示例都包含了必要的包導(dǎo)入和錯(cuò)誤處理
- 每個(gè)示例都有詳細(xì)的注釋說明關(guān)鍵步驟
- 可以通過調(diào)整參數(shù)(如worker數(shù)量、超時(shí)時(shí)間等)觀察不同行為
4.2 模式選擇對(duì)比
| 模式 | 適用場景 | 優(yōu)點(diǎn) | 缺點(diǎn) |
|---|---|---|---|
| Worker Pool | 任務(wù)并行處理 | 控制并發(fā)數(shù),防止資源耗盡 | 需要任務(wù)隊(duì)列管理 |
| Fan-in/Fan-out | 數(shù)據(jù)合并/分發(fā) | 提高吞吐量 | 需注意Channel關(guān)閉時(shí)機(jī) |
| Pipeline | 流式處理 | 模塊化、易擴(kuò)展 | 階段多時(shí)延遲高 |
| Timeout/Cancel | 任務(wù)超時(shí)控制 | 避免阻塞 | 需正確使用Context |
| Future/Promise | 異步結(jié)果獲取 | 非阻塞編程 | 結(jié)果獲取需同步 |
| Or-Done | 多事件監(jiān)聽 | 靈活組合 | 實(shí)現(xiàn)較復(fù)雜 |
| Rate Limiting | 流量控制 | 防止過載 | 需合理設(shè)置閾值 |
4.3 選擇建議
- 避免共享內(nèi)存,優(yōu)先使用Channel通信
- 使用
sync.WaitGroup等待Goroutine完成 - 合理使用
context管理生命周期 - 注意Channel的關(guān)閉,避免panic
- 使用
select+default實(shí)現(xiàn)非阻塞操作 - 控制Goroutine數(shù)量,防止資源泄漏
到此這篇關(guān)于淺談Go語言中高效并發(fā)模式 的文章就介紹到這了,更多相關(guān)Go語言高效并發(fā)模式內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
go語言開發(fā)環(huán)境配置(sublime text3+gosublime)
網(wǎng)上google了下go的開發(fā)工具,大都推薦sublime text3+gosublime,本文就介紹了go語言開發(fā)環(huán)境配置(sublime text3+gosublime),具有一定的參考價(jià)值,感興趣的可以了解一下2022-01-01
Go?數(shù)據(jù)結(jié)構(gòu)之二叉樹詳情
這篇文章主要介紹了?Go?數(shù)據(jù)結(jié)構(gòu)之二叉樹詳情,二叉樹是一種數(shù)據(jù)結(jié)構(gòu),在每個(gè)節(jié)點(diǎn)下面最多存在兩個(gè)其他節(jié)點(diǎn)。即一個(gè)節(jié)點(diǎn)要么連接至一個(gè)、兩個(gè)節(jié)點(diǎn)或不連接其他節(jié)點(diǎn),下文基于GO語言展開二叉樹結(jié)構(gòu)詳情,需要的朋友可以參考一下2022-05-05
Goland 2020或2019軟件版本去掉a...或fmt...提示的方法
這篇文章主要介紹了Goland 2020或2019軟件版本去掉a...或fmt...提示的方法,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-10-10
Go中string與[]byte高效互轉(zhuǎn)的方法實(shí)例
string與[]byte經(jīng)常需要互相轉(zhuǎn)化,普通轉(zhuǎn)化會(huì)發(fā)生底層數(shù)據(jù)的復(fù)制,下面這篇文章主要給大家介紹了關(guān)于Go中string與[]byte高效互轉(zhuǎn)的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考下2021-09-09

