欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Golang信號量設(shè)計實現(xiàn)示例詳解

 更新時間:2022年08月02日 09:12:57   作者:ag9920  
這篇文章主要為大家介紹了Golang信號量設(shè)計實現(xiàn)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

開篇

在我們此前的文章 Golang Mutex 原理解析 中曾提到過,Mutex 的底層結(jié)構(gòu)包含了兩個字段,state 和 sema:

type Mutex struct {
    state int32 
    sema  uint32
}
  • state 代表互斥鎖的狀態(tài),比如是否被鎖定;
  • sema 表示信號量,協(xié)程阻塞會等待該信號量,解鎖的協(xié)程釋放信號量從而喚醒等待信號量的協(xié)程。

這個 sema 就是 semaphore 信號量的意思。Golang 協(xié)程之間的搶鎖,實際上爭搶給Locked賦值的權(quán)利,能給 Locked 置為1,就說明搶鎖成功。搶不到就阻塞等待 sema 信號量,一旦持有鎖的協(xié)程解鎖,那么等待的協(xié)程會依次被喚醒。

有意思的是,雖然 semaphore 在鎖的實現(xiàn)中起到了至關(guān)重要的作用,Golang 對信號量的實現(xiàn)卻是隱藏在 runtime 中,并沒有包含到標(biāo)準(zhǔn)庫里來,在 src 源碼中我們可以看到底層依賴的信號量相關(guān)函數(shù)。

// defined in package runtime
// Semacquire waits until *s > 0 and then atomically decrements it.
// It is intended as a simple sleep primitive for use by the synchronization
// library and should not be used directly.
func runtime_Semacquire(s *uint32)
// Semrelease atomically increments *s and notifies a waiting goroutine
// if one is blocked in Semacquire.
// It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly.
// If handoff is true, pass count directly to the first waiter.
// skipframes is the number of frames to omit during tracing, counting from
// runtime_Semrelease's caller.
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
  • runtime_Semacquire:阻塞等待直到 s 大于 0,然后立刻將 s 減去 1【原子操作】;
  • runtime_Semrelease:將 s 增加 1,然后通知一個阻塞在 runtime_Semacquire 的 goroutine【原子操作】。

兩個原子操作,一個 acquire,一個 release,其實就代表了對資源的獲取和釋放。Mutex 作為 sync 包的核心,支撐了 RWMutex,channel,singleflight 等多個并發(fā)控制的能力,而對信號量的管理又是 Mutex 的基礎(chǔ)。

雖然源碼看不到,但 Golang 其實在擴(kuò)展庫 golang.org/x/sync/semaphore 也提供了一套信號量的實現(xiàn),我們可以由此來參考一下,理解 semaphore 的實現(xiàn)思路。

信號量

在看源碼之前,我們先理清楚【信號量】設(shè)計背后的場景和原理。

信號量的概念是荷蘭計算機(jī)科學(xué)家 Edsger Dijkstra 在 1963 年左右提出來的,廣泛應(yīng)用在不同的操作系統(tǒng)中。在系統(tǒng)中,會給每一個進(jìn)程一個信號量,代表每個進(jìn)程目前的狀態(tài)。未得到控制權(quán)的進(jìn)程,會在特定的地方被迫停下來,等待可以繼續(xù)進(jìn)行的信號到來。

在 Mutex 依賴的信號量機(jī)制中我們可以看到,這里本質(zhì)就是依賴 sema 一個 uint32 的變量 + 原子操作來實現(xiàn)并發(fā)控制能力。當(dāng) goroutine 完成對信號量等待時,該變量 -1,當(dāng) goroutine 完成對信號量的釋放時,該變量 +1。

如果一個新的 goroutine 發(fā)現(xiàn)信號量不大于 0,說明資源暫時沒有,就得阻塞等待。直到信號量 > 0,此時的語義是有新的資源,該goroutine就會結(jié)束等待,完成對信號量的 -1 并返回。注意我們上面有提到,runtime 支持的兩個方法都是原子性的,不用擔(dān)心兩個同時在等待的 goroutine 同時搶占同一份資源。

典型的信號量場景是【圖書館借書】。假設(shè)學(xué)校圖書館某熱門書籍現(xiàn)在只有 100 本存貨,但是上萬學(xué)生都想借閱,怎么辦?

