redis實(shí)現(xiàn)分布式延時(shí)隊(duì)列的示例代碼
延時(shí)隊(duì)列簡(jiǎn)介
延時(shí)隊(duì)列是一種特殊的消息隊(duì)列,它允許將消息在一定的延遲時(shí)間后再進(jìn)行消費(fèi)。延時(shí)隊(duì)列的主要特點(diǎn)是可以延遲消息的處理時(shí)間,以滿足定時(shí)任務(wù)或者定時(shí)事件的需求。
總之,延時(shí)隊(duì)列通過延遲消息的消費(fèi)時(shí)間,提供了一種方便、可靠的方式來處理定時(shí)任務(wù)和定時(shí)事件。它在分布式系統(tǒng)中具有重要的作用,能夠提高系統(tǒng)的可靠性和性能。
延時(shí)隊(duì)列的實(shí)現(xiàn)方式可以有多種,本文介紹一種redis實(shí)現(xiàn)的分布式延時(shí)隊(duì)列。
應(yīng)用場(chǎng)景
定時(shí)任務(wù):可以將需要在特定時(shí)間執(zhí)行的任務(wù)封裝為延時(shí)消息,通過延時(shí)隊(duì)列來觸發(fā)任務(wù)的執(zhí)行。
訂單超時(shí)處理:可以將訂單消息發(fā)送到延時(shí)隊(duì)列中,并設(shè)置訂單的超時(shí)時(shí)間,超過時(shí)間后,消費(fèi)者從隊(duì)列中獲取到超時(shí)的訂單消息,進(jìn)行相應(yīng)的處理。
消息重試機(jī)制:當(dāng)某個(gè)消息處理失敗時(shí),可以將該消息發(fā)送到延時(shí)隊(duì)列中,并設(shè)置一定的重試時(shí)間,超過時(shí)間后再次嘗試處理。
案例
12306火車票購(gòu)買,搶了訂單后,45分鐘沒有支付,自動(dòng)取消訂單
考慮
數(shù)據(jù)持久化:redis是支持的,可以使用rdb,也可以使用aof
有序存儲(chǔ):因?yàn)橹灰钚〉臎]過期,后面的肯定就沒過期,這樣的話檢查最小的節(jié)點(diǎn)就行了,考慮使用redis中的zset結(jié)構(gòu)
高可用:考慮哨兵或者cluster
高伸縮:因?yàn)?2306用戶量非常大,可能導(dǎo)致redis中存儲(chǔ)的任務(wù)空間非常大,所以考慮擴(kuò)展節(jié)點(diǎn),從這個(gè)角度來說,使用cluster集群模式,哨兵只有一個(gè)節(jié)點(diǎn)即主節(jié)點(diǎn)寫數(shù)據(jù)。
實(shí)現(xiàn)
整體思路
- 生產(chǎn)消費(fèi)者模型:因?yàn)?2306的用戶量非常大,所以考慮生產(chǎn)者和消費(fèi)者有多個(gè)節(jié)點(diǎn);
- 采用cluster模式實(shí)現(xiàn)高可用以及高伸縮性;
- 采用zset存儲(chǔ)延時(shí)任務(wù)(zadd key score member,score表示時(shí)間);
- 為了讓數(shù)據(jù)均勻分布在cluster集群中的多個(gè)主節(jié)點(diǎn)中:構(gòu)建多個(gè)zset,每個(gè)zset對(duì)應(yīng)一個(gè)消費(fèi)者,生產(chǎn)者隨機(jī)向某個(gè)zset中生產(chǎn)數(shù)據(jù)。
具體實(shí)現(xiàn)
生產(chǎn)者
需要安裝hiredis-cluster集群,安裝編譯如下:
git clone https://github.com/Nordix/hiredis-cluster.git
cd hiredis-cluster
mkdir build
cd build
cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo -
DENABLE_SSL=ON ..
make
sudo make install
sudo ldconfig
需要安裝libevent庫(kù),最后編譯時(shí)執(zhí)行gcc producer.c -o producer -levent -lhiredis_cluster -lhiredis -lhiredis_ssl
編譯生產(chǎn)者可執(zhí)行程序
#include <hiredis_cluster/adapters/libevent.h> #include <hiredis_cluster/hircluster.h> #include <event.h> #include <event2/listener.h> #include <event2/bufferevent.h> #include <event2/buffer.h> #include <stdio.h> #include <stdlib.h> #include <stdint.h> #include <string.h> #include <sys/time.h> int64_t g_taskid = 0; #define MAX_KEY 10 static int64_t hi_msec_now() { int64_t msec; struct timeval now; int status; status = gettimeofday(&now, NULL); if (status < 0) { return -1; } msec = (int64_t)now.tv_sec * 1000LL + (int64_t)(now.tv_usec / 1000LL); return msec; } static int _vscnprintf(char *buf, size_t size, const char *fmt, va_list args) { int n; n = vsnprintf(buf, size, fmt, args); if (n <= 0) { return 0; } if (n <= (int)size) { return n; } return (int)(size-1); } static int _scnprintf(char *buf, size_t size, const char *fmt, ...) { va_list args; int n; va_start(args, fmt); n = _vscnprintf(buf, size, fmt, args); va_end(args); return n; } void connectCallback(const redisAsyncContext *ac, int status) { if (status != REDIS_OK) { printf("Error: %s\n", ac->errstr); return; } printf("Connected to %s:%d\n", ac->c.tcp.host, ac->c.tcp.port); } void disconnectCallback(const redisAsyncContext *ac, int status) { if (status != REDIS_OK) { printf("Error: %s\n", ac->errstr); return; } printf("Disconnected from %s:%d\n", ac->c.tcp.host, ac->c.tcp.port); } void addTaskCallback(redisClusterAsyncContext *cc, void *r, void *privdata) { redisReply *reply = (redisReply *)r; if (reply == NULL) { if (cc->errstr) { printf("errstr: %s\n", cc->errstr); } return; } int64_t now = hi_msec_now() / 10; printf("add task success reply: %lld now=%ld\n", reply->integer, now); } int addTask(redisClusterAsyncContext *cc, char *desc) { /* 轉(zhuǎn)化為厘米秒 */ int64_t now = hi_msec_now() / 10; g_taskid++; /* key */ char key[256] = {0}; // 為了讓數(shù)據(jù)均勻分布在cluster集群中的多個(gè)主節(jié)點(diǎn)中: ? // 構(gòu)建多個(gè)zset,每個(gè)zset對(duì)應(yīng)一個(gè)消費(fèi)者,生產(chǎn)者隨機(jī)向某個(gè)zset中生產(chǎn)數(shù)據(jù), // 生產(chǎn)者可以有很多個(gè),只需要保證向task_group:0-task_group:9中均勻的生產(chǎn)數(shù)據(jù)即可 int len = _scnprintf(key, 255, "task_group:%ld", g_taskid % MAX_KEY); key[len] = '\0'; /* member */ char mem[1024] = {0}; len = _scnprintf(mem, 1023, "task:%ld:%s", g_taskid, desc); mem[len] = '\0'; int status; // 為每一個(gè)任務(wù)延時(shí)5秒中去處理 status = redisClusterAsyncCommand(cc, addTaskCallback, "", "zadd %s %ld %s", key, now+500, mem); printf("redisClusterAsyncCommand:zadd %s %ld %s\n", key, now+500, mem); if (status != REDIS_OK) { printf("error: err=%d errstr=%s\n", cc->err, cc->errstr); } return 0; } void stdio_callback(struct bufferevent *bev, void *arg) { redisClusterAsyncContext *cc = (redisClusterAsyncContext *)arg; struct evbuffer *evbuf = bufferevent_get_input(bev); char *msg = evbuffer_readln(evbuf, NULL, EVBUFFER_EOL_LF); if (!msg) return; if (strcmp(msg, "quit") == 0) { printf("safe exit!!!\n"); exit(0); return; } if (strlen(msg) > 1024-5-13-1) { printf("[err]msg is too long, try again...\n"); return; } addTask(cc, msg); printf("stdio read the data: %s\n", msg); } int main(int argc, char **argv) { printf("Connecting...\n"); // 連接cluster集群,可以從cluster集群中任意一個(gè)節(jié)點(diǎn)出發(fā)連接集群 redisClusterAsyncContext *cc = redisClusterAsyncConnect("127.0.0.1:7006", HIRCLUSTER_FLAG_NULL); printf("redisClusterAsyncContext...\n"); if (cc && cc->err) { printf("Error: %s\n", cc->errstr); return 1; } struct event_base *base = event_base_new(); redisClusterLibeventAttach(cc, base); redisClusterAsyncSetConnectCallback(cc, connectCallback); redisClusterAsyncSetDisconnectCallback(cc, disconnectCallback); // nodeIterator ni; // initNodeIterator(&ni, cc->cc); // cluster_node *node; // while ((node = nodeNext(&ni)) != NULL) { // printf("node %s:%d role:%d pad:%d\n", node->host, node->port, node->role, node->pad); // } struct bufferevent *ioev = bufferevent_socket_new(base, 0, BEV_OPT_CLOSE_ON_FREE); bufferevent_setcb(ioev, stdio_callback, NULL, NULL, cc); bufferevent_enable(ioev, EV_READ | EV_PERSIST); printf("Dispatch..\n"); event_base_dispatch(base); printf("Done..\n"); redisClusterAsyncFree(cc); event_base_free(base); return 0; } // 需要安裝 hiredis-cluster libevent // gcc producer.c -o producer -levent -lhiredis_cluster -lhiredis -lhiredis_ssl
說明:
這里構(gòu)建了10個(gè)zset,分別是task_group:0,task_group:1,…,task_group:9作為10個(gè)zset的key,zset的數(shù)據(jù)其實(shí)就代表著消費(fèi)者的數(shù)量,通常消費(fèi)者的功能是一摸一樣的,生產(chǎn)者就不管你有多少個(gè)了,只需要將任務(wù)均勻的打散在不同的zset中就行了(具體實(shí)現(xiàn)可以搞一個(gè)全局的id,每一次添加任務(wù)時(shí)id++,然后再對(duì)zset個(gè)數(shù)10取模,最終可以得到0-9之間的一個(gè)數(shù),然后再與task_group拼接,這樣就可以將任務(wù)均勻的打散在不同的zset中)。
消費(fèi)者
消費(fèi)者是采用skynet+lua腳本實(shí)現(xiàn)的,每個(gè)消費(fèi)者會(huì)不斷的去檢查redis中的任務(wù)有沒有過期,如果過期,就取出來刪除(這里只是demo,只是打印之后刪除任務(wù))
local skynet = require "skynet" local function table_dump( object ) if type(object) == 'table' then local s = '{ ' for k,v in pairs(object) do if type(k) ~= 'number' then k = string.format("%q", k) end s = s .. '['..k..'] = ' .. table_dump(v) .. ',' end return s .. '} ' elseif type(object) == 'function' then return tostring(object) elseif type(object) == 'string' then return string.format("%q", object) else return tostring(object) end end local mode, key = ... if mode == "slave" then local rediscluster = require "skynet.db.redis.cluster" local function onmessage(data,channel,pchannel) print("onmessage",data,channel,pchannel) end skynet.start(function () local db = rediscluster.new({ {host="127.0.0.1",port=7001}, }, {read_slave=true,auth=nil,db=0,}, onmessage ) assert(db, "redis-cluster startup error") skynet.fork(function () while true do local res = db:zrange(key, 0, 0, "withscores") if not next(res) then skynet.sleep(50) else local expire = tonumber(res[2]) local now = skynet.time()*100 if now >= expire then print(("%s is comsumed:expire_time:%d"):format(res[1], expire)) db:zrem(key, res[1]) else skynet.sleep(10) end end end end) end) else skynet.start(function () -- // 啟動(dòng)10個(gè)程序,并把"slave"傳入mode,task_group:i傳入到key中,即每個(gè)程序只消費(fèi)一個(gè) for i=0,9 do skynet.newservice(SERVICE_NAME, "slave", "task_group:"..i)
運(yùn)行結(jié)果
redis分布式延時(shí)隊(duì)列優(yōu)勢(shì)
1.Redis zset支持高性能的 score 排序。
2.Redis是在內(nèi)存上進(jìn)行操作的,速度非???。
3.Redis可以搭建集群,當(dāng)消息很多時(shí)候,我們可以用集群來提高消息處理的速度,提高可用性。
4.Redis具有持久化機(jī)制,當(dāng)出現(xiàn)故障的時(shí)候,可以通過AOF和RDB方式來對(duì)數(shù)據(jù)進(jìn)行恢復(fù),保證了數(shù)據(jù)的可靠性
redis分布式延時(shí)隊(duì)列劣勢(shì)
使用 Redis 實(shí)現(xiàn)的延時(shí)消息隊(duì)列也存在數(shù)據(jù)持久化, 消息可靠性的問題:
- 沒有重試機(jī)制 - 處理消息出現(xiàn)異常沒有重試機(jī)制, 這些需要自己去實(shí)現(xiàn), 包括重試次數(shù)的實(shí)現(xiàn)等;
- 沒有 ACK 機(jī)制 - 例如在獲取消息并已經(jīng)刪除了消息情況下, 正在處理消息的時(shí)候客戶端崩潰了, 這條正在處理的這些消息就會(huì)丟失, MQ 是需要明確的返回一個(gè)值給 MQ 才會(huì)認(rèn)為這個(gè)消息是被正確的消費(fèi)了。
總結(jié):如果對(duì)消息可靠性要求較高, 推薦使用 MQ 來實(shí)現(xiàn)
以上就是redis實(shí)現(xiàn)分布式延時(shí)隊(duì)列的示例代碼的詳細(xì)內(nèi)容,更多關(guān)于redis分布式延時(shí)隊(duì)列的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
- redis實(shí)現(xiàn)延時(shí)隊(duì)列的兩種方式(小結(jié))
- 基于Redis實(shí)現(xiàn)延時(shí)隊(duì)列的優(yōu)化方案小結(jié)
- 生產(chǎn)redisson延時(shí)隊(duì)列不消費(fèi)問題排查解決
- Redisson 分布式延時(shí)隊(duì)列 RedissonDelayedQueue 運(yùn)行流程
- redis使用zset實(shí)現(xiàn)延時(shí)隊(duì)列的示例代碼
- Redis消息隊(duì)列、阻塞隊(duì)列、延時(shí)隊(duì)列的實(shí)現(xiàn)
- Redis簡(jiǎn)易延時(shí)隊(duì)列的實(shí)現(xiàn)示例
- Redisson延時(shí)隊(duì)列RedissonDelayed的具體使用
- redis和rabbitmq實(shí)現(xiàn)延時(shí)隊(duì)列的示例代碼
相關(guān)文章
redis集群實(shí)現(xiàn)清理前綴相同的key
這篇文章主要介紹了redis集群實(shí)現(xiàn)清理前綴相同的key,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10Redis高并發(fā)情況下并發(fā)扣減庫(kù)存項(xiàng)目實(shí)戰(zhàn)
本文主要介紹了Redis高并發(fā)情況下并發(fā)扣減庫(kù)存項(xiàng)目實(shí)戰(zhàn),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04hiredis從安裝到項(xiàng)目實(shí)戰(zhàn)操作
這篇文章主要介紹了hiredis從安裝到項(xiàng)目實(shí)戰(zhàn)操作,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-02-02Redis數(shù)據(jù)庫(kù)中實(shí)現(xiàn)分布式鎖的方法
這篇文章主要介紹了Redis數(shù)據(jù)庫(kù)中實(shí)現(xiàn)分布式鎖的方法,Redis是一個(gè)高性能的主存式數(shù)據(jù)庫(kù),需要的朋友可以參考下2015-06-06Windows系統(tǒng)一鍵啟動(dòng)Redis腳本
本文介紹了在Windows系統(tǒng)中創(chuàng)建一鍵啟動(dòng)Redis的腳本,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-12-12