欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Golang實現(xiàn)基于Redis的可靠延遲隊列

 更新時間:2022年06月22日 14:29:18   作者:Finley  
redisson?delayqueue可以使用redis的有序集合結(jié)構(gòu)實現(xiàn)延時隊列,遺憾的是go語言社區(qū)中并無類似的庫。不過問題不大,本文將用Go語言實現(xiàn)這一功能,需要的可以參考一下

前言

在之前探討延時隊列的文章中我們提到了 redisson delayqueue 使用 redis 的有序集合結(jié)構(gòu)實現(xiàn)延時隊列,遺憾的是 go 語言社區(qū)中并無類似的庫。不過問題不大,沒有輪子我們自己造。

本文的完整代碼實現(xiàn)在hdt3213/delayqueue,可以直接 go get 安裝使用。

使用有序集合結(jié)構(gòu)實現(xiàn)延時隊列的方法已經(jīng)廣為人知,無非是將消息作為有序集合的 member, 投遞時間戳作為 score 使用 zrangebyscore 命令搜索已到投遞時間的消息然后將其發(fā)給消費者。

然而消息隊列不是將消息發(fā)給消費者就萬事大吉,它們還有一個重要職責(zé)是確保送達和消費。通常的實現(xiàn)方式是當(dāng)消費者收到消息后向消息隊列返回確認(ack),若消費者返回否定確認(nack)或超時未返回,消息隊列則會按照預(yù)定規(guī)則重新發(fā)送,直到到達最大重試次數(shù)后停止。如何實現(xiàn) ack 和重試機制是我們要重點考慮的問題。

我們的消息隊列允許分布式地部署多個生產(chǎn)者和消費者,消費者實例定時執(zhí)行 lua 腳本驅(qū)動消息在隊列中的流轉(zhuǎn)無需部署額外組件。由于 Redis 保證了 lua 腳本執(zhí)行的原子性,整個流程無需加鎖。

消費者采用拉模式獲得消息,保證每條消息至少投遞一次,消息隊列會重試超時或者被否定確認的消息(nack) 直至到達最大重試次數(shù)。一條消息最多有一個消費者正在處理,減少了要考慮的并發(fā)問題。

請注意:若消費時間超過了 MaxConsumeDuration 消息隊列會認為消費超時并重新投遞,此時可能有多個消費者同時消費。

具體使用也非常簡單,只需要注冊處理消息的回調(diào)函數(shù)并調(diào)用 start() 即可:

package main

import (
	"github.com/go-redis/redis/v8"
	"github.com/hdt3213/delayqueue"
	"strconv"
	"time"
)

func main() {
	redisCli := redis.NewClient(&redis.Options{
		Addr: "127.0.0.1:6379",
	})
	queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool {
		// 注冊處理消息的回調(diào)函數(shù)
        // 返回 true 表示已成功消費,返回 false 消息隊列會重新投遞次消息
		return true
	})
	// 發(fā)送延時消息
	for i := 0; i < 10; i++ {
		err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
		if err != nil {
			panic(err)
		}
	}

	// start consume
	done := queue.StartConsume()
	<-done
}

由于數(shù)據(jù)存儲在 redis 中所以我們最多能保證在 redis 無故障且消息隊列相關(guān) key 未被外部篡改的情況下不會丟失消息。

原理詳解

消息隊列涉及幾個關(guān)鍵的 redis 數(shù)據(jù)結(jié)構(gòu):

  • msgKey: 為了避免兩條內(nèi)容完全相同的消息造成意外的影響,我們將每條消息放到一個字符串類型的鍵中,并分配一個 UUID 作為它的唯一標(biāo)識。其它數(shù)據(jù)結(jié)構(gòu)中只存儲 UUID 而不存儲完整的消息內(nèi)容。每個 msg 擁有一個獨立的 key 而不是將所有消息放到一個哈希表是為了利用 TTL 機制避免
  • pendingKey: 有序集合類型,member 為消息 ID, score 為投遞時間的 unix 時間戳。
  • readyKey: 列表類型,需要投遞的消息 ID。
  • unAckKey: 有序集合類型,member 為消息 ID, score 為重試時間的 unix 時間戳。
  • retryKey: 列表類型,已到重試時間的消息 ID
  • garbageKey: 集合類型,用于暫存已達重試上線的消息 ID
  • retryCountKey: 哈希表類型,鍵為消息 ID, 值為剩余的重試次數(shù)

流程如下圖所示:

由于我們允許分布式地部署多個消費者,每個消費者都在定時執(zhí)行 lua 腳本,所以多個消費者可能處于上述流程中不同狀態(tài),我們無法預(yù)知(或控制)上圖中五個操作發(fā)生的先后順序,也無法控制有多少實例正在執(zhí)行同一個操作。

因此我們需要保證上圖中五個操作滿足三個條件:

  • 都是原子性的
  • 不會重復(fù)處理同一條消息
  • 操作前后消息隊列始終處于正確的狀態(tài)

只要滿足這三個條件,我們就可以部署多個實例且不需要使用分布式鎖等技術(shù)來進行狀態(tài)同步。

