淺談Go用于同步和并發(fā)控制的幾種常見鎖
1. 互斥鎖(Mutex)
sync.Mutex
:這是最基本的互斥鎖,用于保護共享資源防止同時訪問。
它有兩個主要的方法:
Lock()
:獲取鎖,如果鎖已經(jīng)被其他Goroutine獲取,則等待。Unlock()
:釋放鎖。
1.1 示例
創(chuàng)建多個goroutine來增加一個共享變量的值。為了防止并發(fā)訪問導致的數(shù)據(jù)競爭,我們將使用sync.Mutex
來確保每次只有一個goroutine可以修改變量。
package main import ( "fmt" "sync" "time" ) // 定義一個共享資源 var counter int = 0 // 創(chuàng)建一個互斥鎖 var lock sync.Mutex func main() { // 創(chuàng)建一個等待組,以便等待所有g(shù)oroutine完成 var wg sync.WaitGroup // 啟動多個goroutine來增加計數(shù)器 for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() for j := 0; j < 10; j++ { // 在修改共享資源前獲取鎖 lock.Lock() // 修改共享資源 counter++ fmt.Printf("Goroutine %d sees counter: %d\n", id, counter) // 釋放鎖 lock.Unlock() // 等待一段時間,模擬處理過程 time.Sleep(time.Millisecond * 10) } }(i) } // 等待所有g(shù)oroutine完成 wg.Wait() // 打印最終的計數(shù)器值 fmt.Println("Final counter:", counter) }
1.2 代碼解釋
- 共享資源:這里的共享資源是
counter
變量,所有g(shù)oroutine都會嘗試修改它。 - 互斥鎖:使用
sync.Mutex
來保護對counter
的訪問。在每次修改前,goroutine會調(diào)用lock.Lock()
來嘗試獲取鎖,完成修改后調(diào)用lock.Unlock()
釋放鎖。 - 等待組:使用
sync.WaitGroup
來等待所有g(shù)oroutine完成。每啟動一個goroutine,調(diào)用wg.Add(1)
,每個goroutine完成時調(diào)用wg.Done()
。 - 并發(fā)執(zhí)行:通過
go func(id int)
啟動goroutine,每個goroutine都嘗試多次修改counter
,并在控制臺輸出當前看到的counter
值。
2. 讀寫鎖(RWMutex)
sync.RWMutex
:這是一個讀寫互斥鎖,允許多個讀操作并發(fā),但寫操作是互斥的。
主要方法有:
RLock()
:獲取讀鎖,允許其他Goroutine同時獲取讀鎖。RUnlock()
:釋放讀鎖。Lock()
:獲取寫鎖,阻塞其他的讀鎖和寫鎖請求。Unlock()
:釋放寫鎖。
2.1 示例
創(chuàng)建多個goroutine,一些用于讀取共享數(shù)據(jù),而另一些用于寫入共享數(shù)據(jù)。sync.RWMutex
將允許多個讀操作并發(fā)執(zhí)行,但寫操作將是互斥的,確保了數(shù)據(jù)的一致性。
package main import ( "fmt" "sync" "time" ) // 定義一個共享資源 var data int = 0 // 創(chuàng)建一個讀寫互斥鎖 var rwMutex sync.RWMutex func main() { // 創(chuàng)建一個等待組,以便等待所有g(shù)oroutine完成 var wg sync.WaitGroup // 啟動多個讀goroutine for i := 0; i < 3; i++ { wg.Add(1) go func(id int) { defer wg.Done() readData(id) }(i) } // 啟動多個寫goroutine for i := 0; i < 2; i++ { wg.Add(1) go func(id int) { defer wg.Done() writeData(id) }(i) } // 等待所有g(shù)oroutine完成 wg.Wait() } // 讀取數(shù)據(jù)的函數(shù) func readData(id int) { for j := 0; j < 5; j++ { // 獲取讀鎖 rwMutex.RLock() fmt.Printf("Goroutine %d reads data: %d\n", id, data) // 釋放讀鎖 rwMutex.RUnlock() // 等待一段時間,模擬讀取過程 time.Sleep(time.Millisecond * 100) } } // 寫入數(shù)據(jù)的函數(shù) func writeData(id int) { for j := 0; j < 5; j++ { // 獲取寫鎖 rwMutex.Lock() // 修改數(shù)據(jù) data += id fmt.Printf("Goroutine %d writes data: %d\n", id, data) // 釋放寫鎖 rwMutex.Unlock() // 等待一段時間,模擬寫入過程 time.Sleep(time.Millisecond * 100) } }
2.2 代碼解釋
- 共享資源:這里的共享資源是
data
變量,所有讀goroutine都會讀取它,而寫goroutine會修改它。 - 讀寫互斥鎖:使用
sync.RWMutex
來保護對data
的訪問。讀goroutine在讀取前調(diào)用rwMutex.RLock()
獲取讀鎖,并在讀取后調(diào)用rwMutex.RUnlock()
釋放讀鎖。寫goroutine在寫入前調(diào)用rwMutex.Lock()
獲取寫鎖,并在寫入后調(diào)用rwMutex.Unlock()
釋放寫鎖。 - 等待組:使用
sync.WaitGroup
來等待所有g(shù)oroutine完成。每啟動一個goroutine,調(diào)用wg.Add(1)
,每個goroutine完成時調(diào)用wg.Done()
。 - 并發(fā)執(zhí)行:通過
go func(id int)
啟動goroutine,讀goroutine和寫goroutine分別執(zhí)行讀取和寫入操作。
3. 條件變量(Cond)
sync.Cond
:條件變量通常與互斥鎖一起使用,用于實現(xiàn)更復雜的同步場景。
它提供了三種方法:
Wait()
:等待條件滿足。Signal()
:喚醒一個等待中的Goroutine。Broadcast()
:喚醒所有等待中的Goroutine。
3.1 示例
創(chuàng)建一個生產(chǎn)者-消費者模型,生產(chǎn)者將數(shù)據(jù)添加到緩沖區(qū)中,而消費者從緩沖區(qū)中獲取數(shù)據(jù)。我們使用sync.Cond
來實現(xiàn)生產(chǎn)者和消費者之間的同步。
package main import ( "fmt" "sync" "time" ) // 緩沖區(qū)容量 const bufferSize = 5 // 緩沖區(qū) var buffer = make([]int, 0, bufferSize) // 互斥鎖 var mutex sync.Mutex // 條件變量 var cond = sync.NewCond(&mutex) func main() { // 創(chuàng)建一個等待組,以便等待所有g(shù)oroutine完成 var wg sync.WaitGroup // 啟動生產(chǎn)者goroutine for i := 0; i < 2; i++ { wg.Add(1) go producer(&wg, i) } // 啟動消費者goroutine for i := 0; i < 3; i++ { wg.Add(1) go consumer(&wg, i) } // 等待所有g(shù)oroutine完成 wg.Wait() } // 生產(chǎn)者函數(shù) func producer(wg *sync.WaitGroup, id int) { defer wg.Done() for j := 0; j < 10; j++ { time.Sleep(time.Millisecond * 100) // 模擬生產(chǎn)過程 // 獲取鎖 mutex.Lock() // 等待緩沖區(qū)未滿 for len(buffer) == bufferSize { cond.Wait() } // 生產(chǎn)數(shù)據(jù) buffer = append(buffer, j) fmt.Printf("Producer %d produced: %d, buffer: %v\n", id, j, buffer) // 喚醒消費者 cond.Signal() // 釋放鎖 mutex.Unlock() } } // 消費者函數(shù) func consumer(wg *sync.WaitGroup, id int) { defer wg.Done() for { time.Sleep(time.Millisecond * 150) // 模擬消費過程 // 獲取鎖 mutex.Lock() // 等待緩沖區(qū)非空 for len(buffer) == 0 { cond.Wait() } // 消費數(shù)據(jù) data := buffer[0] buffer = buffer[1:] fmt.Printf("Consumer %d consumed: %d, buffer: %v\n", id, data, buffer) // 喚醒生產(chǎn)者 cond.Signal() // 釋放鎖 mutex.Unlock() } }
3.2 代碼解釋
- 緩沖區(qū):
buffer
是一個用于存放數(shù)據(jù)的切片,bufferSize
定義了緩沖區(qū)的容量。 - 互斥鎖:
mutex
用于保護緩沖區(qū)的并發(fā)訪問。 - 條件變量:
cond
是一個條件變量,配合互斥鎖使用,用于實現(xiàn)生產(chǎn)者和消費者之間的同步。 - 生產(chǎn)者函數(shù):
producer
函數(shù)模擬生產(chǎn)數(shù)據(jù)。當緩沖區(qū)滿時,生產(chǎn)者會等待條件變量。生產(chǎn)數(shù)據(jù)后,生產(chǎn)者會發(fā)出Signal
通知消費者。 - 消費者函數(shù):
consumer
函數(shù)模擬消費數(shù)據(jù)。當緩沖區(qū)空時,消費者會等待條件變量。消費數(shù)據(jù)后,消費者會發(fā)出Signal
通知生產(chǎn)者。 - 等待組:使用
sync.WaitGroup
來等待所有生產(chǎn)者和消費者goroutine完成。
4. Once
sync.Once
:保證某個操作只執(zhí)行一次,常用于初始化操作。
主要方法是:
Do(f func())
:只執(zhí)行一次傳入的函數(shù),即使從多個Goroutine調(diào)用也只會執(zhí)行一次。
4.1 示例
模擬一個只需初始化一次的資源。無論有多少個goroutine嘗試初始化這個資源,sync.Once
都確保它們中的某一個只會執(zhí)行一次初始化操作。
package main import ( "fmt" "sync" "time" ) // 定義一個全局變量用于存放初始化資源 var resource string // 定義一個sync.Once變量 var once sync.Once // 模擬資源初始化的函數(shù) func initialize() { fmt.Println("Initializing resource...") resource = "Resource Initialized" } func main() { // 創(chuàng)建一個等待組,以便等待所有g(shù)oroutine完成 var wg sync.WaitGroup // 啟動多個goroutine,嘗試初始化資源 for i := 0; i < 5; i++ { wg.Add(1) go func(id int) { defer wg.Done() useResource(id) }(i) } // 等待所有g(shù)oroutine完成 wg.Wait() // 最后打印資源的狀態(tài) fmt.Println("Final resource state:", resource) } // 使用資源的函數(shù),嘗試初始化資源 func useResource(id int) { // 使用sync.Once的Do方法確保initialize函數(shù)只執(zhí)行一次 once.Do(initialize) fmt.Printf("Goroutine %d using resource: %s\n", id, resource) // 模擬資源使用過程 time.Sleep(time.Millisecond * 100) }
4.2 代碼解釋
- 全局變量:
resource
是一個全局變量,用于存放初始化的資源。 - sync.Once:
once
是一個sync.Once
變量,用于確保初始化函數(shù)initialize
只執(zhí)行一次。 - 初始化函數(shù):
initialize
函數(shù)模擬初始化資源的操作,只會在第一次調(diào)用時執(zhí)行。 - 等待組:使用
sync.WaitGroup
等待所有g(shù)oroutine完成操作。 - 使用資源的函數(shù):
useResource
函數(shù)模擬使用資源的過程。它調(diào)用once.Do(initialize)
確保initialize
函數(shù)只執(zhí)行一次。然后,它打印出資源的狀態(tài),并模擬使用資源的過程。
5. 原子操作
sync/atomic
包提供了底層的原子操作,可以用于實現(xiàn)無鎖的并發(fā)算法。
這些操作包括:
AddInt32()
AddInt64()
LoadInt32()
LoadInt64()
StoreInt32()
StoreInt64()
CompareAndSwapInt32()
CompareAndSwapInt64()
5.1 示例
創(chuàng)建一個簡單的程序,該程序使用原子操作來增加、存儲和加載一個整數(shù)值,并使用CompareAndSwap
來實現(xiàn)條件更新。
package main import ( "fmt" "sync" "sync/atomic" ) func main() { var wg sync.WaitGroup var count int32 = 0 // 啟動多個goroutine來增加計數(shù)器 for i := 0; i < 10; i++ { wg.Add(1) go func() { defer wg.Done() for j := 0; j < 5; j++ { // 使用atomic.AddInt32來原子地增加計數(shù)器 atomic.AddInt32(&count, 1) } }() } // 等待所有g(shù)oroutine完成 wg.Wait() // 使用atomic.LoadInt32來原子地讀取計數(shù)器 finalCount := atomic.LoadInt32(&count) fmt.Println("Final count:", finalCount) // 嘗試使用atomic.CompareAndSwapInt32來條件性地更新計數(shù)器 if atomic.CompareAndSwapInt32(&count, finalCount, 100) { fmt.Println("Count was", finalCount, ", updated count to 100") } else { fmt.Println("Failed to update count") } // 再次讀取并打印計數(shù)器的值 updatedCount := atomic.LoadInt32(&count) fmt.Println("Updated count:", updatedCount) }
5.2 代碼解釋
- 變量定義:定義一個
int32
類型的變量count
用于計數(shù)。 - 增加計數(shù)器:啟動多個goroutine,每個goroutine使用
atomic.AddInt32
來原子地增加count
的值。這保證了在并發(fā)環(huán)境下,計數(shù)的增加操作是安全的。 - 讀取計數(shù)器:所有g(shù)oroutine完成后,使用
atomic.LoadInt32
原子地讀取count
的值。這是讀取共享變量的安全方式。 - 條件更新:使用
atomic.CompareAndSwapInt32
嘗試原子地更新count
的值。這個函數(shù)只有在當前值等于預期值時才會更新,并返回是否成功。 - 打印最終結(jié)果:打印最終的計數(shù)值和更新后的計數(shù)值。
6. Pool
sync.Pool
:用于臨時對象的緩存,減少垃圾回收的壓力。
主要方法包括:
Get()
:獲取一個對象。Put(x interface{})
:放回一個對象。
6.1 示例
如何使用對象池來緩存和重用對象,從而減少垃圾回收的壓力。
package main import ( "fmt" "sync" "time" ) // 定義一個結(jié)構(gòu)體類型,用于示例 type MyObject struct { ID int } // 創(chuàng)建一個全局的sync.Pool對象 var objectPool = sync.Pool{ New: func() interface{} { return &MyObject{} }, } func main() { var wg sync.WaitGroup // 啟動多個goroutine來獲取和放回對象 for i := 0; i < 10; i++ { wg.Add(1) go func(id int) { defer wg.Done() // 從對象池中獲取一個對象 obj := objectPool.Get().(*MyObject) // 模擬對象的使用 obj.ID = id fmt.Printf("Goroutine %d using object with ID: %d\n", id, obj.ID) // 模擬工作延遲 time.Sleep(time.Millisecond * 100) // 重置對象的狀態(tài)(可選) obj.ID = 0 // 將對象放回池中 objectPool.Put(obj) }(i) } // 等待所有g(shù)oroutine完成 wg.Wait() // 打印對象池的狀態(tài) fmt.Println("All goroutines finished, objects are back in the pool.") }
6.2 代碼解釋
- 定義結(jié)構(gòu)體:定義一個
MyObject
結(jié)構(gòu)體,用于示例。 - 創(chuàng)建對象池:使用
sync.Pool
創(chuàng)建一個全局的對象池objectPool
。通過設置New
字段指定當對象池為空時如何創(chuàng)建新對象。 - 啟動多個goroutine:在主函數(shù)中,啟動10個goroutine,每個goroutine從對象池中獲取一個對象,使用后將其放回池中。
- 獲取對象:使用
objectPool.Get()
從對象池中獲取一個對象,并類型斷言為*MyObject
。 - 使用對象:模擬對象的使用過程,設置對象的
ID
字段,并打印信息。 - 模擬延遲:使用
time.Sleep
模擬一些處理延遲。 - 重置對象狀態(tài):重置對象的狀態(tài)(這是可選的,但有助于避免狀態(tài)污染)。
- 放回對象:使用
objectPool.Put(obj)
將對象放回對象池中。 - 等待所有g(shù)oroutine完成:使用
sync.WaitGroup
等待所有g(shù)oroutine完成。 - 打印狀態(tài):最后打印消息,表示所有g(shù)oroutine已完成。
到此這篇關(guān)于淺談Go用于同步和并發(fā)控制的幾種常見鎖的文章就介紹到這了,更多相關(guān)Go 同步和并發(fā)控制鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!