欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

百行代碼實(shí)現(xiàn)基于Redis的可靠延遲隊(duì)列

 更新時(shí)間:2022年06月23日 08:28:34   作者:Finley  
本文主要介紹了百行代碼實(shí)現(xiàn)基于Redis的可靠延遲隊(duì)列,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧

在之前探討延時(shí)隊(duì)列的文章中我們提到了 redisson delayqueue 使用 redis 的有序集合結(jié)構(gòu)實(shí)現(xiàn)延時(shí)隊(duì)列,遺憾的是 go 語(yǔ)言社區(qū)中并無(wú)類似的庫(kù)。不過(guò)問題不大,沒有輪子我們自己造??。

本文的完整代碼實(shí)現(xiàn)在hdt3213/delayqueue,可以直接 go get 安裝使用。

使用有序集合結(jié)構(gòu)實(shí)現(xiàn)延時(shí)隊(duì)列的方法已經(jīng)廣為人知,無(wú)非是將消息作為有序集合的 member, 投遞時(shí)間戳作為 score 使用 zrangebyscore 命令搜索已到投遞時(shí)間的消息然后將其發(fā)給消費(fèi)者。

然而消息隊(duì)列不是將消息發(fā)給消費(fèi)者就萬(wàn)事大吉,它們還有一個(gè)重要職責(zé)是確保送達(dá)和消費(fèi)。通常的實(shí)現(xiàn)方式是當(dāng)消費(fèi)者收到消息后向消息隊(duì)列返回確認(rèn)(ack),若消費(fèi)者返回否定確認(rèn)(nack)或超時(shí)未返回,消息隊(duì)列則會(huì)按照預(yù)定規(guī)則重新發(fā)送,直到到達(dá)最大重試次數(shù)后停止。如何實(shí)現(xiàn) ack 和重試機(jī)制是我們要重點(diǎn)考慮的問題。

我們的消息隊(duì)列允許分布式地部署多個(gè)生產(chǎn)者和消費(fèi)者,消費(fèi)者實(shí)例定時(shí)執(zhí)行 lua 腳本驅(qū)動(dòng)消息在隊(duì)列中的流轉(zhuǎn)無(wú)需部署額外組件。由于 Redis 保證了 lua 腳本執(zhí)行的原子性,整個(gè)流程無(wú)需加鎖。

消費(fèi)者采用拉模式獲得消息,保證每條消息至少投遞一次,消息隊(duì)列會(huì)重試超時(shí)或者被否定確認(rèn)的消息(nack) 直至到達(dá)最大重試次數(shù)。一條消息最多有一個(gè)消費(fèi)者正在處理,減少了要考慮的并發(fā)問題。

請(qǐng)注意:若消費(fèi)時(shí)間超過(guò)了 MaxConsumeDuration 消息隊(duì)列會(huì)認(rèn)為消費(fèi)超時(shí)并重新投遞,此時(shí)可能有多個(gè)消費(fèi)者同時(shí)消費(fèi)。

具體使用也非常簡(jiǎn)單,只需要注冊(cè)處理消息的回調(diào)函數(shù)并調(diào)用 start() 即可:

package main

import (
	"github.com/go-redis/redis/v8"
	"github.com/hdt3213/delayqueue"
	"strconv"
	"time"
)

func main() {
	redisCli := redis.NewClient(&redis.Options{
		Addr: "127.0.0.1:6379",
	})
	queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool {
		// 注冊(cè)處理消息的回調(diào)函數(shù)
        // 返回 true 表示已成功消費(fèi),返回 false 消息隊(duì)列會(huì)重新投遞次消息
		return true
	})
	// 發(fā)送延時(shí)消息
	for i := 0; i < 10; i++ {
		err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
		if err != nil {
			panic(err)
		}
	}

	// start consume
	done := queue.StartConsume()
	<-done
}

由于數(shù)據(jù)存儲(chǔ)在 redis 中所以我們最多能保證在 redis 無(wú)故障且消息隊(duì)列相關(guān) key 未被外部篡改的情況下不會(huì)丟失消息。

原理詳解

