Go語言如何使用分布式鎖解決并發(fā)問題
在分布式系統(tǒng)中,協(xié)調(diào)多個服務(wù)實(shí)例之間的共享資源訪問是一個經(jīng)典的挑戰(zhàn)。傳統(tǒng)的單機(jī)鎖(如 sync.Mutex
)無法實(shí)現(xiàn)跨進(jìn)程工作,此時就需要用到分布式鎖了。本文將介紹 Go 語言生態(tài)中基于 Redis 實(shí)現(xiàn)的分布式鎖庫 redsync
,并探討其使用方法和實(shí)現(xiàn)原理。
分布式鎖
首先我們來探討下為什么需要分布式鎖?當(dāng)我們編寫的程序出現(xiàn)資源競爭的時候,就需要使用互斥鎖來保證并發(fā)安全。而我們的服務(wù)很有可能不會單機(jī)部署,而是采用多副本的集群部署方案。無論哪種方案運(yùn)行程序,我們都需要合適的工具來解決并發(fā)問題。在解決單個進(jìn)程間多個協(xié)程之間的并發(fā)資源搶占問題時,我們往往采用 sync.Mutex
。而在解決多個進(jìn)程間的并發(fā)資源搶占問題時,就需要采用分布式鎖了,這就引出了我們今天要講解的 redsync
。
為什么是 redsync
在 Go 中分布式鎖的開源實(shí)現(xiàn)有很多,為什么選擇介紹和使用 redsync
呢?簡單一句話:redsync
是 Redis 官方 唯一推薦的 Go Redis 分布式鎖解決方案,遵循 Redlock 算法。它允許在多個獨(dú)立 Redis 節(jié)點(diǎn)上創(chuàng)建高可用的鎖,適用于需要強(qiáng)一致性的分布式場景。
我們可以對比下 sync.Mutex
和 redsync
之間的區(qū)別,讓你有個感性的認(rèn)識。
特性 | sync.Mutex | redsync |
---|---|---|
適用范圍 | 單個進(jìn)程內(nèi)的多個 goroutine | 多個進(jìn)程(允許跨機(jī)器) |
依賴 | 無 | Redis |
性能 | 高(無網(wǎng)絡(luò)開銷) | 較低(涉及網(wǎng)絡(luò)通信) |
實(shí)現(xiàn)復(fù)雜度 | 簡單 | 較復(fù)雜(需處理網(wǎng)絡(luò)、超時等問題) |
典型場景 | 內(nèi)存共享資源保護(hù) | 分布式系統(tǒng)共享資源保護(hù) |
二者分別適用于不同的并發(fā)場景,選擇時需要根據(jù)實(shí)際需求(單機(jī)還是分布式)來決定。
redsync 快速上手
redsync
雖然內(nèi)部實(shí)現(xiàn)上比較復(fù)雜,但別被嚇到,它的用法超級簡單。
示例代碼如下:
package main import ( "context" "github.com/go-redsync/redsync/v4" // 引入 redsync 庫,用于實(shí)現(xiàn)基于 Redis 的分布式鎖 "github.com/go-redsync/redsync/v4/redis/goredis/v9" // 引入 redsync 的 goredis 連接池 goredislib "github.com/redis/go-redis/v9" // 引入 go-redis 庫,用于與 Redis 服務(wù)器通信 ) func main() { // 創(chuàng)建一個 Redis 客戶端 client := goredislib.NewClient(&goredislib.Options{ Addr: "localhost:36379", // Redis 服務(wù)器地址 Password: "nightwatch", }) // 使用 go-redis 客戶端創(chuàng)建一個 redsync 連接池 pool := goredis.NewPool(client) // 創(chuàng)建一個 redsync 實(shí)例,用于管理分布式鎖 rs := redsync.New(pool) // 創(chuàng)建一個名為 "test-redsync" 的互斥鎖(Mutex) mutex := rs.NewMutex("test-redsync") // 創(chuàng)建一個上下文(context),一般用于控制鎖的超時和取消 ctx := context.Background() // 獲取鎖,如果獲取失?。ɡ珂i已被其他進(jìn)程持有),會返回錯誤 if err := mutex.LockContext(ctx); err != nil { panic(err) // 如果獲取鎖失敗,程序會 panic } // TODO 執(zhí)行業(yè)務(wù)邏輯 // ... // 釋放鎖,如果釋放失?。ɡ珂i已過期或不屬于當(dāng)前進(jìn)程),會返回錯誤 if _, err := mutex.UnlockContext(ctx); err != nil { panic(err) // 如果釋放鎖失敗,程序會 panic } }
因?yàn)?redsync
依賴 Redis,所以我們首先需要創(chuàng)建一個 Redis 客戶端對象 client
,調(diào)用 goredis.NewPool(client)
會基于這個 client
創(chuàng)建一個 redsync
的連接池,有了這個連接池 pool
就可以調(diào)用 redsync.New(pool)
創(chuàng)建一個 redsync
實(shí)例來申請分布式鎖了。
redsync
提供了 NewMutex
方法可以創(chuàng)建一個分布式鎖,它接收一個 name
參數(shù)作為鎖的名字,這個名字會作為 Redis 中的 key
。
拿到鎖對象 mutex
以后,調(diào)用 mutex.LockContext(ctx)
就可以加鎖,加鎖后便可以訪問競態(tài)資源了,資源訪問完成后,調(diào)用 mutex.UnlockContext(ctx)
便可以釋放鎖。
可以發(fā)現(xiàn),redsync
用法和 sync.Mutex
非常相似,核心就是 Lock/Unlock
兩個操作。redsync 的使用無非多了一步連接 Redis 的過程。
配置選項(xiàng)
不知道你有沒有想過一個問題,我們在使用 sync.Mutex
時,如果某個 gorutine 加鎖后不釋放掉,那么其他 gorutine 就無法獲取鎖,而在分布式場景中,如果一個進(jìn)程獲取了 Redis 分布式鎖,然后在未釋放鎖之前進(jìn)程掛掉了,其他進(jìn)程要如何獲取鎖呢,難道要一直等待下去嗎?
這里就要引出一個使用分布式鎖很重要的問題,那就是一定要設(shè)置一個過期時間,這樣才能保證即使拿到鎖的進(jìn)程掛掉了,只要鎖的過期時間已到,鎖也一定會被自動釋放掉,只有這樣,其他進(jìn)程才有機(jī)會獲取鎖。
而我們上面的示例中,之所以可以不設(shè)置鎖的過期時間,原因是 redsync
內(nèi)部設(shè)置了默認(rèn)值。以下是 redsync
中 NewMutex
方法的源碼:
// NewMutex returns a new distributed mutex with given name. func (r *Redsync) NewMutex(name string, options ...Option) *Mutex { m := &Mutex{ name: name, expiry: 8 * time.Second, tries: 32, delayFunc: func(tries int) time.Duration { return time.Duration(rand.Intn(maxRetryDelayMilliSec-minRetryDelayMilliSec)+minRetryDelayMilliSec) * time.Millisecond }, genValueFunc: genValue, driftFactor: 0.01, timeoutFactor: 0.05, quorum: len(r.pools)/2 + 1, pools: r.pools, } for _, o := range options { o.Apply(m) } if m.shuffle { randomPools(m.pools) } return m }
這里 Mutex
對象的第二個字段 expiry
就是分布式鎖的過期時間,這里默認(rèn)為設(shè)為 8 秒。tries
字段是獲取鎖的重試次數(shù),即嘗試獲取鎖失敗 32 次以后,才會返回加鎖失敗,因?yàn)榉植际綀鼍跋率∈呛苷5那闆r,所以 32 次并不是一個很夸張的值。delayFunc
字段是每次失敗后重試的間隔時間。其他字段我就不一一講解了,絕大多數(shù)我們都用不到。
根據(jù)代碼我們很容易想到這幾個字段是通過選項(xiàng)模式來設(shè)置的。
WithExpiry(time.Duration)
:設(shè)置鎖的自動過期時間(建議大于業(yè)務(wù)執(zhí)行時間)。WithTries(int)
:設(shè)置最大重試次數(shù)。WithRetryDelay(time.Duration)
:設(shè)置重試間隔。
使用示例:
mutex := rs.NewMutex("test-redsync", redsync.WithExpiry(30*time.Second), redsync.WithTries(3), redsync.WithRetryDelay(500*time.Millisecond), )
看門狗
我們現(xiàn)在知道使用分布式鎖一定要設(shè)置一個過期時間了,但是這會帶來另外一個問題:如果我們的業(yè)務(wù)代碼還沒執(zhí)行完,鎖就過期自動釋放了,那么此時另外一個進(jìn)程成功拿到這把鎖,也來訪問競態(tài)資源,那分布式鎖不就失去意義了嗎?
這就引出了使用分布式鎖的另一個重要問題,鎖自動續(xù)期。我舉一個代碼示例,你就懂了:
package main import ( "context" "log/slog" "time" "github.com/go-redsync/redsync/v4" // 引入 redsync 庫,用于實(shí)現(xiàn)基于 Redis 的分布式鎖 "github.com/go-redsync/redsync/v4/redis/goredis/v9" // 引入 redsync 的 goredis 連接池 goredislib "github.com/redis/go-redis/v9" // 引入 go-redis 庫,用于與 Redis 服務(wù)器通信 ) func main() { // 創(chuàng)建一個 Redis 客戶端 client := goredislib.NewClient(&goredislib.Options{ Addr: "localhost:36379", // Redis 服務(wù)器地址 Password: "nightwatch", }) // 使用 go-redis 客戶端創(chuàng)建一個 redsync 連接池 pool := goredis.NewPool(client) // 創(chuàng)建一個 redsync 實(shí)例,用于管理分布式鎖 rs := redsync.New(pool) // 創(chuàng)建一個名為 "test-redsync" 的互斥鎖(Mutex) mutex := rs.NewMutex("test-redsync", redsync.WithExpiry(5*time.Second)) // 創(chuàng)建一個上下文(context),一般用于控制鎖的超時和取消 ctx, cancel := context.WithCancel(context.Background()) defer cancel() // 獲取鎖,如果獲取失?。ɡ珂i已被其他進(jìn)程持有),會返回錯誤 if err := mutex.LockContext(ctx); err != nil { panic(err) // 如果獲取鎖失敗,程序會 panic } // 看門狗,實(shí)現(xiàn)鎖自動續(xù)約 stopCh := make(chan struct{}) ticker := time.NewTicker(2 * time.Second) // 每隔 2s 續(xù)約一次 defer ticker.Stop() go func() { for { select { case <-ticker.C: // 續(xù)約,延長鎖的過期時間 if ok, err := mutex.ExtendContext(ctx); !ok || err != nil { slog.Error("Failed to extend mutex", "err", err, "status", ok) } else { slog.Info("Successfully extend mutex") } case <-stopCh: slog.Info("Exiting mutex watchdog") return } } }() // 執(zhí)行業(yè)務(wù)邏輯 time.Sleep(6 * time.Second) // 通知看門狗停止自動續(xù)期 stopCh <- struct{}{} // 釋放鎖,如果釋放失?。ɡ珂i已過期或不屬于當(dāng)前進(jìn)程),會返回錯誤 if _, err := mutex.UnlockContext(ctx); err != nil { panic(err) // 如果釋放鎖失敗,程序會 panic } }
這個示例延續(xù)了前文中的示例代碼,你需要重點(diǎn)關(guān)注的是如下這部分邏輯:
// 看門狗,實(shí)現(xiàn)鎖自動續(xù)約 stopCh := make(chan struct{}) ticker := time.NewTicker(2 * time.Second) // 每隔 2s 續(xù)約一次 defer ticker.Stop() go func() { for { select { case <-ticker.C: // 續(xù)約,延長鎖的過期時間 if ok, err := mutex.ExtendContext(ctx); !ok || err != nil { slog.Error("Failed to extend mutex", "err", err, "status", ok) } else { slog.Info("Successfully extend mutex") } case <-stopCh: slog.Info("Exiting mutex watchdog") return } } }()
redsync
提供了 mutex.ExtendContext(ctx)
方法可以延長鎖的過期時間。假設(shè)我們申請的分布式鎖過期時間是 5 秒,而業(yè)務(wù)代碼執(zhí)行時間是未知的,那么我們在拿到鎖以后,可以單獨(dú)開啟一個 goroutine 來定時延長鎖的過期時間,當(dāng)業(yè)務(wù)代碼執(zhí)行完成以后,主 goroutine 通過 stopCh <- struct{}{}
向子 goroutine 發(fā)送停止信號,那么子 goroutine 中的 <-stopCh
case 就會收到通知,子 goroutine 便會退出,也就停止了鎖自動續(xù)期。
通過為分布式鎖設(shè)置過期時間,再配合子 goroutine 自動續(xù)期的功能,我們就能保證,持有鎖的進(jìn)程掛掉時不會影響其他進(jìn)程獲取鎖,并且還能實(shí)現(xiàn)業(yè)務(wù)執(zhí)行完成后才釋放鎖。而這個實(shí)現(xiàn)分布式鎖自動續(xù)期的程序,我們通常把它叫做“看門狗”。
我再額外啰嗦一句,關(guān)于分布式鎖的續(xù)期時常和間隔周期的問題,一般來說,續(xù)期的時間可以設(shè)置為等于過期時間,即鎖的過期時間設(shè)為 5 秒,那么每次也只續(xù)期 5 秒,redsync
內(nèi)部也是這么做的,至于間隔多久續(xù)期一次,這個時間肯定是要小于過期時間 5 秒的,通常設(shè)為鎖過期時間的 1/3 或 1/2 都可以。
redsync 原理
我上面講解的 redsync
用法基本上能覆蓋業(yè)務(wù)開發(fā)中的大部分場景了,對于 redsync
更多的功能我就不過多介紹了,有了現(xiàn)有的知識,你遇到了問題也可以自己查閱文檔學(xué)習(xí)。 下面我想講點(diǎn)更有價值的東西,我們自己來實(shí)現(xiàn)一個微型的 Redis 分布式鎖,以此來加深你對 redsync
的理解。
如何實(shí)現(xiàn)一個 Redis 分布式鎖
要基于 Redis 實(shí)現(xiàn)一個最小化的分布式鎖,我們可以定義一個結(jié)構(gòu)體 MiniRedisMutex
作為鎖對象:
type MiniRedisMutex struct { name string // 會作為分布式鎖在 Redis 中的 key expiry time.Duration // 鎖過期時間 conn redis.Cmdable // Redis Client }
它僅包含必要的字段,name
是鎖的名稱,expiry
是分布式鎖必須要有的過期時間,conn
用來存儲 Redis 客戶端連接。
我們可以定義一個構(gòu)造函數(shù) NewMutex
來創(chuàng)建分布式鎖對象:
func NewMutex(name string, expiry time.Duration, conn redis.Cmdable) *MiniRedisMutex { return &MiniRedisMutex{name, expiry, conn} }
接下來就要實(shí)現(xiàn)加鎖和解鎖這兩個功能。
加鎖方法 Lock
實(shí)現(xiàn)如下:
func (m *MiniRedisMutex) Lock(ctx context.Context, value string) (bool, error) { reply, err := m.conn.SetNX(ctx, m.name, value, m.expiry).Result() if err != nil { return false, err } return reply, nil }
Lock
方法接收兩個參數(shù),ctx
用來控制取消,value
則會作為鎖的值。
Lock
方法內(nèi)部邏輯非常簡單,直接調(diào)用 Redis 的 SetNX
命令來排他的設(shè)置一個鍵值對,鎖名稱 name
作為 Redis 的 key
,鎖的值 value
作為 Redis 的 value
,并指定過期時間為 expiry
,這就是分布式鎖的加鎖原理。
這里有兩個關(guān)鍵點(diǎn)需要你注意:
- 使用
SetNX
命令:這里之所以使用SetNX
命令而不是普通的Set
命令,是因?yàn)榧渔i操作需要排他性。我們知道,SetNX
命令的全稱是SET if Not eXists
,即通過SetNX
命令設(shè)置鍵值對時,如果key
不存在,設(shè)置其value
,若key
已存在,則不執(zhí)行任何操作。這剛好符合互斥性,是實(shí)現(xiàn)分布式互斥鎖的關(guān)鍵所在。 value
唯一性:雖然SetNX
命令能夠?qū)崿F(xiàn)互斥,但是 Redis 的value
還是要保證唯一性。這一點(diǎn)我們接著往下看你就明白了。
釋放鎖方法 Unlock
實(shí)現(xiàn)如下:
// 釋放鎖的 lua 腳本,保證并發(fā)安全 var deleteScript = ` local val = redis.call("GET", KEYS[1]) if val == ARGV[1] then return redis.call("DEL", KEYS[1]) elseif val == false then return -1 else return 0 end ` // Unlock 釋放鎖 func (m *MiniRedisMutex) Unlock(ctx context.Context, value string) (bool, error) { // 執(zhí)行 lua 腳本,Redis 會保證其并發(fā)安全 status, err := m.conn.Eval(ctx, deleteScript, []string{m.name}, value).Result() if err != nil { return false, err } if status == int64(-1) { return false, ErrLockAlreadyExpired } return status != int64(0), nil }
在釋放鎖的邏輯中,我們不是簡單的將指定的 Redis 鍵值對刪除即可,而是調(diào)用 m.conn.Eval
方法執(zhí)行了一段 lua 腳本的方式來釋放鎖。
在這段 lua 腳本中,我們先是從 Redis 中獲取指定 key
為 m.name
的鍵值對,然后判斷其 value
是否等于 Unlock
方法傳入的 value
參數(shù)值,如果相等,則從 Redis 中刪除指定的鍵值對,表示釋放鎖,否則什么也不做。
之所以要對 value
進(jìn)行判斷,是因?yàn)槲覀円WC這把鎖是當(dāng)前進(jìn)程所持有的鎖,而不是其他進(jìn)程持有的鎖。那么以什么為依據(jù)來說明這把鎖是當(dāng)前進(jìn)程持有的呢?這就是我們要保證 value
唯一的原因,每個進(jìn)程在加鎖的時候,需要生成一個隨機(jī)的 value
作為自己的鎖的標(biāo)識,那么釋放時,就可以通過這個 value
來判斷是否是自己持有的鎖。而這樣做的目的,是為了避免一個進(jìn)程搶到鎖后,還在執(zhí)行業(yè)務(wù)邏輯時,鎖被另外一個進(jìn)程給釋放了。
遺憾的是,這段釋放鎖的邏輯,Redis 沒有提供像 SetNX
一樣的快捷命令,所以我們只能將其放在 lua 腳本中執(zhí)行,才能保證并發(fā)安全。
至此,一個微型的 Redis 分布式鎖的核心功能咱們就講解完成了。
以下是 MiniRedisMutex
分布式鎖完整的代碼實(shí)現(xiàn):
package miniredislock import ( "context" "errors" "time" "github.com/redis/go-redis/v9" ) var ErrLockAlreadyExpired = errors.New("miniredislock: failed to unlock, lock was already expired") // MiniRedisMutex 一個微型的 Redis 分布式鎖 type MiniRedisMutex struct { name string // 會作為分布式鎖在 Redis 中的 key expiry time.Duration // 鎖過期時間 conn redis.Cmdable // Redis Client } // NewMutex 創(chuàng)建 Redis 分布式鎖 func NewMutex(name string, expiry time.Duration, conn redis.Cmdable) *MiniRedisMutex { return &MiniRedisMutex{name, expiry, conn} } // Lock 加鎖 func (m *MiniRedisMutex) Lock(ctx context.Context, value string) (bool, error) { reply, err := m.conn.SetNX(ctx, m.name, value, m.expiry).Result() if err != nil { return false, err } return reply, nil } // 釋放鎖的 lua 腳本,保證并發(fā)安全 var deleteScript = ` local val = redis.call("GET", KEYS[1]) if val == ARGV[1] then return redis.call("DEL", KEYS[1]) elseif val == false then return -1 else return 0 end ` // Unlock 釋放鎖 func (m *MiniRedisMutex) Unlock(ctx context.Context, value string) (bool, error) { // 執(zhí)行 lua 腳本,Redis 會保證其并發(fā)安全 status, err := m.conn.Eval(ctx, deleteScript, []string{m.name}, value).Result() if err != nil { return false, err } if status == int64(-1) { return false, ErrLockAlreadyExpired } return status != int64(0), nil }
其實(shí),這段代碼的主要邏輯,都是我從 redsync
源碼中提取出來。所以 redsync
其實(shí)也是這樣實(shí)現(xiàn)的,只不過它內(nèi)部增加了很多可靠性和邊緣場景等邏輯代碼,最核心的加鎖和解鎖邏輯是一樣的。
微型分布式鎖使用
下面我們來寫一個示例程序,演示下如何使用這個微型的分布式鎖:
package main import ( "fmt" "time" goredislib "github.com/redis/go-redis/v9" "golang.org/x/net/context" "github.com/jianghushinian/blog-go-example/redsync/miniredislock" ) func main() { // 創(chuàng)建一個 Redis 客戶端 client := goredislib.NewClient(&goredislib.Options{ Addr: "localhost:36379", // Redis 服務(wù)器地址 Password: "nightwatch", }) defer client.Close() // 創(chuàng)建一個名為 "test-miniredislock" 的互斥鎖 mutex := miniredislock.NewMutex("test-miniredislock", 5*time.Second, client) ctx := context.Background() // 互斥鎖的值應(yīng)該是一個隨機(jī)值 value := "random-string" // 獲取鎖 _, err := mutex.Lock(ctx, value) if err != nil { panic(err) } // 執(zhí)行業(yè)務(wù)邏輯 fmt.Println("do something...") time.Sleep(3 * time.Second) // 釋放自己持有的鎖 _, err = mutex.Unlock(ctx, value) if err != nil { panic(err) } }
這個示例的具體邏輯我就不逐行講解了,相信你一看便懂。也希望你能夠自己在本機(jī)上跑起來這段代碼,真正用一下分布式鎖,以此加深理解。
最后我再留一個作業(yè),你可以嘗試一下實(shí)現(xiàn)鎖的續(xù)期方法 Extend
。
總結(jié)
分布式鎖可以確保分布式系統(tǒng)中并發(fā)安全的訪問競態(tài)資源,redsync
作為 Go 中最流行的 Redis 分布式鎖方案,非常值得我們學(xué)習(xí)和使用。
redsync
的用法非常簡單,加鎖和解鎖操作與 sync.Mutex
也非常類似,沒有太多的學(xué)習(xí)成本。不過,為了避免持有鎖的進(jìn)程掛掉時,其他進(jìn)程還有機(jī)會獲取鎖,我們需要實(shí)現(xiàn)看門狗的功能。
到此這篇關(guān)于Go語言如何使用分布式鎖解決并發(fā)問題的文章就介紹到這了,更多相關(guān)Go分布式鎖解決并發(fā)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang中urlencode與urldecode編碼解碼詳解
這篇文章主要給大家介紹了關(guān)于Golang中urlencode與urldecode編碼解碼的相關(guān)資料,在Go語言中轉(zhuǎn)碼操作非常方便,可以使用內(nèi)置的encoding包來快速完成轉(zhuǎn)碼操作,Go語言中的encoding包提供了許多常用的編碼解碼方式,需要的朋友可以參考下2023-09-09golang中import cycle not allowed解決的一種思路
這篇文章主要給大家介紹了關(guān)于golang中import cycle not allowed解決的一種思路,文中通過示例代碼介紹的非常詳細(xì),需要的朋友可以參考借鑒,下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2018-08-08