欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

一文帶你深入了解Golang中的Mutex

 更新時間:2023年03月31日 08:22:04   作者:eleven26  
這篇文章主要為大家詳細(xì)介紹了Golang中Mutex的相關(guān)知識,知其然,更要知其所以然。文中的示例代碼講解詳細(xì),感興趣的小伙伴可以了解一下

在我們的日常開發(fā)中,總會有時候需要對一些變量做并發(fā)讀寫,比如 web 應(yīng)用在同時接到多個請求之后, 需要對一些資源做初始化,而這些資源可能是只需要初始化一次的,而不是每一個 http 請求都初始化, 在這種情況下,我們需要限制只能一個協(xié)程來做初始化的操作,比如初始化數(shù)據(jù)庫連接等, 這個時候,我們就需要有一種機(jī)制,可以限制只有一個協(xié)程來執(zhí)行這些初始化的代碼。 在 go 語言中,我們可以使用互斥鎖(Mutex)來實(shí)現(xiàn)這種功能。

互斥鎖的定義

這里引用一下維基百科的定義:

互斥鎖(Mutual exclusion,縮寫 Mutex)是一種用于多線程編程中,防止兩個線程同時對同一公共資源 (比如全局變量)進(jìn)行讀寫的機(jī)制。該目的通過將代碼切片成一個一個的臨界區(qū)域(critical section)達(dá)成。 臨街區(qū)域指的是一塊對公共資源進(jìn)行訪問的代碼,并非一種機(jī)制或是算法。

互斥,顧名思義,也就是只有一個線程能持有鎖。當(dāng)然,在 go 中,是只有一個協(xié)程能持有鎖。

下面是一個簡單的例子:

var sum int // 和
var mu sync.Mutex // 互斥鎖

// add 將 sum 加 1
func add() {
    // 獲取鎖,只能有一個協(xié)程獲取到鎖,
    // 其他協(xié)程需要阻塞等待鎖釋放才能獲取到鎖。
   mu.Lock()
   // 臨界區(qū)域
   sum++
   mu.Unlock()
}

func TestMutex(t *testing.T) {
   // 啟動 1000 個協(xié)程
   var wg sync.WaitGroup
   wg.Add(1000)

   for i := 0; i < 1000; i++ {
      go func() {
         // 每個協(xié)程里面調(diào)用 add()
         add()
         wg.Done()
      }()
   }

   // 等待所有協(xié)程執(zhí)行完畢
   wg.Wait()
   // 最終 sum 的值應(yīng)該是 1000
   assert.Equal(t, 1000, sum)
}

上面的例子中,我們定義了一個全局變量 sum,用于存儲和,然后定義了一個互斥鎖 mu, 在 add() 函數(shù)中,我們使用 mu.Lock() 來加鎖,然后對 sum 進(jìn)行加 1 操作, 最后使用 mu.Unlock() 來解鎖,這樣就保證了在任意時刻,只有一個協(xié)程能夠?qū)?sum 進(jìn)行加 1 操作, 從而保證了在并發(fā)執(zhí)行 add() 操作的時候 sum 的值是正確的。

上面這個例子,在我之前的文章中已經(jīng)作為例子出現(xiàn)過很多次了,這里不再贅述了。

go Mutex 的基本用法

Mutex 我們一般只會用到它的兩個方法:

  • Lock:獲取互斥鎖。(只會有一個協(xié)程可以獲取到鎖,通常用在臨界區(qū)開始的地方。)
  • Unlock: 釋放互斥鎖。(釋放獲取到的鎖,通常用在臨界區(qū)結(jié)束的地方。)

Mutex 的模型可以用下圖表示:

說明:

  • 同一時刻只能有一個協(xié)程獲取到 Mutex 的使用權(quán),其他協(xié)程需要排隊(duì)等待(也就是上圖的 G1->G2->Gn)。
  • 擁有鎖的協(xié)程從臨界區(qū)退出的時候需要使用 Unlock 來釋放鎖,這個時候等待隊(duì)列的下一個協(xié)程可以獲取到鎖(實(shí)際實(shí)現(xiàn)比這里說的復(fù)雜很多,后面會細(xì)說),從而進(jìn)入臨界區(qū)。
  • 等待的協(xié)程會在 Lock 調(diào)用處阻塞,Unlock 的時候會使得一個等待的協(xié)程解除阻塞的狀態(tài),得以繼續(xù)執(zhí)行。

上面提到的這幾點(diǎn)也是 Mutex 的基本原理。

互斥鎖使用的兩個例子

了解了 go Mutex 基本原理之后,讓我們再來看看 Mutex 的一些使用的例子。

gin Context 中的 Set 方法

一個很常見的場景就是,并發(fā)對 map 進(jìn)行讀寫,熟悉 go 的朋友應(yīng)該知道,go 中的 map 是不支持并發(fā)讀寫的, 如果我們對 map 進(jìn)行并發(fā)讀寫會導(dǎo)致 panic

而在 ginContext 結(jié)構(gòu)體中,也有一個 map 類型的字段 Keys,用來在上下文間傳遞鍵值對數(shù)據(jù), 所以在通過 Set 來設(shè)置鍵值對的時候需要使用 c.mu.Lock() 來先獲取互斥鎖,然后再對 Keys 做設(shè)置。

// Set is used to store a new key/value pair exclusively for this context.
// It also lazy initializes  c.Keys if it was not used previously.
func (c *Context) Set(key string, value any) {
    // 獲取鎖
   c.mu.Lock()
    // 如果 Keys 還沒初始化,則進(jìn)行初始化
   if c.Keys == nil {
      c.Keys = make(map[string]any)
   }

    // 設(shè)置鍵值對
   c.Keys[key] = value
    // 釋放鎖
   c.mu.Unlock()
}