直接買一萬本書是非常簡單粗暴的解法,但資源有限,這不是長久之計。

常見的解決方案很簡單:學(xué)生們先登記,一個一個來。我們先給 100 個同學(xué)發(fā)出,剩下的你們繼續(xù)等,等到什么時候借書的同學(xué)看完了,把書還回來了,就給排隊等待的同學(xué)們發(fā)放。同時,為了避免超發(fā),每發(fā)一個,都需要在維護(hù)的記錄里將【余量】減去 1,每還回來一個,就把【余量】加上 1。

runtime_Semacquire 就是排隊等待借書,runtime_Semrelease 就是看完了把書歸還給圖書館。

另外需要注意,雖然我們上面舉例的增加/減小的粒度都是 1,但這本質(zhì)上只是一種場景,事實上就算是圖書館借書,也完全有可能出現(xiàn)一個人同時借了兩本一模一樣的書。所以,信號量的設(shè)計需要支持 N 個資源的獲取和釋放。

所以,我們對于 acquire 和 release 兩種操作的語義如下:

  • release: 將信號量增加 n【保證原子性】;
  • acquire: 若信號量 < n,阻塞等待,直到信號量 >= n,此時將信號量的值減去 n【保證原子性】。

semaphore 擴(kuò)展庫實現(xiàn)

這里我們結(jié)合golang.org/x/sync/semaphore 源碼來看看怎樣設(shè)計出來我們上面提到的信號量結(jié)構(gòu)。

// NewWeighted creates a new weighted semaphore with the given
// maximum combined weight for concurrent access.
func NewWeighted(n int64) *Weighted {
	w := &Weighted{size: n}
	return w
}
// Weighted provides a way to bound concurrent access to a resource.
// The callers can request access with a given weight.
type Weighted struct {
	size    int64       // 最大資源數(shù)
	cur     int64       // 當(dāng)前已被使用的資源
	mu      sync.Mutex
	waiters list.List   // 等待隊列
}

有意思的是,雖然包名是 semaphore,但是擴(kuò)展庫里真正給【信號量結(jié)構(gòu)體】定義的名稱是 Weighted。從上面的定義我們可以看到,傳入初始資源個數(shù) n(對應(yīng) size),就可以生成一個 Weighted 信號量結(jié)構(gòu)。

Weighted 提供了三個方法來實現(xiàn)對信號量機(jī)制的支持:

  • Acquire

對應(yīng)上面我們提到的 acquire 語義,注意我們提到過,抽象的來講,acquire 成功與否其實不太看返回值,而是只要獲取不了就一直阻塞,如果返回了,就意味著獲取到了。

但在 Golang 實現(xiàn)當(dāng)中,我們肯定不希望,如果發(fā)生了異常 case,導(dǎo)致一直阻塞在這里。所以你可以看到 Acquire 的入?yún)⒗镉袀€ context.Context,借用 context 的上下文控制能力,你可以對此進(jìn)行 cancel, 可以設(shè)置 timeout 等待超時,就能對 acquire 行為進(jìn)行更多約束。

所以,acquire 之后我們?nèi)匀恍枰獧z查返回值 error,如果為 nil,代表正常獲取了資源。否則可能是 context 已經(jīng)不合法了。

  • Release

跟上面提到的 release 語義完全一致,傳入你要釋放的資源數(shù) n,保證原子性地增加信號量。

  • TryAcquire

這里其實跟 sync 包中的各類 TryXXX 函數(shù)定位很像。并發(fā)的機(jī)制中大都包含 fast path 和 slow path,比如首個 goroutine 先來 acquire,那么一定是能拿到的,后續(xù)再來請求的 goroutine 由于慢了一步,只能走 slow path 進(jìn)行等待,自旋等操作。sync 包中絕大部分精華,都在于 slow path 的處理。fast path 大多是一個基于 atomic 包的原子操作,比如 CAS 就可以解決。

TryAcquire 跟 Acquire 的區(qū)別在于,雖然也是要資源,但是不等待。有了我就獲取,就減信號量,返回 trye。但是如果目前還沒有,我不會阻塞在這里,而是直接返回 false。

下面我們逐個方法看看,Weighted 是怎樣實現(xiàn)的。

Acquire

