Go語言并發(fā)控制之semaphore的原理與使用
今天我們來介紹一個 Go 官方庫 x 提供的擴展并發(fā)原語 semaphore
,譯為“信號量”。因為它就像一個信號一樣控制多個 goroutine 之間協(xié)作。
概念講解
我先簡單介紹下信號量的概念,為不熟悉的讀者作為補充知識。
一個生活中的例子:假設(shè)一個餐廳總共有 10 張餐桌,每來 1 位顧客占用 1 張餐桌,那么同一時間共計可以有 10 人在就餐,超過 10 人則需要排隊等位;如果有 1 位顧客就餐完成,則可以讓排隊等待的第 1 位顧客來就餐。
如果加入信號量,則可以這樣理解:
- 10 個餐桌是有限的資源,即信號量初始值為 10。
- 當(dāng)顧客進入餐廳,如果餐桌有空,顧客可以被分配 1 個餐桌,信號量的值減 1。
- 如果沒有空餐桌,顧客需要排隊等待,直到有空餐桌為止(信號量值為 0 時,新的顧客必須等待)。
- 有顧客就餐完成后,餐桌空出,信號量的值加 1,新的顧客(排隊等待中的顧客,或者無人排隊時新來的顧客)可以被分配餐桌。
這就是信號量的應(yīng)用場景,信號量幫助管理餐桌的數(shù)量,確保餐廳在任何時刻不會接待超過可容納的顧客數(shù)量。
如果放在 Go 程序中,我們就可以使用信號量來實現(xiàn)任務(wù)池的功能,控制有限個 goroutine 來并發(fā)執(zhí)行任務(wù)。這個我們放在后面來實現(xiàn),先看下 semaphore
源碼是如何實現(xiàn)的。
源碼解讀
我們可以在 semaphore
文檔中看到其定義和實現(xiàn)的 exported 方法:
pkg.go.dev/golang.org/x/sync@v0.10.0/semaphore#pkg-index
type Weighted func NewWeighted(n int64) *Weighted func (s *Weighted) Acquire(ctx context.Context, n int64) error func (s *Weighted) Release(n int64) func (s *Weighted) TryAcquire(n int64) bool
semaphore.Weighted
是一個結(jié)構(gòu)體表示信號量對象,NewWeighted
是它的構(gòu)造函數(shù),用于實例化一個包含 n
個資源的信號量對象,即信號量的初始值為 n
。
它實現(xiàn)了 3 個方法,分別是:
Acquire
:用來請求n
個資源,即對信號量的值進行減n
操作。如果資源不足,則阻塞等待,直到有足夠的資源數(shù),或者ctx
被取消。Release
:用來釋放n
個資源,即對信號量的值進行加n
操作。TryAcquire
:用來請求n
個資源,即對信號量的值進行減n
操作。與Acquire
不同的是,TryAcquire
不會阻塞等待,成功返回true
,失敗返回false
。
現(xiàn)在,你是否能將 semaphore.Weighted
對象和它實現(xiàn)的幾個方法與前文中餐廳就餐的例子對應(yīng)上了呢?
接下來我們看一下 Weighted
結(jié)構(gòu)體是如何定義的:
github.com/golang/sync/blob/v0.9.0/semaphore/semaphore.go
// Weighted 信號量結(jié)構(gòu)體 type Weighted struct { size int64 // 資源總數(shù)量 cur int64 // 當(dāng)前已經(jīng)使用的資源數(shù) mu sync.Mutex // 互斥鎖,保證對其他屬性的操作并發(fā)安全 waiters list.List // 等待者隊列,使用列表實現(xiàn) }
Weighted
結(jié)構(gòu)體包含 4 個字段:
size
是資源總數(shù)。在餐廳的例子中就是 10 張餐桌。cur
記錄了當(dāng)前已經(jīng)使用的資源數(shù)。在餐廳的例子中就是已經(jīng)被顧客占用的餐桌數(shù),size - cur
就是剩余資源數(shù),即空餐桌數(shù)。mu
是互斥鎖,用于保證對其他屬性的操作是并發(fā)安全的。waiters
是等待者隊列,記錄所有排隊的等待者。在餐廳的例子中,當(dāng) 10 張餐桌都被占滿,新來的顧客就要進入等待者隊列。
構(gòu)造函數(shù) NewWeighted
實現(xiàn)如下:
// NewWeighted 構(gòu)造一個信號量對象 func NewWeighted(n int64) *Weighted { w := &Weighted{size: n} return w }
此外,semaphore
包還為等待者定義了一個結(jié)構(gòu)體 waiter
:
// 等待者結(jié)構(gòu)體 type waiter struct { n int64 // 請求資源數(shù) ready chan<- struct{} // 當(dāng)獲取到資源時被關(guān)閉,用于喚醒當(dāng)前等待者 }
waiter
結(jié)構(gòu)體包含 2 個字段:
n
記錄了當(dāng)前等待者請求的資源數(shù)。ready
是一個channel
類型,當(dāng)資源滿足,ready
會被close
掉,等待者就會被喚醒。
稍后你將看到它的用處。
我們回過頭來看下 Weighted
結(jié)構(gòu)體的第一個方法 Acquire
的實現(xiàn):
// Acquire 請求 n 個資源 // 如果資源不足,則阻塞等待,直到有足夠的資源數(shù),或者 ctx 被取消 // 成功返回 nil,失敗返回 ctx.Err() 并且不改變資源數(shù) func (s *Weighted) Acquire(ctx context.Context, n int64) error { done := ctx.Done() s.mu.Lock() // 加鎖保證并發(fā)安全 // 如果在分配資源前 ctx 已經(jīng)取消,則直接返回 ctx.Err() select { case <-done: s.mu.Unlock() return ctx.Err() default: } // 如果資源數(shù)足夠,且不存在其他等待者,則請求資源成功,將 cur 加上 n,并返回 if s.size-s.cur >= n && s.waiters.Len() == 0 { s.cur += n s.mu.Unlock() return nil } // 如果請求的資源數(shù)大于資源總數(shù),不可能滿足,則阻塞等待 ctx 取消,并返回 ctx.Err() if n > s.size { // Don't make other Acquire calls block on one that's doomed to fail. s.mu.Unlock() <-done return ctx.Err() } // 資源不夠或者存在其他等待者,則繼續(xù)執(zhí)行 // 加入等待隊列 ready := make(chan struct{}) // 創(chuàng)建一個 channel 作為一個屬性記錄到等待者對象 waiter 中,用于后續(xù)通知其喚醒 w := waiter{n: n, ready: ready} // 構(gòu)造一個等待者對象 waiter elem := s.waiters.PushBack(w) // 將 waiter 追加到等待者隊列 s.mu.Unlock() // 使用 select 實現(xiàn)阻塞等待 select { case <-done: // 檢查 ctx 是否被取消 s.mu.Lock() select { case <-ready: // 檢查當(dāng)前 waiter 是否被喚醒 // 進入這里,說明是 ctx 被取消后 waiter 被喚醒 s.cur -= n // 那么就當(dāng)作 waiter 沒有被喚醒,將請求的資源數(shù)還回去 s.notifyWaiters() // 通知等待隊列,檢查隊列中下一個 waiter 資源數(shù)是否滿足 default: // 將當(dāng)前 waiter 從等待者隊列中移除 isFront := s.waiters.Front() == elem // 當(dāng)前 waiter 是否為第一個等待者 s.waiters.Remove(elem) // 從隊列中移除 // 如果當(dāng)前 waiter 是隊列中第一個等待者,并且還有剩余的資源數(shù) if isFront && s.size > s.cur { s.notifyWaiters() // 通知等待隊列,檢查隊列中下一個 waiter 資源數(shù)是否滿足 } } s.mu.Unlock() return ctx.Err() case <-ready: // 被喚醒 select { case <-done: // 再次檢查 ctx 是否被取消 // 進入這里,說明 waiter 被喚醒后 ctx 卻被取消了,當(dāng)作未被喚醒來處理 s.Release(n) // 釋放資源 return ctx.Err() default: } return nil // 成功返回 nil } }
這是 Weighted
結(jié)構(gòu)體實現(xiàn)最為復(fù)雜的一個方法,不過我在代碼中寫了非常詳細的注釋,來幫助你理解。
Acquire
主要邏輯為:
- 先檢查
ctx
是否被取消,如果在分配資源前ctx
已經(jīng)取消,則直接返回ctx.Err()
。 - 檢查當(dāng)前剩余資源數(shù)是否滿足請求的資源數(shù),如果資源數(shù)足夠,且不存在其他等待者,則請求資源成功,將
cur
加上n
,并返回。 - 校驗請求的資源數(shù)是否合法,如果請求的資源數(shù)大于資源總數(shù),則不可能被滿足,此時會阻塞等待
ctx
被取消,并返回ctx.Err()
。 - 如果目前資源不夠,或者存在其他等待者,則代碼將繼續(xù)執(zhí)行,進入阻塞等待邏輯:
- 首先構(gòu)造一個等待者對象
waiter
,并將其加入等待者隊列。 - 接著,使用
select
實現(xiàn)阻塞等待。此時又分兩種情況,ctx
被取消,或者waiter
被喚醒。- 如果是
ctx
被取消,還會再檢查一下waiter
是否被喚醒,如果被喚醒,則還是以ctx
被取消為準(zhǔn),會當(dāng)作waiter
沒有被喚醒,將請求的資源數(shù)還回去,并調(diào)用s.notifyWaiters()
通知等待者隊列,檢查隊列中下一個waiter
資源數(shù)是否滿足;如果沒被喚醒,則將當(dāng)前waiter
從等待者隊列中移除,如果當(dāng)前waiter
是隊列中第一個等待者,并且還有剩余的資源數(shù),則還會調(diào)用s.notifyWaiters()
通知等待者隊列,檢查隊列中下一個waiter
資源數(shù)是否滿足。 - 如果是
waiter
被喚醒,還會再檢查一下ctx
是否被取消,如果被取消,則以ctx
被取消為準(zhǔn),會調(diào)用s.Release(n)
釋放掉當(dāng)前waiter
請求的資源;如果沒被取消,則返回nil
表示請求資源成功。
- 如果是
- 首先構(gòu)造一個等待者對象
那么接下來,我們看看釋放資源的 Release
方法邏輯:
// Release 釋放 n 個資源 func (s *Weighted) Release(n int64) { s.mu.Lock() // 加鎖保證并發(fā)安全 s.cur -= n // 釋放資源 if s.cur < 0 { s.mu.Unlock() panic("semaphore: released more than held") } s.notifyWaiters() // 通知等待隊列,檢查隊列中下一個 waiter 資源數(shù)是否滿足 s.mu.Unlock() }
這里的邏輯就簡單多了,首先執(zhí)行 s.cur -= n
減少當(dāng)前已經(jīng)使用的資源數(shù),即這一步就是釋放資源操作。注意這里還對 s.cur
是否小于 0 進行了判斷,所以使用時一定是申請多少資源就釋放多少資源,不要用錯。接著同樣調(diào)用 s.notifyWaiters()
通知等待者隊列,檢查隊列中下一個 waiter
資源數(shù)是否滿足。
現(xiàn)在,是時候看看 notifyWaiters
方法是如何實現(xiàn)的了:
// 檢查隊列中下一個 waiter 資源數(shù)是否滿足 func (s *Weighted) notifyWaiters() { // 循環(huán)檢查下一個 waiter 請求的資源數(shù)是否滿足,滿足則出隊,不滿足則終止循環(huán) for { next := s.waiters.Front() // 獲取隊首元素 if next == nil { break // 沒有 waiter 了,隊列為空終止循環(huán) } w := next.Value.(waiter) if s.size-s.cur < w.n { // 當(dāng)前 waiter 資源數(shù)不滿足,退出循環(huán) // 不繼續(xù)查找隊列中后續(xù) waiter 請求資源是否滿足,避免產(chǎn)生饑餓 break } // 資源數(shù)滿足,喚醒 waiter s.cur += w.n // 記錄使用的資源數(shù) s.waiters.Remove(next) // 從隊列中移除 waiter close(w.ready) // 利用關(guān)閉 channel 的操作,來喚醒 waiter } }
notifyWaiters
方法內(nèi)部的核心邏輯是:循環(huán)檢查下一個 waiter
請求的資源數(shù)是否滿足,滿足則出隊,不滿足則終止循環(huán)。
可以發(fā)現(xiàn),next
是隊首元素,所以說等待者隊列是先進先出(FIFO
)的。
這里還有一個要注意的點是,當(dāng)前 waiter
資源數(shù)不滿足時,直接退出循環(huán),而不再繼續(xù)查找隊列中后續(xù) waiter
請求資源是否滿足。比如當(dāng)前可用資源數(shù)為 5,等待者隊列中有兩個等待者 waiter1
和 waiter2
,waiter1
請求的資源數(shù)是 10,waiter2
請求的資源數(shù)是 1,此時就會退出循環(huán),waiter1
和 waiter2
都繼續(xù)等待,而不會先將資源分配給 waiter2
。這樣做是為了避免之后的 waiter3
、waiter4
... 總是比 waiter1
請求的資源數(shù)小,而導(dǎo)致 waiter1
長時間阻塞,從而產(chǎn)生饑餓。
Weighted
還有最后一個方法 TryAcquire
我們一起來看看是如何實現(xiàn)的:
// TryAcquire 嘗試請求 n 個資源 // 不阻塞,成功返回 true,失敗返回 false 并且不改變資源數(shù) func (s *Weighted) TryAcquire(n int64) bool { s.mu.Lock() // 加鎖保證并發(fā)安全 // 剩余資源數(shù)足夠,且不存在其他等待者,則請求資源成功 success := s.size-s.cur >= n && s.waiters.Len() == 0 if success { s.cur += n // 記錄當(dāng)前已經(jīng)使用的資源數(shù) } s.mu.Unlock() return success }
這個方法實現(xiàn)也很簡單,就是檢查剩余資源數(shù)是否足夠,且不存在其他等待者,如果滿足,則請求資源成功,增加當(dāng)前已經(jīng)使用的資源數(shù) s.cur += n
,然后返回 true
,否則返回 false
。
至此,semaphore
包的源碼就講解完成了。
使用示例
熟悉了 semaphore
包的源碼,那么如何使用就不在話下了。如下是 semaphore
包文檔中提供的 worker pool
模式示例,演示如何使用信號量來限制并行任務(wù)中運行的 goroutine 數(shù)量。代碼如下:
pkg.go.dev/golang.org/x/sync@v0.10.0/semaphore#example-package-WorkerPool
package main import ( "context" "fmt" "log" "runtime" "golang.org/x/sync/semaphore" ) func main() { ctx := context.TODO() var ( maxWorkers = runtime.GOMAXPROCS(0) // worker pool 支持的最大 worker 數(shù)量,取當(dāng)前機器 CPU 核心數(shù) sem = semaphore.NewWeighted(int64(maxWorkers)) // 信號量,資源總數(shù)即為最大 worker 數(shù)量 out = make([]int, 32) // 總?cè)蝿?wù)數(shù)量 ) // 一次最多啟動 maxWorkers 數(shù)量個 goroutine 計算輸出 for i := range out { // 當(dāng)最大工作數(shù) maxWorkers 個 goroutine 正在執(zhí)行時,Acquire 會阻塞直到其中一個 goroutine 完成 if err := sem.Acquire(ctx, 1); err != nil { // 請求資源 log.Printf("Failed to acquire semaphore: %v", err) break } // 開啟新的 goroutine 執(zhí)行計算任務(wù) go func(i int) { defer sem.Release(1) // 任務(wù)執(zhí)行完成后釋放資源 out[i] = collatzSteps(i + 1) // 執(zhí)行 Collatz 步驟計算 }(i) } // 獲取所有的 tokens 以等待全部 goroutine 執(zhí)行完成 if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil { log.Printf("Failed to acquire semaphore: %v", err) } fmt.Println(out) } // collatzSteps computes the number of steps to reach 1 under the Collatz // conjecture. (See https://en.wikipedia.org/wiki/Collatz_conjecture.) func collatzSteps(n int) (steps int) { if n <= 0 { panic("nonpositive input") } for ; n > 1; steps++ { if steps < 0 { panic("too many steps") } if n%2 == 0 { n /= 2 continue } const maxInt = int(^uint(0) >> 1) if n > (maxInt-1)/3 { panic("overflow") } n = 3*n + 1 } return steps }
這段代碼利用信號量(semaphore
)實現(xiàn)一個工作池(worker pool
),來限制最大并發(fā)協(xié)程(goroutine
)數(shù)。
通過 runtime.GOMAXPROCS(0)
可以獲取當(dāng)前機器 CPU 核心數(shù)(比如在我的 Apple M1 Pro 機器中這個值是 10),以此作為 worker pool
支持的最大 worker
數(shù)量,這也是信號量的資源總數(shù)。out
切片長度為 32,即總?cè)蝿?wù)數(shù)為 32。使用 for
循環(huán)來開啟新的 goroutine 執(zhí)行計算任務(wù) collatzSteps(i + 1)
,不過在開啟新的 goroutine 前,會調(diào)用 sem.Acquire(ctx, 1)
向 worker pool
申請一個資源,當(dāng)最大工作數(shù) maxWorkers
個 goroutine 正在執(zhí)行時,Acquire
會阻塞直到其中一個 goroutine 完成,任務(wù)執(zhí)行完成后在 defer
語句中調(diào)用 sem.Release(1)
釋放資源。
NOTE:
關(guān)于計算任務(wù) collatzSteps
我們其實不必深究,只需要知道這是一個耗時任務(wù)即可。簡單來說,Collatz Conjecture 是一個數(shù)學(xué)猜想,提出了一個看似簡單的整數(shù)序列規(guī)則,對于任意正整數(shù) n
執(zhí)行操作,如果 n
是偶數(shù),則將 n
除以 2,如果 n
是奇數(shù),則將 n
乘以 3 再加 1(即 3n + 1
),反復(fù)執(zhí)行這些操作后,最終所有整數(shù)都將達到 1。collatzSteps
函數(shù)實現(xiàn)了計算在 Collatz 猜想下一個正整數(shù) n
需要多少步才能達到 1。
值得注意的是,在 for
循環(huán)提交完所有任務(wù)后,使用 sem.Acquire(ctx, int64(maxWorkers))
獲取了信號量中全部資源,這樣就能夠確保所有任務(wù)執(zhí)行完成后才會退出,它的作用類似 sync.WaitGroup.Wait()
。
那么接下來我們使用 sync.WaitGroup
來實現(xiàn)同樣的功能:
github.com/jianghushinian/blog-go-example/blob/main/x/sync/semaphore/waitgroup/main.go
func main() { var ( maxWorkers = runtime.GOMAXPROCS(0) // 獲取系統(tǒng)可用的最大 CPU 核心數(shù) out = make([]int, 32) // 存儲 Collatz 結(jié)果 wg sync.WaitGroup // 用于等待 goroutine 完成 sem = make(chan struct{}, maxWorkers) // 用于限制最大并發(fā)數(shù) ) for i := range out { // 通過 sem 管理并發(fā),確保最多只有 maxWorkers 個 goroutine 同時執(zhí)行 sem <- struct{}{} // 如果 sem 已滿,這里會阻塞,直到有空閑槽位 // 增加 WaitGroup 計數(shù) wg.Add(1) go func(i int) { defer wg.Done() // goroutine 完成時,減少 WaitGroup 計數(shù) defer func() { <-sem }() // goroutine 完成時,從 sem 中釋放一個槽位 // 執(zhí)行 Collatz 步驟計算 out[i] = collatzSteps(i + 1) }(i) } // 等待所有 goroutine 完成 wg.Wait() // 輸出結(jié)果 fmt.Println(out) }
現(xiàn)在對比一下使用 sync.WaitGroup
和 semaphore
實現(xiàn)的代碼,是否能加深你對信號量這一功能的理解呢?
最后,我們再來留個作業(yè),請使用 errgroup
實現(xiàn)一遍這個示例程序。
NOTE:
如果你對 sync.WaitGroup
或 errgroup
不熟悉,可以參考我的文章「Go 并發(fā)控制:sync.WaitGroup 詳解」和「Go 并發(fā)控制:errgroup 詳解」。
總結(jié)
本文對 Go 中的擴展并發(fā)原語 semaphore
進行了講解,并帶你看了其源碼的實現(xiàn),以及介紹了如何使用。
不知道你有沒有發(fā)現(xiàn),在初始化 semaphore
時,傳遞的 n
如果為 1,那么這個信號量其實就相當(dāng)于互斥鎖 sync.Mutex
。
semaphore
可以用來實現(xiàn) worker pool
模式,并且使用套路或場景與 sync.WaitGroup
也比較相似,你可以對比學(xué)習(xí)。
以上就是Go語言并發(fā)控制之semaphore的原理與使用的詳細內(nèi)容,更多關(guān)于Go semaphore的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Golang動態(tài)數(shù)組的實現(xiàn)示例
動態(tài)數(shù)組能自動調(diào)整大小,與靜態(tài)數(shù)組不同,其大小不固定,可根據(jù)需求變化,實現(xiàn)通常依賴于數(shù)據(jù)結(jié)構(gòu)如鏈表或數(shù)組加額外信息,本文就來介紹一下Golang動態(tài)數(shù)組的實現(xiàn)示例,感興趣的可以了解一下2024-10-10Go?tablewriter庫提升命令行輸出專業(yè)度實例詳解
命令行工具大家都用過,如果是運維人員可能會編寫命令行工具來完成各種任務(wù),命令行輸出的美觀和易讀性往往容易被忽視,很爛的輸出會讓人感覺不專業(yè),本文將介紹Go語言中牛逼的實戰(zhàn)工具tablewriter庫,使你在命令行輸出中展現(xiàn)出專業(yè)的一面2023-11-11