同樣的,對 Keys 做讀操作的時候也需要使用互斥鎖:

// Get returns the value for the given key, ie: (value, true).
// If the value does not exist it returns (nil, false)
func (c *Context) Get(key string) (value any, exists bool) {
    // 獲取鎖
   c.mu.RLock()
    // 讀取 key
   value, exists = c.Keys[key]
    // 釋放鎖
   c.mu.RUnlock()
   return
}

可能會有人覺得奇怪,為什么從 map 中讀也還需要鎖。這是因?yàn)?,如果讀的時候沒有鎖保護(hù), 那么就有可能在 Set 設(shè)置的過程中,同時也在進(jìn)行讀操作,這樣就會 panic 了。

這個例子想要說明的是,像 map 這種數(shù)據(jù)結(jié)構(gòu)本身就不支持并發(fā)讀寫,我們這種情況下只有使用 Mutex 了。

sync.Pool 中的 pinSlow 方法

sync.Pool 的實(shí)現(xiàn)中,有一個全局變量記錄了進(jìn)程內(nèi)所有的 sync.Pool 對象,那就是 allPools 變量, 另外有一個鎖 allPoolsMu 用來保護(hù)對 allPools 的讀寫操作:

var (
   // 保護(hù) allPools 和 oldPools 的互斥鎖。
   allPoolsMu Mutex

   // allPools is the set of pools that have non-empty primary
   // caches. Protected by either 1) allPoolsMu and pinning or 2)
   // STW.
   allPools []*Pool

   // oldPools is the set of pools that may have non-empty victim
   // caches. Protected by STW.
   oldPools []*Pool
)

pinSlow 方法中會在 allPoolsMu 的保護(hù)下對 allPools 做讀寫操作:

func (p *Pool) pinSlow() (*poolLocal, int) {
   // Retry under the mutex.
   // Can not lock the mutex while pinned.
   runtime_procUnpin()
   allPoolsMu.Lock() // 獲取鎖
   defer allPoolsMu.Unlock() // 函數(shù)返回的時候釋放鎖
   pid := runtime_procPin()
   // poolCleanup won't be called while we are pinned.
   s := p.localSize
   l := p.local
   if uintptr(pid) < s {
      return indexLocal(l, pid), pid
   }
   if p.local == nil {
      allPools = append(allPools, p) // 全局變量修改
   }
   // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
   size := runtime.GOMAXPROCS(0)
   local := make([]poolLocal, size)
   atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
   runtime_StoreReluintptr(&p.localSize, uintptr(size))     // store-release
   return &local[pid], pid
}

這個例子主要是為了說明使用 mu 的另外一種非常常見的場景:并發(fā)讀寫全局變量

互斥鎖使用的注意事項(xiàng)

互斥鎖如果使用不當(dāng),可能會導(dǎo)致死鎖或者出現(xiàn) panic 的情況,下面是一些常見的錯誤:

  • 忘記使用 Unlock 釋放鎖。
  • Lock 之后還沒 Unlock 之前又使用 Lock 獲取鎖。也就是重復(fù)上鎖,go 中的 Mutex 不可重入。
  • 死鎖:位于臨界區(qū)內(nèi)不同的兩個協(xié)程都想獲取對方持有的不同的鎖。
  • 還沒 Lock 之前就 Unlock。這會導(dǎo)致 panic,因?yàn)檫@是沒有任何意義的。
  • 復(fù)制 Mutex,比如將 Mutex 作為參數(shù)傳遞。

對于第 1 點(diǎn),我們往往可以使用 defer 關(guān)鍵字來做釋放鎖的操作。第 2 點(diǎn)不太好發(fā)現(xiàn),只能在開發(fā)的時候多加注意。 第 3 點(diǎn)我們在使用鎖的時候可以考慮盡量避免在臨界區(qū)內(nèi)再去使用別的鎖。 最后,Mutex 是不可以復(fù)制的,這個可以在編譯之前通過 go vet 來做檢查。

為什么 Mutex 不能被復(fù)制呢?因?yàn)?Mutex 中包含了鎖的狀態(tài),如果復(fù)制了,那么這個狀態(tài)也會被復(fù)制, 如果在復(fù)制前進(jìn)行 Lock,復(fù)制后進(jìn)行 Unlock,那就意味著 LockUnlock 操作的其實(shí)是兩個不同的狀態(tài), 這樣顯然是不行的,是釋放不了鎖的。

雖然不可以復(fù)制,但是我們可以通過傳遞指針類型的參數(shù)來傳遞 Mutex。

互斥鎖鎖定的是什么

在前一篇文章中,我們提到過,原子操作本質(zhì)上是變量級的互斥鎖。而互斥鎖本身鎖定的又是什么呢? 其實(shí)互斥鎖本質(zhì)上是一個信號量,它通過獲取釋放信號量,最終使得協(xié)程獲得某一個代碼塊的執(zhí)行權(quán)力。

也就是說,互斥鎖,鎖定的是一塊代碼塊。

我們以 go-zero 里面的 collection/fifo.go 為例子說明一下:

// Take takes the first element out of q if not empty.
func (q *Queue) Take() (any, bool) {
   // 獲取互斥鎖(只能有一個協(xié)程獲取到鎖)
   q.lock.Lock()
   // 函數(shù)返回的時候釋放互斥鎖(獲取到鎖的協(xié)程釋放鎖之后,其他協(xié)程才能進(jìn)行搶占鎖)
   defer q.lock.Unlock()

   // 下面的代碼只有搶占到(也就是互斥鎖鎖定的代碼塊)
   if q.count == 0 {
      return nil, false
   }

   element := q.elements[q.head]
   q.head = (q.head + 1) % len(q.elements)
   q.count--

   return element, true
}

