欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

golang中單機鎖的具體實現(xiàn)詳解

 更新時間:2025年03月19日 08:49:08   作者:MelonTe  
這篇文章主要為大家詳細介紹了golang中單機鎖的具體實現(xiàn)的相關(guān)知識,文中的示例代碼講解詳細,感興趣的小伙伴可以跟隨小編一起學(xué)習一下

1、鎖的概念引入

首先,為什么需要鎖?

在并發(fā)編程中,多個線程或進程可能同時訪問和修改同一個共享資源(例如變量、數(shù)據(jù)結(jié)構(gòu)、文件)等,若不引入合適的同步機制,會引發(fā)以下問題:

  • 數(shù)據(jù)競爭:多個線程同時修改一個資源,最終的結(jié)果跟線程的執(zhí)行順序有關(guān),結(jié)果是不可預(yù)測的。
  • 數(shù)據(jù)不一致:一個線程在修改資源,而另一個線程讀取了未修改完的數(shù)據(jù),從而導(dǎo)致讀取了錯誤的數(shù)據(jù)。
  • 資源競爭:多線程競爭同一個資源,浪費系統(tǒng)的性能。

因此,我們需要一把鎖,來保證同一時間只有一個人能寫數(shù)據(jù),確保共享資源在并發(fā)訪問下的正確性和一致性。

在這里,引入兩種常見的并發(fā)控制處理機制,即樂觀鎖與悲觀鎖:

樂觀鎖:假定在并發(fā)操作中,資源的搶占并不是很激烈,數(shù)據(jù)被修改的可能性不是很大,那這時候就不需要對共享資源區(qū)進行加鎖再操作,而是先修改了數(shù)據(jù),最終來判斷數(shù)據(jù)有沒有被修改,沒有被修改則提交修改指,否則重試。

悲觀鎖:與樂觀鎖相反,它假設(shè)場景的資源競爭激烈,對共享資源區(qū)的訪問必須要求持有鎖。

針對不同的場景需要采取因地制宜的策略,比較樂觀鎖與悲觀所,它們的優(yōu)缺點顯而易見:

策略優(yōu)點缺點
樂觀鎖不需要實際上鎖,性能高若沖突時,需要重新進行操作,多次重試可能會導(dǎo)致性能下降明顯
悲觀鎖訪問數(shù)據(jù)一定需要持有鎖,保證并發(fā)場景下的數(shù)據(jù)正確性加鎖期間,其他等待鎖的線程需要被阻塞,性能低

2、Sync.Mutex

Go對單機鎖的實現(xiàn),考慮了實際環(huán)境中協(xié)程對資源競爭程度的變化,制定了一套鎖升級的過程。具體方案如下:

  • 首先采取樂觀的態(tài)度,Goroutine會保持自旋態(tài),通過CAS操作嘗試獲取鎖。
  • 當多次獲取失敗,將會由樂觀態(tài)度轉(zhuǎn)入悲觀態(tài)度,判定當前并發(fā)資源競爭程度劇烈,進入阻塞態(tài)等待被喚醒。

從樂觀轉(zhuǎn)向悲觀的判定規(guī)則如下,滿足其中之一即發(fā)生轉(zhuǎn)變:

  • Goroutine自旋嘗試次數(shù)超過4次
  • 當前P的執(zhí)行隊列中存在等待被執(zhí)行的G(避免自旋影響GMP調(diào)度性能)
  • CPU是單核的(其他Goroutine執(zhí)行不了,自旋無意義)

除此之外,為了防止被阻塞的協(xié)程等待過長時間也沒有獲取到鎖,導(dǎo)致用戶的整體體驗下降,引入了饑餓的概念:

  • 饑餓態(tài):若Goroutine被阻塞等待的時間>1ms,則這個協(xié)程被視為處于饑餓狀態(tài)
  • 饑餓模式:表示當前鎖是否處于特定的模式,在該模式下,鎖的交接是公平的,按順序交給等待最久的協(xié)程。

饑餓模式與正常模式的轉(zhuǎn)變規(guī)則如下:

普通模式->饑餓模式:存在阻塞的協(xié)程,阻塞時間超過1ms

饑餓模式->普通模式:阻塞隊列清空,亦或者獲得鎖的協(xié)程的等待時間小于1ms,則恢復(fù)

接下來步入源碼,觀看具體的實現(xiàn)。

2.1、數(shù)據(jù)結(jié)構(gòu)

位于包sync/mutex.go中,對鎖的定義如下:

type Mutex struct {
    state int32
    sema  uint32
}
  • state:標識目前鎖的狀態(tài)信息,包括了是否處于饑餓模式、是否存在喚醒的阻塞協(xié)程、是否上鎖、以及處于等待鎖的協(xié)程個數(shù)有多少。
  • seme:用于阻塞和喚醒協(xié)程的信號量。

將state看作一個二進制字符串,它存儲信息的規(guī)則如下:

  • 第一位標識是否處于上鎖,0表示否,1表示上鎖(mutexLocked)
  • 第二位標識是否存在喚醒的阻塞協(xié)程(mutexWoken)
  • 第三位標識是否處于饑餓模式(mutexStarving)
  • 從第四位開始,記錄了處于阻塞態(tài)的協(xié)程個數(shù)
const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving
    mutexWaiterShift = iota
    starvationThresholdNs = 1e6 //饑餓閾值
)

2.2、獲取鎖Lock()

func (m *Mutex) Lock() {
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        return
    }
    m.lockSlow()
}

嘗試直接通過CAS操作直接獲取鎖,若成功則返回,否則說明鎖被獲取,步入LockSlow。

2.3、LockSlow()

源碼較長,進行拆分講解:

var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state

(1)定義了基本的常量,含義如下:

  • waitStartTime:記錄當前協(xié)程等待的時間,只有被阻塞才會使用
  • awoke:標識當前協(xié)程是否被Unlock喚醒
  • iter:記錄當前協(xié)程自旋嘗試次數(shù)
  • old:記錄舊的鎖的狀態(tài)信息
for {
        //處于上鎖狀態(tài),并且不處于饑餓狀態(tài)中,并且當前的協(xié)程允許繼續(xù)自旋下去
        if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
 
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            runtime_doSpin()
            iter++
            old = m.state
            continue
        }
        //...
    }

(2)進入嘗試獲取鎖的循環(huán)中,兩個if表示:

  • 若鎖處于上鎖狀態(tài),并且不處于饑餓狀態(tài)中,并且當前的協(xié)程允許繼續(xù)自旋下去(非單核CPU、自旋次數(shù)<=4、調(diào)度器P的本地隊列不存在等待執(zhí)行的G),則步入:
    • 若當前協(xié)程并非從等待隊列喚醒、并且不存在被喚醒的等待協(xié)程、并且存在位于阻塞的協(xié)程、則嘗試設(shè)置mutexWoken標識為1,若成功:
      • 標識當前的協(xié)程為被喚醒的協(xié)程。(雖然并非實際從阻塞中喚醒)
    • 告訴P,當前的協(xié)程處于自旋態(tài)
    • 更新iter計數(shù)器,與old記錄的當前鎖的狀態(tài)信息,進行下一次重試循環(huán)

這里存在的唯一疑惑為,為什么要將awoke標識為true?

首先,因為當前鎖并非處于饑餓模式,因此當前的搶占鎖的模式是不公平的,若當前鎖的阻塞隊列還沒有被喚醒的協(xié)程,那就要求不要喚醒了,嘗試讓當前正在嘗試的協(xié)程獲取到鎖,避免喚醒協(xié)程進行資源競爭。

for {
        //...
        new := old
        if old&mutexStarving == 0 {
            new |= mutexLocked
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            new += 1 << mutexWaiterShift
        }
        if starving && old&mutexLocked != 0 {
            new |= mutexStarving
        }
        if awoke {
            new &^= mutexWoken
        }
        //...
}        

(3)進行狀態(tài)更新:

當協(xié)程從步驟2走出來時,只能說明它位于以下兩個狀態(tài)之一:

  • 旋不動了,或者鎖進入饑餓模式了,鎖要讓給別人了,總之是獲取不到鎖了(悲觀)。
  • 鎖被釋放了。

不論如何,都需要進行一些狀態(tài)的更新,為接下來的打算做準備。

用new存儲一個鎖即將要進入的新狀態(tài)信息,更新規(guī)則:

  • 若鎖不處于饑餓模式:說明鎖可能被釋放了,也可能是自旋次數(shù)過多,不管接下來是否能拿到鎖,鎖都會被某一個協(xié)程獲取,因此置mutexLocked為1。
  • 若鎖可能處于饑餓狀態(tài),或者鎖沒有被釋放:那說明自己是搶不到鎖了,即將進入阻塞態(tài),阻塞協(xié)程計數(shù)器+1。
  • 若當前的協(xié)程是被喚醒的,并且已經(jīng)處在饑餓態(tài)中而且鎖仍然鎖著:鎖進入絕對公平的饑餓模式
  • 若當前協(xié)程是被喚醒的:清除mutexWoken標識位,因為接下來可能需要有協(xié)程被喚醒(饑餓模式)。

