欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Golang分布式應(yīng)用之Redis示例詳解

 更新時(shí)間:2022年07月29日 17:21:06   作者:qingwave  
這篇文章主要為大家介紹了Golang分布式應(yīng)用之Redis示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

正文

Redis作是一個(gè)高性能的內(nèi)存數(shù)據(jù)庫(kù),常被應(yīng)用于分布式系統(tǒng)中,除了作為分布式緩存或簡(jiǎn)單的內(nèi)存數(shù)據(jù)庫(kù)還有一些特殊的應(yīng)用場(chǎng)景,本文結(jié)合Golang來(lái)編寫對(duì)應(yīng)的中間件。

本文所有代碼見(jiàn)github.com/qingwave/go…

分布式鎖

單機(jī)系統(tǒng)中我們可以使用sync.Mutex來(lái)保護(hù)臨界資源,在分布式系統(tǒng)中同樣有這樣的需求,當(dāng)多個(gè)主機(jī)搶占同一個(gè)資源,需要加對(duì)應(yīng)的“分布式鎖”。

在Redis中我們可以通過(guò)setnx命令來(lái)實(shí)現(xiàn)

  • 如果key不存在可以設(shè)置對(duì)應(yīng)的值,設(shè)置成功則加鎖成功,key不存在返回失敗
  • 釋放鎖可以通過(guò)del實(shí)現(xiàn)。

主要邏輯如下:

type RedisLock struct {
	client     *redis.Client
	key        string
	expiration time.Duration // 過(guò)期時(shí)間,防止宕機(jī)或者異常
}
func NewLock(client *redis.Client, key string, expiration time.Duration) *RedisLock {
	return &RedisLock{
		client:     client,
		key:        key,
		expiration: expiration,
	}
}
// 加鎖將成功會(huì)將調(diào)用者id保存到redis中
func (l *RedisLock) Lock(id string) (bool, error) {
	return l.client.SetNX(context.TODO(), l.key, id, l.expiration).Result()
}
const unLockScript = `
if (redis.call("get", KEYS[1]) == KEYS[2]) then
	redis.call("del", KEYS[1])
	return true
end
return false
`
// 解鎖通過(guò)lua腳本來(lái)保證原子性,只能解鎖當(dāng)前調(diào)用者加的鎖
func (l *RedisLock) UnLock(id string) error {
	_, err := l.client.Eval(context.TODO(), unLockScript, []string{l.key, id}).Result()
	if err != nil && err != redis.Nil {
		return err
	}
	return nil
}

需要加一個(gè)額外的超時(shí)時(shí)間來(lái)防止系統(tǒng)宕機(jī)或者異常請(qǐng)求造成的死鎖,通過(guò)超時(shí)時(shí)間為最大預(yù)估運(yùn)行時(shí)間的2倍。

解鎖時(shí)通過(guò)lua腳本來(lái)保證原子性,調(diào)用者只會(huì)解自己加的鎖。避免由于超時(shí)造成的混亂,例如:進(jìn)程A在時(shí)間t1獲取了鎖,但由于執(zhí)行緩慢,在時(shí)間t2鎖超時(shí)失效,進(jìn)程B在t3獲取了鎖,這是如果進(jìn)程A執(zhí)行完去解鎖會(huì)取消進(jìn)程B的鎖。

運(yùn)行測(cè)試

func main() {
    client := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "123456",
		DB:       0, // use default DB
	})
	lock := NewLock(client, "counter", 30*time.Second)
    counter := 0
	worker := func(i int) {
		for {
			id := fmt.Sprintf("worker%d", i)
			ok, err := lock.Lock(id)
			log.Printf("worker %d attempt to obtain lock, ok: %v, err: %v", i, ok, err)
			if !ok {
				time.Sleep(100 * time.Millisecond)
				continue
			}
			defer lock.UnLock(id)
			counter++
			log.Printf("worker %d, add counter %d", i, counter)
			break
		}
	}
	wg := sync.WaitGroup{}
	for i := 1; i <= 5; i++ {
		wg.Add(1)
		id := i
		go func() {
			defer wg.Done()
			worker(id)
		}()
	}
	wg.Wait()
}

運(yùn)行結(jié)果,可以看到與sync.Mutex使用效果類似

2022/07/22 09:58:09 worker 5 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:09 worker 5, add counter 1
2022/07/22 09:58:09 worker 4 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:09 worker 1 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:09 worker 2 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:09 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 1 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 4 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 4, add counter 2
2022/07/22 09:58:10 worker 1 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 1, add counter 3
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 2 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 2, add counter 4
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: false, err: <nil>
2022/07/22 09:58:10 worker 3 attempt to obtain lock, ok: true, err: <nil>
2022/07/22 09:58:10 worker 3, add counter 5