// Acquire acquires the semaphore with a weight of n, blocking until resources
// are available or ctx is done. On success, returns nil. On failure, returns
// ctx.Err() and leaves the semaphore unchanged.
//
// If ctx is already done, Acquire may still succeed without blocking.
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
	s.mu.Lock()
	if s.size-s.cur >= n && s.waiters.Len() == 0 {
		s.cur += n
		s.mu.Unlock()
		return nil
	}
	if n > s.size {
		// Don't make other Acquire calls block on one that's doomed to fail.
		s.mu.Unlock()
		<-ctx.Done()
		return ctx.Err()
	}
	ready := make(chan struct{})
	w := waiter{n: n, ready: ready}
	elem := s.waiters.PushBack(w)
	s.mu.Unlock()
	select {
	case <-ctx.Done():
		err := ctx.Err()
		s.mu.Lock()
		select {
		case <-ready:
			// Acquired the semaphore after we were canceled.  Rather than trying to
			// fix up the queue, just pretend we didn't notice the cancelation.
			err = nil
		default:
			isFront := s.waiters.Front() == elem
			s.waiters.Remove(elem)
			// If we're at the front and there're extra tokens left, notify other waiters.
			if isFront && s.size > s.cur {
				s.notifyWaiters()
			}
		}
		s.mu.Unlock()
		return err
	case <-ready:
		return nil
	}
}

在閱讀之前回憶一下上面 Weighted 結(jié)構(gòu)的定義,注意 Weighted 并沒有維護(hù)一個變量用來表示【當(dāng)前剩余的資源】,這一點是通過 size(初始化的時候設(shè)置,表示總資源數(shù))減去 cur(當(dāng)前已被使用的資源),二者作差得到的。

我們來拆解一下上面這段代碼:

第一步:這是常規(guī)意義上的 fast path

s.mu.Lock()
if s.size-s.cur >= n && s.waiters.Len() == 0 {
        s.cur += n
        s.mu.Unlock()
        return nil
}
  • 先上鎖,保證并發(fā)安全;
  • 校驗如果 size - cur >= n,代表剩余的資源是足夠,同時 waiters 這個等待隊列為空,代表沒有別的協(xié)程在等待;
  • 此時就沒什么多想的,直接 cur 加上 n 即可,代表又消耗了 n 個資源,然后解鎖返回,很直接。

第二步:針對特定場景做提前剪枝

if n > s.size {
        // Don't make other Acquire calls block on one that's doomed to fail.
        s.mu.Unlock()
        <-ctx.Done()
        return ctx.Err()
}

如果請求的資源數(shù)量,甚至都大于資源總數(shù)量了,說明這個協(xié)程心里沒數(shù)。。。。就算我現(xiàn)在把所有初始化的資源都拿回來,也喂不飽你呀?。。∧悄茉趺崔k,我就不煩勞后面流程處理了,直接等你的 context 什么時候 Done,給你返回 context 的錯誤就行了,同時先解個鎖,別耽誤別的 goroutine 拿資源。

第三步:資源是夠的,只是現(xiàn)在沒有,那就把當(dāng)前goroutine加到排隊的隊伍里

ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
s.mu.Unlock()

這里 ready 結(jié)構(gòu)是個空結(jié)構(gòu)體的 channel,僅僅是為了實現(xiàn)協(xié)程間通信,通知什么時候資源 ready,建立一個屬于這個 goroutine 的 waiter,然后塞到 Weighted 結(jié)構(gòu)的等待隊列 waiters 里。

搞定了以后直接解鎖,因為你已經(jīng)來排隊了,手續(xù)處理完成,以后的路有別的通知機(jī)制保證,就沒必要在這兒拿著鎖阻塞新來的 goroutine 了,人家也得排隊。

第四步:排隊等待

select {
    case <-ctx.Done():
            err := ctx.Err()
            s.mu.Lock()
            select {
            case <-ready:
                    // Acquired the semaphore after we were canceled.  Rather than trying to
                    // fix up the queue, just pretend we didn't notice the cancelation.
                    err = nil
            default:
                    isFront := s.waiters.Front() == elem
                    s.waiters.Remove(elem)
                    // If we're at the front and there're extra tokens left, notify other waiters.
                    if isFront && s.size > s.cur {
                            s.notifyWaiters()
                    }
            }
            s.mu.Unlock()
            return err
    case <-ready:
            return nil
    }

一個 select 語句,只看兩種情況:1. 這個 goroutine 的 context 超時了;2. 拿到了資源,皆大歡喜。

