Golang實現(xiàn)Redis分布式鎖(Lua腳本+可重入+自動續(xù)期)
1 概念
應(yīng)用場景
Golang自帶的Lock鎖單機(jī)版OK(存儲在程序的內(nèi)存中),分布式不行
分布式鎖:
- 簡單版:redis setnx=》加鎖設(shè)置過期時間需要保證原子性=》lua腳本
- 完整版:redis Lua腳本+實現(xiàn)可重入+自動續(xù)期=》hset結(jié)構(gòu)
應(yīng)用場景:
- 防止用戶重復(fù)下單,鎖住用戶id
- 防止商品超賣問題
- 鎖住賬戶,防止并發(fā)操作
例如:我本地啟兩個端口跑兩個相同服務(wù),然后通過Nginx反向代理分別將請求均衡打到兩個服務(wù)(模擬分布式微服務(wù)),最后通過Jmeter模擬高并發(fā)場景。同時我在代碼里添加上lock鎖。
可以看到還是有消費(fèi)到相同數(shù)據(jù),出現(xiàn)超賣現(xiàn)象,這是因為lock鎖是在go程序的內(nèi)存,只能鎖住當(dāng)前程序。如果是分布式的話,就需要涉及分布式鎖。
注意??:
本地通過Mac+Jmeter+Iris+Nginx模擬分布式場景詳情可見:https://blog.csdn.net/weixin_45565886/article/details/136635997
package main import ( "context" "github.com/go-redis/redis/v8" "github.com/kataras/iris/v12" context2 "github.com/kataras/iris/v12/context" "myTest/demo_home/redis_demo/distributed_lock/constant" service2 "myTest/demo_home/redis_demo/distributed_lock/other_svc/service" "sync" ) func main() { constant.RedisCli = redis.NewClient(&redis.Options{ Addr: "localhost:6379", DB: 0, }) _, err := constant.RedisCli.Set(context.TODO(), constant.AppleKey, 500, -1).Result() if err != nil && err != redis.Nil { panic(err) } app := iris.New() xLock2 := new(sync.Mutex) app.Get("/consume", func(c *context2.Context) { xLock2.Lock() defer xLock2.Unlock() service2.GoodsService2.Consume() c.JSON("ok port:9999") }) app.Listen(":9999", nil) }
分布式鎖必備特性
分布式鎖需要具備的特性:
獨占性(排他性):任何時刻有且僅有一個線程持有
高可用:redis集群情況下,不能因為某個節(jié)點掛了而出現(xiàn)獲取鎖失敗和釋放鎖失敗的情況
防死鎖:杜絕死鎖,必須有超時控制機(jī)制或撤銷操作 Expire key
不亂搶:防止亂搶。(自己只能unlock自己的鎖)lua腳本保證原子性,且只刪除自己的鎖
重入性:同一個節(jié)點的同一個線程如果獲得鎖之后,它也可以再次獲取這個鎖
- setnx只能解決有無分布式鎖
- hset 解決可重入問題,記錄加鎖次數(shù): hset zyRedisLock uuid:threadID 3
2 思路分析
宕機(jī)與過期
如果加鎖成功之后,某個Redis節(jié)點宕機(jī),該鎖一直得不到釋放,就會導(dǎo)致其他Redis節(jié)點加鎖失敗。
- 加鎖時需要設(shè)置過期時間
//通過lua腳本保證加鎖與設(shè)置過期時間的原子性 func (r *RedisLock) TryLock() bool { //通過lua腳本加鎖[hincrby如果key不存在,則會主動創(chuàng)建,如果存在則會給count數(shù)加1,表示又重入一次] lockCmd := "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " + "then " + " redis.call('hincrby', KEYS[1], ARGV[1], 1) " + " redis.call('expire', KEYS[1], ARGV[2]) " + " return 1 " + "else " + " return 0 " + "end" result, err := r.redisCli.Eval(context.TODO(), lockCmd, []string{r.key}, r.Id, r.expire).Result() if err != nil { log.Errorf("tryLock %s %v", r.key, err) return false } i := result.(int64) if i == 1 { //獲取鎖成功&自動續(xù)期 go r.reNewExpire() return true } return false }
防止誤刪key
鎖過期時間設(shè)置30s,業(yè)務(wù)邏輯假如要跑40s。30s后鎖自動過期釋放了,其他線程加鎖了。再過10s后業(yè)務(wù)邏輯走完了,去釋放鎖,就會出現(xiàn)把其他人的鎖刪除。【張冠李戴】
- 設(shè)置key時,可帶上線程id和uuid(我這里以uuid演示)。刪除key之前,要判斷是否是自己的鎖。如果是則unlock釋放,不是就return走。
func (r *RedisLock) Unlock() { //通過lua腳本刪除鎖 //1. 查看鎖是否存在,如果不存在,直接返回 //2. 如果存在,對鎖進(jìn)行hincrby -1操作,當(dāng)減到0時,表明已經(jīng)unlock完成,可以刪除key delCmd := "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " + "then " + " return nil " + "elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " + "then " + " return redis.call('del', KEYS[1]) " + "else " + " return 0 " + "end" resp, err := r.redisCli.Eval(context.TODO(), delCmd, []string{r.key}, r.Id).Result() if err != nil && err != redis.Nil { log.Errorf("unlock %s %v", r.key, err) } if resp == nil { fmt.Println("delKey=", resp) return } }
Lua保證原子性
加鎖與設(shè)置過期時間需要保證原子性。否則如果加鎖成功后,還沒來得及設(shè)置過期時間,Redis節(jié)點掛掉了,就又會出現(xiàn)其他節(jié)點一直獲取不到鎖的問題。
- Lua腳本保證原子性
//lock 加鎖&設(shè)置過期時間 "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " + "then " + " redis.call('hincrby', KEYS[1], ARGV[1], 1) " + " redis.call('expire', KEYS[1], ARGV[2]) " + " return 1 " + "else " + " return 0 " + "end" //unlock解鎖 delCmd := "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " + "then " + " return nil " + "elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " + "then " + " return redis.call('del', KEYS[1]) " + "else " + " return 0 " + "end" //自動續(xù)期 renewCmd := "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 " + "then " + " return redis.call('expire', KEYS[1], ARGV[2]) " + "else " + " return 0 " + "end"
可重入鎖
存在一部分業(yè)務(wù),方法里還需要繼續(xù)加鎖。需要實現(xiàn)鎖的可重入,記錄加鎖的次數(shù)。Lock幾次,就unLock幾次。
- map[string]map[string]int =>可通過Redis hset結(jié)構(gòu)實現(xiàn)
# yiRedisLock :redis的key # fas421424safsfa:1 :uuid+線程號 # 5 :加鎖次數(shù)(重入次數(shù)) hset yiRedisLock fas421424safsfa:1 5
//通過hset&hincrby 保證可重入(記錄加鎖次數(shù)) lockCmd := "if redis.call('exists', KEYS[1]) == 0 or redis.call('hexists', KEYS[1], ARGV[1]) == 1 " + "then " + " redis.call('hincrby', KEYS[1], ARGV[1], 1) " + " redis.call('expire', KEYS[1], ARGV[2]) " + " return 1 " + "else " + " return 0 " + "end" delCmd := "if redis.call('hexists', KEYS[1], ARGV[1]) == 0 " + "then " + " return nil " + "elseif redis.call('hincrby', KEYS[1], ARGV[1], -1) == 0 " + "then " + " return redis.call('del', KEYS[1]) " + "else " + " return 0 " + "end"
自動續(xù)期
相同業(yè)務(wù)耗時可能因為網(wǎng)絡(luò)等問題而有所變化。例如:我們設(shè)置分布式鎖超時時間為20s,但是業(yè)務(wù)因為網(wǎng)絡(luò)問題某次耗時達(dá)到了30s,這時鎖就會被超時釋放,其他線程就能獲取到鎖。存在業(yè)務(wù)風(fēng)險。
- 加鎖成功之后設(shè)置自動續(xù)期,啟一個timer定時任務(wù),比如每10s檢測一下鎖有沒有被釋放,如果沒有,就自動續(xù)期。
// 判斷鎖是否存在,如果存在(表明業(yè)務(wù)還未完成),重新設(shè)置過期時間(自動續(xù)期) renewCmd := "if redis.call('hexists', KEYS[1], ARGV[1]) == 1 " + "then " + " return redis.call('expire', KEYS[1], ARGV[2]) " + "else " + " return 0 " + "end"
3 代碼
3.1 項目結(jié)構(gòu)解析
- constant模塊:定義分布式鎖名稱、業(yè)務(wù)Key(用于模擬扣減數(shù)據(jù)庫)
- lock模塊:核心模塊,實現(xiàn)分布式鎖
- Lock
- TryLock
- UnLock
- NewRedisLock
- other_svc:在其他端口啟另外一個服務(wù),用于本地模擬分布式
- service:業(yè)務(wù)類,扣減商品數(shù)量(其中的扣減操作涉及分布式鎖)
- main:提供iris web服務(wù)
3.2 全部代碼
注:
:other_svc這里不提供,與分布式鎖實現(xiàn)無太大關(guān)系。同時為了快速演示效果,部分項目結(jié)構(gòu)與代碼不規(guī)范。
感興趣的朋友,可以上Github查看全部代碼。
Github:https://github.com/ziyifast/ziyifast-code_instruction/tree/main/redis_demo/distributed_lock
現(xiàn)象:
constant/const.go
package constant import "github.com/go-redis/redis/v8" var ( BizKey = "XXOO" AppleKey = "apple" RedisCli *redis.Client )
lock/redis_lock.go
package service import ( "context" "github.com/go-redis/redis/v8" "github.com/ziyifast/log" "myTest/demo_home/redis_demo/distributed_lock/constant" "myTest/demo_home/redis_demo/distributed_lock/lock" "strconv" ) type goodsService struct { } var GoodsService = new(goodsService) func (g *goodsService) Consume() { redisLock := lock.NewRedisLock(constant.RedisCli, constant.BizKey) redisLock.Lock() defer redisLock.Unlock() //consume goods result, err := constant.RedisCli.Get(context.TODO(), constant.AppleKey).Result() if err != nil && err != redis.Nil { panic(err) } i, err := strconv.ParseInt(result, 10, 64) if err != nil { panic(err) } if i < 0 { log.Infof("no more apple...") return } _, err = constant.RedisCli.Set(context.TODO(), constant.AppleKey, i-1, -1).Result() if err != nil && err != redis.Nil { panic(err) } log.Infof("consume success...appleID:%d", i) }
service/goods_service.go
package service import ( "context" "github.com/go-redis/redis/v8" "github.com/ziyifast/log" "myTest/demo_home/redis_demo/distributed_lock/constant" "myTest/demo_home/redis_demo/distributed_lock/lock" "strconv" ) type goodsService struct { } var GoodsService = new(goodsService) func (g *goodsService) Consume() { redisLock := lock.NewRedisLock(constant.RedisCli, constant.BizKey) redisLock.Lock() defer redisLock.Unlock() //consume goods result, err := constant.RedisCli.Get(context.TODO(), constant.AppleKey).Result() if err != nil && err != redis.Nil { panic(err) } i, err := strconv.ParseInt(result, 10, 64) if err != nil { panic(err) } if i < 0 { log.Infof("no more apple...") return } _, err = constant.RedisCli.Set(context.TODO(), constant.AppleKey, i-1, -1).Result() if err != nil && err != redis.Nil { panic(err) } log.Infof("consume success...appleID:%d", i) }
main.go
package main import ( "context" "github.com/go-redis/redis/v8" "github.com/kataras/iris/v12" context2 "github.com/kataras/iris/v12/context" "myTest/demo_home/redis_demo/distributed_lock/constant" "myTest/demo_home/redis_demo/distributed_lock/service" ) func main() { constant.RedisCli = redis.NewClient(&redis.Options{ Addr: "localhost:6379", DB: 0, }) _, err := constant.RedisCli.Set(context.TODO(), constant.AppleKey, 500, -1).Result() if err != nil && err != redis.Nil { panic(err) } app := iris.New() //xLock := new(sync.Mutex) app.Get("/consume", func(c *context2.Context) { //xLock.Lock() //defer xLock.Unlock() service.GoodsService.Consume() c.JSON("ok port:8888") }) app.Listen(":8888", nil) }
到此這篇關(guān)于Golang實現(xiàn)Redis分布式鎖(Lua腳本+可重入+自動續(xù)期)的文章就介紹到這了,更多相關(guān)Golang Redis分布式鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go語言開發(fā)環(huán)境搭建與初探(Windows平臺下)
Go是Google開發(fā)的一種編譯型,並發(fā)型,并具有垃圾回收功能的編程語言,可能很多人想學(xué)習(xí)go語言,那么首先就要了解go語言的環(huán)境配置方法2014-10-10golang實現(xiàn)微信小程序商城后臺系統(tǒng)(moshopserver)
這篇文章主要介紹了golang實現(xiàn)微信小程序商城后臺系統(tǒng)(moshopserver),本文通過截圖實例代碼的形式給大家介紹的非常詳細(xì),具有一定的參考借鑒價值,需要的朋友可以參考下2020-02-02淺析Go中fasthttp與net/http的性能對比及應(yīng)用
這篇文章主要為大家詳細(xì)介紹了Golang中fasthttp的底層實現(xiàn)以及與net/http的區(qū)別,下面就跟隨小編一起來看看fasthttp到底是如何做到性能如此之快的吧2024-03-03詳解如何使用Go語言進(jìn)行文件監(jiān)控和通知
在Go語言中,文件監(jiān)控通常涉及到文件系統(tǒng)事件的監(jiān)聽,文件或目錄的狀態(tài)發(fā)生變化(如創(chuàng)建、刪除、修改等)時,你的程序需要得到通知,所以本文給大家介紹了如何使用Go語言進(jìn)行文件監(jiān)控和通知,需要的朋友可以參考下2024-06-06