消息隊(duì)列涉及幾個(gè)關(guān)鍵的 redis 數(shù)據(jù)結(jié)構(gòu):

  • msgKey: 為了避免兩條內(nèi)容完全相同的消息造成意外的影響,我們將每條消息放到一個(gè)字符串類型的鍵中,并分配一個(gè) UUID 作為它的唯一標(biāo)識(shí)。其它數(shù)據(jù)結(jié)構(gòu)中只存儲(chǔ) UUID 而不存儲(chǔ)完整的消息內(nèi)容。每個(gè) msg 擁有一個(gè)獨(dú)立的 key 而不是將所有消息放到一個(gè)哈希表是為了利用 TTL 機(jī)制避免
  • pendingKey: 有序集合類型,member 為消息 ID, score 為投遞時(shí)間的 unix 時(shí)間戳。
  • readyKey: 列表類型,需要投遞的消息 ID。
  • unAckKey: 有序集合類型,member 為消息 ID, score 為重試時(shí)間的 unix 時(shí)間戳。
  • retryKey: 列表類型,已到重試時(shí)間的消息 ID
  • garbageKey: 集合類型,用于暫存已達(dá)重試上線的消息 ID
  • retryCountKey: 哈希表類型,鍵為消息 ID, 值為剩余的重試次數(shù)

流程如下圖所示:

由于我們?cè)试S分布式地部署多個(gè)消費(fèi)者,每個(gè)消費(fèi)者都在定時(shí)執(zhí)行 lua 腳本,所以多個(gè)消費(fèi)者可能處于上述流程中不同狀態(tài),我們無(wú)法預(yù)知(或控制)上圖中五個(gè)操作發(fā)生的先后順序,也無(wú)法控制有多少實(shí)例正在執(zhí)行同一個(gè)操作。

因此我們需要保證上圖中五個(gè)操作滿足三個(gè)條件:

  • 都是原子性的
  • 不會(huì)重復(fù)處理同一條消息
  • 操作前后消息隊(duì)列始終處于正確的狀態(tài)

只要滿足這三個(gè)條件,我們就可以部署多個(gè)實(shí)例且不需要使用分布式鎖等技術(shù)來(lái)進(jìn)行狀態(tài)同步。

是不是聽起來(lái)有點(diǎn)嚇人??? 其實(shí)簡(jiǎn)單的很,讓我們一起來(lái)詳細(xì)看看吧~

pending2ReadyScript

pending2ReadyScript 使用 zrangebyscore 掃描已到投遞時(shí)間的消息ID并把它們移動(dòng)到 ready 中:

-- keys: pendingKey, readyKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- 從 pending key 中找出已到投遞時(shí)間的消息
if (#msgs == 0) then return end
local args2 = {'LPush', KEYS[2]} -- 將他們放入 ready key 中
for _,v in ipairs(msgs) do
	table.insert(args2, v) 
end
redis.call(unpack(args2))
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- 從 pending key 中刪除已投遞的消息

ready2UnackScript

ready2UnackScript 從 ready 或者 retry 中取出一條消息發(fā)送給消費(fèi)者并放入 unack 中,類似于 RPopLPush:

-- keys: readyKey/retryKey, unackKey
-- argv: retryTime
local msg = redis.call('RPop', KEYS[1])
if (not msg) then return end
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
return msg

unack2RetryScript

unack2RetryScript 從 retry 中找出所有已到重試時(shí)間的消息并把它們移動(dòng)到 unack 中:

-- keys: unackKey, retryCountKey, retryKey, garbageKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])  -- 找到已到重試時(shí)間的消息
if (#msgs == 0) then return end
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs)) -- 查詢剩余重試次數(shù)
for i,v in ipairs(retryCounts) do
	local k = msgs[i]
	if tonumber(v) > 0 then -- 剩余次數(shù)大于 0
		redis.call("HIncrBy", KEYS[2], k, -1) -- 減少剩余重試次數(shù)
		redis.call("LPush", KEYS[3], k) -- 添加到 retry key 中
	else -- 剩余重試次數(shù)為 0
		redis.call("HDel", KEYS[2], k) -- 刪除重試次數(shù)記錄
		redis.call("SAdd", KEYS[4], k) -- 添加到垃圾桶,等待后續(xù)刪除
	end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])  -- 將已處理的消息從 unack key 中刪除