雖然更新的有點多,但是可以歸納為

  • 若鎖釋放了,那就標識一下接下來鎖要被獲取即可。
  • 若鎖沒有釋放,并給當前協(xié)程等待了很久,那鎖就進入饑餓狀態(tài),接下來需要有阻塞協(xié)程被喚醒。

(4)嘗試更新信息:

if atomic.CompareAndSwapInt32(&m.state, old, new) {
            //...
        } else {
            old = m.state
        }

接下來嘗試將new更新進state,若更新失敗,說明當前有另一個協(xié)程介入了,為了防止數(shù)據(jù)的一致性丟失,要全部重來一次。

(5)狀態(tài)更新成功,具體判斷是要沉睡還是獲取鎖成功:

步入步驟4的if主支中,此時有兩個狀態(tài):

if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
    //...
        } else {
            //...
        }

因為當前狀態(tài),可能是鎖釋放了,檢查鎖更新前是否已經(jīng)被釋放了并且不是饑餓模式,若是那說明獲取鎖成功了,函數(shù)結(jié)束了。

if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // If we were already waiting before, queue at the front of the queue.
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            runtime_SemacquireMutex(&m.sema, queueLifo, 2)
    //....
        } else {
            //...
        }

否則,說明當前協(xié)程要進入阻塞態(tài)了,記錄一下開始阻塞的時間,用于醒來是判斷是否饑餓。然后進入阻塞沉睡中。

(6)若步驟5進入阻塞,則被喚醒后:

if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                break // locked the mutex with CAS
            }
            // If we were already waiting before, queue at the front of the queue.
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            runtime_SemacquireMutex(&m.sema, queueLifo, 2)
                //喚醒
              starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            old = m.state
    //若鎖處于饑餓模式
            if old&mutexStarving != 0 {
                //鎖的異常處理
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                //將要更新的信號量
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    delta -= mutexStarving
                }
                atomic.AddInt32(&m.state, delta)
                break
            }
            awoke = true
            iter = 0
    
    //....
        } else {
            //...
        }

從阻塞中喚醒,首先計算一些協(xié)程的阻塞時間,以及當前的最新鎖狀態(tài)。

若鎖處于饑餓模式:那么當前協(xié)程將直接獲取鎖,當前協(xié)程是因為饑餓模式被喚醒的,不存在其他協(xié)程搶占鎖。于是更新信號量,將記錄阻塞協(xié)程數(shù)-1,將鎖的上鎖態(tài)置1。若當前從饑餓模式喚醒的協(xié)程,等待時間已經(jīng)不到1ms了或者是最后一個等待的協(xié)程,那么將將鎖從饑餓模式轉(zhuǎn)化為正常模式。至此,獲取成功,退出函數(shù)。

否則,只是普通的隨機喚醒,于是開始嘗試進行搶占,回到步驟1。

2.4、釋放鎖Unlock()

func (m *Mutex) Unlock() {
    //直接釋放鎖
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if new != 0 {
        m.unlockSlow(new)
    }
}

通過原子操作,直接將鎖的mutexLocked標識置為0。若置0后,鎖的狀態(tài)不為0,那就說明存在需要獲取鎖的協(xié)程,步入unlockSlow。

2.5、unlockSlow()

func (m *Mutex) unlockSlow(new int32) {
    if (new+mutexLocked)&mutexLocked == 0 {
        fatal("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        old := new
        for {
            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                return
            }
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                runtime_Semrelease(&m.sema, false, 2)
                return
            }
            old = m.state
        }
    } else {
        runtime_Semrelease(&m.sema, true, 2)
    }
}

(1)首先進行了異常狀態(tài)處理,若釋放了一個已經(jīng)釋放了到鎖,那么直接fatal,程序終止。

if (new+mutexLocked)&mutexLocked == 0 {
        fatal("sync: unlock of unlocked mutex")
    }

(2)若鎖不處于饑餓狀態(tài):

  • 若此時的等待協(xié)程數(shù)量為0,或者鎖被上鎖了、含有被喚醒的協(xié)程、鎖處于饑餓模式:都說明有新的協(xié)程介入了流程,已經(jīng)完成了交接,可以直接退出
  • 喚醒一個處于阻塞態(tài)的協(xié)程。

否則,處于饑餓狀態(tài),喚醒等待最久的協(xié)程。

3、Sync.RWMutex

