欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

Redis如何實現(xiàn)延遲隊列

 更新時間:2023年04月28日 14:34:26   作者:ALPHA25  
這篇文章主要介紹了Redis如何實現(xiàn)延遲隊列問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

Redis實現(xiàn)延遲隊列

Redis延遲隊列

Redis 是通過有序集合(ZSet)的方式來實現(xiàn)延遲消息隊列的,ZSet 有一個 Score 屬性可以用來存儲延遲執(zhí)行的時間。

但需要無限循環(huán)檢查任務(wù),會消耗系統(tǒng)資源

class RedisDelayQueue(object):
    """Simple Queue with Redis Backend
    dq = RedisDelayQueue('delay:commtrans')
    dq.put( 5 ,{'info':'測試 5555','time': timestamp_to_datetime_str(t + 5)})
    print(dq.get())
    """
    def __init__(self, name, namespace='queue'):
        """The default connection parameters are: host='localhost', port=6379, db=0"""
        self.__db = get_redis_engine(database_name='spdb')
        self.key = '%s:%s' % (namespace, name)
    def qsize(self):
        """Return the approximate size of the queue."""
        return self.__db.zcard(self.key)
    def empty(self):
        """Return True if the queue is empty, False otherwise."""
        return self.qsize() == 0
    def rem(self, value):
        return self.__db.zrem(self.key, value)
    def get(self):
        # 獲取任務(wù),以0和當前時間為區(qū)間,返回一條在當前區(qū)間的記錄
        items = self.__db.zrangebyscore(self.key, 0, int(time.time()), 0, 1)
        if items:
            item = items[0]
            if self.rem(item):  # 解決并發(fā)問題  如能刪就讓誰取走
                return json.loads(item)
        return None
    def put(self, interval, item):
        """:param interval 延時秒數(shù)"""
        # 以時間作為score,對任務(wù)隊列按時間戳從小到大排序
        """Put item into the queue."""
        d = json.dumps(item)
        return self.__db.zadd(self.key, {d: int(time.time()) + int(interval)})

Redis實現(xiàn)延時隊列的優(yōu)化方案

延時隊列的應(yīng)用

近期在開發(fā)部門的新項目,其中有個關(guān)鍵功能就是智能推送,即根據(jù)用戶行為在特定的時間點向用戶推送相應(yīng)的提醒消息,比如以下業(yè)務(wù)場景:

  • 在用戶點擊充值項后,半小時內(nèi)未充值,向用戶推送充值未完成提醒。
  • 在用戶最近一次閱讀行為2小時后,向用戶推送繼續(xù)閱讀提醒。
  • 在用戶新注冊或退出應(yīng)用N分鐘后,向用戶推送合適的推薦消息。

上述場景的共同特征就是在某事件觸發(fā)后延遲一定時間后再執(zhí)行特定任務(wù),若事件觸發(fā)時間點可知,則上述邏輯也可等價于在指定時間點(事件觸發(fā)時間點+延遲時間長度)執(zhí)行特定任務(wù)。

實現(xiàn)這類需求一般采用延時隊列,其中創(chuàng)建的延時消息中需要包含任務(wù)延遲時間或任務(wù)執(zhí)行時間點等信息,當任務(wù)滿足時間條件需要執(zhí)行時,該消息便會被消費,也就是說可以指定隊列中的消息在哪個時間點被消費。

延時隊列的實現(xiàn)

在單機環(huán)境中,JDK已經(jīng)自帶了很多能夠?qū)崿F(xiàn)延時隊列功能的組件,比如DelayQueue, Timer, ScheduledExecutorService等組件,都可以較為簡便地創(chuàng)建延時任務(wù),但上述組件使用一般需要把任務(wù)存儲在內(nèi)存中,服務(wù)重啟存在任務(wù)丟失風險,且任務(wù)規(guī)模體量受內(nèi)存限制,同時也造成長時間內(nèi)存占用,并不靈活,通常適用于單進程客服端程序中或?qū)θ蝿?wù)要求不高的項目中。

