Go語言并發(fā)控制之semaphore的原理與使用
今天我們來介紹一個 Go 官方庫 x 提供的擴展并發(fā)原語 semaphore,譯為“信號量”。因為它就像一個信號一樣控制多個 goroutine 之間協(xié)作。
概念講解
我先簡單介紹下信號量的概念,為不熟悉的讀者作為補充知識。
一個生活中的例子:假設一個餐廳總共有 10 張餐桌,每來 1 位顧客占用 1 張餐桌,那么同一時間共計可以有 10 人在就餐,超過 10 人則需要排隊等位;如果有 1 位顧客就餐完成,則可以讓排隊等待的第 1 位顧客來就餐。
如果加入信號量,則可以這樣理解:
- 10 個餐桌是有限的資源,即信號量初始值為 10。
- 當顧客進入餐廳,如果餐桌有空,顧客可以被分配 1 個餐桌,信號量的值減 1。
- 如果沒有空餐桌,顧客需要排隊等待,直到有空餐桌為止(信號量值為 0 時,新的顧客必須等待)。
- 有顧客就餐完成后,餐桌空出,信號量的值加 1,新的顧客(排隊等待中的顧客,或者無人排隊時新來的顧客)可以被分配餐桌。
這就是信號量的應用場景,信號量幫助管理餐桌的數(shù)量,確保餐廳在任何時刻不會接待超過可容納的顧客數(shù)量。
如果放在 Go 程序中,我們就可以使用信號量來實現(xiàn)任務池的功能,控制有限個 goroutine 來并發(fā)執(zhí)行任務。這個我們放在后面來實現(xiàn),先看下 semaphore 源碼是如何實現(xiàn)的。
源碼解讀
我們可以在 semaphore 文檔中看到其定義和實現(xiàn)的 exported 方法:
pkg.go.dev/golang.org/x/sync@v0.10.0/semaphore#pkg-index
type Weighted
func NewWeighted(n int64) *Weighted
func (s *Weighted) Acquire(ctx context.Context, n int64) error
func (s *Weighted) Release(n int64)
func (s *Weighted) TryAcquire(n int64) bool
semaphore.Weighted 是一個結構體表示信號量對象,NewWeighted 是它的構造函數(shù),用于實例化一個包含 n 個資源的信號量對象,即信號量的初始值為 n。
它實現(xiàn)了 3 個方法,分別是:
Acquire:用來請求n個資源,即對信號量的值進行減n操作。如果資源不足,則阻塞等待,直到有足夠的資源數(shù),或者ctx被取消。Release:用來釋放n個資源,即對信號量的值進行加n操作。TryAcquire:用來請求n個資源,即對信號量的值進行減n操作。與Acquire不同的是,TryAcquire不會阻塞等待,成功返回true,失敗返回false。
現(xiàn)在,你是否能將 semaphore.Weighted 對象和它實現(xiàn)的幾個方法與前文中餐廳就餐的例子對應上了呢?
接下來我們看一下 Weighted 結構體是如何定義的:
github.com/golang/sync/blob/v0.9.0/semaphore/semaphore.go
// Weighted 信號量結構體
type Weighted struct {
size int64 // 資源總數(shù)量
cur int64 // 當前已經(jīng)使用的資源數(shù)
mu sync.Mutex // 互斥鎖,保證對其他屬性的操作并發(fā)安全
waiters list.List // 等待者隊列,使用列表實現(xiàn)
}
Weighted 結構體包含 4 個字段:
size是資源總數(shù)。在餐廳的例子中就是 10 張餐桌。cur記錄了當前已經(jīng)使用的資源數(shù)。在餐廳的例子中就是已經(jīng)被顧客占用的餐桌數(shù),size - cur就是剩余資源數(shù),即空餐桌數(shù)。mu是互斥鎖,用于保證對其他屬性的操作是并發(fā)安全的。waiters是等待者隊列,記錄所有排隊的等待者。在餐廳的例子中,當 10 張餐桌都被占滿,新來的顧客就要進入等待者隊列。
構造函數(shù) NewWeighted 實現(xiàn)如下:
// NewWeighted 構造一個信號量對象
func NewWeighted(n int64) *Weighted {
w := &Weighted{size: n}
return w
}
此外,semaphore 包還為等待者定義了一個結構體 waiter:
// 等待者結構體
type waiter struct {
n int64 // 請求資源數(shù)
ready chan<- struct{} // 當獲取到資源時被關閉,用于喚醒當前等待者
}
waiter 結構體包含 2 個字段:
n記錄了當前等待者請求的資源數(shù)。ready是一個channel類型,當資源滿足,ready會被close掉,等待者就會被喚醒。
稍后你將看到它的用處。
我們回過頭來看下 Weighted 結構體的第一個方法 Acquire 的實現(xiàn):
// Acquire 請求 n 個資源
// 如果資源不足,則阻塞等待,直到有足夠的資源數(shù),或者 ctx 被取消
// 成功返回 nil,失敗返回 ctx.Err() 并且不改變資源數(shù)
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
done := ctx.Done()
s.mu.Lock() // 加鎖保證并發(fā)安全
// 如果在分配資源前 ctx 已經(jīng)取消,則直接返回 ctx.Err()
select {
case <-done:
s.mu.Unlock()
return ctx.Err()
default:
}
// 如果資源數(shù)足夠,且不存在其他等待者,則請求資源成功,將 cur 加上 n,并返回
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
// 如果請求的資源數(shù)大于資源總數(shù),不可能滿足,則阻塞等待 ctx 取消,并返回 ctx.Err()
if n > s.size {
// Don't make other Acquire calls block on one that's doomed to fail.
s.mu.Unlock()
<-done
return ctx.Err()
}
// 資源不夠或者存在其他等待者,則繼續(xù)執(zhí)行
// 加入等待隊列
ready := make(chan struct{}) // 創(chuàng)建一個 channel 作為一個屬性記錄到等待者對象 waiter 中,用于后續(xù)通知其喚醒
w := waiter{n: n, ready: ready} // 構造一個等待者對象 waiter
elem := s.waiters.PushBack(w) // 將 waiter 追加到等待者隊列
s.mu.Unlock()
// 使用 select 實現(xiàn)阻塞等待
select {
case <-done: // 檢查 ctx 是否被取消
s.mu.Lock()
select {
case <-ready: // 檢查當前 waiter 是否被喚醒
// 進入這里,說明是 ctx 被取消后 waiter 被喚醒
s.cur -= n // 那么就當作 waiter 沒有被喚醒,將請求的資源數(shù)還回去
s.notifyWaiters() // 通知等待隊列,檢查隊列中下一個 waiter 資源數(shù)是否滿足
default: // 將當前 waiter 從等待者隊列中移除
isFront := s.waiters.Front() == elem // 當前 waiter 是否為第一個等待者
s.waiters.Remove(elem) // 從隊列中移除
// 如果當前 waiter 是隊列中第一個等待者,并且還有剩余的資源數(shù)
if isFront && s.size > s.cur {
s.notifyWaiters() // 通知等待隊列,檢查隊列中下一個 waiter 資源數(shù)是否滿足
}
}
s.mu.Unlock()
return ctx.Err()
case <-ready: // 被喚醒
select {
case <-done: // 再次檢查 ctx 是否被取消
// 進入這里,說明 waiter 被喚醒后 ctx 卻被取消了,當作未被喚醒來處理
s.Release(n) // 釋放資源
return ctx.Err()
default:
}
return nil // 成功返回 nil
}
}
這是 Weighted 結構體實現(xiàn)最為復雜的一個方法,不過我在代碼中寫了非常詳細的注釋,來幫助你理解。
Acquire 主要邏輯為:
- 先檢查
ctx是否被取消,如果在分配資源前ctx已經(jīng)取消,則直接返回ctx.Err()。 - 檢查當前剩余資源數(shù)是否滿足請求的資源數(shù),如果資源數(shù)足夠,且不存在其他等待者,則請求資源成功,將
cur加上n,并返回。 - 校驗請求的資源數(shù)是否合法,如果請求的資源數(shù)大于資源總數(shù),則不可能被滿足,此時會阻塞等待
ctx被取消,并返回ctx.Err()。 - 如果目前資源不夠,或者存在其他等待者,則代碼將繼續(xù)執(zhí)行,進入阻塞等待邏輯:
- 首先構造一個等待者對象
waiter,并將其加入等待者隊列。 - 接著,使用
select實現(xiàn)阻塞等待。此時又分兩種情況,ctx被取消,或者waiter被喚醒。- 如果是
ctx被取消,還會再檢查一下waiter是否被喚醒,如果被喚醒,則還是以ctx被取消為準,會當作waiter沒有被喚醒,將請求的資源數(shù)還回去,并調(diào)用s.notifyWaiters()通知等待者隊列,檢查隊列中下一個waiter資源數(shù)是否滿足;如果沒被喚醒,則將當前waiter從等待者隊列中移除,如果當前waiter是隊列中第一個等待者,并且還有剩余的資源數(shù),則還會調(diào)用s.notifyWaiters()通知等待者隊列,檢查隊列中下一個waiter資源數(shù)是否滿足。 - 如果是
waiter被喚醒,還會再檢查一下ctx是否被取消,如果被取消,則以ctx被取消為準,會調(diào)用s.Release(n)釋放掉當前waiter請求的資源;如果沒被取消,則返回nil表示請求資源成功。
- 如果是
- 首先構造一個等待者對象
那么接下來,我們看看釋放資源的 Release 方法邏輯:
// Release 釋放 n 個資源
func (s *Weighted) Release(n int64) {
s.mu.Lock() // 加鎖保證并發(fā)安全
s.cur -= n // 釋放資源
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
s.notifyWaiters() // 通知等待隊列,檢查隊列中下一個 waiter 資源數(shù)是否滿足
s.mu.Unlock()
}
這里的邏輯就簡單多了,首先執(zhí)行 s.cur -= n 減少當前已經(jīng)使用的資源數(shù),即這一步就是釋放資源操作。注意這里還對 s.cur 是否小于 0 進行了判斷,所以使用時一定是申請多少資源就釋放多少資源,不要用錯。接著同樣調(diào)用 s.notifyWaiters() 通知等待者隊列,檢查隊列中下一個 waiter 資源數(shù)是否滿足。
現(xiàn)在,是時候看看 notifyWaiters 方法是如何實現(xiàn)的了:
// 檢查隊列中下一個 waiter 資源數(shù)是否滿足
func (s *Weighted) notifyWaiters() {
// 循環(huán)檢查下一個 waiter 請求的資源數(shù)是否滿足,滿足則出隊,不滿足則終止循環(huán)
for {
next := s.waiters.Front() // 獲取隊首元素
if next == nil {
break // 沒有 waiter 了,隊列為空終止循環(huán)
}
w := next.Value.(waiter)
if s.size-s.cur < w.n { // 當前 waiter 資源數(shù)不滿足,退出循環(huán)
// 不繼續(xù)查找隊列中后續(xù) waiter 請求資源是否滿足,避免產(chǎn)生饑餓
break
}
// 資源數(shù)滿足,喚醒 waiter
s.cur += w.n // 記錄使用的資源數(shù)
s.waiters.Remove(next) // 從隊列中移除 waiter
close(w.ready) // 利用關閉 channel 的操作,來喚醒 waiter
}
}
notifyWaiters 方法內(nèi)部的核心邏輯是:循環(huán)檢查下一個 waiter 請求的資源數(shù)是否滿足,滿足則出隊,不滿足則終止循環(huán)。
可以發(fā)現(xiàn),next 是隊首元素,所以說等待者隊列是先進先出(FIFO)的。
這里還有一個要注意的點是,當前 waiter 資源數(shù)不滿足時,直接退出循環(huán),而不再繼續(xù)查找隊列中后續(xù) waiter 請求資源是否滿足。比如當前可用資源數(shù)為 5,等待者隊列中有兩個等待者 waiter1 和 waiter2,waiter1 請求的資源數(shù)是 10,waiter2 請求的資源數(shù)是 1,此時就會退出循環(huán),waiter1 和 waiter2 都繼續(xù)等待,而不會先將資源分配給 waiter2。這樣做是為了避免之后的 waiter3、waiter4... 總是比 waiter1 請求的資源數(shù)小,而導致 waiter1 長時間阻塞,從而產(chǎn)生饑餓。
Weighted 還有最后一個方法 TryAcquire 我們一起來看看是如何實現(xiàn)的:
// TryAcquire 嘗試請求 n 個資源
// 不阻塞,成功返回 true,失敗返回 false 并且不改變資源數(shù)
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock() // 加鎖保證并發(fā)安全
// 剩余資源數(shù)足夠,且不存在其他等待者,則請求資源成功
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n // 記錄當前已經(jīng)使用的資源數(shù)
}
s.mu.Unlock()
return success
}
這個方法實現(xiàn)也很簡單,就是檢查剩余資源數(shù)是否足夠,且不存在其他等待者,如果滿足,則請求資源成功,增加當前已經(jīng)使用的資源數(shù) s.cur += n,然后返回 true,否則返回 false。
至此,semaphore 包的源碼就講解完成了。
使用示例
熟悉了 semaphore 包的源碼,那么如何使用就不在話下了。如下是 semaphore 包文檔中提供的 worker pool 模式示例,演示如何使用信號量來限制并行任務中運行的 goroutine 數(shù)量。代碼如下:
pkg.go.dev/golang.org/x/sync@v0.10.0/semaphore#example-package-WorkerPool
package main
import (
"context"
"fmt"
"log"
"runtime"
"golang.org/x/sync/semaphore"
)
func main() {
ctx := context.TODO()
var (
maxWorkers = runtime.GOMAXPROCS(0) // worker pool 支持的最大 worker 數(shù)量,取當前機器 CPU 核心數(shù)
sem = semaphore.NewWeighted(int64(maxWorkers)) // 信號量,資源總數(shù)即為最大 worker 數(shù)量
out = make([]int, 32) // 總任務數(shù)量
)
// 一次最多啟動 maxWorkers 數(shù)量個 goroutine 計算輸出
for i := range out {
// 當最大工作數(shù) maxWorkers 個 goroutine 正在執(zhí)行時,Acquire 會阻塞直到其中一個 goroutine 完成
if err := sem.Acquire(ctx, 1); err != nil { // 請求資源
log.Printf("Failed to acquire semaphore: %v", err)
break
}
// 開啟新的 goroutine 執(zhí)行計算任務
go func(i int) {
defer sem.Release(1) // 任務執(zhí)行完成后釋放資源
out[i] = collatzSteps(i + 1) // 執(zhí)行 Collatz 步驟計算
}(i)
}
// 獲取所有的 tokens 以等待全部 goroutine 執(zhí)行完成
if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
}
fmt.Println(out)
}
// collatzSteps computes the number of steps to reach 1 under the Collatz
// conjecture. (See https://en.wikipedia.org/wiki/Collatz_conjecture.)
func collatzSteps(n int) (steps int) {
if n <= 0 {
panic("nonpositive input")
}
for ; n > 1; steps++ {
if steps < 0 {
panic("too many steps")
}
if n%2 == 0 {
n /= 2
continue
}
const maxInt = int(^uint(0) >> 1)
if n > (maxInt-1)/3 {
panic("overflow")
}
n = 3*n + 1
}
return steps
}
這段代碼利用信號量(semaphore)實現(xiàn)一個工作池(worker pool),來限制最大并發(fā)協(xié)程(goroutine)數(shù)。
通過 runtime.GOMAXPROCS(0) 可以獲取當前機器 CPU 核心數(shù)(比如在我的 Apple M1 Pro 機器中這個值是 10),以此作為 worker pool 支持的最大 worker 數(shù)量,這也是信號量的資源總數(shù)。out 切片長度為 32,即總任務數(shù)為 32。使用 for 循環(huán)來開啟新的 goroutine 執(zhí)行計算任務 collatzSteps(i + 1),不過在開啟新的 goroutine 前,會調(diào)用 sem.Acquire(ctx, 1) 向 worker pool 申請一個資源,當最大工作數(shù) maxWorkers 個 goroutine 正在執(zhí)行時,Acquire 會阻塞直到其中一個 goroutine 完成,任務執(zhí)行完成后在 defer 語句中調(diào)用 sem.Release(1) 釋放資源。
NOTE:
關于計算任務 collatzSteps 我們其實不必深究,只需要知道這是一個耗時任務即可。簡單來說,Collatz Conjecture 是一個數(shù)學猜想,提出了一個看似簡單的整數(shù)序列規(guī)則,對于任意正整數(shù) n 執(zhí)行操作,如果 n 是偶數(shù),則將 n 除以 2,如果 n 是奇數(shù),則將 n 乘以 3 再加 1(即 3n + 1),反復執(zhí)行這些操作后,最終所有整數(shù)都將達到 1。collatzSteps 函數(shù)實現(xiàn)了計算在 Collatz 猜想下一個正整數(shù) n 需要多少步才能達到 1。
值得注意的是,在 for 循環(huán)提交完所有任務后,使用 sem.Acquire(ctx, int64(maxWorkers)) 獲取了信號量中全部資源,這樣就能夠確保所有任務執(zhí)行完成后才會退出,它的作用類似 sync.WaitGroup.Wait()。
那么接下來我們使用 sync.WaitGroup 來實現(xiàn)同樣的功能:
github.com/jianghushinian/blog-go-example/blob/main/x/sync/semaphore/waitgroup/main.go
func main() {
var (
maxWorkers = runtime.GOMAXPROCS(0) // 獲取系統(tǒng)可用的最大 CPU 核心數(shù)
out = make([]int, 32) // 存儲 Collatz 結果
wg sync.WaitGroup // 用于等待 goroutine 完成
sem = make(chan struct{}, maxWorkers) // 用于限制最大并發(fā)數(shù)
)
for i := range out {
// 通過 sem 管理并發(fā),確保最多只有 maxWorkers 個 goroutine 同時執(zhí)行
sem <- struct{}{} // 如果 sem 已滿,這里會阻塞,直到有空閑槽位
// 增加 WaitGroup 計數(shù)
wg.Add(1)
go func(i int) {
defer wg.Done() // goroutine 完成時,減少 WaitGroup 計數(shù)
defer func() { <-sem }() // goroutine 完成時,從 sem 中釋放一個槽位
// 執(zhí)行 Collatz 步驟計算
out[i] = collatzSteps(i + 1)
}(i)
}
// 等待所有 goroutine 完成
wg.Wait()
// 輸出結果
fmt.Println(out)
}
現(xiàn)在對比一下使用 sync.WaitGroup 和 semaphore 實現(xiàn)的代碼,是否能加深你對信號量這一功能的理解呢?
最后,我們再來留個作業(yè),請使用 errgroup 實現(xiàn)一遍這個示例程序。
NOTE:
如果你對 sync.WaitGroup 或 errgroup 不熟悉,可以參考我的文章「Go 并發(fā)控制:sync.WaitGroup 詳解」和「Go 并發(fā)控制:errgroup 詳解」。
總結
本文對 Go 中的擴展并發(fā)原語 semaphore 進行了講解,并帶你看了其源碼的實現(xiàn),以及介紹了如何使用。
不知道你有沒有發(fā)現(xiàn),在初始化 semaphore 時,傳遞的 n 如果為 1,那么這個信號量其實就相當于互斥鎖 sync.Mutex。
semaphore 可以用來實現(xiàn) worker pool 模式,并且使用套路或場景與 sync.WaitGroup 也比較相似,你可以對比學習。
以上就是Go語言并發(fā)控制之semaphore的原理與使用的詳細內(nèi)容,更多關于Go semaphore的資料請關注腳本之家其它相關文章!
相關文章
Golang動態(tài)數(shù)組的實現(xiàn)示例
動態(tài)數(shù)組能自動調(diào)整大小,與靜態(tài)數(shù)組不同,其大小不固定,可根據(jù)需求變化,實現(xiàn)通常依賴于數(shù)據(jù)結構如鏈表或數(shù)組加額外信息,本文就來介紹一下Golang動態(tài)數(shù)組的實現(xiàn)示例,感興趣的可以了解一下2024-10-10
Go?tablewriter庫提升命令行輸出專業(yè)度實例詳解
命令行工具大家都用過,如果是運維人員可能會編寫命令行工具來完成各種任務,命令行輸出的美觀和易讀性往往容易被忽視,很爛的輸出會讓人感覺不專業(yè),本文將介紹Go語言中牛逼的實戰(zhàn)工具tablewriter庫,使你在命令行輸出中展現(xiàn)出專業(yè)的一面2023-11-11