除了鎖定代碼塊的這一個作用,有另外一個比較關(guān)鍵的地方也是我們不能忽視的, 那就是 互斥鎖并不保證臨界區(qū)內(nèi)操作的變量不能被其他協(xié)程訪問。 互斥鎖只能保證一段代碼只能一個協(xié)程執(zhí)行,但是對于臨界區(qū)內(nèi)涉及的共享資源, 你在臨界區(qū)外也依然是可以對其進(jìn)行讀寫的。

我們以上面的代碼說明一下:在上面的 Take 函數(shù)中,我們對 q.headq.count 都進(jìn)行了操作, 雖然這些操作代碼位于臨界區(qū)內(nèi),但是臨界區(qū)并不保證持有鎖期間其他協(xié)程不會在臨界區(qū)外去修改 q.headq.count

下面就是一個非常典型的錯誤的例子:

import (
   "fmt"
   "sync"
   "testing"
)

var mu sync.Mutex
var sum int

// 在鎖的保護(hù)下對 sum 做讀寫操作
func test() {
   mu.Lock()
   sum++
   mu.Unlock()
}

func TestMutex(t *testing.T) {
   var wg sync.WaitGroup
   wg.Add(1000)

   for i := 0; i < 500; i++ {
      go func() {
         test()
         wg.Done()
      }()

      // 位于臨界區(qū)外,也依然是可以對 sum 做讀寫操作的。
      sum++
   }

   wg.Wait()

   fmt.Println(sum)
}

靠譜的做法是,對于有共享資源的讀寫的操作都使用 Mutex 保護(hù)起來。

當(dāng)然,如果我們只有一個變量,那么可能使用原子操作就足夠了。

互斥鎖實(shí)現(xiàn)原理

互斥鎖的實(shí)現(xiàn)有以下幾個關(guān)鍵的地方:

  • 信號量:這是操作系統(tǒng)中的同步對象。
  • 等待隊(duì)列:獲取不到互斥鎖的協(xié)程,會放入到一個先入先出隊(duì)列的隊(duì)列尾部。這樣信號量釋放的時候,可以依次對它們喚醒。
  • 原子操作:互斥鎖的實(shí)現(xiàn)中,使用了一個字段來記錄了幾種不同的狀態(tài),使用原子操作可以保證幾種狀態(tài)可以一次性變更完成。

我們先來看看 Mutex結(jié)構(gòu)體定義:

type Mutex struct {
   state int32 // 狀態(tài)字段
   sema  uint32 // 信號量
}

其中 state 字段記錄了四種不同的信息:

這四種不同信息在源碼中定義了不同的常量:

const (
   mutexLocked      = 1 << iota // 表示有 goroutine 擁有鎖
   mutexWoken                   // 喚醒(就是第 2 位)
   mutexStarving                // 饑餓(第 3 位)
   mutexWaiterShift = iota      // 表示第 4 位開始,表示等待者的數(shù)量

   starvationThresholdNs = 1e6  // 1ms 進(jìn)入饑餓模式的等待時間閾值
)

sema 的含義比較簡單,就是一個用作不同 goroutine 同步的信號量。

信號量

go 的 Mutex 是基于信號量來實(shí)現(xiàn)的,那信號量又是什么呢?

維基百科:信號量是一個同步對象,用于保持在 0 至指定最大值之間的一個計(jì)數(shù)值。當(dāng)線程完成一次對該 semaphore 對象的等待(wait)時,該計(jì)數(shù)值減一;當(dāng)線程完成一次對 semaphore 對象的釋放(release)時,計(jì)數(shù)值加一。

上面這個解釋有點(diǎn)難懂,通俗地說,就是一個數(shù)字,調(diào)用 wait 的時候,這個數(shù)字減去 1,調(diào)用 release 的時候,這個數(shù)字加上 1。 (還有一個隱含的邏輯是,如果這個數(shù)小于 0,那么調(diào)用 wait 的時候會阻塞,直到它大于 0。)

對應(yīng)到 go 的 Mutex 中,有兩個操作信號量的函數(shù):

  • runtime_Semrelease: 自動遞增信號量并通知等待的 goroutine。
  • runtime_SemacquireMutex: 是一直等到信號量大于 0,然后自動遞減。

我們注意到了,其實(shí) runtime_SemacquireMutex 是有一個前提條件的,那就是等到信號量大于 0。 其實(shí)信號量的兩個操作 P/V 就是一個加 1 一個減 1,所以在實(shí)際使用的時候,也是需要一個獲取鎖的操作對應(yīng)一個釋放鎖的操作, 否則,其他協(xié)程都無法獲取到鎖,因?yàn)樾盘柫恳恢辈粷M足。

等待隊(duì)列

go 中如果已經(jīng)有 goroutine 持有互斥鎖,那么其他的協(xié)程會放入一個 FIFO 隊(duì)列中,如下圖:

說明:

  • G1 表示持有互斥鎖的 goroutine,G2...Gn 表示一個 goroutine 的等待隊(duì)列,這是一個先入先出的隊(duì)列。
  • G1 先持有鎖,得以進(jìn)入臨界區(qū),其他想搶占鎖的 goroutine 阻塞在 Lock 調(diào)用處。
  • G1 在使用完鎖后,會使用 Unlock 來釋放鎖,本質(zhì)上是釋放了信號量,然后會喚醒 FIFO 隊(duì)列頭部的 goroutine。
  • G2FIFO 隊(duì)列中移除,進(jìn)入臨界區(qū)。G2 使用完鎖之后也會使用 Unlock 來釋放鎖。

上面只是一個大概模型,在實(shí)際實(shí)現(xiàn)中,比這個復(fù)雜很多倍,下面會繼續(xù)深入講解。

原子操作