在分布式環(huán)境下,僅使用JDK自帶組件并不能可靠高效地實現(xiàn)延時隊列,通常需要引入第三方中間件或框架。

比如常見的經(jīng)典任務(wù)調(diào)度框架Quartz或基于此框架的xxl-job等其它框架,這些框架的主要功能是實現(xiàn)定時任務(wù)或周期性任務(wù),在Redis、RabbitMQ還未廣泛應(yīng)用時,譬如常見的超時未支付取消訂單等功能都是由定時任務(wù)實現(xiàn)的,通過定時輪詢來判斷是否已到達觸發(fā)執(zhí)行的時間點。

但由于定時任務(wù)需要一定的周期性,周期掃描的間隔時間不好控制,太短會造成很多無意義的掃描,且增大系統(tǒng)壓力,太長又會造成執(zhí)行時間誤差太大,且可能造成單次掃描所處理的堆積記錄數(shù)量過大。

此外,利用MQ做延時隊列也是一種常見的方式,比如通過RabbitMQTTL和死信隊列實現(xiàn)消息的延遲投遞,考慮到投遞出去的MQ消息無法方便地實現(xiàn)刪除或修改,即無法實現(xiàn)任務(wù)的取消或任務(wù)執(zhí)行時間點的更改,同時也不能方便地對消息進行去重,因此在項目中并未選擇使用MQ實現(xiàn)延時隊列。

Redis的數(shù)據(jù)結(jié)構(gòu)zset,同樣可以實現(xiàn)延遲隊列的效果,且更加靈活,可以實現(xiàn)MQ無法做到的一些特性,因此項目最終采用Redis實現(xiàn)延時隊列,并對其進行優(yōu)化與封裝。

實現(xiàn)原理是利用zsetscore屬性,redis會將zset集合中的元素按照score進行從小到大排序,通過zadd命令向zset中添加元素,如下述命令所示,其中value值為延時任務(wù)消息,可根據(jù)業(yè)務(wù)定義消息格式,score值為任務(wù)執(zhí)行的時間點,比如13位毫秒時間戳。

zadd delayqueue 1614608094000 taskinfo

任務(wù)添加后,獲取任務(wù)的邏輯只需從zset中篩選score值小于當前時間戳的元素,所得結(jié)果便是當前時間節(jié)點下需要執(zhí)行的任務(wù),通過zrangebyscore命令來獲取,如下述命令所示,其中timestamp為當前時間戳,可用limit限制每次拉取的記錄數(shù),防止單次獲取記錄數(shù)過大。

zrangebyscore delayqueue 0 timestamp limit 0 1000

在實際實現(xiàn)過程中,從zset中獲取到當前需要執(zhí)行的任務(wù)后,需要先確保將任務(wù)對應(yīng)的元素從zset中刪除,刪除成功后才允許執(zhí)行任務(wù)邏輯,這樣是為了在分布式環(huán)境下,當存在多個線程獲取到同一任務(wù)后,利用redis刪除操作的原子性,確保只有一個線程能夠刪除成功并執(zhí)行任務(wù),防止重復(fù)執(zhí)行。

實際任務(wù)的執(zhí)行通常會再將其發(fā)送至MQ異步處理,將“獲取任務(wù)”與“執(zhí)行任務(wù)”兩者分離解耦,更加靈活,“獲取任務(wù)”只負責拿到當前時間需要執(zhí)行的任務(wù),并不真正運行任務(wù)業(yè)務(wù)邏輯,因此只需相對少量的執(zhí)行線程即可,而實際的任務(wù)執(zhí)行邏輯則由MQ消費者承擔,方便調(diào)控負載能力。

整體過程如下圖所示。

zset延時隊列實現(xiàn)原理示意圖