對于共享資源區(qū)的操作,可以劃分為讀與寫兩大類。假設(shè)在一個場景中,對共享資源區(qū)繼續(xù)讀的操作遠大于寫的操作,如果每個協(xié)程的讀操作都需要獲取互斥鎖,這帶來的性能損耗是非常大的。

RWMutex是一個可以運用在讀操作>寫操作中的提高性能的鎖,可以將它視為由一個讀鎖與一個寫鎖構(gòu)成。其運作規(guī)則具體如下:

  • 讀鎖允許多個讀協(xié)程同時讀取共享資源區(qū),若有協(xié)程需要修改資源區(qū)的數(shù)據(jù),那么它需要被阻塞。
  • 寫鎖具有嚴格的排他性,當共享資源區(qū)被上了寫鎖時,任何其他goroutine都不得訪問。
  • 可見在最壞的情況下,所有的協(xié)程都是需要寫操作時,讀寫鎖會退化成普通的Mutex。

3.1、數(shù)據(jù)結(jié)構(gòu)

type RWMutex struct {
    w           Mutex        // held if there are pending writers
    writerSem   uint32       // semaphore for writers to wait for completing readers
    readerSem   uint32       // semaphore for readers to wait for completing writers
    readerCount atomic.Int32 // number of pending readers
    readerWait  atomic.Int32 // number of departing readers
}
const rwmutexMaxReaders = 1 << 30 //最大的讀協(xié)程數(shù)量
  • w:一個互斥的寫鎖
  • writerSem:關(guān)聯(lián)被阻塞的寫協(xié)程的信號量
  • readerSem:關(guān)聯(lián)被阻塞的讀協(xié)程的信號量
  • readerCount:正常情況下,記錄正在讀取的協(xié)程數(shù)量;但若當前是寫協(xié)程正在持有鎖,那么實際記錄讀協(xié)程的數(shù)量為readerCount - rwmutexMaxReader
  • readerWait:記錄釋放下一個寫協(xié)程,還需要等待讀協(xié)程完成的數(shù)量

3.2、讀鎖流程RLock()

func (rw *RWMutex) RLock() {
    if rw.readerCount.Add(1) < 0 {
        // A writer is pending, wait for it.
        runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
    }
}

對readerCount+1,表示新加入一個讀協(xié)程。若結(jié)果<0,說明當前鎖正在被寫協(xié)程占據(jù),令當前的讀協(xié)程阻塞。

3.3、讀釋放鎖流程RUnlock()

func (rw *RWMutex) RUnlock() {
    if r := rw.readerCount.Add(-1); r < 0 {
        // Outlined slow-path to allow the fast-path to be inlined
        rw.rUnlockSlow(r)
    }
}

對readerCount-1,表示減少一個讀協(xié)程。若結(jié)果<0,說明當前鎖正在被寫協(xié)程占據(jù),步入runlockslow。

3.4、rUnlockSlow()

