Golang信號量設(shè)計實現(xiàn)示例詳解
開篇
在我們此前的文章 Golang Mutex 原理解析 中曾提到過,Mutex 的底層結(jié)構(gòu)包含了兩個字段,state 和 sema:
type Mutex struct { state int32 sema uint32 }
- state 代表互斥鎖的狀態(tài),比如是否被鎖定;
- sema 表示信號量,協(xié)程阻塞會等待該信號量,解鎖的協(xié)程釋放信號量從而喚醒等待信號量的協(xié)程。
這個 sema 就是 semaphore 信號量的意思。Golang 協(xié)程之間的搶鎖,實際上爭搶給Locked
賦值的權(quán)利,能給 Locked
置為1,就說明搶鎖成功。搶不到就阻塞等待 sema
信號量,一旦持有鎖的協(xié)程解鎖,那么等待的協(xié)程會依次被喚醒。
有意思的是,雖然 semaphore 在鎖的實現(xiàn)中起到了至關(guān)重要的作用,Golang 對信號量的實現(xiàn)卻是隱藏在 runtime 中,并沒有包含到標(biāo)準(zhǔn)庫里來,在 src 源碼中我們可以看到底層依賴的信號量相關(guān)函數(shù)。
// defined in package runtime // Semacquire waits until *s > 0 and then atomically decrements it. // It is intended as a simple sleep primitive for use by the synchronization // library and should not be used directly. func runtime_Semacquire(s *uint32) // Semrelease atomically increments *s and notifies a waiting goroutine // if one is blocked in Semacquire. // It is intended as a simple wakeup primitive for use by the synchronization // library and should not be used directly. // If handoff is true, pass count directly to the first waiter. // skipframes is the number of frames to omit during tracing, counting from // runtime_Semrelease's caller. func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
- runtime_Semacquire:阻塞等待直到 s 大于 0,然后立刻將 s 減去 1【原子操作】;
- runtime_Semrelease:將 s 增加 1,然后通知一個阻塞在 runtime_Semacquire 的 goroutine【原子操作】。
兩個原子操作,一個 acquire,一個 release,其實就代表了對資源的獲取和釋放。Mutex 作為 sync 包的核心,支撐了 RWMutex,channel,singleflight 等多個并發(fā)控制的能力,而對信號量的管理又是 Mutex 的基礎(chǔ)。
雖然源碼看不到,但 Golang 其實在擴(kuò)展庫 golang.org/x/sync/semaphore
也提供了一套信號量的實現(xiàn),我們可以由此來參考一下,理解 semaphore 的實現(xiàn)思路。
信號量
在看源碼之前,我們先理清楚【信號量】設(shè)計背后的場景和原理。
信號量的概念是荷蘭計算機(jī)科學(xué)家 Edsger Dijkstra 在 1963 年左右提出來的,廣泛應(yīng)用在不同的操作系統(tǒng)中。在系統(tǒng)中,會給每一個進(jìn)程一個信號量,代表每個進(jìn)程目前的狀態(tài)。未得到控制權(quán)的進(jìn)程,會在特定的地方被迫停下來,等待可以繼續(xù)進(jìn)行的信號到來。
在 Mutex 依賴的信號量機(jī)制中我們可以看到,這里本質(zhì)就是依賴 sema 一個 uint32 的變量 + 原子操作來實現(xiàn)并發(fā)控制能力。當(dāng) goroutine 完成對信號量等待時,該變量 -1,當(dāng) goroutine 完成對信號量的釋放時,該變量 +1。
如果一個新的 goroutine 發(fā)現(xiàn)信號量不大于 0,說明資源暫時沒有,就得阻塞等待。直到信號量 > 0,此時的語義是有新的資源,該goroutine就會結(jié)束等待,完成對信號量的 -1 并返回。注意我們上面有提到,runtime 支持的兩個方法都是原子性的,不用擔(dān)心兩個同時在等待的 goroutine 同時搶占同一份資源。
典型的信號量場景是【圖書館借書】。假設(shè)學(xué)校圖書館某熱門書籍現(xiàn)在只有 100 本存貨,但是上萬學(xué)生都想借閱,怎么辦?
直接買一萬本書是非常簡單粗暴的解法,但資源有限,這不是長久之計。
常見的解決方案很簡單:學(xué)生們先登記,一個一個來。我們先給 100 個同學(xué)發(fā)出,剩下的你們繼續(xù)等,等到什么時候借書的同學(xué)看完了,把書還回來了,就給排隊等待的同學(xué)們發(fā)放。同時,為了避免超發(fā),每發(fā)一個,都需要在維護(hù)的記錄里將【余量】減去 1,每還回來一個,就把【余量】加上 1。
runtime_Semacquire 就是排隊等待借書,runtime_Semrelease 就是看完了把書歸還給圖書館。
另外需要注意,雖然我們上面舉例的增加/減小的粒度都是 1,但這本質(zhì)上只是一種場景,事實上就算是圖書館借書,也完全有可能出現(xiàn)一個人同時借了兩本一模一樣的書。所以,信號量的設(shè)計需要支持 N 個資源的獲取和釋放。
所以,我們對于 acquire 和 release 兩種操作的語義如下:
- release: 將信號量增加 n【保證原子性】;
- acquire: 若信號量 < n,阻塞等待,直到信號量 >= n,此時將信號量的值減去 n【保證原子性】。
semaphore 擴(kuò)展庫實現(xiàn)
這里我們結(jié)合golang.org/x/sync/semaphore
源碼來看看怎樣設(shè)計出來我們上面提到的信號量結(jié)構(gòu)。
// NewWeighted creates a new weighted semaphore with the given // maximum combined weight for concurrent access. func NewWeighted(n int64) *Weighted { w := &Weighted{size: n} return w } // Weighted provides a way to bound concurrent access to a resource. // The callers can request access with a given weight. type Weighted struct { size int64 // 最大資源數(shù) cur int64 // 當(dāng)前已被使用的資源 mu sync.Mutex waiters list.List // 等待隊列 }
有意思的是,雖然包名是 semaphore,但是擴(kuò)展庫里真正給【信號量結(jié)構(gòu)體】定義的名稱是 Weighted。從上面的定義我們可以看到,傳入初始資源個數(shù) n(對應(yīng) size),就可以生成一個 Weighted 信號量結(jié)構(gòu)。
Weighted 提供了三個方法來實現(xiàn)對信號量機(jī)制的支持:
- Acquire
對應(yīng)上面我們提到的 acquire 語義,注意我們提到過,抽象的來講,acquire 成功與否其實不太看返回值,而是只要獲取不了就一直阻塞,如果返回了,就意味著獲取到了。
但在 Golang 實現(xiàn)當(dāng)中,我們肯定不希望,如果發(fā)生了異常 case,導(dǎo)致一直阻塞在這里。所以你可以看到 Acquire 的入?yún)⒗镉袀€ context.Context,借用 context 的上下文控制能力,你可以對此進(jìn)行 cancel, 可以設(shè)置 timeout 等待超時,就能對 acquire 行為進(jìn)行更多約束。
所以,acquire 之后我們?nèi)匀恍枰獧z查返回值 error,如果為 nil,代表正常獲取了資源。否則可能是 context 已經(jīng)不合法了。
- Release
跟上面提到的 release 語義完全一致,傳入你要釋放的資源數(shù) n,保證原子性地增加信號量。
- TryAcquire
這里其實跟 sync 包中的各類 TryXXX 函數(shù)定位很像。并發(fā)的機(jī)制中大都包含 fast path 和 slow path,比如首個 goroutine 先來 acquire,那么一定是能拿到的,后續(xù)再來請求的 goroutine 由于慢了一步,只能走 slow path 進(jìn)行等待,自旋等操作。sync 包中絕大部分精華,都在于 slow path 的處理。fast path 大多是一個基于 atomic 包的原子操作,比如 CAS 就可以解決。
TryAcquire 跟 Acquire 的區(qū)別在于,雖然也是要資源,但是不等待。有了我就獲取,就減信號量,返回 trye。但是如果目前還沒有,我不會阻塞在這里,而是直接返回 false。
下面我們逐個方法看看,Weighted 是怎樣實現(xiàn)的。
Acquire
// Acquire acquires the semaphore with a weight of n, blocking until resources // are available or ctx is done. On success, returns nil. On failure, returns // ctx.Err() and leaves the semaphore unchanged. // // If ctx is already done, Acquire may still succeed without blocking. func (s *Weighted) Acquire(ctx context.Context, n int64) error { s.mu.Lock() if s.size-s.cur >= n && s.waiters.Len() == 0 { s.cur += n s.mu.Unlock() return nil } if n > s.size { // Don't make other Acquire calls block on one that's doomed to fail. s.mu.Unlock() <-ctx.Done() return ctx.Err() } ready := make(chan struct{}) w := waiter{n: n, ready: ready} elem := s.waiters.PushBack(w) s.mu.Unlock() select { case <-ctx.Done(): err := ctx.Err() s.mu.Lock() select { case <-ready: // Acquired the semaphore after we were canceled. Rather than trying to // fix up the queue, just pretend we didn't notice the cancelation. err = nil default: isFront := s.waiters.Front() == elem s.waiters.Remove(elem) // If we're at the front and there're extra tokens left, notify other waiters. if isFront && s.size > s.cur { s.notifyWaiters() } } s.mu.Unlock() return err case <-ready: return nil } }
在閱讀之前回憶一下上面 Weighted 結(jié)構(gòu)的定義,注意 Weighted 并沒有維護(hù)一個變量用來表示【當(dāng)前剩余的資源】,這一點是通過 size(初始化的時候設(shè)置,表示總資源數(shù))減去 cur(當(dāng)前已被使用的資源),二者作差得到的。
我們來拆解一下上面這段代碼:
第一步:這是常規(guī)意義上的 fast path
s.mu.Lock() if s.size-s.cur >= n && s.waiters.Len() == 0 { s.cur += n s.mu.Unlock() return nil }
- 先上鎖,保證并發(fā)安全;
- 校驗如果 size - cur >= n,代表剩余的資源是足夠,同時 waiters 這個等待隊列為空,代表沒有別的協(xié)程在等待;
- 此時就沒什么多想的,直接 cur 加上 n 即可,代表又消耗了 n 個資源,然后解鎖返回,很直接。
第二步:針對特定場景做提前剪枝
if n > s.size { // Don't make other Acquire calls block on one that's doomed to fail. s.mu.Unlock() <-ctx.Done() return ctx.Err() }
如果請求的資源數(shù)量,甚至都大于資源總數(shù)量了,說明這個協(xié)程心里沒數(shù)。。。。就算我現(xiàn)在把所有初始化的資源都拿回來,也喂不飽你呀?。。∧悄茉趺崔k,我就不煩勞后面流程處理了,直接等你的 context 什么時候 Done,給你返回 context 的錯誤就行了,同時先解個鎖,別耽誤別的 goroutine 拿資源。
第三步:資源是夠的,只是現(xiàn)在沒有,那就把當(dāng)前goroutine加到排隊的隊伍里
ready := make(chan struct{}) w := waiter{n: n, ready: ready} elem := s.waiters.PushBack(w) s.mu.Unlock()
這里 ready 結(jié)構(gòu)是個空結(jié)構(gòu)體的 channel,僅僅是為了實現(xiàn)協(xié)程間通信,通知什么時候資源 ready,建立一個屬于這個 goroutine 的 waiter,然后塞到 Weighted 結(jié)構(gòu)的等待隊列 waiters 里。
搞定了以后直接解鎖,因為你已經(jīng)來排隊了,手續(xù)處理完成,以后的路有別的通知機(jī)制保證,就沒必要在這兒拿著鎖阻塞新來的 goroutine 了,人家也得排隊。
第四步:排隊等待
select { case <-ctx.Done(): err := ctx.Err() s.mu.Lock() select { case <-ready: // Acquired the semaphore after we were canceled. Rather than trying to // fix up the queue, just pretend we didn't notice the cancelation. err = nil default: isFront := s.waiters.Front() == elem s.waiters.Remove(elem) // If we're at the front and there're extra tokens left, notify other waiters. if isFront && s.size > s.cur { s.notifyWaiters() } } s.mu.Unlock() return err case <-ready: return nil }
一個 select 語句,只看兩種情況:1. 這個 goroutine 的 context 超時了;2. 拿到了資源,皆大歡喜。
重點在于 ctx.Done 分支里 default 的處理。我們可以看到,如果是超時了,此時還沒拿到資源,首先會把當(dāng)前 goroutine 從 waiters 等待隊列里移除(合情合理,你既然因為自己的原因做不了主,沒法繼續(xù)等待了,就別耽誤別人事了)。
然后接著判斷,若這個 goroutine 同時也是排在最前的 goroutine,而且恰好現(xiàn)在有資源了,就趕緊通知隊里的 goroutine 們,伙計們,現(xiàn)在有資源了,趕緊來拿。我們來看看這個 notifyWaiters 干了什么:
func (s *Weighted) notifyWaiters() { for { next := s.waiters.Front() if next == nil { break // No more waiters blocked. } w := next.Value.(waiter) if s.size-s.cur < w.n { // Not enough tokens for the next waiter. We could keep going (to try to // find a waiter with a smaller request), but under load that could cause // starvation for large requests; instead, we leave all remaining waiters // blocked. // // Consider a semaphore used as a read-write lock, with N tokens, N // readers, and one writer. Each reader can Acquire(1) to obtain a read // lock. The writer can Acquire(N) to obtain a write lock, excluding all // of the readers. If we allow the readers to jump ahead in the queue, // the writer will starve — there is always one token available for every // reader. break } s.cur += w.n s.waiters.Remove(next) close(w.ready) } }
其實很簡單,遍歷 waiters 這個等待隊列,拿到排隊最前的 waiter,判斷資源夠不夠,如果夠了,增加 cur 變量,資源給你,然后把你從等待隊列里移出去,再 close ready 那個goroutine 就行,算是通知一下。
重點部分在于,如果資源不夠怎么辦?
想象一下現(xiàn)在的處境,Weighted 這個 semaphore 的確有資源,而目前要處理的這個 goroutine 的的確確就是排隊最靠前的,而且人家也沒獅子大開口,要比你總 size 還大的資源。但是,但是,好巧不巧,現(xiàn)在你要的數(shù)量,比我手上有的少。。。。
很無奈,那怎么辦呢?
無非兩種解法:
- 我先不管你,反正你要的不夠,我先看看你后面那個 goroutine 人家夠不夠,雖然你現(xiàn)在是排位第一個,但是也得繼續(xù)等著;
- 沒辦法,你排第一,需求我就得滿足,所以我們都繼續(xù)等,等啥時候資源夠了就給你。
擴(kuò)展庫實際選用的是第 2 種策略,即一定要滿足排在最前面的 goroutine,這里的考慮在注釋里有提到,如果直接繼續(xù)看后面的 goroutine 夠不夠,優(yōu)先滿足后面的,在一些情況下會餓死有大資源要求的 goroutine,設(shè)計上不希望這樣的情況發(fā)生。
簡單說:要的多不是錯,既然你排第一,目前貨不多,那就大家一起阻塞等待,保障你的權(quán)利。
Release
// Release releases the semaphore with a weight of n. func (s *Weighted) Release(n int64) { s.mu.Lock() s.cur -= n if s.cur < 0 { s.mu.Unlock() panic("semaphore: released more than held") } s.notifyWaiters() s.mu.Unlock() }
Release 這里的實現(xiàn)非常簡單,一把鎖保障不出現(xiàn)并發(fā),然后將 cur 減去 n 即可,說明此時又有 n 個資源回到了貨倉。然后和上面 Acquire 一樣,調(diào)用 notifyWaiters,叫排隊第一的哥們(哦不,是 goroutine)來領(lǐng)東西了。
TryAcquire
// TryAcquire acquires the semaphore with a weight of n without blocking. // On success, returns true. On failure, returns false and leaves the semaphore unchanged. func (s *Weighted) TryAcquire(n int64) bool { s.mu.Lock() success := s.size-s.cur >= n && s.waiters.Len() == 0 if success { s.cur += n } s.mu.Unlock() return success }
其實就是 Acquire 方法的 fast path,只是返回了個 bool,標(biāo)識是否獲取成功。
總結(jié)
今天我們了解了擴(kuò)展庫 semaphore 對于信號量的封裝實現(xiàn),整體代碼加上注釋也才 100 多行,是非常好的學(xué)習(xí)材料,建議大家有空了對著源碼再過一遍。Acquire 和 Release 的實現(xiàn)都很符合直覺。
其實,我們使用 buffered channel 其實也可以模擬出來 n 個信號量的效果,但就不具備 semaphore Weighted 這套實現(xiàn)里面,一次獲取多個資源的能力了。
以上就是Golang信號量設(shè)計實現(xiàn)示例詳解的詳細(xì)內(nèi)容,更多關(guān)于Go信號量設(shè)計的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
剖析Go編寫的Socket服務(wù)器模塊解耦及基礎(chǔ)模塊的設(shè)計
這篇文章主要介紹了Go的Socket服務(wù)器模塊解耦及日志和定時任務(wù)的模塊設(shè)計,舉了一些Go語言編寫的服務(wù)器模塊的例子,需要的朋友可以參考下2016-03-03golang使用OpenTelemetry實現(xiàn)跨服務(wù)全鏈路追蹤詳解
這篇文章主要為大家介紹了golang使用OpenTelemetry實現(xiàn)跨服務(wù)全鏈路追蹤詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-09-09golang?實現(xiàn)?pdf?轉(zhuǎn)高清晰度?jpeg的處理方法
這篇文章主要介紹了golang?實現(xiàn)?pdf?轉(zhuǎn)高清晰度?jpeg,下面主要介紹Golang 代碼使用方法及Golang PDF轉(zhuǎn)JPEG的詳細(xì)代碼,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2022-10-10構(gòu)建Golang應(yīng)用最小Docker鏡像的實現(xiàn)
這篇文章主要介紹了構(gòu)建Golang應(yīng)用最小Docker鏡像的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-05-05Golang基礎(chǔ)學(xué)習(xí)之map的示例詳解
哈希表是常見的數(shù)據(jù)結(jié)構(gòu),有的語言會將哈希稱作字典或者映射,在Go中,哈希就是常見的數(shù)據(jù)類型map,本文就來聊聊Golang中map的相關(guān)知識吧2023-03-03