go 的 Mutex 實(shí)現(xiàn)中,state 字段是一個 32 位的整數(shù),不同的位記錄了四種不同信息,在這種情況下, 只需要通過原子操作就可以保證一次性實(shí)現(xiàn)對四種不同狀態(tài)信息的更改,而不需要更多額外的同步機(jī)制。

但是毋庸置疑,這種實(shí)現(xiàn)會大大降低代碼的可讀性,因?yàn)橥ㄟ^一個整數(shù)來記錄不同的信息, 就意味著,需要通過各種位運(yùn)算來實(shí)現(xiàn)對這個整數(shù)不同位的修改,比如將上鎖的操作:

new |= mutexLocked

當(dāng)然,這只是 Mutex 實(shí)現(xiàn)中最簡單的一種位運(yùn)算了。下面以 state 記錄的四種不同信息為維度來具體講解一下:

1.mutexLocked:這是 state 的最低位,1 表示鎖被占用,0 表示鎖沒有被占用。

new := mutexLocked 新狀態(tài)為上鎖狀態(tài)

2.mutexWoken: 這是表示是否有協(xié)程被喚醒了的狀態(tài)

  • new = (old - 1<<mutexWaiterShift) | mutexWoken 等待者數(shù)量減去 1 的同時,設(shè)置喚醒標(biāo)識
  • new &^= mutexWoken 清除喚醒標(biāo)識

3.mutexStarving:饑餓模式的標(biāo)識

new |= mutexStarving 設(shè)置饑餓標(biāo)識

4.等待者數(shù)量:state >> mutexWaiterShift 就是等待者的數(shù)量,也就是上面提到的 FIFO 隊(duì)列中 goroutine 的數(shù)量

  • new += 1 << mutexWaiterShift 等待者數(shù)量加 1
  • delta := int32(mutexLocked - 1<<mutexWaiterShift) 上鎖的同時,將等待者數(shù)量減 1

這里并沒有涵蓋 Mutex 中所有的位運(yùn)算,其他操作在下文講解源碼實(shí)現(xiàn)的時候會提到。

在上面做了這一系列的位運(yùn)算之后,我們會得到一個新的 state 狀態(tài),假設(shè)名為 new,那么我們就可以通過 CAS 操作來將 Mutexstate 字段更新:

atomic.CompareAndSwapInt32(&m.state, old, new)

通過上面這個原子操作,我們就可以一次性地更新 Mutexstate 字段,也就是一次性更新了四種狀態(tài)信息。

這種通過一個整數(shù)記錄不同狀態(tài)的寫法在 sync 包其他的一些地方也有用到,比如 WaitGroup 中的 state 字段。

最后,對于這種操作,我們需要注意的是,因?yàn)槲覀冊趫?zhí)行 CAS 前后是沒有其他什么鎖或者其他的保護(hù)機(jī)制的, 這也就意味著上面的這個 CAS 操作是有可能會失敗的,那如果失敗了怎么辦呢?

如果失敗了,也就意味著肯定有另外一個 goroutine 率先執(zhí)行了 CAS 操作并且成功了,將 state 修改為了一個新的值。 這個時候,其實(shí)我們前面做的一系列位運(yùn)算得到的結(jié)果實(shí)際上已經(jīng)不對了,在這種情況下,我們需要獲取最新的 state,然后再次計(jì)算得到一個新的 state。

所以我們會在源碼里面看到 CAS 操作是寫在 for 循環(huán)里面的。

Mutex 的公平性

在前面,我們提到 goroutien 獲取不到鎖的時候,會進(jìn)入一個 FIFO 隊(duì)列的隊(duì)列尾,在實(shí)際實(shí)現(xiàn)中,其實(shí)沒有那么簡單, 為了獲得更好的性能,在實(shí)現(xiàn)的時候會盡量先讓運(yùn)行狀態(tài)的 goroutine 獲得鎖,當(dāng)然如果隊(duì)列中的 goroutine 等待太久(大于 1ms), 那么就會先讓隊(duì)列中的 goroutine 獲得鎖。

下面是文檔中的說明:

Mutex 可以處于兩種操作模式:正常模式和饑餓模式。在正常模式下,等待者按照FIFO(先進(jìn)先出)的順序排隊(duì),但是被喚醒的等待者不擁有互斥鎖,會與新到達(dá)的 Goroutine 競爭所有權(quán)。新到達(dá)的 Goroutine 有優(yōu)勢——它們已經(jīng)在 CPU 上運(yùn)行,數(shù)量可能很多,因此被喚醒的等待者有很大的機(jī)會失去鎖。在這種情況下,它將排在等待隊(duì)列的前面。如果等待者未能在1毫秒內(nèi)獲取到互斥鎖,則將互斥鎖切換到饑餓模式。 在饑餓模式下,互斥鎖的所有權(quán)直接從解鎖 Goroutine 移交給隊(duì)列前面的等待者。新到達(dá)的 Goroutine 即使看起來未被鎖定,也不會嘗試獲取互斥鎖,也不會嘗試自旋。相反,它們會將自己排隊(duì)在等待隊(duì)列的末尾。如果等待者獲得互斥鎖的所有權(quán)并發(fā)現(xiàn)(1)它是隊(duì)列中的最后一個等待者,或者(2)它等待時間少于1毫秒,則將互斥鎖切換回正常模式。 正常模式的性能要優(yōu)于饑餓模式,因?yàn)?Goroutine 可以連續(xù)多次獲取互斥鎖,即使有被阻塞的等待者。饑餓模式很重要,可以防止尾部延遲的病態(tài)情況。

簡單總結(jié):

1.Mutex 有兩種模式:正常模式、饑餓模式。

2.正常模式下:

被喚醒的 goroutine 和正在運(yùn)行的 goroutine 競爭鎖。這樣可以運(yùn)行中的協(xié)程有機(jī)會先獲取到鎖,從而避免了協(xié)程切換的開銷。性能更好。

3.饑餓模式下:

優(yōu)先讓隊(duì)列中的 goroutine 獲得鎖,并且直接放棄時間片,讓給隊(duì)列中的 goroutine,運(yùn)行中的 goroutine 想獲取鎖要到隊(duì)尾排隊(duì)。更加公平。

Mutex 源碼剖析

Mutex 本身的源碼其實(shí)很少,但是復(fù)雜程度是非常高的,所以第一次看的時候可能會非常懵逼,但是不妨礙我們?nèi)チ私馑拇蟾艑?shí)現(xiàn)原理。

Mutex 中主要有兩個方法,LockUnlock,使用起來非常的簡單,但是它的實(shí)現(xiàn)可不簡單。下面我們就來深入了解一下它的實(shí)現(xiàn)。

Lock

Lock 方法的實(shí)現(xiàn)如下:

// Lock 獲取鎖。
// 如果鎖已在使用中,則調(diào)用 goroutine 將阻塞,直到互斥量可用。
func (m *Mutex) Lock() {
   // Fast path: grab unlocked mutex.
   // 上鎖成功則直接返回
   if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
      return
   }

   // Slow path (outlined so that the fast path can be inlined)
   // 沒有上鎖成功,這個時候需要做的事情就有點(diǎn)多了。
   m.lockSlow()
}

Lock 方法中,第一次獲取鎖的時候是非常簡單的,一個簡單的原子操作設(shè)置一下 mutexLocked 標(biāo)識就完成了。 但是如果這個原子操作失敗了,表示有其他 goroutine 先獲取到了鎖,這個時候就需要調(diào)用 lockSlow 來做一些額外的操作了:

// 獲取 mutex 鎖
func (m *Mutex) lockSlow() {
   var waitStartTime int64 // 當(dāng)前協(xié)程開始等待的時間
   starving := false       // 當(dāng)前協(xié)程是否是饑餓模式
   awoke := false          // 喚醒標(biāo)志(是否當(dāng)前協(xié)程就是被喚醒的協(xié)程)
   iter := 0               // 自旋次數(shù)(超過一定次數(shù)如果還沒能獲得鎖,就進(jìn)入等待)
   old := m.state          // 舊的狀態(tài),每次 for 循環(huán)會重新獲取當(dāng)前的狀態(tài)字段

   for {
      // 自旋:目的是讓正在運(yùn)行中的 goroutine 盡快獲取到鎖。
      // 兩種情況不會自旋:
      // 1. 饑餓模式:在饑餓模式下,鎖會直接交給等待隊(duì)列中的 goroutine,所以不會自旋。
      // 2. 鎖被釋放了:另外如果運(yùn)行到這里的時候,發(fā)現(xiàn)鎖已經(jīng)被釋放了,也就不需要自旋了。
      if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
         // 設(shè)置 mutexWoken 標(biāo)識
         // 如果自旋是有意義的,則會進(jìn)入到這里,嘗試設(shè)置 mutexWoken 標(biāo)識。
         // 設(shè)置成功在持有鎖的 goroutine 獲取鎖的時候不會喚醒等待隊(duì)列中的 goroutine,下一個獲取鎖的就是當(dāng)前 goroutine。
         if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
            atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
            // 各個判斷的含義:
            // !awoke 已經(jīng)被喚醒過一次了,說明當(dāng)前協(xié)程是被從等待隊(duì)列中喚醒的協(xié)程/又或者已經(jīng)成功設(shè)置 mutexWoken 標(biāo)識了,不需要再喚醒了。
            // old&mutexWoken == 0 如果不等于 0 說明有 goroutine 被喚醒了,不會嘗試設(shè)置 mutexWoken 標(biāo)識
            // old>>mutexWaiterShift != 0 如果等待隊(duì)列為空,當(dāng)前 goroutine 就是下一個搶占鎖的 goroutine
            // 前面的判斷都通過了,才會進(jìn)行 CAS 操作嘗試設(shè)置 mutexWoken 標(biāo)識
            awoke = true
         }
         runtime_doSpin() // 自旋
         iter++           // 自旋次數(shù) +1(超過一定次數(shù)會停止自旋)
         old = m.state    // 再次獲取鎖的最新狀態(tài),之后會檢查是否鎖被釋放了
         continue         // 繼續(xù)下一次檢查
      }

      new := old
      // 饑餓模式下,新到達(dá)的 goroutines 必須排隊(duì)。
      // 不是饑餓狀態(tài),直接競爭鎖。
      if old&mutexStarving == 0 {
         new |= mutexLocked
      }
      // 進(jìn)入等待隊(duì)列的兩種情況:
      // 1. 鎖依然被占用。
      // 2. 進(jìn)入了饑餓模式。
      if old&(mutexLocked|mutexStarving) != 0 {
         new += 1 << mutexWaiterShift // 等待者數(shù)量 +1
      }
       // 已經(jīng)等待超過了 1ms,且鎖被其他協(xié)程占用,則進(jìn)入饑餓模式
      if starving && old&mutexLocked != 0 {
         new |= mutexStarving
      }
      // 喚醒之后,需要重置喚醒標(biāo)志。
      // 不管有沒有獲取到鎖,都是要清除這個標(biāo)識的:
      // 獲取到鎖肯定要清除,如果獲取到鎖,需要讓其他運(yùn)行中的 goroutine 來搶占鎖;
      // 如果沒有獲取到鎖,goroutine 會阻塞,這個時候是需要持有鎖的 goroutine 來喚醒的,如果有 mutexWoken 標(biāo)識,持有鎖的 goroutine 喚醒不了。
      if awoke {
         if new&mutexWoken == 0 {
            throw("sync: inconsistent mutex state")
         }
         new &^= mutexWoken // 重置喚醒標(biāo)志
      }

      // 成功設(shè)置新狀態(tài)
      if atomic.CompareAndSwapInt32(&m.state, old, new) {
         // 原來鎖的狀態(tài)已釋放,并且不是饑餓狀態(tài),正常請求到了鎖,返回
         if old&(mutexLocked|mutexStarving) == 0 { // 這意味著當(dāng)前的 goroutine 成功獲取了鎖
            break
         }

         // 如果已經(jīng)被喚醒過,會被加入到等待隊(duì)列頭。
         queueLifo := waitStartTime != 0
         if waitStartTime == 0 {
            waitStartTime = runtime_nanotime()
         }
         // 阻塞等待
         // queueLifo 為 true,表示加入到隊(duì)列頭。否則,加入到隊(duì)列尾。
         // (首次加入隊(duì)列加入到隊(duì)尾,不是首次加入則加入隊(duì)頭,這樣等待最久的 goroutine 優(yōu)先能夠獲取到鎖。)
         runtime_SemacquireMutex(&m.sema, queueLifo, 1)
         // 從等待隊(duì)列中喚醒,檢查鎖是否應(yīng)該進(jìn)入饑餓模式。
         starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs

         // 獲取當(dāng)前的鎖最新狀態(tài)
         old = m.state
         // 如果鎖已經(jīng)處于饑餓狀態(tài),直接搶到鎖,返回。
         // 饑餓模式下,被喚醒的協(xié)程可以直接獲取到鎖。
         // 新來的 goroutine 都需要進(jìn)入隊(duì)列等待。
         if old&mutexStarving != 0 {
            // 如果這個 goroutine 被喚醒并且 Mutex 處于饑餓模式,P 的所有權(quán)已經(jīng)移交給我們,
            // 但 Mutex 處于不一致的狀態(tài):mutexLocked 未設(shè)置,我們?nèi)匀槐灰暈榈却?。修?fù)這個問題。
            if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
               throw("sync: inconsistent mutex state")
            }
            // 加鎖,并且減少等待者數(shù)量。
            // 實(shí)際上是兩步操作合成了一步:
            // 1. m.state = m.state + 1 (獲取鎖)
            // 2. m.state = m.state - 1<<mutexWaiterShift(waiter - 1)
            delta := int32(mutexLocked - 1<<mutexWaiterShift)
            // 清除饑餓狀態(tài)的兩種情況:
            // 1. 如果不需要進(jìn)入饑餓模式(當(dāng)前被喚醒的 goroutine 的等待時間小于 1ms)
            // 2. 原來的等待者數(shù)量為 1,說明是最后一個被喚醒的 goroutine。
            if !starving || old>>mutexWaiterShift == 1 {
               // 退出饑餓模式
               delta -= mutexStarving
            }
            // 原子操作,設(shè)置新狀態(tài)。
            atomic.AddInt32(&m.state, delta)
            break
         }
         // 設(shè)置喚醒標(biāo)記,重新?lián)屨兼i(會與那些運(yùn)行中的 goroutine 一起競爭鎖)
         awoke = true
         iter = 0
      } else {
         // CAS 更新狀態(tài)失敗,獲取最新狀態(tài),然后重試
         old = m.state
      }
   }
}

