Golang分布式應(yīng)用之Redis示例詳解
正文
Redis作是一個(gè)高性能的內(nèi)存數(shù)據(jù)庫,常被應(yīng)用于分布式系統(tǒng)中,除了作為分布式緩存或簡單的內(nèi)存數(shù)據(jù)庫還有一些特殊的應(yīng)用場景,本文結(jié)合Golang來編寫對(duì)應(yīng)的中間件。
本文所有代碼見github.com/qingwave/go…
分布式鎖
單機(jī)系統(tǒng)中我們可以使用sync.Mutex來保護(hù)臨界資源,在分布式系統(tǒng)中同樣有這樣的需求,當(dāng)多個(gè)主機(jī)搶占同一個(gè)資源,需要加對(duì)應(yīng)的“分布式鎖”。
在Redis中我們可以通過setnx命令來實(shí)現(xiàn)
- 如果key不存在可以設(shè)置對(duì)應(yīng)的值,設(shè)置成功則加鎖成功,key不存在返回失敗
- 釋放鎖可以通過
del實(shí)現(xiàn)。
主要邏輯如下:
type RedisLock struct {
client *redis.Client
key string
expiration time.Duration // 過期時(shí)間,防止宕機(jī)或者異常
}
func NewLock(client *redis.Client, key string, expiration time.Duration) *RedisLock {
return &RedisLock{
client: client,
key: key,
expiration: expiration,
}
}
// 加鎖將成功會(huì)將調(diào)用者id保存到redis中
func (l *RedisLock) Lock(id string) (bool, error) {
return l.client.SetNX(context.TODO(), l.key, id, l.expiration).Result()
}
const unLockScript = `
if (redis.call("get", KEYS[1]) == KEYS[2]) then
redis.call("del", KEYS[1])
return true
end
return false
`
// 解鎖通過lua腳本來保證原子性,只能解鎖當(dāng)前調(diào)用者加的鎖
func (l *RedisLock) UnLock(id string) error {
_, err := l.client.Eval(context.TODO(), unLockScript, []string{l.key, id}).Result()
if err != nil && err != redis.Nil {
return err
}
return nil
}
需要加一個(gè)額外的超時(shí)時(shí)間來防止系統(tǒng)宕機(jī)或者異常請(qǐng)求造成的死鎖,通過超時(shí)時(shí)間為最大預(yù)估運(yùn)行時(shí)間的2倍。
解鎖時(shí)通過lua腳本來保證原子性,調(diào)用者只會(huì)解自己加的鎖。避免由于超時(shí)造成的混亂,例如:進(jìn)程A在時(shí)間t1獲取了鎖,但由于執(zhí)行緩慢,在時(shí)間t2鎖超時(shí)失效,進(jìn)程B在t3獲取了鎖,這是如果進(jìn)程A執(zhí)行完去解鎖會(huì)取消進(jìn)程B的鎖。
運(yùn)行測試
func main() {
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "123456",
DB: 0, // use default DB
})
lock := NewLock(client, "counter", 30*time.Second)
counter := 0
worker := func(i int) {
for {
id := fmt.Sprintf("worker%d", i)
ok, err := lock.Lock(id)
log.Printf("worker %d attempt to obtain lock, ok: %v, err: %v", i, ok, err)
if !ok {
time.Sleep(100 * time.Millisecond)
continue
}
defer lock.UnLock(id)
counter++
log.Printf("worker %d, add counter %d", i, counter)
break
}
}
wg := sync.WaitGroup{}
for i := 1; i <= 5; i++ {
wg.Add(1)
id := i
go func() {
defer wg.Done()
worker(id)
}()
}
wg.Wait()
}
運(yùn)行結(jié)果,可以看到與sync.Mutex使用效果類似
2022/07/22 09:58:09 worker 5 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:09 worker 5, add counter 1
2022/07/22 09:58:09 worker 4 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:09 worker 1 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:09 worker 2 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:09 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 1 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 4 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 4, add counter 2
2022/07/22 09:58:10 worker 1 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 1, add counter 3
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 2, add counter 4
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 3, add counter 5
特別注意的是,在分布式Redis集群中,如果發(fā)生異常時(shí)(主節(jié)點(diǎn)宕機(jī)),可能會(huì)降低分布式鎖的可用性,可以通過強(qiáng)一致性的組件etcd、ZooKeeper等實(shí)現(xiàn)。
分布式過濾器
假設(shè)要開發(fā)一個(gè)爬蟲服務(wù),爬取百萬級(jí)的網(wǎng)頁,怎么判斷某一個(gè)網(wǎng)頁是否爬取過,除了借助數(shù)據(jù)庫和HashMap,我們可以借助布隆過濾器來做。相比其他方式布隆過濾器占用極低的空間,而且插入查詢時(shí)間非???。
布隆過濾器用來判斷某個(gè)元素是否在集合中,利用BitSet
- 插入數(shù)據(jù)時(shí)將值進(jìn)行多次Hash,將BitSet對(duì)應(yīng)位置1
- 查詢時(shí)同樣進(jìn)行多次Hash對(duì)比所有位上是否為1,如是則存在。
布隆過濾器有一定的誤判率,不適合精確查詢的場景。另外也不支持刪除元素。通常適用于URL去重、垃圾郵件過濾、防止緩存擊穿等場景中。
在Redis中,我們可以使用自帶的BitSet實(shí)現(xiàn),同樣也借助lua腳本的原子性來避免多次查詢數(shù)據(jù)不一致。
const (
// 插入數(shù)據(jù),調(diào)用setbit設(shè)置對(duì)應(yīng)位
setScript = `
for _, offset in ipairs(ARGV) do
redis.call("setbit", KEYS[1], offset, 1)
end
`
// 查詢數(shù)據(jù),如果所有位都為1返回true
getScript = `
for _, offset in ipairs(ARGV) do
if tonumber(redis.call("getbit", KEYS[1], offset)) == 0 then
return false
end
end
return true
`
)
type BloomFilter struct {
client *redis.Client
key string // 存在redis中的key
bits uint // BitSet的大小
maps uint // Hash的次數(shù)
}
func NewBloomFilter(client *redis.Client, key string, bits, maps uint) *BloomFilter {
client.Del(context.TODO(), key)
if maps == 0 {
maps = 14
}
return &BloomFilter{
key: key,
client: client,
bits: bits,
maps: maps,
}
}
// 進(jìn)行多次Hash, 得到位置列表
func (f *BloomFilter) getLocations(data []byte) []uint {
locations := make([]uint, f.maps)
for i := 0; i < int(f.maps); i++ {
val := murmur3.Sum64(append(data, byte(i)))
locations[i] = uint(val) % f.bits
}
return locations
}
func (f *BloomFilter) Add(data []byte) error {
args := getArgs(f.getLocations(data))
_, err := f.client.Eval(context.TODO(), setScript, []string{f.key}, args).Result()
if err != nil && err != redis.Nil {
return err
}
return nil
}
func (f *BloomFilter) Exists(data []byte) (bool, error) {
args := getArgs(f.getLocations(data))
resp, err := f.client.Eval(context.TODO(), getScript, []string{f.key}, args).Result()
if err != nil {
if err == redis.Nil {
return false, nil
}
return false, err
}
exists, ok := resp.(int64)
if !ok {
return false, nil
}
return exists == 1, nil
}
func getArgs(locations []uint) []string {
args := make([]string, 0)
for _, l := range locations {
args = append(args, strconv.FormatUint(uint64(l), 10))
}
return args
}
運(yùn)行測試
func main() {
bf := NewBloomFilter(client,"bf-test", 2^16, 14)
exists, err := bf.Exists([]byte("test1"))
log.Printf("exist %t, err %v", exists, err)
if err := bf.Add([]byte("test1")); err != nil {
log.Printf("add err: %v", err)
}
exists, err = bf.Exists([]byte("test1"))
log.Printf("exist %t, err %v", exists, err)
exists, err = bf.Exists([]byte("test2"))
log.Printf("exist %t, err %v", exists, err)
// output
// 2022/07/22 10:05:58 exist false, err <nil>
// 2022/07/22 10:05:58 exist true, err <nil>
// 2022/07/22 10:05:58 exist false, err <nil>
}
分布式限流器
在golang.org/x/time/rate包中提供了基于令牌桶的限流器,如果要實(shí)現(xiàn)分布式環(huán)境的限流可以基于Redis Lua腳本實(shí)現(xiàn)。
令牌桶的主要原理如下:
- 假設(shè)一個(gè)令牌桶容量為burst,每秒按照qps的速率往里面放置令牌
- 初始時(shí)放滿令牌,令牌溢出則直接丟棄,請(qǐng)求令牌時(shí),如果桶中有足夠令牌則允許,否則拒絕
- 當(dāng)burst==qps時(shí),嚴(yán)格按照qps限流;當(dāng)burst>qps時(shí),可以允許一定的突增流量
這里主要參考了官方rate包的實(shí)現(xiàn),將核心邏輯改為Lua實(shí)現(xiàn)。
--- 相關(guān)Key
--- limit rate key值,對(duì)應(yīng)value為當(dāng)前令牌數(shù)
local limit_key = KEYS[1]
--- 輸入?yún)?shù)
--[[
qps: 每秒請(qǐng)求數(shù);
burst: 令牌桶容量;
now: 當(dāng)前Timestamp;
cost: 請(qǐng)求令牌數(shù);
max_wait: 最大等待時(shí)間
--]]
local qps = tonumber(ARGV[1])
local burst = tonumber(ARGV[2])
local now = ARGV[3]
local cost = tonumber(ARGV[4])
local max_wait = tonumber(ARGV[5])
--- 獲取redis中的令牌數(shù)
local tokens = redis.call("hget", limit_key, "token")
if not tokens then
tokens = burst
end
--- 上次修改時(shí)間
local last_time = redis.call("hget", limit_key, "last_time")
if not last_time then
last_time = 0
end
--- 最新等待時(shí)間
local last_event = redis.call("hget", limit_key, "last_event")
if not last_event then
last_event = 0
end
--- 通過當(dāng)前時(shí)間與上次修改時(shí)間的差值,qps計(jì)算出當(dāng)前時(shí)間得令牌數(shù)
local delta = math.max(0, now-last_time)
local new_tokens = math.min(burst, delta * qps + tokens)
new_tokens = new_tokens - cost --- 最新令牌數(shù),減少請(qǐng)求令牌
--- 如果最新令牌數(shù)小于0,計(jì)算需要等待的時(shí)間
local wait_period = 0
if new_tokens < 0 and qps > 0 then
wait_period = wait_period - new_tokens / qps
end
wait_period = math.ceil(wait_period)
local time_act = now + wait_period --- 滿足等待間隔的時(shí)間戳
--- 允許請(qǐng)求有兩種情況
--- 當(dāng)請(qǐng)求令牌數(shù)小于burst, 等待時(shí)間不超過最大等待時(shí)間,可以通過補(bǔ)充令牌滿足請(qǐng)求
--- qps為0時(shí),只要最新令牌數(shù)不小于0即可
local ok = (cost <= burst and wait_period <= max_wait and qps > 0) or (qps == 0 and new_tokens >= 0)
--- 設(shè)置對(duì)應(yīng)值
if ok then
redis.call("set", limit_key, new_tokens)
redis.call("set", last_time_key, now)
redis.call("set", last_event_key, time_act)
end
--- 返回列表,{是否允許, 等待時(shí)間}
return {ok, wait_period}
在Golang中的相關(guān)接口Allow、AllowN、Wait等都是通過調(diào)用reserveN實(shí)現(xiàn)
// 調(diào)用lua腳本
func (lim *RedisLimiter) reserveN(now time.Time, n int, maxFutureReserveSecond int) (*Reservation, error) {
// ...
res, err := lim.rdb.Eval(context.TODO(), reserveNScript, []string{lim.limitKey}, lim.qps, lim.burst, now.Unix(), n, maxFutureReserveSecond).Result()
if err != nil && err != redis.Nil {
return nil, err
}
//...
return &Reservation{
ok: allow == 1,
lim: lim,
tokens: n,
timeToAct: now.Add(time.Duration(wait) * time.Second),
}, nil
}
運(yùn)行測試
func main() {
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "123456",
DB: 0, // use default DB
})
r, err := NewRedisLimiter(rdb, 1, 2, "testrate")
if err != nil {
log.Fatal(err)
}
r.Reset()
for i := 0; i < 5; i++ {
err := r.Wait(context.TODO())
log.Printf("worker %d allowed: %v", i, err)
}
}
// output
// 2022/07/22 12:50:31 worker 0 allowed: <nil>
// 2022/07/22 12:50:31 worker 1 allowed: <nil>
// 2022/07/22 12:50:32 worker 2 allowed: <nil>
// 2022/07/22 12:50:33 worker 3 allowed: <nil>
// 2022/07/22 12:50:34 worker 4 allowed: <nil>
前兩個(gè)請(qǐng)求在burst內(nèi),直接可以獲得,后面的請(qǐng)求按照qps的速率生成。
其他
除此之外,Redis還可以用作全局計(jì)數(shù)、去重(set)、發(fā)布訂閱等場景。Redis官方也提供了一些通用模塊,通過加載這些模塊也可以實(shí)現(xiàn)過濾、限流等特性,參考modules。
參考
https://github.com/qingwave/gocorex
以上就是Golang分布式應(yīng)用之Redis示例詳解的詳細(xì)內(nèi)容,更多關(guān)于Go分布式Redis的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Golang動(dòng)態(tài)調(diào)用方法小結(jié)
本文主要介紹了Golang動(dòng)態(tài)調(diào)用方法小結(jié),文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-12-12