采用zset做延時隊列的另一個好處是可以實現(xiàn)任務(wù)的取消和任務(wù)執(zhí)行時間點的更改,只需要將任務(wù)信息從zset中刪除,便可取消任務(wù),同時由于zset擁有集合去重的特性,只需再次寫入同一個任務(wù)信息,但是value值設(shè)置為不同的執(zhí)行時間點,便可更改任務(wù)執(zhí)行時間,實現(xiàn)單個任務(wù)執(zhí)行時間的動態(tài)調(diào)整。

了解實現(xiàn)原理后,再進行具體編程實現(xiàn)。創(chuàng)建延時任務(wù)較為簡便,準備好任務(wù)消息和執(zhí)行時間點,寫入zset即可。獲取延時任務(wù)最簡單的方案是通過定時任務(wù),周期性地執(zhí)行上述邏輯,如下代碼所示。

@XxlScheduled(cron = "0/5 * * * * ?", name = "scan business1 delayqueue")
public void scanBusiness1() {
	// 某業(yè)務(wù)邏輯的zset延遲隊列對應(yīng)的key
	String zsetKey = "delayqueue:business1";
	while (true) {
		// 篩選score值小于當前時間戳的元素,一次最多拉取1000條
		Set<String> tasks = stringRedisTemplate.opsForZSet().rangeByScore(zsetKey, 0, System.currentTimeMillis(), 0, 1000);
		if (CollectionUtils.isEmpty(tasks)) {
			// 當前時間下已沒有需要執(zhí)行的任務(wù),結(jié)束本次掃描
			return;
		}
		for (String task : tasks) {
			// 先刪除,再執(zhí)行,確保多線程環(huán)境下執(zhí)行的唯一性
			Boolean delete = stringRedisTemplate.delete(task);
			if (delete) {
				// 刪除成功后,將其再發(fā)送到指定MQ異步處理,將“獲取任務(wù)”與“執(zhí)行任務(wù)”分離解耦
				rabbitTemplate.convertAndSend("exchange_business1", "routekey_business1", task);
			}
		}
	}
}

上述方案使用xxl-job做分布式定時任務(wù),間隔5秒執(zhí)行一次,代碼借助spring提供的api來完成redisMQ的操作。

由于是分布式定時任務(wù),每次執(zhí)行只有一個線程在獲取任務(wù),機器利用率低,當數(shù)據(jù)規(guī)模較大時,單靠一個線程無法滿足吞吐量要求,因此這種方案只適用于小規(guī)模數(shù)據(jù)量級別。

此處間隔時間也可適當調(diào)整,例如縮短為1秒,調(diào)整所需考慮原則在上文已提到:間隔太短會造成很多無意義的掃描,且增大系統(tǒng)壓力,太長又會造成執(zhí)行時間誤差太大。

為了提升整體吞吐量,考慮不使用分布式定時任務(wù),對集群內(nèi)每臺機器(或?qū)嵗┚O(shè)置獨立的定時任務(wù),同時采用多個zset隊列,以數(shù)字后綴區(qū)分。

假設(shè)有Mzset隊列,創(chuàng)建延時消息時選取消息的某個ID字段,計算hash值再對M取余,根據(jù)余數(shù)決定發(fā)送到對應(yīng)數(shù)字后綴的zset隊列中(分散消息,此處ID字段選取需要考慮做到均勻分布,不要造成數(shù)據(jù)傾斜)。

隊列數(shù)量M的選取需要考慮機器數(shù)量N,理想情況下有多少臺機器就定義多少個隊列,保持MN基本相等即可。

因為隊列太少,會造成機器對隊列的競爭訪問處理,隊列太多又會導致任務(wù)得不到及時的處理。

最佳實踐是隊列數(shù)量可動態(tài)配置,如采用分布式配置中心,這樣當集群機器數(shù)量變化時,可以相應(yīng)調(diào)整隊列數(shù)量。

每臺機器在觸發(fā)定時任務(wù)時,需要通過適當?shù)呢撦d均衡來決定從哪個隊列拉取消息,負載均衡的好壞也會影響整個集群的效率,如果負載分布不均可能會導致多臺機器競爭處理同一隊列,降低效率。

