Golang實現(xiàn)基于Redis的可靠延遲隊列
前言
在之前探討延時隊列的文章中我們提到了 redisson delayqueue 使用 redis 的有序集合結(jié)構(gòu)實現(xiàn)延時隊列,遺憾的是 go 語言社區(qū)中并無類似的庫。不過問題不大,沒有輪子我們自己造。
本文的完整代碼實現(xiàn)在hdt3213/delayqueue,可以直接 go get 安裝使用。
使用有序集合結(jié)構(gòu)實現(xiàn)延時隊列的方法已經(jīng)廣為人知,無非是將消息作為有序集合的 member, 投遞時間戳作為 score 使用 zrangebyscore 命令搜索已到投遞時間的消息然后將其發(fā)給消費者。
然而消息隊列不是將消息發(fā)給消費者就萬事大吉,它們還有一個重要職責(zé)是確保送達和消費。通常的實現(xiàn)方式是當(dāng)消費者收到消息后向消息隊列返回確認(ack),若消費者返回否定確認(nack)或超時未返回,消息隊列則會按照預(yù)定規(guī)則重新發(fā)送,直到到達最大重試次數(shù)后停止。如何實現(xiàn) ack 和重試機制是我們要重點考慮的問題。
我們的消息隊列允許分布式地部署多個生產(chǎn)者和消費者,消費者實例定時執(zhí)行 lua 腳本驅(qū)動消息在隊列中的流轉(zhuǎn)無需部署額外組件。由于 Redis 保證了 lua 腳本執(zhí)行的原子性,整個流程無需加鎖。
消費者采用拉模式獲得消息,保證每條消息至少投遞一次,消息隊列會重試超時或者被否定確認的消息(nack) 直至到達最大重試次數(shù)。一條消息最多有一個消費者正在處理,減少了要考慮的并發(fā)問題。
請注意:若消費時間超過了 MaxConsumeDuration 消息隊列會認為消費超時并重新投遞,此時可能有多個消費者同時消費。
具體使用也非常簡單,只需要注冊處理消息的回調(diào)函數(shù)并調(diào)用 start() 即可:
package main import ( "github.com/go-redis/redis/v8" "github.com/hdt3213/delayqueue" "strconv" "time" ) func main() { redisCli := redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", }) queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool { // 注冊處理消息的回調(diào)函數(shù) // 返回 true 表示已成功消費,返回 false 消息隊列會重新投遞次消息 return true }) // 發(fā)送延時消息 for i := 0; i < 10; i++ { err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3)) if err != nil { panic(err) } } // start consume done := queue.StartConsume() <-done }
由于數(shù)據(jù)存儲在 redis 中所以我們最多能保證在 redis 無故障且消息隊列相關(guān) key 未被外部篡改的情況下不會丟失消息。
原理詳解
消息隊列涉及幾個關(guān)鍵的 redis 數(shù)據(jù)結(jié)構(gòu):
- msgKey: 為了避免兩條內(nèi)容完全相同的消息造成意外的影響,我們將每條消息放到一個字符串類型的鍵中,并分配一個 UUID 作為它的唯一標(biāo)識。其它數(shù)據(jù)結(jié)構(gòu)中只存儲 UUID 而不存儲完整的消息內(nèi)容。每個 msg 擁有一個獨立的 key 而不是將所有消息放到一個哈希表是為了利用 TTL 機制避免
- pendingKey: 有序集合類型,member 為消息 ID, score 為投遞時間的 unix 時間戳。
- readyKey: 列表類型,需要投遞的消息 ID。
- unAckKey: 有序集合類型,member 為消息 ID, score 為重試時間的 unix 時間戳。
- retryKey: 列表類型,已到重試時間的消息 ID
- garbageKey: 集合類型,用于暫存已達重試上線的消息 ID
- retryCountKey: 哈希表類型,鍵為消息 ID, 值為剩余的重試次數(shù)
流程如下圖所示:
由于我們允許分布式地部署多個消費者,每個消費者都在定時執(zhí)行 lua 腳本,所以多個消費者可能處于上述流程中不同狀態(tài),我們無法預(yù)知(或控制)上圖中五個操作發(fā)生的先后順序,也無法控制有多少實例正在執(zhí)行同一個操作。
因此我們需要保證上圖中五個操作滿足三個條件:
- 都是原子性的
- 不會重復(fù)處理同一條消息
- 操作前后消息隊列始終處于正確的狀態(tài)
只要滿足這三個條件,我們就可以部署多個實例且不需要使用分布式鎖等技術(shù)來進行狀態(tài)同步。
是不是聽起來有點嚇人?其實簡單的很,讓我們一起來詳細看看吧~
pending2ReadyScript
pending2ReadyScript 使用 zrangebyscore 掃描已到投遞時間的消息ID并把它們移動到 ready 中:
-- keys: pendingKey, readyKey -- argv: currentTime local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- 從 pending key 中找出已到投遞時間的消息 if (#msgs == 0) then return end local args2 = {'LPush', KEYS[2]} -- 將他們放入 ready key 中 for _,v in ipairs(msgs) do table.insert(args2, v) end redis.call(unpack(args2)) redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- 從 pending key 中刪除已投遞的消息
ready2UnackScript
ready2UnackScript 從 ready 或者 retry 中取出一條消息發(fā)送給消費者并放入 unack 中,類似于 RPopLPush:
-- keys: readyKey/retryKey, unackKey -- argv: retryTime local msg = redis.call('RPop', KEYS[1]) if (not msg) then return end redis.call('ZAdd', KEYS[2], ARGV[1], msg) return msg
unack2RetryScript
unack2RetryScript 從 retry 中找出所有已到重試時間的消息并把它們移動到 unack 中:
-- keys: unackKey, retryCountKey, retryKey, garbageKey -- argv: currentTime local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- 找到已到重試時間的消息 if (#msgs == 0) then return end local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- 查詢剩余重試次數(shù) for i,v in ipairs(retryCounts) do local k = msgs[i] if tonumber(v) > 0 then -- 剩余次數(shù)大于 0 redis.call("HIncrBy", KEYS[2], k, -1) -- 減少剩余重試次數(shù) redis.call("LPush", KEYS[3], k) -- 添加到 retry key 中 else -- 剩余重試次數(shù)為 0 redis.call("HDel", KEYS[2], k) -- 刪除重試次數(shù)記錄 redis.call("SAdd", KEYS[4], k) -- 添加到垃圾桶,等待后續(xù)刪除 end end redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- 將已處理的消息從 unack key 中刪除
因為 redis 要求 lua 腳本必須在執(zhí)行前在 KEYS 參數(shù)中聲明自己要訪問的 key, 而我們將每個 msg 有一個獨立的 key,我們在執(zhí)行 unack2RetryScript 之前是不知道哪些 msg key 需要被刪除。所以 lua 腳本只將需要刪除的消息記在 garbage key 中,腳本執(zhí)行完后再通過 del 命令將他們刪除:
func (q *DelayQueue) garbageCollect() error { ctx := context.Background() msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result() if err != nil { return fmt.Errorf("smembers failed: %v", err) } if len(msgIds) == 0 { return nil } // allow concurrent clean msgKeys := make([]string, 0, len(msgIds)) for _, idStr := range msgIds { msgKeys = append(msgKeys, q.genMsgKey(idStr)) } err = q.redisCli.Del(ctx, msgKeys...).Err() if err != nil && err != redis.Nil { return fmt.Errorf("del msgs failed: %v", err) } err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err() if err != nil && err != redis.Nil { return fmt.Errorf("remove from garbage key failed: %v", err) } return nil }
之前提到的 lua 腳本都是原子性執(zhí)行的,不會有其它命令插入其中。 gc 函數(shù)由 3 條 redis 命令組成,在執(zhí)行過程中可能會有其它命令插入執(zhí)行過程中,不過考慮到一條消息進入垃圾回收流程之后不會復(fù)活所以不需要保證 3 條命令原子性。
ack
ack 只需要將消息徹底刪除即可:
func (q *DelayQueue) ack(idStr string) error { ctx := context.Background() err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err() if err != nil { return fmt.Errorf("remove from unack failed: %v", err) } // msg key has ttl, ignore result of delete _ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err() q.redisCli.HDel(ctx, q.retryCountKey, idStr) return nil }
否定確認只需要將 unack key 中消息的重試時間改為現(xiàn)在,隨后執(zhí)行的 unack2RetryScript 會立即將它移動到 retry key
func (q *DelayQueue) nack(idStr string) error { ctx := context.Background() // update retry time as now, unack2Retry will move it to retry immediately err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{ Member: idStr, Score: float64(time.Now().Unix()), }).Err() if err != nil { return fmt.Errorf("negative ack failed: %v", err) } return nil }
consume
消息隊列的核心邏輯是每秒執(zhí)行一次的 consume 函數(shù),它負責(zé)調(diào)用上述腳本將消息轉(zhuǎn)移到正確的集合中并回調(diào) consumer 來消費消息:
func (q *DelayQueue) consume() error { // 執(zhí)行 pending2ready,將已到時間的消息轉(zhuǎn)移到 ready err := q.pending2Ready() if err != nil { return err } // 循環(huán)調(diào)用 ready2Unack 拉取消息進行消費 var fetchCount uint for { idStr, err := q.ready2Unack() if err == redis.Nil { // consumed all break } if err != nil { return err } fetchCount++ ack, err := q.callback(idStr) if err != nil { return err } if ack { err = q.ack(idStr) } else { err = q.nack(idStr) } if err != nil { return err } if fetchCount >= q.fetchLimit { break } } // 將 nack 或超時的消息放入重試隊列 err = q.unack2Retry() if err != nil { return err } // 清理已達到最大重試次數(shù)的消息 err = q.garbageCollect() if err != nil { return err } // 消費重試隊列 fetchCount = 0 for { idStr, err := q.retry2Unack() if err == redis.Nil { // consumed all break } if err != nil { return err } fetchCount++ ack, err := q.callback(idStr) if err != nil { return err } if ack { err = q.ack(idStr) } else { err = q.nack(idStr) } if err != nil { return err } if fetchCount >= q.fetchLimit { break } } return nil }
至此一個簡單可靠的延時隊列就做好了,何不趕緊開始試用呢?
以上就是Golang實現(xiàn)基于Redis的可靠延遲隊列的詳細內(nèi)容,更多關(guān)于Golang Redis可靠延遲隊列的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Golang中interface轉(zhuǎn)string輸出打印方法
這篇文章主要給大家介紹了關(guān)于Golang中interface轉(zhuǎn)string輸出打印的相關(guān)資料,在go語言中interface轉(zhuǎn)string可以直接使用fmt提供的fmt函數(shù),文中通過代碼介紹的非常詳細,需要的朋友可以參考下2024-02-02Golang內(nèi)存對齊的規(guī)則及實現(xiàn)
本文介紹了Golang內(nèi)存對齊的規(guī)則及實現(xiàn),通過合理的內(nèi)存對齊,可以提高程序的執(zhí)行效率和性能,通過對本文的閱讀,讀者可以更好地理解Golang內(nèi)存對齊的原理和技巧,并應(yīng)用于實際編程中2023-08-08一文詳解如何在Golang中實現(xiàn)JWT認證與授權(quán)
在現(xiàn)代Web應(yīng)用中,安全性是一個非常重要的課題,JWT作為一種常用的認證與授權(quán)機制,已被廣泛應(yīng)用于各種系統(tǒng)中,下面我們就來看看如何在Golang中實現(xiàn)JWT認證與授權(quán)吧2025-03-03go語言實現(xiàn)http服務(wù)端與客戶端的例子
今天小編就為大家分享一篇go語言實現(xiàn)http服務(wù)端與客戶端的例子,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-08-08