因?yàn)?redis 要求 lua 腳本必須在執(zhí)行前在 KEYS 參數(shù)中聲明自己要訪問的 key, 而我們將每個(gè) msg 有一個(gè)獨(dú)立的 key,我們?cè)趫?zhí)行 unack2RetryScript 之前是不知道哪些 msg key 需要被刪除。所以 lua 腳本只將需要?jiǎng)h除的消息記在 garbage key 中,腳本執(zhí)行完后再通過(guò) del 命令將他們刪除:

func (q *DelayQueue) garbageCollect() error {
	ctx := context.Background()
	msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result()
	if err != nil {
		return fmt.Errorf("smembers failed: %v", err)
	}
	if len(msgIds) == 0 {
		return nil
	}
	// allow concurrent clean
	msgKeys := make([]string, 0, len(msgIds))
	for _, idStr := range msgIds {
		msgKeys = append(msgKeys, q.genMsgKey(idStr))
	}
	err = q.redisCli.Del(ctx, msgKeys...).Err()
	if err != nil && err != redis.Nil {
		return fmt.Errorf("del msgs failed: %v", err)
	}
	err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err()
	if err != nil && err != redis.Nil {
		return fmt.Errorf("remove from garbage key failed: %v", err)
	}
	return nil
}

之前提到的 lua 腳本都是原子性執(zhí)行的,不會(huì)有其它命令插入其中。 gc 函數(shù)由 3 條 redis 命令組成,在執(zhí)行過(guò)程中可能會(huì)有其它命令插入執(zhí)行過(guò)程中,不過(guò)考慮到一條消息進(jìn)入垃圾回收流程之后不會(huì)復(fù)活所以不需要保證 3 條命令原子性。

ack

ack 只需要將消息徹底刪除即可:

func (q *DelayQueue) ack(idStr string) error {
	ctx := context.Background()
	err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err()
	if err != nil {
		return fmt.Errorf("remove from unack failed: %v", err)
	}
	// msg key has ttl, ignore result of delete
	_ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
	q.redisCli.HDel(ctx, q.retryCountKey, idStr)
	return nil
}

否定確認(rèn)只需要將 unack key 中消息的重試時(shí)間改為現(xiàn)在,隨后執(zhí)行的 unack2RetryScript 會(huì)立即將它移動(dòng)到 retry key

func (q *DelayQueue) nack(idStr string) error {
	ctx := context.Background()
	// update retry time as now, unack2Retry will move it to retry immediately
	err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{
		Member: idStr,
		Score:  float64(time.Now().Unix()),
	}).Err()
	if err != nil {
		return fmt.Errorf("negative ack failed: %v", err)
	}
	return nil
}

consume

消息隊(duì)列的核心邏輯是每秒執(zhí)行一次的 consume 函數(shù),它負(fù)責(zé)調(diào)用上述腳本將消息轉(zhuǎn)移到正確的集合中并回調(diào) consumer 來(lái)消費(fèi)消息:

func (q *DelayQueue) consume() error {
	// 執(zhí)行 pending2ready,將已到時(shí)間的消息轉(zhuǎn)移到 ready
	err := q.pending2Ready()
	if err != nil {
		return err
	}
	// 循環(huán)調(diào)用 ready2Unack 拉取消息進(jìn)行消費(fèi)
	var fetchCount uint
	for {
		idStr, err := q.ready2Unack()
		if err == redis.Nil { // consumed all
			break
		}
		if err != nil {
			return err
		}
		fetchCount++
		ack, err := q.callback(idStr)
		if err != nil {
			return err
		}
		if ack {
			err = q.ack(idStr)
		} else {
			err = q.nack(idStr)
		}
		if err != nil {
			return err
		}
		if fetchCount >= q.fetchLimit {
			break
		}
	}
	// 將 nack 或超時(shí)的消息放入重試隊(duì)列
	err = q.unack2Retry()
	if err != nil {
		return err
	}
    // 清理已達(dá)到最大重試次數(shù)的消息
	err = q.garbageCollect()
	if err != nil {
		return err
	}
	// 消費(fèi)重試隊(duì)列
	fetchCount = 0
	for {
		idStr, err := q.retry2Unack()
		if err == redis.Nil { // consumed all
			break
		}
		if err != nil {
			return err
		}
		fetchCount++
		ack, err := q.callback(idStr)
		if err != nil {
			return err
		}
		if ack {
			err = q.ack(idStr)
		} else {
			err = q.nack(idStr)
		}
		if err != nil {
			return err
		}
		if fetchCount >= q.fetchLimit {
			break
		}
	}
	return nil
}