一個簡單實用的做法是利用redis的自增操作再對隊列數(shù)量取余即可,只要保持隊列數(shù)量和機器數(shù)量基本相等,這種做法在很大程度上就可以保證不會有多臺機器競爭同一隊列。

至于每臺機器從對應(yīng)zset中的任務(wù)獲取邏輯,仍然和前面代碼一致。以上方式簡化實現(xiàn)代碼如下所示。

@Scheduled(cron = "0/5 * * * * ?")
public void scanBusiness1() {
	// 隊列數(shù)量M,考慮動態(tài)配置,保持和機器數(shù)量基本一致
	int M = 10;
	// redis自增key,用于負載均衡
	String incrKey = "incrkey:delayqueue:business1";
	// 每臺機器執(zhí)行時,從不同的zset中拉取消息,盡量確保不同機器訪問不同zset
	String zsetKey = "delayqueue:business1:" + (stringRedisTemplate.opsForValue().increment(incrKey) % M);
	while (true) {
		// 此處邏輯和前面代碼一致,省略。。。
	}
}

上述方案和第一種方案的主要的不同點在于zsetKey的獲取上,這里是根據(jù)負載均衡算法算出來的,確保每臺機器訪問不同zset并拉取消息,同時定時任務(wù)采用spring提供的進程內(nèi)注解@Scheduled,集群內(nèi)每臺機器都會間隔5秒執(zhí)行,因此相比之前的方案,能夠較為明顯地提升整個集群的吞吐量。

但是這種方案的步驟相對更為復(fù)雜,需要動態(tài)配置隊列數(shù)量,同時在創(chuàng)建延時任務(wù)時需要選擇合適的消息ID字段來決定發(fā)送的目標zset隊列,此處還要考慮均勻分布,整體實現(xiàn)要考慮的因素較多。

上面一種方案已經(jīng)能夠較好地滿足整體吞吐量要求,但其缺點是步驟相對復(fù)雜,因此項目中沒有采用這種方案,而是采用下面一種也能滿足吞吐量要求,步驟相對簡單,又方便通用化的方案。

該方案不使用定時任務(wù),而是單獨啟動后臺線程,在線程中執(zhí)行永久循環(huán),每次循環(huán)邏輯為:從目標zset中獲取score值小于當前時間戳的元素集合中的score最小的那個元素,相當于獲取當前時間點需要執(zhí)行且執(zhí)行時間點最早的那個任務(wù),如果獲取不到,表示當前時間點下暫無需要執(zhí)行的任務(wù),則線程休眠100ms(可視情況調(diào)整),否則,對獲取到的元素進行處理,在分布式多線程環(huán)境下,仍然需要先刪除成功才能進行處理。

此外,考慮到每個線程獲取元素后都需要再次訪問redis嘗試刪除操作,為了避免多線程爭搶浪費資源,降低效率,這里采用lua腳本將獲取和刪除操作原子化。lua腳本邏輯代碼如下所示。

local zsetKey = 'delayqueue'
local timestamp = 1614608094000
local items = redis.call('zrangebyscore',zsetKey,0,timestamp,'limit',0,1)
if #items == 0 then
    return ''
else
    redis.call('zremrangebyrank',zsetKey,0,0)
    return items[1]
end

其中timestamp為當前時間戳,通過在zrangebyscore命令中指定limit為1來獲取score最小的元素,若獲取不到,即結(jié)果集長度為0,則返回空字符串,否則,通過zremrangebyrank命令刪除頭部元素,即score最小的元素,也就是之前獲取到的那個元素,由于redis內(nèi)部保證lua腳本的原子性,上述獲取并刪除的操作能夠運行無誤。具體JAVA實現(xiàn)中還對其進行了多線程操作的封裝和通用化的抽象,使不同業(yè)務(wù)都能夠使用該組件實現(xiàn)延時隊列。具體實現(xiàn)代碼如下所示。

/**
 * 基于ZSET實現(xiàn)消息延遲處理,score存儲執(zhí)行時間點,到達時間點即會向指定隊列發(fā)送該消息;
 * 定義一個繼承本類的bean即可;
 */