特別注意的是,在分布式Redis集群中,如果發(fā)生異常時(shí)(主節(jié)點(diǎn)宕機(jī)),可能會(huì)降低分布式鎖的可用性,可以通過(guò)強(qiáng)一致性的組件etcd、ZooKeeper等實(shí)現(xiàn)。

分布式過(guò)濾器

假設(shè)要開發(fā)一個(gè)爬蟲服務(wù),爬取百萬(wàn)級(jí)的網(wǎng)頁(yè),怎么判斷某一個(gè)網(wǎng)頁(yè)是否爬取過(guò),除了借助數(shù)據(jù)庫(kù)和HashMap,我們可以借助布隆過(guò)濾器來(lái)做。相比其他方式布隆過(guò)濾器占用極低的空間,而且插入查詢時(shí)間非??臁?/p>

布隆過(guò)濾器用來(lái)判斷某個(gè)元素是否在集合中,利用BitSet

  • 插入數(shù)據(jù)時(shí)將值進(jìn)行多次Hash,將BitSet對(duì)應(yīng)位置1
  • 查詢時(shí)同樣進(jìn)行多次Hash對(duì)比所有位上是否為1,如是則存在。

布隆過(guò)濾器有一定的誤判率,不適合精確查詢的場(chǎng)景。另外也不支持刪除元素。通常適用于URL去重、垃圾郵件過(guò)濾、防止緩存擊穿等場(chǎng)景中。

在Redis中,我們可以使用自帶的BitSet實(shí)現(xiàn),同樣也借助lua腳本的原子性來(lái)避免多次查詢數(shù)據(jù)不一致。

const (
	// 插入數(shù)據(jù),調(diào)用setbit設(shè)置對(duì)應(yīng)位
	setScript = `
for _, offset in ipairs(ARGV) do
	redis.call("setbit", KEYS[1], offset, 1)
end
`
	// 查詢數(shù)據(jù),如果所有位都為1返回true
	getScript = `
for _, offset in ipairs(ARGV) do
	if tonumber(redis.call("getbit", KEYS[1], offset)) == 0 then
		return false
	end
end
return true
`
)
type BloomFilter struct {
	client *redis.Client
	key    string // 存在redis中的key
	bits   uint // BitSet的大小
	maps   uint // Hash的次數(shù)
}
func NewBloomFilter(client *redis.Client, key string, bits, maps uint) *BloomFilter {
	client.Del(context.TODO(), key)
	if maps == 0 {
		maps = 14
	}
	return &BloomFilter{
		key:    key,
		client: client,
		bits:   bits,
		maps:   maps,
	}
}
// 進(jìn)行多次Hash, 得到位置列表
func (f *BloomFilter) getLocations(data []byte) []uint {
	locations := make([]uint, f.maps)
	for i := 0; i < int(f.maps); i++ {
		val := murmur3.Sum64(append(data, byte(i)))
		locations[i] = uint(val) % f.bits
	}
	return locations
}
func (f *BloomFilter) Add(data []byte) error {
	args := getArgs(f.getLocations(data))
	_, err := f.client.Eval(context.TODO(), setScript, []string{f.key}, args).Result()
	if err != nil && err != redis.Nil {
		return err
	}
	return nil
}
func (f *BloomFilter) Exists(data []byte) (bool, error) {
	args := getArgs(f.getLocations(data))
	resp, err := f.client.Eval(context.TODO(), getScript, []string{f.key}, args).Result()
	if err != nil {
		if err == redis.Nil {
			return false, nil
		}
		return false, err
	}
	exists, ok := resp.(int64)
	if !ok {
		return false, nil
	}
	return exists == 1, nil
}
func getArgs(locations []uint) []string {
	args := make([]string, 0)
	for _, l := range locations {
		args = append(args, strconv.FormatUint(uint64(l), 10))
	}
	return args
}

運(yùn)行測(cè)試

func main() {
	bf := NewBloomFilter(client,"bf-test", 2^16, 14)
	exists, err := bf.Exists([]byte("test1"))
	log.Printf("exist %t, err %v", exists, err)
	if err := bf.Add([]byte("test1")); err != nil {
		log.Printf("add err: %v", err)
	}
	exists, err = bf.Exists([]byte("test1"))
	log.Printf("exist %t, err %v", exists, err)
	exists, err = bf.Exists([]byte("test2"))
	log.Printf("exist %t, err %v", exists, err)
// output
// 2022/07/22 10:05:58 exist false, err <nil>
// 2022/07/22 10:05:58 exist true, err <nil>
// 2022/07/22 10:05:58 exist false, err <nil>
}

分布式限流器

golang.org/x/time/rate包中提供了基于令牌桶的限流器,如果要實(shí)現(xiàn)分布式環(huán)境的限流可以基于Redis Lua腳本實(shí)現(xiàn)。