重點在于 ctx.Done 分支里 default 的處理。我們可以看到,如果是超時了,此時還沒拿到資源,首先會把當(dāng)前 goroutine 從 waiters 等待隊列里移除(合情合理,你既然因為自己的原因做不了主,沒法繼續(xù)等待了,就別耽誤別人事了)。

然后接著判斷,若這個 goroutine 同時也是排在最前的 goroutine,而且恰好現(xiàn)在有資源了,就趕緊通知隊里的 goroutine 們,伙計們,現(xiàn)在有資源了,趕緊來拿。我們來看看這個 notifyWaiters 干了什么:

func (s *Weighted) notifyWaiters() {
	for {
		next := s.waiters.Front()
		if next == nil {
			break // No more waiters blocked.
		}
		w := next.Value.(waiter)
		if s.size-s.cur < w.n {
			// Not enough tokens for the next waiter.  We could keep going (to try to
			// find a waiter with a smaller request), but under load that could cause
			// starvation for large requests; instead, we leave all remaining waiters
			// blocked.
			//
			// Consider a semaphore used as a read-write lock, with N tokens, N
			// readers, and one writer.  Each reader can Acquire(1) to obtain a read
			// lock.  The writer can Acquire(N) to obtain a write lock, excluding all
			// of the readers.  If we allow the readers to jump ahead in the queue,
			// the writer will starve — there is always one token available for every
			// reader.
			break
		}
		s.cur += w.n
		s.waiters.Remove(next)
		close(w.ready)
	}
}

其實很簡單,遍歷 waiters 這個等待隊列,拿到排隊最前的 waiter,判斷資源夠不夠,如果夠了,增加 cur 變量,資源給你,然后把你從等待隊列里移出去,再 close ready 那個goroutine 就行,算是通知一下。

重點部分在于,如果資源不夠怎么辦?

想象一下現(xiàn)在的處境,Weighted 這個 semaphore 的確有資源,而目前要處理的這個 goroutine 的的確確就是排隊最靠前的,而且人家也沒獅子大開口,要比你總 size 還大的資源。但是,但是,好巧不巧,現(xiàn)在你要的數(shù)量,比我手上有的少。。。。

很無奈,那怎么辦呢?

無非兩種解法:

  • 我先不管你,反正你要的不夠,我先看看你后面那個 goroutine 人家夠不夠,雖然你現(xiàn)在是排位第一個,但是也得繼續(xù)等著;
  • 沒辦法,你排第一,需求我就得滿足,所以我們都繼續(xù)等,等啥時候資源夠了就給你。

擴(kuò)展庫實際選用的是第 2 種策略,即一定要滿足排在最前面的 goroutine,這里的考慮在注釋里有提到,如果直接繼續(xù)看后面的 goroutine 夠不夠,優(yōu)先滿足后面的,在一些情況下會餓死有大資源要求的 goroutine,設(shè)計上不希望這樣的情況發(fā)生。

簡單說:要的多不是錯,既然你排第一,目前貨不多,那就大家一起阻塞等待,保障你的權(quán)利。

Release

// Release releases the semaphore with a weight of n.
func (s *Weighted) Release(n int64) {
	s.mu.Lock()
	s.cur -= n
	if s.cur &lt; 0 {
		s.mu.Unlock()
		panic("semaphore: released more than held")
	}
	s.notifyWaiters()
	s.mu.Unlock()
}

Release 這里的實現(xiàn)非常簡單,一把鎖保障不出現(xiàn)并發(fā),然后將 cur 減去 n 即可,說明此時又有 n 個資源回到了貨倉。然后和上面 Acquire 一樣,調(diào)用 notifyWaiters,叫排隊第一的哥們(哦不,是 goroutine)來領(lǐng)東西了。

TryAcquire

// TryAcquire acquires the semaphore with a weight of n without blocking.
// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
func (s *Weighted) TryAcquire(n int64) bool {
	s.mu.Lock()
	success := s.size-s.cur >= n && s.waiters.Len() == 0
	if success {
		s.cur += n
	}
	s.mu.Unlock()
	return success
}

其實就是 Acquire 方法的 fast path,只是返回了個 bool,標(biāo)識是否獲取成功。

總結(jié)