public abstract class AbstractDelayedMsgScanTrigger implements Runnable, DisposableBean {
	private static final RedisScript<String> TRY_GET_AND_DEL_SCRIPT;
	static {
		// 獲取并刪除的lua腳本,使用spring提供的api
		String sb = "local items = redis.call('zrangebyscore',KEYS[1],0,ARGV[1],'limit',0,1)\n" +
				"if #items == 0 then\n" +
				"\treturn ''\n" +
				"else\n" +
				"\tredis.call('zremrangebyrank',KEYS[1],0,0)\n" +
				"\treturn items[1]\n" +
				"end";
		// 自有工具類,只要能創(chuàng)建出spring包下的 RedisScript 的實現(xiàn)類對象均可
		TRY_GET_AND_DEL_SCRIPT = RedisScriptHelper.createScript(sb, String.class);
	}
	private final ThreadPoolExecutor EXECUTOR = new ThreadPoolExecutor(getThreadNum(), getThreadNum(),
			0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new NamedThreadFactory(getThreadNamePrefix()));
	private volatile boolean quit = false;
	@Autowired
	private StringRedisTemplate stringRedisTemplate;
	@Autowired
	private RabbitTemplate rabbitTemplate;
	@PostConstruct
	public void startScan() {
		// bean構(gòu)建完成后,啟動若干執(zhí)行線程
		int threadNum = getThreadNum();
		for (int i = 0; i < threadNum; i++) {
			EXECUTOR.execute(this);
		}
	}
	@Override
	public void run() {
		while (!quit) {
			try {
				// 循環(huán),采用lua獲取當前需要執(zhí)行的任務(wù)并將其從redis中刪除
				String msg = stringRedisTemplate.execute(TRY_GET_AND_DEL_SCRIPT,
						Lists.newArrayList(getDelayedMsgSourceKey()), String.valueOf(System.currentTimeMillis()));
				if (StringUtils.isNotBlank(msg)) {
					// 消息不為空,表示獲取任務(wù)成功,將其再發(fā)送到指定MQ異步處理,將“獲取任務(wù)”與“執(zhí)行任務(wù)”分離解耦
					rabbitTemplate.convertAndSend(getSendExchange(), getSendRoutingKey(), msg);
				} else {
					// 獲取不到任務(wù),表示當前時間點下暫無需要執(zhí)行的任務(wù),則線程休眠1S(可視情況調(diào)整)
					SleepUtils.sleepSeconds(1);
				}
			} catch (Exception e) {
				Logs.MSG.error("delayed msg scan error, sourceKey:{}", getDelayedMsgSourceKey(), e);
			}
		}
	}
	@Override
	public void destroy() throws Exception {
		quit = true;
	}
	public void setQuit(boolean quit) {
		this.quit = quit;
	}
	/**
	 * 獲取消息的工作線程數(shù)量
	 */
	protected abstract int getThreadNum();
	/**
	 * 線程名稱前綴,方便問題定位
	 */
	protected abstract String getThreadNamePrefix();
	/**
	 * 存放延遲消息的ZSET隊列名
	 */
	protected abstract String getDelayedMsgSourceKey();
	/**
	 * 消息到達執(zhí)行時間點時將其通過指定 exchange 發(fā)送到實時消費隊列中
	 */
	protected abstract String getSendExchange();
	/**
	 * 消息到達執(zhí)行時間點時將其通過指定 routingKey 發(fā)送到實時消費隊列中
	 */
	protected abstract String getSendRoutingKey();
}

在具體業(yè)務(wù)應(yīng)用中,只需定義一個繼承上述類的bean即可,需要實現(xiàn)的方法主要是提供一些配置,比如該業(yè)務(wù)對應(yīng)的zset延時隊列名稱,同時工作拉取消息的線程數(shù)量,由于采用rabbitMq,因此這里需要提供exchangeroutingKey。

