百行代碼實(shí)現(xiàn)基于Redis的可靠延遲隊(duì)列
在之前探討延時(shí)隊(duì)列的文章中我們提到了 redisson delayqueue 使用 redis 的有序集合結(jié)構(gòu)實(shí)現(xiàn)延時(shí)隊(duì)列,遺憾的是 go 語(yǔ)言社區(qū)中并無(wú)類似的庫(kù)。不過(guò)問題不大,沒有輪子我們自己造??。
本文的完整代碼實(shí)現(xiàn)在hdt3213/delayqueue,可以直接 go get 安裝使用。
使用有序集合結(jié)構(gòu)實(shí)現(xiàn)延時(shí)隊(duì)列的方法已經(jīng)廣為人知,無(wú)非是將消息作為有序集合的 member, 投遞時(shí)間戳作為 score 使用 zrangebyscore 命令搜索已到投遞時(shí)間的消息然后將其發(fā)給消費(fèi)者。
然而消息隊(duì)列不是將消息發(fā)給消費(fèi)者就萬(wàn)事大吉,它們還有一個(gè)重要職責(zé)是確保送達(dá)和消費(fèi)。通常的實(shí)現(xiàn)方式是當(dāng)消費(fèi)者收到消息后向消息隊(duì)列返回確認(rèn)(ack),若消費(fèi)者返回否定確認(rèn)(nack)或超時(shí)未返回,消息隊(duì)列則會(huì)按照預(yù)定規(guī)則重新發(fā)送,直到到達(dá)最大重試次數(shù)后停止。如何實(shí)現(xiàn) ack 和重試機(jī)制是我們要重點(diǎn)考慮的問題。
我們的消息隊(duì)列允許分布式地部署多個(gè)生產(chǎn)者和消費(fèi)者,消費(fèi)者實(shí)例定時(shí)執(zhí)行 lua 腳本驅(qū)動(dòng)消息在隊(duì)列中的流轉(zhuǎn)無(wú)需部署額外組件。由于 Redis 保證了 lua 腳本執(zhí)行的原子性,整個(gè)流程無(wú)需加鎖。
消費(fèi)者采用拉模式獲得消息,保證每條消息至少投遞一次,消息隊(duì)列會(huì)重試超時(shí)或者被否定確認(rèn)的消息(nack) 直至到達(dá)最大重試次數(shù)。一條消息最多有一個(gè)消費(fèi)者正在處理,減少了要考慮的并發(fā)問題。
請(qǐng)注意:若消費(fèi)時(shí)間超過(guò)了 MaxConsumeDuration 消息隊(duì)列會(huì)認(rèn)為消費(fèi)超時(shí)并重新投遞,此時(shí)可能有多個(gè)消費(fèi)者同時(shí)消費(fèi)。
具體使用也非常簡(jiǎn)單,只需要注冊(cè)處理消息的回調(diào)函數(shù)并調(diào)用 start() 即可:
package main import ( "github.com/go-redis/redis/v8" "github.com/hdt3213/delayqueue" "strconv" "time" ) func main() { redisCli := redis.NewClient(&redis.Options{ Addr: "127.0.0.1:6379", }) queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool { // 注冊(cè)處理消息的回調(diào)函數(shù) // 返回 true 表示已成功消費(fèi),返回 false 消息隊(duì)列會(huì)重新投遞次消息 return true }) // 發(fā)送延時(shí)消息 for i := 0; i < 10; i++ { err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3)) if err != nil { panic(err) } } // start consume done := queue.StartConsume() <-done }
由于數(shù)據(jù)存儲(chǔ)在 redis 中所以我們最多能保證在 redis 無(wú)故障且消息隊(duì)列相關(guān) key 未被外部篡改的情況下不會(huì)丟失消息。
原理詳解
消息隊(duì)列涉及幾個(gè)關(guān)鍵的 redis 數(shù)據(jù)結(jié)構(gòu):
- msgKey: 為了避免兩條內(nèi)容完全相同的消息造成意外的影響,我們將每條消息放到一個(gè)字符串類型的鍵中,并分配一個(gè) UUID 作為它的唯一標(biāo)識(shí)。其它數(shù)據(jù)結(jié)構(gòu)中只存儲(chǔ) UUID 而不存儲(chǔ)完整的消息內(nèi)容。每個(gè) msg 擁有一個(gè)獨(dú)立的 key 而不是將所有消息放到一個(gè)哈希表是為了利用 TTL 機(jī)制避免
- pendingKey: 有序集合類型,member 為消息 ID, score 為投遞時(shí)間的 unix 時(shí)間戳。
- readyKey: 列表類型,需要投遞的消息 ID。
- unAckKey: 有序集合類型,member 為消息 ID, score 為重試時(shí)間的 unix 時(shí)間戳。
- retryKey: 列表類型,已到重試時(shí)間的消息 ID
- garbageKey: 集合類型,用于暫存已達(dá)重試上線的消息 ID
- retryCountKey: 哈希表類型,鍵為消息 ID, 值為剩余的重試次數(shù)
流程如下圖所示:
由于我們?cè)试S分布式地部署多個(gè)消費(fèi)者,每個(gè)消費(fèi)者都在定時(shí)執(zhí)行 lua 腳本,所以多個(gè)消費(fèi)者可能處于上述流程中不同狀態(tài),我們無(wú)法預(yù)知(或控制)上圖中五個(gè)操作發(fā)生的先后順序,也無(wú)法控制有多少實(shí)例正在執(zhí)行同一個(gè)操作。
因此我們需要保證上圖中五個(gè)操作滿足三個(gè)條件:
- 都是原子性的
- 不會(huì)重復(fù)處理同一條消息
- 操作前后消息隊(duì)列始終處于正確的狀態(tài)
只要滿足這三個(gè)條件,我們就可以部署多個(gè)實(shí)例且不需要使用分布式鎖等技術(shù)來(lái)進(jìn)行狀態(tài)同步。
是不是聽起來(lái)有點(diǎn)嚇人??? 其實(shí)簡(jiǎn)單的很,讓我們一起來(lái)詳細(xì)看看吧~
pending2ReadyScript
pending2ReadyScript 使用 zrangebyscore 掃描已到投遞時(shí)間的消息ID并把它們移動(dòng)到 ready 中:
-- keys: pendingKey, readyKey -- argv: currentTime local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- 從 pending key 中找出已到投遞時(shí)間的消息 if (#msgs == 0) then return end local args2 = {'LPush', KEYS[2]} -- 將他們放入 ready key 中 for _,v in ipairs(msgs) do table.insert(args2, v) end redis.call(unpack(args2)) redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- 從 pending key 中刪除已投遞的消息
ready2UnackScript
ready2UnackScript 從 ready 或者 retry 中取出一條消息發(fā)送給消費(fèi)者并放入 unack 中,類似于 RPopLPush:
-- keys: readyKey/retryKey, unackKey -- argv: retryTime local msg = redis.call('RPop', KEYS[1]) if (not msg) then return end redis.call('ZAdd', KEYS[2], ARGV[1], msg) return msg
unack2RetryScript
unack2RetryScript 從 retry 中找出所有已到重試時(shí)間的消息并把它們移動(dòng)到 unack 中:
-- keys: unackKey, retryCountKey, retryKey, garbageKey -- argv: currentTime local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1]) -- 找到已到重試時(shí)間的消息 if (#msgs == 0) then return end local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- 查詢剩余重試次數(shù) for i,v in ipairs(retryCounts) do local k = msgs[i] if tonumber(v) > 0 then -- 剩余次數(shù)大于 0 redis.call("HIncrBy", KEYS[2], k, -1) -- 減少剩余重試次數(shù) redis.call("LPush", KEYS[3], k) -- 添加到 retry key 中 else -- 剩余重試次數(shù)為 0 redis.call("HDel", KEYS[2], k) -- 刪除重試次數(shù)記錄 redis.call("SAdd", KEYS[4], k) -- 添加到垃圾桶,等待后續(xù)刪除 end end redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- 將已處理的消息從 unack key 中刪除
因?yàn)?redis 要求 lua 腳本必須在執(zhí)行前在 KEYS 參數(shù)中聲明自己要訪問的 key, 而我們將每個(gè) msg 有一個(gè)獨(dú)立的 key,我們?cè)趫?zhí)行 unack2RetryScript 之前是不知道哪些 msg key 需要被刪除。所以 lua 腳本只將需要?jiǎng)h除的消息記在 garbage key 中,腳本執(zhí)行完后再通過(guò) del 命令將他們刪除:
func (q *DelayQueue) garbageCollect() error { ctx := context.Background() msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result() if err != nil { return fmt.Errorf("smembers failed: %v", err) } if len(msgIds) == 0 { return nil } // allow concurrent clean msgKeys := make([]string, 0, len(msgIds)) for _, idStr := range msgIds { msgKeys = append(msgKeys, q.genMsgKey(idStr)) } err = q.redisCli.Del(ctx, msgKeys...).Err() if err != nil && err != redis.Nil { return fmt.Errorf("del msgs failed: %v", err) } err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err() if err != nil && err != redis.Nil { return fmt.Errorf("remove from garbage key failed: %v", err) } return nil }
之前提到的 lua 腳本都是原子性執(zhí)行的,不會(huì)有其它命令插入其中。 gc 函數(shù)由 3 條 redis 命令組成,在執(zhí)行過(guò)程中可能會(huì)有其它命令插入執(zhí)行過(guò)程中,不過(guò)考慮到一條消息進(jìn)入垃圾回收流程之后不會(huì)復(fù)活所以不需要保證 3 條命令原子性。
ack
ack 只需要將消息徹底刪除即可:
func (q *DelayQueue) ack(idStr string) error { ctx := context.Background() err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err() if err != nil { return fmt.Errorf("remove from unack failed: %v", err) } // msg key has ttl, ignore result of delete _ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err() q.redisCli.HDel(ctx, q.retryCountKey, idStr) return nil }
否定確認(rèn)只需要將 unack key 中消息的重試時(shí)間改為現(xiàn)在,隨后執(zhí)行的 unack2RetryScript 會(huì)立即將它移動(dòng)到 retry key
func (q *DelayQueue) nack(idStr string) error { ctx := context.Background() // update retry time as now, unack2Retry will move it to retry immediately err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{ Member: idStr, Score: float64(time.Now().Unix()), }).Err() if err != nil { return fmt.Errorf("negative ack failed: %v", err) } return nil }
consume
消息隊(duì)列的核心邏輯是每秒執(zhí)行一次的 consume 函數(shù),它負(fù)責(zé)調(diào)用上述腳本將消息轉(zhuǎn)移到正確的集合中并回調(diào) consumer 來(lái)消費(fèi)消息:
func (q *DelayQueue) consume() error { // 執(zhí)行 pending2ready,將已到時(shí)間的消息轉(zhuǎn)移到 ready err := q.pending2Ready() if err != nil { return err } // 循環(huán)調(diào)用 ready2Unack 拉取消息進(jìn)行消費(fèi) var fetchCount uint for { idStr, err := q.ready2Unack() if err == redis.Nil { // consumed all break } if err != nil { return err } fetchCount++ ack, err := q.callback(idStr) if err != nil { return err } if ack { err = q.ack(idStr) } else { err = q.nack(idStr) } if err != nil { return err } if fetchCount >= q.fetchLimit { break } } // 將 nack 或超時(shí)的消息放入重試隊(duì)列 err = q.unack2Retry() if err != nil { return err } // 清理已達(dá)到最大重試次數(shù)的消息 err = q.garbageCollect() if err != nil { return err } // 消費(fèi)重試隊(duì)列 fetchCount = 0 for { idStr, err := q.retry2Unack() if err == redis.Nil { // consumed all break } if err != nil { return err } fetchCount++ ack, err := q.callback(idStr) if err != nil { return err } if ack { err = q.ack(idStr) } else { err = q.nack(idStr) } if err != nil { return err } if fetchCount >= q.fetchLimit { break } } return nil }
至此一個(gè)簡(jiǎn)單可靠的延時(shí)隊(duì)列就做好了,何不趕緊開始試用呢?????
到此這篇關(guān)于百行代碼實(shí)現(xiàn)基于Redis的可靠延遲隊(duì)列的文章就介紹到這了,更多相關(guān)百行代碼實(shí)現(xiàn)基于Redis的可靠延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Redisson實(shí)現(xiàn)Redis分布式鎖的幾種方式
本文在講解如何使用Redisson實(shí)現(xiàn)Redis普通分布式鎖,以及Redlock算法分布式鎖的幾種方式的同時(shí),也附帶解答這些同學(xué)的一些疑問,感興趣的可以了解一下2021-08-08Redis異步隊(duì)列的實(shí)現(xiàn)及應(yīng)用場(chǎng)景
異步隊(duì)列是一種底層基于異步 I/O 模型的消息隊(duì)列,用于在分布式系統(tǒng)中進(jìn)行同步和異步的通訊和協(xié)作,本文主要介紹了Redis異步隊(duì)列的實(shí)現(xiàn)及應(yīng)用場(chǎng)景,感興趣的可以了解一下2023-12-12redis由于目標(biāo)計(jì)算機(jī)積極拒絕,無(wú)法連接的解決
這篇文章主要介紹了redis由于目標(biāo)計(jì)算機(jī)積極拒絕,無(wú)法連接的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07redis適合場(chǎng)景八點(diǎn)總結(jié)
在本篇文章中我們給大家整理了關(guān)于redis適合什么場(chǎng)景的8點(diǎn)知識(shí)點(diǎn)內(nèi)容,需要的朋友們參考下。2019-06-06