我們可以看到,lockSlow 的處理非常的復(fù)雜,又要考慮讓運(yùn)行中的 goroutine 盡快獲取到鎖,又要考慮不能讓等待隊(duì)列中的 goroutine 等待太久。

代碼中注釋很多,再簡單總結(jié)一下其中的流程:

1.為了讓循環(huán)中的 goroutine 可以先獲取到鎖,會先讓 goroutine 自旋等待鎖的釋放,這是因?yàn)檫\(yùn)行中的 goroutine 正在占用 CPU,讓它先獲取到鎖可以避免一些不必要的協(xié)程切換,從而獲得更好的性能。

3.自旋完畢之后,會嘗試獲取鎖,同時也要根據(jù)舊的鎖狀態(tài)來更新鎖的不同狀態(tài)信息,比如是否進(jìn)入饑餓模式等。

3.計(jì)算得到一個新的 state 后,會進(jìn)行 CAS 操作嘗試更新 state 狀態(tài)。

4.CAS 失敗會重試上面的流程。

5.CAS 成功之后會做如下操作:

  • 判斷當(dāng)前是否已經(jīng)獲取到鎖,如果是,則返回,Lock 成功了。
  • 會判斷當(dāng)前的 goroutine 是否是已經(jīng)被喚醒過,如果是,會將當(dāng)前 goroutine 加入到等待隊(duì)列頭部。
  • 調(diào)用 runtime_SemacquireMutex,進(jìn)入阻塞狀態(tài),等待下一次喚醒。
  • 喚醒之后,判斷是否需要進(jìn)入饑餓模式。
  • 最后,如果已經(jīng)是饑餓模式,當(dāng)前 goroutine 直接獲取到鎖,退出循環(huán),否則,再進(jìn)行下一次搶占鎖的循環(huán)中。

具體流程我們可以參考一下下面的流程圖:

圖中有一些矩形方框描述了 unlockSlow 的關(guān)鍵流程。

Unlock

Unlock 方法的實(shí)現(xiàn)如下:

// Unlock 釋放互斥鎖。
// 如果 m 在進(jìn)入 Unlock 時未被鎖定,則會出現(xiàn)運(yùn)行時錯誤。
func (m *Mutex) Unlock() {
   // Fast path: drop lock bit.
   // unlock 成功
   // unLock 操作實(shí)際上是將 state 減去 1。
   new := atomic.AddInt32(&m.state, -mutexLocked)
   if new != 0 { // 等待隊(duì)列為空的時候直接返回了
      // 喚醒一個等待鎖的 goroutine
      m.unlockSlow(new)
   }
}

