golang基于Mutex實(shí)現(xiàn)可重入鎖
golang基于Mutex實(shí)現(xiàn)可重入鎖
為什么需要可重入鎖
我們平時說的分布式鎖,一般指的是在不同服務(wù)器上的多個線程中,只有一個線程能搶到一個鎖,從而執(zhí)行一個任務(wù)。而我們使用鎖就是保證一個任務(wù)只能由一個線程來完成。所以我們一般是使用這樣的三段式邏輯:
Lock(); DoJob(); Unlock();
但是由于我們的系統(tǒng)都是分布式的,這個鎖一般不會只放在某個進(jìn)程中,我們會借用第三方存儲,比如 Redis 來做這種分布式鎖。但是一旦借助了第三方存儲,我們就必須面對這個問題:Unlock是否能保證一定運(yùn)行呢?
這個問題,我們面對的除了程序的bug之外,還有網(wǎng)絡(luò)的不穩(wěn)定,進(jìn)程被殺死,服務(wù)器被down機(jī)等。我們是無法保證Unlock一定被運(yùn)行的。
那么我們就一般在Lock的時候?yàn)檫@個鎖加一個超時時間作為兜底。
LockByExpire(duration); DoJob(); Unlock();
這個超時時間是為了一旦出現(xiàn)異常情況導(dǎo)致Unlock沒有被運(yùn)行,這個鎖在duration時間內(nèi)也會被自動釋放。這個在redis中我們一般就是使用set ex 來進(jìn)行鎖超時的設(shè)定。
但是有這個超時時間我們又遇上了問題,超時時間設(shè)置多久合適呢?當(dāng)然要設(shè)置的比 DoJob 消耗的時間更長,否則的話,在任務(wù)還沒結(jié)束的時候,鎖就被釋放了,還是有可能導(dǎo)致并發(fā)任務(wù)的存在。
但是實(shí)際上,同樣由于網(wǎng)絡(luò)超時問題,系統(tǒng)運(yùn)行狀況問題等,我們是無法準(zhǔn)確知道DoJob這個函數(shù)要執(zhí)行多久的。那么這時候怎么辦呢?
有兩個辦法:
第一個方法,我們可以對DoJob做一個超時設(shè)置。讓DoJob最多只能執(zhí)行n秒,那么我的分布式鎖的超時時長設(shè)置比n秒長就可以了。為一個任務(wù)設(shè)置超時時間在很多語言是可以做到的。比如golang 中的 TimeoutContext。
而第二種方法,就是我們先為鎖設(shè)置一個比較小的超時時長,然后不斷續(xù)期這個鎖。對一個鎖的不斷需求,也可以理解為重新開始加鎖,這種可以不斷續(xù)期的鎖,就叫做可重入鎖。
除了主線程之外,可重入鎖必然有一個另外的線程(或者攜程)可以對這個鎖進(jìn)行續(xù)期,我們叫這個額外的程序叫做watchDog(看門狗)。
鎖重入的定義
鎖可重入也就是當(dāng)前已經(jīng)獲取到鎖的goroutine繼續(xù)調(diào)用Lock方法獲取鎖,Go標(biāo)準(zhǔn)庫中提供了sync.Mutex實(shí)現(xiàn)了排他鎖,但并不是可重入的,如果在代碼中重入鎖,也就是Lock之后再次進(jìn)行Lock獲取鎖,則會被阻塞到第二次Lock上,鎖沒有辦法得到釋放從而影響其它goroutine執(zhí)行
// 例如 package main; import "sync" func ReentryExample() { var c int64 var mu sync.Mutex mu.Lock() // 第一次加鎖 // TODO // mu.Lock() // 第二次加鎖,阻塞 c++; // TODO ... }
重入鎖的簡單實(shí)現(xiàn)思路
- 拿到能夠識別到當(dāng)前協(xié)程的id,(通過堆棧信息獲取到goroutine的id)
- 寫一個結(jié)構(gòu)體,實(shí)現(xiàn)Locker接口
首先獲取到goroutine的id
func GoID() int { var buf [32]byte n := runtime.Stack(buf[:],false) // 獲取堆棧的信息 // string(buf[:n] /** goroutine 6 [running]: main.XXX */ // 拿到goroutine的id goIdStr := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine"))[0] goId, err := strconv.Atoi(fieldId)// 轉(zhuǎn)換為int return goId }
然后開始編寫可重入鎖的結(jié)構(gòu)體
// ReentrantMutex 可重入的互斥鎖 type ReentrantMutex struct { sync.Mutex // 互斥鎖 goId int64 // 用于保存goroutine的id recursion int64 // 鎖重入的次數(shù) } // Lock 實(shí)現(xiàn)Locker接口,用于加鎖 func (r *ReentrantMutex) Lock() { gid := GoID() if atomic.LoadInt64(&r.goId) == gid { // 看看是否已經(jīng)加過鎖了? atomic.AddInt64(&r.recursion, 1) // 如果之前加過鎖,則重入的次數(shù)+1 return } r.Mutex.Lock() // 使用互斥鎖上鎖 atomic.StoreInt64(&r.goId, gid) // 使用原子操作保存goroutine的id atomic.StoreInt64(&r.recursion, 1) // 第一次加鎖,因此重入的次數(shù)為一 } // Unlock 實(shí)現(xiàn)了Locker的接口,用于解鎖 func (r *ReentrantMutex) Unlock() { gid := GoID() if atomic.LoadInt64(&r.goId) != gid { // 看是否加過鎖 panic("未加鎖") // 沒有加過鎖,不存在解鎖,直接panic } recursion := atomic.AddInt64(&r.recursion, -1) // 重入次數(shù)-1 if recursion != 0 { // 如果重入次數(shù)沒有等于0(意味著還有鎖沒有釋放) return } atomic.StoreInt64(&r.goId, -1) // 重入次數(shù)為0,則不存在鎖沒有釋放,解鎖 r.Mutex.Unlock() // 互斥鎖解鎖 }
測試用例
package main; func main() { var m ReentrantMutex m.Lock() m.Lock() // 不會阻塞 fmt.Println("1") // 正常打印1 m.Unlock() m.Unlock()// 解鎖 }
其他方法實(shí)現(xiàn)Golang可重入鎖:
具體實(shí)現(xiàn)
在Golang中,語言級別天生支持協(xié)程,所以這種可重入鎖就非常容易實(shí)現(xiàn):
// DistributeLockRedis 基于redis的分布式可重入鎖,自動續(xù)租 type DistributeLockRedis struct { key string // 鎖的key expire int64 // 鎖超時時間 status bool // 上鎖成功標(biāo)識 cancelFun context.CancelFunc // 用于取消自動續(xù)租攜程 redis redis.Client // redis句柄 } // 創(chuàng)建可 func NewDistributeLockRedis(key string, expire int64) *DistributeLockRedis { return &DistributeLockRedis{ key : key, expire : expire, } } // TryLock 上鎖 func (dl *DistributeLockRedis) TryLock() (err error) { if err = dl.lock(); err != nil { return err } ctx, cancelFun := context.WithCancel(context.Background()) dl.cancelFun = cancelFun dl.startWatchDog(ctx) // 創(chuàng)建守護(hù)協(xié)程,自動對鎖進(jìn)行續(xù)期 dl.status = true return nil } // competition 競爭鎖 func (dl *DistributeLockRedis) lock() error { if res, err := redis.String(dl.redis.Do(context.Background(), "SET", dl.key, 1, "NX", "EX", dl.expire)); err != nil { return err } return nil } // guard 創(chuàng)建守護(hù)協(xié)程,自動續(xù)期 func (dl *DistributeLockRedis) startWatchDog(ctx context.Context) { safeGo(func() error { for { select { // Unlock通知結(jié)束 case <-ctx.Done(): return nil default: // 否則只要開始了,就自動重入(續(xù)租鎖) if dl.status { if res, err := redis.Int(dl.redis.Do(context.Background(), "EXPIRE", dl.key, dl.expire)); err != nil { return nil } // 續(xù)租時間為 expire/2 秒 time.Sleep(time.Duration(dl.expire/2) * time.Second) } } } }) } // Unlock 釋放鎖 func (dl *DistributeLockRedis) Unlock() (err error) { // 這個重入鎖必須取消,放在第一個地方執(zhí)行 if dl.cancelFun != nil { dl.cancelFun() // 釋放成功,取消重入鎖 } var res int if dl.status { if res, err = redis.Int(dl.redis.Do(context.Background(), "Del", dl.key)); err != nil { return fmt.Errorf("釋放鎖失敗") } if res == 1 { dl.status = false return nil } } return fmt.Errorf("釋放鎖失敗") }
這段代碼的邏輯基本上都以注釋的形式來寫了。其中主要就在startWatchDog,對鎖進(jìn)行重新續(xù)期
ctx, cancelFun := context.WithCancel(context.Background()) dl.cancelFun = cancelFun dl.startWatchDog(ctx) // 創(chuàng)建守護(hù)協(xié)程,自動對鎖進(jìn)行續(xù)期 dl.status = true
首先創(chuàng)建一個cancelContext,它的context函數(shù)cancelFunc是給Unlock進(jìn)行調(diào)用的。然后啟動一個goroutine進(jìn)程來循環(huán)續(xù)期。
這個新啟動的goroutine在主goroutine處理結(jié)束,調(diào)用Unlock的時候,才會結(jié)束,否則會在 過期時間/2 的時候,調(diào)用一次redis的expire命令來進(jìn)行續(xù)期。
至于外部,在使用的時候如下
func Foo() error { key := foo // 創(chuàng)建可重入的分布式鎖 dl := NewDistributeLockRedis(key, 10) // 爭搶鎖 err := dl.TryLock() if err != nil { // 沒有搶到鎖 return err } // 搶到鎖的記得釋放鎖 defer func() { dl.Unlock() } // 做真正的任務(wù) DoJob() }
到此這篇關(guān)于golang基于Mutex實(shí)現(xiàn)可重入鎖的文章就介紹到這了,更多相關(guān)golang Mutex可重入鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
golang框架gin的日志處理和zap lumberjack日志使用方式
這篇文章主要介紹了golang框架gin的日志處理和zap lumberjack日志使用方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-01-01Golang多線程下載器實(shí)現(xiàn)高效快速地下載大文件
Golang多線程下載器是一種高效、快速地下載大文件的方法。Golang語言天生支持并發(fā)和多線程,可以輕松實(shí)現(xiàn)多線程下載器的開發(fā)。通過使用Golang的協(xié)程和通道,可以將下載任務(wù)分配到多個線程中并行處理,提高了下載的效率和速度2023-05-05基于Go語言實(shí)現(xiàn)插入排序算法及優(yōu)化
插入排序是一種簡單的排序算法。這篇文章將利用Go語言實(shí)現(xiàn)冒泡排序算法,文中的示例代碼講解詳細(xì),對學(xué)習(xí)Go語言有一定的幫助,需要的可以參考一下2022-12-12