func (rw *RWMutex) rUnlockSlow(r int32) {
    if r+1 == 0 || r+1 == -rwmutexMaxReaders {
        race.Enable()
        fatal("sync: RUnlock of unlocked RWMutex")
    }
    if rw.readerWait.Add(-1) == 0 {
        // The last reader unblocks the writer.
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

首先進行錯誤處理,若發(fā)現(xiàn)當前協(xié)程為占用過讀鎖,或者讀流程的協(xié)程數(shù)量上限,系統(tǒng)出現(xiàn)異常,fatal。

否則,對readerWait-1,若結(jié)果為0,說明當前協(xié)程是最后一個介入讀鎖流程的協(xié)程,此時需要釋放一個寫鎖。

3.5、寫鎖流程Lock()

func (rw *RWMutex) Lock() {
    // First, resolve competition with other writers.
    rw.w.Lock()
    // Announce to readers there is a pending writer.
    r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
    // Wait for active readers.
    if r != 0 && rw.readerWait.Add(r) != 0 {
        runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
    }
}

首先嘗試獲取寫鎖,若獲取成功,需要將readerCount-最大讀協(xié)程數(shù),表示現(xiàn)在鎖被讀協(xié)程占據(jù)。

r表示處于讀流程的協(xié)程數(shù)量,若r不為0,那么就將readerWait加上r,等這些讀協(xié)程都讀取完畢,再去寫。將這個寫協(xié)程阻塞。(讀寫鎖并非讀、寫公平,讀協(xié)程優(yōu)先。)

3.6、寫釋放鎖流程Unlock()

func (rw *RWMutex) Unlock() {
    // Announce to readers there is no active writer.
    r := rw.readerCount.Add(rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
        race.Enable()
        fatal("sync: Unlock of unlocked RWMutex")
    }
    // Unblock blocked readers, if any.
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    // Allow other writers to proceed.
    rw.w.Unlock()
}

重新將readerCount置為正常指,表示釋放了寫鎖。若讀協(xié)程超過最大上限,則異常。

然后喚醒所有阻塞的讀協(xié)程。(讀協(xié)程優(yōu)先)

解鎖。

以上就是golang中單機鎖的具體實現(xiàn)詳解的詳細內(nèi)容,更多關(guān)于go單機鎖的資料請關(guān)注腳本之家其

相關(guān)文章

  • GO 切片刪除元素的三種方法

    GO 切片刪除元素的三種方法

    本文主要介紹了GO 切片刪除元素,根據(jù)要刪除元素的位置有三種情況,分別是從開頭位置刪除、從中間位置刪除和從尾部刪除,具有一定的參考價值,感興趣的可以了解一下
    2024-08-08
  • 實時通信的服務(wù)器推送機制 EventSource(SSE) 簡介附go實現(xiàn)示例代碼

    實時通信的服務(wù)器推送機制 EventSource(SSE) 簡介附go實現(xiàn)示例代碼

    EventSource是一種非常有用的 API,適用于許多實時應(yīng)用場景,它提供了一種簡單而可靠的方式來建立服務(wù)器推送連接,并實現(xiàn)實時更新和通知,這篇文章主要介紹了實時通信的服務(wù)器推送機制 EventSource(SSE)簡介附go實現(xiàn)示例,需要的朋友可以參考下
    2024-03-03
  • Go語言等待組sync.WaitGrou的使用示例

    Go語言等待組sync.WaitGrou的使用示例

    本文主要介紹了Go語言等待組sync.WaitGrou的使用示例,sync.WaitGroup只有3個方法,Add(),Done(),Wait(),下面就來具體的介紹一下如何使用,感興趣的可以了解一下
    2024-08-08
  • Golang中常見加密算法的總結(jié)

    Golang中常見加密算法的總結(jié)

    這篇文章主要為大家詳細介紹了Golang中常見的一些加密算法的實現(xiàn),文中的示例代碼講解詳細,具有一定的學(xué)習價值,感興趣的小伙伴可以了解一下
    2023-03-03
  • Go語言入門exec的基本使用示例

    Go語言入門exec的基本使用示例

    這篇文章主要為大家介紹了Go語言入門exec在go語言中的基本使用示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-05-05
  • golang中使用proto3協(xié)議導(dǎo)致的空值字段不顯示的問題處理方案

    golang中使用proto3協(xié)議導(dǎo)致的空值字段不顯示的問題處理方案

    這篇文章主要介紹了golang中使用proto3協(xié)議導(dǎo)致的空值字段不顯示的問題處理方案,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習或者工作具有一定的參考學(xué)習價值,需要的朋友們下面隨著小編來一起學(xué)習學(xué)習吧
    2019-10-10
  • Go路由注冊方法詳解

    Go路由注冊方法詳解

    Go語言中,http.NewServeMux()和http.HandleFunc()是兩種不同的路由注冊方式,前者創(chuàng)建獨立的ServeMux實例,適合模塊化和分層路由,靈活性高,但啟動服務(wù)器時需要顯式指定,后者使用全局默認的http.DefaultServeMux,適合簡單場景,感興趣的朋友跟隨小編一起看看吧
    2025-02-02
  • 詳解 Go 語言中 Map 類型和 Slice 類型的傳遞

    詳解 Go 語言中 Map 類型和 Slice 類型的傳遞

    這篇文章主要介紹了詳解 Go 語言中 Map 類型和 Slice 類型的傳遞的相關(guān)資料,需要的朋友可以參考下
    2017-09-09
  • Go 語言中的 http.FileSystem詳細解析

    Go 語言中的 http.FileSystem詳細解析

    在本文中,我們深入探討了 Go 語言中的 http.FileSystem 接口,并介紹了它的基本原理、使用方法以及實際應(yīng)用場景,感興趣的朋友跟隨小編一起看看吧
    2024-03-03
  • go語言中值類型和指針類型的深入理解

    go語言中值類型和指針類型的深入理解

    這篇文章主要給大家介紹了關(guān)于go語言中值類型和指針類型的相關(guān)資料,文中通過實例代碼介紹的非常詳細,對大家的學(xué)習或者工作具有一定的參考學(xué)習價值,需要的朋友可以參考下
    2022-03-03

最新評論