Unlock 做了兩件事:

  • 釋放當(dāng)前 goroutine 持有的互斥鎖:也就是將 state 減去 1
  • 喚醒等待隊(duì)列中的下一個 goroutine

如果只有一個 goroutine 在使用鎖,只需要簡單地釋放鎖就可以了。 但是如果有其他的 goroutine 在阻塞等待,那么持有互斥鎖的 goroutine 就有義務(wù)去喚醒下一個 goroutine。

喚醒的流程相對復(fù)雜一些:

// unlockSlow 喚醒下一個等待鎖的協(xié)程。
func (m *Mutex) unlockSlow(new int32) {
   // 如果未加鎖,則會拋出錯誤。
   if (new+mutexLocked)&mutexLocked == 0 {
      fatal("sync: unlock of unlocked mutex")
   }

   // 下面的操作是喚醒一個在等待鎖的協(xié)程。
   // 存在兩種情況:
   // 1. 正常模式:
   //  a. 不需要喚醒:沒有等待者、鎖已經(jīng)被搶占、有其他運(yùn)行中的協(xié)程在嘗試獲取鎖、已經(jīng)進(jìn)入了饑餓模式
   //   b. 需要喚醒:其他情況
   // 2. 饑餓模式:喚醒等待隊(duì)列頭部的那個協(xié)程
   if new&mutexStarving == 0 {
      // 不是饑餓模式
      old := new
      // 自旋
      for {
         // 下面幾種情況不需要喚醒:
         // 1. 沒有等待者了(沒得喚醒)
         // 2. 鎖已經(jīng)被占用(只能有一個 goroutine 持有鎖)
         // 3. 有其他運(yùn)行中的協(xié)程已經(jīng)被喚醒(運(yùn)行中的 goroutine 通過自旋先搶占到了鎖)
         // 4. 饑餓模式(饑餓模式下,所有新的 goroutine 都要排隊(duì),饑餓模式會直接喚醒等待隊(duì)列頭部的 gorutine)
         if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
            return
         }
         // 獲取到喚醒等待者的權(quán)力,開始喚醒一個等待者。
         // 下面這一行實(shí)際上是兩個操作:
         // 1. waiter 數(shù)量 - 1
         // 2. 設(shè)置 mutexWoken 標(biāo)志
         new = (old - 1<<mutexWaiterShift) | mutexWoken
         if atomic.CompareAndSwapInt32(&m.state, old, new) {
            // 正常模式下喚醒了一個 goroutine
            //(第二個參數(shù)為 false,表示當(dāng)前的 goroutine 在釋放信號量后還會繼續(xù)執(zhí)行直到用完時間片)
            runtime_Semrelease(&m.sema, false, 1)
            return
         }
         // 喚醒失敗,進(jìn)行下一次嘗試。
         old = m.state
      }
   } else {
      // 饑餓模式:將互斥鎖的所有權(quán)移交給下一個等待者,并放棄我們的時間片,以便下一個等待者可以立即開始運(yùn)行。
      // 注意:如果“mutexLocked”未設(shè)置,等待者在喚醒后會將其設(shè)置。
      // 但是,如果設(shè)置了“mutexStarving”,則仍然認(rèn)為互斥鎖已被鎖定,因此新到來的goroutine不會獲取它。
      //
      // 當(dāng)前的 goroutine 放棄 CPU 時間片,讓給阻塞在 sema 的 goroutine。
      runtime_Semrelease(&m.sema, true, 1)
   }
}

unlockSlow 邏輯相比 lockSlow 要簡單許多,我們可以再結(jié)合下面的流程圖來閱讀上面的源碼:

runtime_Semrelease 第二個參數(shù)的含義

細(xì)心的朋友可能注意到了,在 unlockSlow 的實(shí)現(xiàn)中,有兩處地方調(diào)用了 runtime_Semrelease 這個方法, 這個方法的作用是釋放一個信號量,這樣可以讓阻塞在信號量上的 goroutine 得以繼續(xù)執(zhí)行。 它的第一個參數(shù)我們都知道,是信號量,而第二個參數(shù) truefalse 分別傳遞了一次, 那么 truefalse 分別有什么作用呢?

答案是,設(shè)置為 true 的時候,當(dāng)前的 goroutine 會直接放棄自己的時間片, 將 P 的使用權(quán)交給 Mutex 等待隊(duì)列中的第一個 goroutine, 這樣的目的是,讓 Mutex 等待隊(duì)列中的 goroutine 可以盡快地獲取到鎖。

總結(jié)

互斥鎖在并發(fā)編程中也算是非常常見的一種操作了,使用互斥鎖可以限制只有一個 goroutine 可以進(jìn)入臨界區(qū), 這對于并發(fā)修改全局變量、初始化等情況非常好用。最后,再總結(jié)一下本文所講述的內(nèi)容:

1.互斥鎖是一種用于多線程編程中,防止兩個線程同時對同一公共資源進(jìn)行讀寫的機(jī)制。go 中的互斥鎖實(shí)現(xiàn)是 sync.Mutex。

2.Mutex 的操作只有兩個:

  • Lock 獲取鎖,同一時刻只能有一個 goroutine 可以獲取到鎖,其他 goroutine 會先通過自旋搶占鎖,搶不到則阻塞等待。
  • Unlock 釋放鎖,釋放鎖之前必須有 goroutine 持有鎖。釋放鎖之后,會喚醒等待隊(duì)列中的下一個 goroutine。

3.Mutex 常見的使用場景有兩個:

  • 并發(fā)讀寫 map:如 ginContextKeys 屬性的讀寫。
  • 并發(fā)讀寫全局變量:如 sync.Pool 中對 allPools 的讀寫。