實際使用中只需向該zset隊列中添加消息,并將score設(shè)為該任務(wù)需要執(zhí)行的時間點(此處為13位毫秒時間戳),則到該時間點后,上述組件便會將該消息從zset中取出并刪除,再將其通過指定的路由發(fā)送到實時MQ消費隊列中,由消費者負責執(zhí)行任務(wù)業(yè)務(wù)邏輯。目前該組件在項目中正常平穩(wěn)運行。

注意:

本文結(jié)合項目中的實際需求介紹了延時隊列的應(yīng)用場景,分析了延時隊列的多種實現(xiàn),重點講述了利用redis實現(xiàn)延時隊列的原理,對其實現(xiàn)方案進行比較與優(yōu)化,并將最終方案實際運用于項目需求中。

總結(jié)

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。

相關(guān)文章

  • 聊聊redis-dump工具安裝問題

    聊聊redis-dump工具安裝問題

    這篇文章主要介紹了redis-dump工具安裝問題,由于安裝redis-dump工具需要使用rvm?和gem工具所以要提前安裝,詳細的安裝過程本文給大家提到過,需要的朋友可以參考下
    2022-01-01
  • 配置redis的序列化,注入RedisTemplate方式

    配置redis的序列化,注入RedisTemplate方式

    這篇文章主要介紹了配置redis的序列化,注入RedisTemplate方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2023-12-12
  • redis淘汰策略的幾種實現(xiàn)

    redis淘汰策略的幾種實現(xiàn)

    redis內(nèi)存數(shù)據(jù)數(shù)據(jù)集大小升到一定大的時候,就會實行數(shù)據(jù)淘汰策略,本文主要介紹了redis淘汰策略的幾種實現(xiàn),具有一定的參考價值,感興趣的可以了解一下
    2024-05-05
  • redis專屬鏈表ziplist的使用

    redis專屬鏈表ziplist的使用

    本文主要介紹了redis專屬鏈表ziplist的使用,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-12-12
  • Redis基于Bitmap實現(xiàn)用戶簽到功能

    Redis基于Bitmap實現(xiàn)用戶簽到功能

    很多應(yīng)用上都有用戶簽到的功能,尤其是配合積分系統(tǒng)一起使用。本文主要介紹了Redis基于Bitmap實現(xiàn)用戶簽到功能,具有一定的參考價值,感興趣的小伙伴們可以參考一下
    2021-06-06
  • Redis事務(wù)為什么不支持回滾

    Redis事務(wù)為什么不支持回滾

    事務(wù)是關(guān)系型數(shù)據(jù)庫的特征之一,那么作為 Nosql 的代表 Redis 中有事務(wù)嗎?如果有,那么 Redis 當中的事務(wù)又是否具備關(guān)系型數(shù)據(jù)庫的 ACID 四大特性,本文就來詳細介紹一下
    2021-08-08
  • Redis 哨兵高模式搭建及Java代碼配置

    Redis 哨兵高模式搭建及Java代碼配置

    這篇文章主要介紹了Redis 哨兵高模式搭建及Java代碼配置,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2020-12-12
  • redis通過位圖法記錄在線用戶的狀態(tài)詳解

    redis通過位圖法記錄在線用戶的狀態(tài)詳解

    這篇文章主要給大家介紹了關(guān)于redis如何通過位圖法記錄在線用戶的狀態(tài)的相關(guān)資料,文中先對位圖進行了一個簡單的介紹,而后通過示例代碼將實現(xiàn)的方法介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧
    2018-11-11
  • redis主從連接不成功錯誤問題及解決

    redis主從連接不成功錯誤問題及解決

    這篇文章主要介紹了redis主從連接不成功錯誤問題及解決方案,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教<BR>
    2024-01-01
  • Redis中的配置文件,數(shù)據(jù)持久化,事務(wù)

    Redis中的配置文件,數(shù)據(jù)持久化,事務(wù)

    這篇文章主要介紹了Redis中的配置文件,數(shù)據(jù)持久化,事務(wù)問題,具有很好的參考價值,希望對大家有所幫助。
    2022-12-12

最新評論