是不是聽起來有點嚇人?其實簡單的很,讓我們一起來詳細看看吧~

pending2ReadyScript

pending2ReadyScript 使用 zrangebyscore 掃描已到投遞時間的消息ID并把它們移動到 ready 中:

-- keys: pendingKey, readyKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- 從 pending key 中找出已到投遞時間的消息
if (#msgs == 0) then return end
local args2 = {'LPush', KEYS[2]} -- 將他們放入 ready key 中
for _,v in ipairs(msgs) do
	table.insert(args2, v) 
end
redis.call(unpack(args2))
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- 從 pending key 中刪除已投遞的消息

ready2UnackScript

ready2UnackScript 從 ready 或者 retry 中取出一條消息發(fā)送給消費者并放入 unack 中,類似于 RPopLPush:

-- keys: readyKey/retryKey, unackKey
-- argv: retryTime
local msg = redis.call('RPop', KEYS[1])
if (not msg) then return end
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
return msg

unack2RetryScript

unack2RetryScript 從 retry 中找出所有已到重試時間的消息并把它們移動到 unack 中:

-- keys: unackKey, retryCountKey, retryKey, garbageKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- 找到已到重試時間的消息
if (#msgs == 0) then return end
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- 查詢剩余重試次數(shù)
for i,v in ipairs(retryCounts) do
	local k = msgs[i]
	if tonumber(v) > 0 then -- 剩余次數(shù)大于 0
		redis.call("HIncrBy", KEYS[2], k, -1) -- 減少剩余重試次數(shù)
		redis.call("LPush", KEYS[3], k) -- 添加到 retry key 中
	else -- 剩余重試次數(shù)為 0
		redis.call("HDel", KEYS[2], k) -- 刪除重試次數(shù)記錄
		redis.call("SAdd", KEYS[4], k) -- 添加到垃圾桶,等待后續(xù)刪除
	end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- 將已處理的消息從 unack key 中刪除

因為 redis 要求 lua 腳本必須在執(zhí)行前在 KEYS 參數(shù)中聲明自己要訪問的 key, 而我們將每個 msg 有一個獨立的 key,我們在執(zhí)行 unack2RetryScript 之前是不知道哪些 msg key 需要被刪除。所以 lua 腳本只將需要刪除的消息記在 garbage key 中,腳本執(zhí)行完后再通過 del 命令將他們刪除:

func (q *DelayQueue) garbageCollect() error {
	ctx := context.Background()
	msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result()
	if err != nil {
		return fmt.Errorf("smembers failed: %v", err)
	}
	if len(msgIds) == 0 {
		return nil
	}
	// allow concurrent clean
	msgKeys := make([]string, 0, len(msgIds))
	for _, idStr := range msgIds {
		msgKeys = append(msgKeys, q.genMsgKey(idStr))
	}
	err = q.redisCli.Del(ctx, msgKeys...).Err()
	if err != nil && err != redis.Nil {
		return fmt.Errorf("del msgs failed: %v", err)
	}
	err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err()
	if err != nil && err != redis.Nil {
		return fmt.Errorf("remove from garbage key failed: %v", err)
	}
	return nil
}

之前提到的 lua 腳本都是原子性執(zhí)行的,不會有其它命令插入其中。 gc 函數(shù)由 3 條 redis 命令組成,在執(zhí)行過程中可能會有其它命令插入執(zhí)行過程中,不過考慮到一條消息進入垃圾回收流程之后不會復(fù)活所以不需要保證 3 條命令原子性。

ack

ack 只需要將消息徹底刪除即可:

func (q *DelayQueue) ack(idStr string) error {
	ctx := context.Background()
	err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err()
	if err != nil {
		return fmt.Errorf("remove from unack failed: %v", err)
	}
	// msg key has ttl, ignore result of delete
	_ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
	q.redisCli.HDel(ctx, q.retryCountKey, idStr)
	return nil
}

否定確認只需要將 unack key 中消息的重試時間改為現(xiàn)在,隨后執(zhí)行的 unack2RetryScript 會立即將它移動到 retry key

func (q *DelayQueue) nack(idStr string) error {
	ctx := context.Background()
	// update retry time as now, unack2Retry will move it to retry immediately
	err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{
		Member: idStr,
		Score:  float64(time.Now().Unix()),
	}).Err()
	if err != nil {
		return fmt.Errorf("negative ack failed: %v", err)
	}
	return nil
}

consume

消息隊列的核心邏輯是每秒執(zhí)行一次的 consume 函數(shù),它負責(zé)調(diào)用上述腳本將消息轉(zhuǎn)移到正確的集合中并回調(diào) consumer 來消費消息:

func (q *DelayQueue) consume() error {
	// 執(zhí)行 pending2ready,將已到時間的消息轉(zhuǎn)移到 ready
	err := q.pending2Ready()
	if err != nil {
		return err
	}
	// 循環(huán)調(diào)用 ready2Unack 拉取消息進行消費
	var fetchCount uint
	for {
		idStr, err := q.ready2Unack()
		if err == redis.Nil { // consumed all
			break
		}
		if err != nil {
			return err
		}
		fetchCount++
		ack, err := q.callback(idStr)
		if err != nil {
			return err
		}
		if ack {
			err = q.ack(idStr)
		} else {
			err = q.nack(idStr)
		}
		if err != nil {
			return err
		}
		if fetchCount >= q.fetchLimit {
			break
		}
	}
	// 將 nack 或超時的消息放入重試隊列
	err = q.unack2Retry()
	if err != nil {
		return err
	}
    // 清理已達到最大重試次數(shù)的消息
	err = q.garbageCollect()
	if err != nil {
		return err
	}
	// 消費重試隊列
	fetchCount = 0
	for {
		idStr, err := q.retry2Unack()
		if err == redis.Nil { // consumed all
			break
		}
		if err != nil {
			return err
		}
		fetchCount++
		ack, err := q.callback(idStr)
		if err != nil {
			return err
		}
		if ack {
			err = q.ack(idStr)
		} else {
			err = q.nack(idStr)
		}
		if err != nil {
			return err
		}
		if fetchCount >= q.fetchLimit {
			break
		}
	}
	return nil
}

至此一個簡單可靠的延時隊列就做好了,何不趕緊開始試用呢?

以上就是Golang實現(xiàn)基于Redis的可靠延遲隊列的詳細內(nèi)容,更多關(guān)于Golang Redis可靠延遲隊列的資料請關(guān)注腳本之家其它相關(guān)文章!

相關(guān)文章

  • Golang易錯知識點匯總

    Golang易錯知識點匯總

    這篇文章匯總了在開發(fā)和刷面試題過程中遇到的Golang容易搞錯的知識點,關(guān)鍵部分也都為大家寫了代碼示例,感興趣的小伙伴可以了解一下
    2022-09-09
  • Golang中interface轉(zhuǎn)string輸出打印方法

    Golang中interface轉(zhuǎn)string輸出打印方法

    這篇文章主要給大家介紹了關(guān)于Golang中interface轉(zhuǎn)string輸出打印的相關(guān)資料,在go語言中interface轉(zhuǎn)string可以直接使用fmt提供的fmt函數(shù),文中通過代碼介紹的非常詳細,需要的朋友可以參考下
    2024-02-02
  • 一文理解Go 中的可尋址和不可尋址

    一文理解Go 中的可尋址和不可尋址

    如果字典的元素不存在,則返回零值,而零值是不可變對象,如果能尋址問題就大了。而如果字典的元素存在,考慮到 Go 中 map 實現(xiàn)中元素的地址是變化的,這意味著尋址的結(jié)果也是無意義的。下面我們就圍繞這個話題寫一篇文章吧,需要的朋友可以參考一下
    2021-10-10
  • Golang內(nèi)存對齊的規(guī)則及實現(xiàn)

    Golang內(nèi)存對齊的規(guī)則及實現(xiàn)

    本文介紹了Golang內(nèi)存對齊的規(guī)則及實現(xiàn),通過合理的內(nèi)存對齊,可以提高程序的執(zhí)行效率和性能,通過對本文的閱讀,讀者可以更好地理解Golang內(nèi)存對齊的原理和技巧,并應(yīng)用于實際編程中
    2023-08-08
  • go語言處理JSON和XML數(shù)據(jù)示例解析

    go語言處理JSON和XML數(shù)據(jù)示例解析

    這篇文章主要介紹了go語言處理JSON和XML數(shù)據(jù)的示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2023-08-08
  • 自定義Go?Json的序列化方法譯文

    自定義Go?Json的序列化方法譯文

    這篇文章主要為大家介紹了自定義Go?Json序列化方法譯文,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪
    2022-06-06
  • 一文詳解如何在Golang中實現(xiàn)JWT認證與授權(quán)

    一文詳解如何在Golang中實現(xiàn)JWT認證與授權(quán)

    在現(xiàn)代Web應(yīng)用中,安全性是一個非常重要的課題,JWT作為一種常用的認證與授權(quán)機制,已被廣泛應(yīng)用于各種系統(tǒng)中,下面我們就來看看如何在Golang中實現(xiàn)JWT認證與授權(quán)吧
    2025-03-03
  • go web 預(yù)防跨站腳本的實現(xiàn)方式

    go web 預(yù)防跨站腳本的實現(xiàn)方式

    這篇文章主要介紹了go web 預(yù)防跨站腳本的實現(xiàn)方式,文中給大家介紹XSS最佳的防護應(yīng)該注意哪些問題,本文通過實例代碼講解的非常詳細,需要的朋友可以參考下
    2021-06-06
  • go語言實現(xiàn)http服務(wù)端與客戶端的例子

    go語言實現(xiàn)http服務(wù)端與客戶端的例子

    今天小編就為大家分享一篇go語言實現(xiàn)http服務(wù)端與客戶端的例子,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2019-08-08
  • go-kit組件使用hystrix中間件的操作

    go-kit組件使用hystrix中間件的操作

    這篇文章主要介紹了go-kit組件使用hystrix中間件的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2021-04-04

最新評論