令牌桶的主要原理如下:

  • 假設(shè)一個(gè)令牌桶容量為burst,每秒按照qps的速率往里面放置令牌
  • 初始時(shí)放滿令牌,令牌溢出則直接丟棄,請(qǐng)求令牌時(shí),如果桶中有足夠令牌則允許,否則拒絕
  • 當(dāng)burst==qps時(shí),嚴(yán)格按照qps限流;當(dāng)burst>qps時(shí),可以允許一定的突增流量

這里主要參考了官方rate包的實(shí)現(xiàn),將核心邏輯改為L(zhǎng)ua實(shí)現(xiàn)。

--- 相關(guān)Key
--- limit rate key值,對(duì)應(yīng)value為當(dāng)前令牌數(shù)
local limit_key = KEYS[1]
--- 輸入?yún)?shù)
--[[
qps: 每秒請(qǐng)求數(shù);
burst: 令牌桶容量;
now: 當(dāng)前Timestamp;
cost: 請(qǐng)求令牌數(shù);
max_wait: 最大等待時(shí)間
--]]
local qps = tonumber(ARGV[1])
local burst = tonumber(ARGV[2])
local now = ARGV[3]
local cost = tonumber(ARGV[4])
local max_wait = tonumber(ARGV[5])
--- 獲取redis中的令牌數(shù)
local tokens = redis.call("hget", limit_key, "token")
if not tokens then
	tokens = burst
end
--- 上次修改時(shí)間
local last_time = redis.call("hget", limit_key, "last_time")
if not last_time then
	last_time = 0
end
--- 最新等待時(shí)間
local last_event = redis.call("hget", limit_key, "last_event")
if not last_event then
	last_event = 0
end
--- 通過(guò)當(dāng)前時(shí)間與上次修改時(shí)間的差值,qps計(jì)算出當(dāng)前時(shí)間得令牌數(shù)
local delta = math.max(0, now-last_time)
local new_tokens = math.min(burst, delta * qps + tokens)
new_tokens = new_tokens - cost --- 最新令牌數(shù),減少請(qǐng)求令牌
--- 如果最新令牌數(shù)小于0,計(jì)算需要等待的時(shí)間
local wait_period = 0
if new_tokens < 0 and qps > 0 then
	wait_period = wait_period - new_tokens / qps
end
wait_period = math.ceil(wait_period)
local time_act = now + wait_period --- 滿足等待間隔的時(shí)間戳
--- 允許請(qǐng)求有兩種情況
--- 當(dāng)請(qǐng)求令牌數(shù)小于burst, 等待時(shí)間不超過(guò)最大等待時(shí)間,可以通過(guò)補(bǔ)充令牌滿足請(qǐng)求
--- qps為0時(shí),只要最新令牌數(shù)不小于0即可
local ok = (cost <= burst and wait_period <= max_wait and qps > 0) or (qps == 0 and new_tokens >= 0)
--- 設(shè)置對(duì)應(yīng)值
if ok then
	redis.call("set", limit_key, new_tokens)
	redis.call("set", last_time_key, now)
	redis.call("set", last_event_key, time_act)
end
--- 返回列表,{是否允許, 等待時(shí)間}
return {ok, wait_period}

在Golang中的相關(guān)接口Allow、AllowN、Wait等都是通過(guò)調(diào)用reserveN實(shí)現(xiàn)

// 調(diào)用lua腳本
func (lim *RedisLimiter) reserveN(now time.Time, n int, maxFutureReserveSecond int) (*Reservation, error) {
	// ...
	res, err := lim.rdb.Eval(context.TODO(), reserveNScript, []string{lim.limitKey}, lim.qps, lim.burst, now.Unix(), n, maxFutureReserveSecond).Result()
	if err != nil && err != redis.Nil {
		return nil, err
	}
	//...
	return &Reservation{
		ok:        allow == 1,
		lim:       lim,
		tokens:    n,
		timeToAct: now.Add(time.Duration(wait) * time.Second),
	}, nil
}

運(yùn)行測(cè)試

func main() {
	rdb := redis.NewClient(&redis.Options{
		Addr:     "localhost:6379",
		Password: "123456",
		DB:       0, // use default DB
	})
	r, err := NewRedisLimiter(rdb, 1, 2, "testrate")
	if err != nil {
		log.Fatal(err)
	}
	r.Reset()
	for i := 0; i < 5; i++ {
		err := r.Wait(context.TODO())
		log.Printf("worker %d allowed: %v", i, err)
	}
}
// output
// 2022/07/22 12:50:31 worker 0 allowed: <nil>
// 2022/07/22 12:50:31 worker 1 allowed: <nil>
// 2022/07/22 12:50:32 worker 2 allowed: <nil>
// 2022/07/22 12:50:33 worker 3 allowed: <nil>
// 2022/07/22 12:50:34 worker 4 allowed: <nil>