今天我們了解了擴(kuò)展庫 semaphore 對于信號量的封裝實現(xiàn),整體代碼加上注釋也才 100 多行,是非常好的學(xué)習(xí)材料,建議大家有空了對著源碼再過一遍。Acquire 和 Release 的實現(xiàn)都很符合直覺。

其實,我們使用 buffered channel 其實也可以模擬出來 n 個信號量的效果,但就不具備 semaphore Weighted 這套實現(xiàn)里面,一次獲取多個資源的能力了。

以上就是Golang信號量設(shè)計實現(xiàn)示例詳解的詳細(xì)內(nèi)容,更多關(guān)于Go信號量設(shè)計的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Go Java算法之同構(gòu)字符串示例詳解

    Go Java算法之同構(gòu)字符串示例詳解

    這篇文章主要為大家介紹了Go Java算法之同構(gòu)字符串示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-08-08
  • Golang WaitGroup實現(xiàn)原理解析

    Golang WaitGroup實現(xiàn)原理解析

    WaitGroup是Golang并發(fā)的兩種方式之一,一個是Channel,另一個是WaitGroup,下面這篇文章主要給大家介紹了關(guān)于golang基礎(chǔ)之waitgroup用法以及使用要點的相關(guān)資料,需要的朋友可以參考下
    2023-02-02
  • 剖析Go編寫的Socket服務(wù)器模塊解耦及基礎(chǔ)模塊的設(shè)計

    剖析Go編寫的Socket服務(wù)器模塊解耦及基礎(chǔ)模塊的設(shè)計

    這篇文章主要介紹了Go的Socket服務(wù)器模塊解耦及日志和定時任務(wù)的模塊設(shè)計,舉了一些Go語言編寫的服務(wù)器模塊的例子,需要的朋友可以參考下
    2016-03-03
  • golang使用OpenTelemetry實現(xiàn)跨服務(wù)全鏈路追蹤詳解

    golang使用OpenTelemetry實現(xiàn)跨服務(wù)全鏈路追蹤詳解

    這篇文章主要為大家介紹了golang使用OpenTelemetry實現(xiàn)跨服務(wù)全鏈路追蹤詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2023-09-09
  • golang?實現(xiàn)?pdf?轉(zhuǎn)高清晰度?jpeg的處理方法

    golang?實現(xiàn)?pdf?轉(zhuǎn)高清晰度?jpeg的處理方法

    這篇文章主要介紹了golang?實現(xiàn)?pdf?轉(zhuǎn)高清晰度?jpeg,下面主要介紹Golang 代碼使用方法及Golang PDF轉(zhuǎn)JPEG的詳細(xì)代碼,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-10-10
  • 構(gòu)建Golang應(yīng)用最小Docker鏡像的實現(xiàn)

    構(gòu)建Golang應(yīng)用最小Docker鏡像的實現(xiàn)

    這篇文章主要介紹了構(gòu)建Golang應(yīng)用最小Docker鏡像的實現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-05-05
  • 超越傳統(tǒng):Go語言并發(fā)編程的新境界

    超越傳統(tǒng):Go語言并發(fā)編程的新境界

    Go語言是一種開源的編程語言,以其強大的并發(fā)編程能力而聞名,本文將介紹Go語言并發(fā)編程的新境界,探討如何利用Go語言的特性來實現(xiàn)高效的并發(fā)編程,需要的朋友可以參考下
    2023-10-10
  • Golang基礎(chǔ)學(xué)習(xí)之map的示例詳解

    Golang基礎(chǔ)學(xué)習(xí)之map的示例詳解

    哈希表是常見的數(shù)據(jù)結(jié)構(gòu),有的語言會將哈希稱作字典或者映射,在Go中,哈希就是常見的數(shù)據(jù)類型map,本文就來聊聊Golang中map的相關(guān)知識吧
    2023-03-03
  • Go每日一庫之quicktemplate的使用

    Go每日一庫之quicktemplate的使用

    quicktemplate快速、功能強大、易于使用的Go模板引擎。比html/模板快20倍,本文我們就詳細(xì)的介紹一下quicktemplate的具體使用,感興趣的可以了解一下
    2021-07-07
  • golang中使用匿名結(jié)構(gòu)體的方法

    golang中使用匿名結(jié)構(gòu)體的方法

    這篇文章主要介紹了golang中使用匿名結(jié)構(gòu)體,本文通過實例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-08-08

最新評論