Go語言實(shí)現(xiàn)分布式鎖
1. go實(shí)現(xiàn)分布式鎖
通過 golang 實(shí)現(xiàn)一個(gè)簡單的分布式鎖,包括鎖續(xù)約、重試機(jī)制、singleflght機(jī)制的使用
1.1 redis_lock.go
package redis_lock import ( "context" _ "embed" "errors" "github.com/go-redis/redis/v9" "github.com/google/uuid" "golang.org/x/sync/singleflight" "time" ) // go:embed 可以直接解析出文件中的字符串 var ( //go:embed lua_unlock.lua luaUnlock string //go:embed refresh.lua luaRefresh string //go:embed lock.lua luaLock string //定義好兩個(gè)異常信息 ErrLockNotHold = errors.New("未持有鎖") ErrFailedToPreemptLock = errors.New("加鎖失敗") ) type Client struct { //采用公共的接口,后續(xù)實(shí)例通過傳入的方式 client redis.Cmdable // singleflight 用于在一個(gè)實(shí)例的多個(gè)攜程中只需要競爭出一個(gè)攜程 s singleflight.Group } func NewClient(c redis.Cmdable) *Client { return &Client{ client: c, } } func (c *Client) SingleflightLock(ctx context.Context, key string, expire time.Duration, retry RetryStrategy, timeout time.Duration) (*Lock, error) { for { flag := false resCh := c.s.DoChan(key, func() (interface{}, error) { flag = true return c.Lock(ctx, key, expire, retry, timeout) }) select { case res := <-resCh: if flag { if res.Err != nil { return nil, res.Err } //返回鎖對象 return res.Val.(*Lock), nil } case <-ctx.Done(): return nil, ctx.Err() } } } //Lock 加鎖方法,根據(jù)重試機(jī)制進(jìn)行重新獲取 func (c *Client) Lock(ctx context.Context, key string, expire time.Duration, retry RetryStrategy, timeout time.Duration) (*Lock, error) { var timer *time.Timer defer func() { if timer != nil { timer.Stop() } }() for { //設(shè)置超時(shí) lct, cancel := context.WithTimeout(ctx, timeout) //獲取到uuid value := uuid.New().String() //執(zhí)行l(wèi)ua腳本進(jìn)行加鎖 result, err := c.client.Eval(lct, luaLock, []string{key}, value, expire).Bool() //用于主動釋放資源 cancel() if err != nil && !errors.Is(err, context.DeadlineExceeded) { return nil, err } if result { return newLock(c.client, key, value), nil } //可以不傳重試機(jī)制 if retry != nil { //通過重試機(jī)制獲取重試的策略 interval, ok := retry.Next() if !ok { //不用重試 return nil, ErrFailedToPreemptLock } if timer == nil { timer = time.NewTimer(interval) } timer.Reset(interval) select { case <-timer.C: //睡眠時(shí)間超時(shí)了 return nil, ctx.Err() case <-ctx.Done(): //整個(gè)調(diào)用的超時(shí) return nil, ctx.Err() } } } } // TryLock 嘗試加鎖 func (c *Client) TryLock(ctx context.Context, key string, expire time.Duration) (*Lock, error) { return c.Lock(ctx, key, expire, nil, 0) } // NewLock 創(chuàng)建一個(gè)鎖結(jié)構(gòu)體 func newLock(client redis.Cmdable, key string, value string) *Lock { return &Lock{ client: client, key: key, value: value, unLockChan: make(chan struct{}, 1), //設(shè)置1個(gè)緩存數(shù)據(jù),用于解鎖的信號量 } } // Lock 結(jié)構(gòu)體對象 type Lock struct { client redis.Cmdable key string value string expire time.Duration //在解鎖成功之后發(fā)送信號來取消續(xù)約 unLockChan chan struct{} } // AutoRefresh 自動續(xù)約 func (l *Lock) AutoRefresh(interval time.Duration, timeout time.Duration) error { //設(shè)計(jì)一個(gè)管道,如果失敗了,就發(fā)送數(shù)據(jù)到管道之中,通知進(jìn)行重試 retry := make(chan struct{}, 1) //方法返回時(shí)關(guān)閉close defer close(retry) ticker := time.NewTicker(interval) for { select { //接收到結(jié)束的信號時(shí),直接return case <-l.unLockChan: return nil //監(jiān)聽重試的管道 case <-retry: ctx, cancel := context.WithTimeout(context.Background(), timeout) err := l.Refresh(ctx) //主動調(diào)用釋放資源 cancel() if err == context.DeadlineExceeded { // 執(zhí)行重試往管道中發(fā)送一個(gè)信號 retry <- struct{}{} continue } if err != nil { return err } case <-ticker.C: ctx, cancel := context.WithTimeout(context.Background(), timeout) err := l.Refresh(ctx) //主動調(diào)用釋放資源 cancel() if err == context.DeadlineExceeded { // 執(zhí)行重試往管道中發(fā)送一個(gè)信號 retry <- struct{}{} continue } if err != nil { return err } } } } // Refresh 續(xù)約 func (l *Lock) Refresh(ctx context.Context) error { //執(zhí)行l(wèi)ua腳本,對鎖進(jìn)行續(xù)約 i, err := l.client.Eval(ctx, luaRefresh, []string{l.key}, l.value, l.expire.Milliseconds()).Int64() if err == redis.Nil { return ErrLockNotHold } if err != nil { return err } if i == 0 { return ErrLockNotHold } return nil } // Unlock 解鎖 func (l *Lock) Unlock(ctx context.Context) error { //解鎖時(shí),退出方法需要發(fā)送一個(gè)信號讓自動續(xù)約的goroutine停止 defer func() { l.unLockChan <- struct{}{} close(l.unLockChan) }() //判斷返回的結(jié)果 result, err := l.client.Eval(ctx, luaUnlock, []string{l.key}, l.value).Int64() if err == redis.Nil { return ErrLockNotHold } if err != nil { return err } //lua腳本返回的結(jié)果如果為0,也是代表當(dāng)前鎖不是自己的 if result == 0 { return ErrLockNotHold } return nil }
1.2 retry.go
package redis_lock import "time" // RetryStrategy 重試策略 type RetryStrategy interface { // Next 下一次重試的時(shí)間是多久,返回兩個(gè)參數(shù) time 時(shí)間,bool 是否直接重試 Next() (time.Duration, bool) }
1.3 lock.lua
lua腳本原子化加鎖
--[[ 獲取到對應(yīng)的value是否跟當(dāng)前的一樣 ]] if redis.call("get", KEYS[1]) == ARGV[1] then -- 如果一樣直接對其時(shí)間進(jìn)行續(xù)約 return redis.call("pexpire", KEYS[1], ARGV[2]) else -- 如果不一樣調(diào)用setnx命令對其進(jìn)行設(shè)置值 return redis.call("set", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])
1.4 lua_unlock.lua
lua腳本原子化解鎖
if redis.call("get", KEYS[1]) == ARGV[1] then -- 返回0,代表key不在 return redis.call("del", KEYS[1]) else -- key在,但是值不對 return 0 end
1.5 refresh.lua
lua腳本續(xù)約
if redis.call("get", KEYS[1]) == ARGV[1] then -- 返回0,代表key不在 return redis.call("pexpire", KEYS[1], ARGV[2]) else -- key在,但是值不對 return 0 end
1.6 單元測試
使用go-mock工具生成本地的單元測試,不需要再單獨(dú)的搭建一個(gè) redis 的服務(wù)端
項(xiàng)目根目錄下安裝mockgen工具
go install github.com/golang/mock/mockgen@latest
添加依賴
go get github.com/golang/mock/mockgen/model
生成redis客戶端接口
mockgen -package=mocks -destination=mocks/redis_cmdable.mock.go github.com/go-redis/redis/v9 Cmdable
- package:指定包
- destination:生成路徑名稱
- 剩下的是指定使用redis包下面的 Cmdable接口生成代碼
測試類
func TestClient_TryLock(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() testCase := []struct { //測試的場景 name string //輸入 key string expiration time.Duration //返回一個(gè)mock數(shù)據(jù) mock func() redis.Cmdable //期望的返回的錯(cuò)誤值 wantError error //期望返回的鎖 wantLock *Lock }{ { name: "locked", key: "locked-key", expiration: time.Minute, mock: func() redis.Cmdable { rdb := mocks.NewMockCmdable(ctrl) res := redis.NewBoolResult(true, nil) i := []interface{}{gomock.Any(), time.Minute} rdb.EXPECT().Eval(gomock.Any(), luaLock, []string{"locked-key"}, i...).Return(res) return rdb }, wantLock: &Lock{ key: "locked-key", }, }, } for _, tc := range testCase { t.Run(tc.name, func(t *testing.T) { var c = NewClient(tc.mock()) l, err := c.TryLock(context.Background(), tc.key, tc.expiration) assert.Equal(t, tc.wantError, err) if err != nil { return } //判斷返回的key是否跟期望的一樣 assert.Equal(t, tc.key, l.key) assert.Equal(t, tc.wantLock.key, l.key) assert.NotEmpty(t, l.value) }) } }
到此這篇關(guān)于Go語言實(shí)現(xiàn)分布式鎖的文章就介紹到這了,更多相關(guān)Go分布式鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Go語言特點(diǎn)及基本數(shù)據(jù)類型使用詳解
這篇文章主要為大家介紹了Go語言特點(diǎn)及基本數(shù)據(jù)類型使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-03-03