前兩個(gè)請(qǐng)求在burst內(nèi),直接可以獲得,后面的請(qǐng)求按照qps的速率生成。

其他

除此之外,Redis還可以用作全局計(jì)數(shù)、去重(set)、發(fā)布訂閱等場(chǎng)景。Redis官方也提供了一些通用模塊,通過(guò)加載這些模塊也可以實(shí)現(xiàn)過(guò)濾、限流等特性,參考modules

參考

https://go-zero.dev/

https://github.com/qingwave/gocorex

以上就是Golang分布式應(yīng)用之Redis示例詳解的詳細(xì)內(nèi)容,更多關(guān)于Go分布式Redis的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • 淺析Golang中Gin框架存在的必要性

    淺析Golang中Gin框架存在的必要性

    在Go語(yǔ)言中,net/http?包提供了一個(gè)強(qiáng)大且靈活的標(biāo)準(zhǔn)HTTP庫(kù),那為什么還出現(xiàn)了像?Gin?這樣的,方便我們構(gòu)建Web應(yīng)用程序的第三方庫(kù),下面就來(lái)和大家簡(jiǎn)單分析一下
    2023-08-08
  • Goland編輯器設(shè)置選擇范圍背景色的操作

    Goland編輯器設(shè)置選擇范圍背景色的操作

    這篇文章主要介紹了Goland編輯器設(shè)置選擇范圍背景色的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2020-12-12
  • Go語(yǔ)言map用法實(shí)例分析

    Go語(yǔ)言map用法實(shí)例分析

    這篇文章主要介紹了Go語(yǔ)言map用法,實(shí)例分析了map的功能及使用技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下
    2015-02-02
  • 解析Golang中的GoPath和GoModule

    解析Golang中的GoPath和GoModule

    在Golang中,有兩個(gè)概念非常容易弄錯(cuò),第一個(gè)就是GoPath,第二個(gè)則是GoModule,很多初學(xué)者不清楚這兩者之間的關(guān)系,也就難以清晰地了解項(xiàng)目的整體結(jié)構(gòu),今天通過(guò)本文給大家介紹下Golang中的GoPath和GoModule相關(guān)知識(shí),感興趣的朋友一起看看吧
    2022-02-02
  • 在Golang中讀寫CSV文件的操作指南

    在Golang中讀寫CSV文件的操作指南

    CSV(逗號(hào)分隔值)文件是一種常見(jiàn)的數(shù)據(jù)存儲(chǔ)格式,廣泛應(yīng)用于數(shù)據(jù)導(dǎo)入、導(dǎo)出、分析和交換等場(chǎng)景,在Golang中,有許多庫(kù)和工具可以幫助我們讀取和寫入CSV文件,使數(shù)據(jù)處理變得簡(jiǎn)單而高效,本文將深入探討如何在Golang中使用標(biāo)準(zhǔn)庫(kù)以及第三方庫(kù)來(lái)讀寫CSV文件
    2023-11-11
  • Golang單元測(cè)試中的技巧分享

    Golang單元測(cè)試中的技巧分享

    這篇文章主要為大家詳細(xì)介紹了Golang進(jìn)行單元測(cè)試時(shí)的一些技巧和科技,文中的示例代碼講解詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴可以了解一下
    2023-03-03
  • 如何避免go的map競(jìng)態(tài)問(wèn)題的方法

    如何避免go的map競(jìng)態(tài)問(wèn)題的方法

    本文主要介紹了如何避免go的map競(jìng)態(tài)問(wèn)題的方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧
    2023-02-02
  • Golang動(dòng)態(tài)調(diào)用方法小結(jié)

    Golang動(dòng)態(tài)調(diào)用方法小結(jié)

    本文主要介紹了Golang動(dòng)態(tài)調(diào)用方法小結(jié),文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-12-12
  • GoLang中Strconv庫(kù)有哪些常用方法

    GoLang中Strconv庫(kù)有哪些常用方法

    這篇文章主要介紹了GoLang中Strconv庫(kù)有哪些常用方法,strconv庫(kù)實(shí)現(xiàn)了基本數(shù)據(jù)類型與其字符串表示的轉(zhuǎn)換,主要有以下常用函數(shù):?Atoi()、Itia()、parse系列、format系列、append系列
    2023-01-01
  • 深入解析Sync.Pool如何提升Go程序性能

    深入解析Sync.Pool如何提升Go程序性能

    在并發(fā)編程中,資源的分配和回收是一個(gè)很重要的問(wèn)題。Go?語(yǔ)言的?Sync.Pool?是一個(gè)可以幫助我們優(yōu)化這個(gè)問(wèn)題的工具。本篇文章將會(huì)介紹?Sync.Pool?的用法、原理以及如何在項(xiàng)目中正確使用它,希望對(duì)大家有所幫助
    2023-05-05

最新評(píng)論