至此一個(gè)簡(jiǎn)單可靠的延時(shí)隊(duì)列就做好了,何不趕緊開始試用呢?????

到此這篇關(guān)于百行代碼實(shí)現(xiàn)基于Redis的可靠延遲隊(duì)列的文章就介紹到這了,更多相關(guān)百行代碼實(shí)現(xiàn)基于Redis的可靠延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Redis主從集群切換數(shù)據(jù)丟失的解決方案

    Redis主從集群切換數(shù)據(jù)丟失的解決方案

    這篇文章主要介紹了Redis主從集群切換數(shù)據(jù)丟失的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧
    2021-04-04
  • Redis高并發(fā)問題的解決方法

    Redis高并發(fā)問題的解決方法

    這篇文章主要介紹了Redis高并發(fā)問題的解決辦法,具有很好的參考價(jià)值,感興趣的小伙伴們可以參考一下,具體如下:
    2018-05-05
  • Redis緩存空間優(yōu)化實(shí)踐詳解

    Redis緩存空間優(yōu)化實(shí)踐詳解

    緩存Redis,是我們最常用的服務(wù),其適用場(chǎng)景廣泛,被大量應(yīng)用到各業(yè)務(wù)場(chǎng)景中。也正因如此,緩存成為了重要的硬件成本來(lái)源,我們有必要從空間上做一些優(yōu)化,降低成本的同時(shí)也會(huì)提高性能,本文通過(guò)代碼示例介紹了redis如何優(yōu)化緩存空間,需要的朋友可以參考一下
    2023-04-04
  • Redis鎖完美解決高并發(fā)秒殺問題

    Redis鎖完美解決高并發(fā)秒殺問題

    本文主要介紹了Redis鎖完美解決高并發(fā)秒殺問題,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2021-09-09
  • Redisson實(shí)現(xiàn)Redis分布式鎖的幾種方式

    Redisson實(shí)現(xiàn)Redis分布式鎖的幾種方式

    本文在講解如何使用Redisson實(shí)現(xiàn)Redis普通分布式鎖,以及Redlock算法分布式鎖的幾種方式的同時(shí),也附帶解答這些同學(xué)的一些疑問,感興趣的可以了解一下
    2021-08-08
  • Redis異步隊(duì)列的實(shí)現(xiàn)及應(yīng)用場(chǎng)景

    Redis異步隊(duì)列的實(shí)現(xiàn)及應(yīng)用場(chǎng)景

    異步隊(duì)列是一種底層基于異步 I/O 模型的消息隊(duì)列,用于在分布式系統(tǒng)中進(jìn)行同步和異步的通訊和協(xié)作,本文主要介紹了Redis異步隊(duì)列的實(shí)現(xiàn)及應(yīng)用場(chǎng)景,感興趣的可以了解一下
    2023-12-12
  • redis由于目標(biāo)計(jì)算機(jī)積極拒絕,無(wú)法連接的解決

    redis由于目標(biāo)計(jì)算機(jī)積極拒絕,無(wú)法連接的解決

    這篇文章主要介紹了redis由于目標(biāo)計(jì)算機(jī)積極拒絕,無(wú)法連接的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
    2023-07-07
  • Redis GEO地理信息定位功能

    Redis GEO地理信息定位功能

    Redis 提供了GEO地理信息定位功能,地理空間項(xiàng)(經(jīng)度、緯度、名稱),實(shí)現(xiàn)查找附近的人、上班打卡、自行車租賃、搖一搖等相關(guān)與地理位置信息的功能,這篇文章主要介紹了Redis GEO地理信息定位功能,需要的朋友可以參考下
    2023-12-12
  • redis適合場(chǎng)景八點(diǎn)總結(jié)

    redis適合場(chǎng)景八點(diǎn)總結(jié)

    在本篇文章中我們給大家整理了關(guān)于redis適合什么場(chǎng)景的8點(diǎn)知識(shí)點(diǎn)內(nèi)容,需要的朋友們參考下。
    2019-06-06
  • windows 64位下redis安裝教程

    windows 64位下redis安裝教程

    這篇文章主要為大家詳細(xì)介紹了windows 64位下redis安裝教程,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-10-10

最新評(píng)論