4.使用 Mutex 需要注意以下幾點(diǎn):

  • 不要忘記使用 Unlock 釋放鎖
  • Lock 之后,沒有釋放鎖之前,不能再次使用 Lock
  • 注意不同 goroutine 競爭不同鎖的情況,需要考慮一下是否有可能會死鎖
  • Unlock 之前,必須已經(jīng)調(diào)用了 Lock,否則會 panic
  • 在第一次使用 Mutex 之后,不能復(fù)制,因?yàn)檫@樣一來 Mutex 的狀態(tài)也會被復(fù)制。這個可以使用 go vet 來檢查。

5.互斥鎖可以保護(hù)一塊代碼塊只能有一個 goroutine 執(zhí)行,但是不保證臨界區(qū)內(nèi)操作的變量不被其他 goroutine 做并發(fā)讀寫操作。

6.go 的 Mutex 基于以下技術(shù)實(shí)現(xiàn):

  • 信號量:這是操作系統(tǒng)層面的同步機(jī)制
  • 隊(duì)列:在 goroutine 獲取不到鎖的時候,會將這些 goroutine 放入一個 FIFO 隊(duì)列中,下次喚醒會喚醒隊(duì)列頭的 goroutine
  • 原子操作:state 字段記錄了四種不同的信息,通過原子操作就可以保證數(shù)據(jù)的完整性

7.go Mutex 的公平性:

  • 正在運(yùn)行的 goroutine 如果需要鎖的話,盡量讓它先獲取到鎖,可以避免不必要的協(xié)程上下文切換。會和被喚醒的 goroutine 一起競爭鎖。
  • 但是如果等待隊(duì)列中的 goroutine 超過了 1ms 還沒有獲取到鎖,那么會進(jìn)入饑餓模式

8.go Mutex 的兩種模式:

  • 正常模式:運(yùn)行中的 goroutine 有一定機(jī)會比等待隊(duì)列中的 goroutine 先獲取到鎖,這種模式有更好的性能。
  • 饑餓模式:所有后來的 goroutine 都直接進(jìn)入等待隊(duì)列,會依次從等待隊(duì)列頭喚醒 goroutine。可以有效避免尾延遲。

9.饑餓模式下,Unlock 的時候會直接將當(dāng)前 goroutine 所在 P 的使用權(quán)交給等待隊(duì)列頭部的 goroutine,放棄原本屬于自己的時間片。

以上就是一文帶你深入了解Golang中的Mutex的詳細(xì)內(nèi)容,更多關(guān)于Golang Mutex的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • go mod 使用私有g(shù)itlab群組的解決方案

    go mod 使用私有g(shù)itlab群組的解決方案

    這篇文章主要介紹了go mod 使用私有g(shù)itlab群組的解決方案,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-05-05
  • Golang實(shí)現(xiàn)將中文轉(zhuǎn)化為拼音

    Golang實(shí)現(xiàn)將中文轉(zhuǎn)化為拼音

    這篇文章主要為大家詳細(xì)介紹了如何通過Golang實(shí)現(xiàn)將中文轉(zhuǎn)化為拼音功能,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下
    2023-02-02
  • 使用Go語言編寫簡潔代碼的最佳實(shí)踐

    使用Go語言編寫簡潔代碼的最佳實(shí)踐

    簡潔的代碼對于創(chuàng)建可維護(hù)、可閱讀和高效的軟件至關(guān)重要,Go 是一種強(qiáng)調(diào)簡單和代碼整潔的語言,在本文中,我們將結(jié)合代碼示例,探討編寫簡潔 Go 代碼的最佳實(shí)踐,需要的朋友可以參考下
    2023-09-09
  • Go語言的數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)JSON

    Go語言的數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)JSON

    本文主要介紹了Go語言的數(shù)據(jù)結(jié)構(gòu)轉(zhuǎn)JSON,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2022-01-01
  • 淺談golang fasthttp踩坑經(jīng)驗(yàn)

    淺談golang fasthttp踩坑經(jīng)驗(yàn)

    本文主要介紹了golang fasthttp踩坑經(jīng)驗(yàn),文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-11-11
  • golang內(nèi)存對齊的概念及案例詳解

    golang內(nèi)存對齊的概念及案例詳解

    為保證程序順利高效的運(yùn)行,編譯器會把各種類型的數(shù)據(jù)安排到合適的地址,并占用合適的長度,這就是內(nèi)存對齊。本文重點(diǎn)給大家介紹golang內(nèi)存對齊的概念及案例詳解,感興趣的朋友一起看看吧
    2022-02-02
  • 自定義Go?Json的序列化方法譯文

    自定義Go?Json的序列化方法譯文

    這篇文章主要為大家介紹了自定義Go?Json序列化方法譯文,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪
    2022-06-06
  • Golang開發(fā)之字符串與切片問題踩坑記錄

    Golang開發(fā)之字符串與切片問題踩坑記錄

    字符串和切片,都是golang常用的兩種內(nèi)置數(shù)據(jù)類型,最近在日常工作中,遇到了一個字符串切片導(dǎo)致的問題,記錄一下排查問題的過程,避免后續(xù)在這種場景上踩坑
    2023-07-07
  • 使用GO語言實(shí)現(xiàn)Mysql數(shù)據(jù)庫CURD的簡單示例

    使用GO語言實(shí)現(xiàn)Mysql數(shù)據(jù)庫CURD的簡單示例

    本文主要介紹了使用GO語言實(shí)現(xiàn)Mysql數(shù)據(jù)庫CURD的簡單示例,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-08-08
  • go語言開發(fā)環(huán)境配置(sublime text3+gosublime)

    go語言開發(fā)環(huán)境配置(sublime text3+gosublime)

    網(wǎng)上google了下go的開發(fā)工具,大都推薦sublime text3+gosublime,本文就介紹了go語言開發(fā)環(huán)境配置(sublime text3+gosublime),具有一定的參考價值,感興趣的可以了解一下
    2022-01-01

最新評論