.Net實(shí)現(xiàn)延遲隊(duì)列
介紹
具有隊(duì)列的特性,再給它附加一個(gè)延遲消費(fèi)隊(duì)列消息的功能,也就是說(shuō)可以指定隊(duì)列中的消息在哪個(gè)時(shí)間點(diǎn)被消費(fèi)。
使用場(chǎng)景
延時(shí)隊(duì)列在項(xiàng)目中的應(yīng)用還是比較多的,尤其像電商類平臺(tái):
- 訂單成功后,在30分鐘內(nèi)沒(méi)有支付,自動(dòng)取消訂單
- 外賣平臺(tái)發(fā)送訂餐通知,下單成功后60s給用戶推送短信。
- 如果訂單一直處于某一個(gè)未完結(jié)狀態(tài)時(shí),及時(shí)處理關(guān)單,并退還庫(kù)存
- 淘寶新建商戶一個(gè)月內(nèi)還沒(méi)上傳商品信息,將凍結(jié)商鋪等
該介紹來(lái)自其他文章
方案
下面的例子沒(méi)有進(jìn)行封裝,所以代碼僅供參考
Redis過(guò)期事件
注意:
不保證在設(shè)定的過(guò)期時(shí)間立即刪除并發(fā)送通知,數(shù)據(jù)量大的時(shí)候會(huì)延遲比較大
不保證一定送達(dá)
發(fā)送即忘策略,不包含持久化
但是比如有些場(chǎng)景,對(duì)這個(gè)時(shí)間不是那么看重,并且有其他措施雙層保障,該實(shí)現(xiàn)方案是比較簡(jiǎn)單。
redis自2.8.0之后版本提供Keyspace Notifications功能,允許客戶訂閱Pub / Sub頻道,以便以某種方式接收影響Redis數(shù)據(jù)集事件。
配置
需要修改配置啟用過(guò)期事件,比如在windows客戶端中,需要修改redis.windows.conf文件,在linux中需要修改redis.conf,修改內(nèi)容是:
-- 取消注釋 notify-keyspace-events Ex -- 注釋 #notify-keyspace-events ""
然后重新啟動(dòng)服務(wù)器,比如windows
.\redis-server.exe .\redis.windows.conf
或者linux中使用docker-compose重新部署redis
redis: container_name: redis image: redis hostname: redis restart: always ports: - "6379:6379" volumes: - $PWD/redis/redis.conf:/etc/redis.conf - /root/common-docker-compose/redis/data:/data command: /bin/bash -c "redis-server /etc/redis.conf" #啟動(dòng)執(zhí)行指定的redis.conf文件
然后使用客戶端訂閱事件
-- windows .\redis-cli -- linux docker exec -it 容器標(biāo)識(shí) redis-cli psubscribe __keyevent@0__:expired
控制臺(tái)訂閱
使用StackExchange.Redis組件訂閱過(guò)期事件
var connectionMultiplexer = ConnectionMultiplexer.Connect(_redisConnection); var db = connectionMultiplexer.GetDatabase(0); db.StringSet("orderno:123456", "訂單創(chuàng)建", TimeSpan.FromSeconds(10)); Console.WriteLine("開(kāi)始訂閱"); var subscriber = connectionMultiplexer.GetSubscriber(); //訂閱庫(kù)0的過(guò)期通知事件 subscriber.Subscribe("__keyevent@0__:expired", (channel, key) => { Console.WriteLine($"key過(guò)期 channel:{channel} key:{key}"); }); Console.ReadLine();
輸出結(jié)果:
key過(guò)期 channel:keyevent@0:expired key:orderno:123456
如果啟動(dòng)多個(gè)客戶端監(jiān)聽(tīng),那么多個(gè)客戶端都可以收到過(guò)期事件。
WebApi中訂閱
創(chuàng)建RedisListenService繼承自:BackgroundService
public class RedisListenService : BackgroundService { private readonly ISubscriber _subscriber; public RedisListenService(IServiceScopeFactory serviceScopeFactory) { using var scope = serviceScopeFactory.CreateScope(); var configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>(); var connectionMultiplexer = ConnectionMultiplexer.Connect(configuration["redis"]); var db = connectionMultiplexer.GetDatabase(0); _subscriber = connectionMultiplexer.GetSubscriber(); } protected override Task ExecuteAsync(CancellationToken stoppingToken) { //訂閱庫(kù)0的過(guò)期通知事件 _subscriber.Subscribe("__keyevent@0__:expired", (channel, key) => { Console.WriteLine($"key過(guò)期 channel:{channel} key:{key}"); }); return Task.CompletedTask; } }
注冊(cè)該后臺(tái)服務(wù)
services.AddHostedService<RedisListenService>();
啟用項(xiàng)目,給redis指定庫(kù)設(shè)置值,等過(guò)期后會(huì)接收到過(guò)期通知事件。
RabbitMq延遲隊(duì)列
版本信息 Rabbitmq版本:3.10.5 Erlang版本:24.3.4.2
要使用rabbitmq做延遲是需要安裝插件(rabbitmq_delayed_message_exchange)的
插件介紹:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
下載地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
將下載好的插件(d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez)映射到容器的plugins目錄下:
docker run -d --name myrabbit -p 9005:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_VHOST=customer -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez rabbitmq:3-management-alpine
進(jìn)入容器
docker exec -it 容器名稱/標(biāo)識(shí) bash
啟用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
查看是否啟用
rabbitmq-plugins list
[E]和[e]表示啟用,然后重啟服務(wù)
rabbitmq-server restart
然后在管理界面添加交換機(jī)可以看到
生產(chǎn)消息
發(fā)送的消息類型是:x-delayed-message
[HttpGet("send/delay")] public string SendDelayedMessage() { var factory = new ConnectionFactory() { HostName = "localhost",//IP地址 Port = 5672,//端口號(hào) UserName = "admin",//用戶賬號(hào) Password = "123456",//用戶密碼 VirtualHost = "customer" }; using var connection = factory.CreateConnection(); using var channel = connection.CreateModel(); var exchangeName = "delay-exchange"; var routingkey = "delay.delay"; var queueName = "delay_queueName"; //設(shè)置Exchange隊(duì)列類型 var argMaps = new Dictionary<string, object>() { {"x-delayed-type", "topic"} }; //設(shè)置當(dāng)前消息為延時(shí)隊(duì)列 channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps); channel.QueueDeclare(queueName, true, false, false, argMaps); channel.QueueBind(queueName, exchangeName, routingkey); var time = 1000 * 5; var message = $"發(fā)送時(shí)間為 {DateTime.Now:yyyy-MM-dd HH:mm:ss} 延時(shí)時(shí)間為:{time}"; var body = Encoding.UTF8.GetBytes(message); var props = channel.CreateBasicProperties(); //設(shè)置消息的過(guò)期時(shí)間 props.Headers = new Dictionary<string, object>() { { "x-delay", time } }; channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body); Console.WriteLine("成功發(fā)送消息:" + message); return "success"; }
消費(fèi)消息
消費(fèi)消息我是弄了一個(gè)后臺(tái)任務(wù)(RabbitmqDelayedHostService)在處理
public class RabbitmqDelayedHostService : BackgroundService { private readonly IModel _channel; private readonly IConnection _connection; public RabbitmqDelayedHostService() { var connFactory = new ConnectionFactory//創(chuàng)建連接工廠對(duì)象 { HostName = "localhost",//IP地址 Port = 5672,//端口號(hào) UserName = "admin",//用戶賬號(hào) Password = "123456",//用戶密碼 VirtualHost = "customer" }; _connection = connFactory.CreateConnection(); _channel = _connection.CreateModel(); //交換機(jī)名稱 var exchangeName = "exchangeDelayed"; var queueName = "delay_queueName"; var routingkey = "delay.delay"; var argMaps = new Dictionary<string, object>() { {"x-delayed-type", "topic"} }; _channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps); _channel.QueueDeclare(queueName, true, false, false, argMaps); _channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey); //聲明為手動(dòng)確認(rèn) _channel.BasicQos(0, 1, false); } protected override Task ExecuteAsync(CancellationToken stoppingToken) { var queueName = "delay_queueName"; var consumer = new EventingBasicConsumer(_channel); consumer.Received += (model, ea) => { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); var routingKey = ea.RoutingKey; Console.WriteLine($"接受到消息的時(shí)間為 {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} "); //手動(dòng)確認(rèn) _channel.BasicAck(ea.DeliveryTag, true); }; _channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); return Task.CompletedTask; } public override void Dispose() { _connection.Dispose(); _channel.Dispose(); base.Dispose(); } }
注冊(cè)該后臺(tái)任務(wù)
services.AddHostedService<RabbitmqDelayedHostService>();
輸出結(jié)果
成功發(fā)送消息:發(fā)送時(shí)間為 2022-07-02 18:54:22 延時(shí)時(shí)間為:5000
成功發(fā)送消息:發(fā)送時(shí)間為 2022-07-02 18:54:22 延時(shí)時(shí)間為:5000
成功發(fā)送消息:發(fā)送時(shí)間為 2022-07-02 18:54:22 延時(shí)時(shí)間為:5000
成功發(fā)送消息:發(fā)送時(shí)間為 2022-07-02 18:54:23 延時(shí)時(shí)間為:5000
成功發(fā)送消息:發(fā)送時(shí)間為 2022-07-02 18:54:23 延時(shí)時(shí)間為:5000
成功發(fā)送消息:發(fā)送時(shí)間為 2022-07-02 18:54:23 延時(shí)時(shí)間為:5000
接受到消息的時(shí)間為 2022-07-02 18:54:27,routingKey:delay.delay message:發(fā)送時(shí)間為 2022-07-02 18:54:22 延時(shí)時(shí)間為:5000
接受到消息的時(shí)間為 2022-07-02 18:54:27,routingKey:delay.delay message:發(fā)送時(shí)間為 2022-07-02 18:54:22 延時(shí)時(shí)間為:5000
接受到消息的時(shí)間為 2022-07-02 18:54:27,routingKey:delay.delay message:發(fā)送時(shí)間為 2022-07-02 18:54:22 延時(shí)時(shí)間為:5000
接受到消息的時(shí)間為 2022-07-02 18:54:28,routingKey:delay.delay message:發(fā)送時(shí)間為 2022-07-02 18:54:23 延時(shí)時(shí)間為:5000
接受到消息的時(shí)間為 2022-07-02 18:54:28,routingKey:delay.delay message:發(fā)送時(shí)間為 2022-07-02 18:54:23 延時(shí)時(shí)間為:5000
接受到消息的時(shí)間為 2022-07-02 18:54:28,routingKey:delay.delay message:發(fā)送時(shí)間為 2022-07-02 18:54:23 延時(shí)時(shí)間為:5000
其他方案
- Hangfire延遲隊(duì)列
BackgroundJob.Schedule( () => Console.WriteLine("Delayed!"), TimeSpan.FromDays(7));
- 時(shí)間輪
- Redisson DelayQueue
- 計(jì)時(shí)管理器
到此這篇關(guān)于.Net實(shí)現(xiàn)延遲隊(duì)列的文章就介紹到這了。希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
在Apache環(huán)境下成功的運(yùn)行ASP.NET的注意事項(xiàng)
在Apache環(huán)境下成功的運(yùn)行ASP.NET的注意事項(xiàng)...2007-08-08一文帶你了解.Net基于Threading.Mutex實(shí)現(xiàn)互斥鎖
互斥鎖是一個(gè)互斥的同步對(duì)象,意味著同一時(shí)間有且僅有一個(gè)線程可以獲取它。這篇文章主要介紹了一文帶你了解.Net基于Threading.Mutex實(shí)現(xiàn)互斥鎖,感興趣的可以了解一下2021-06-06ASP.NET Core中使用xUnit進(jìn)行單元測(cè)試
這篇文章主要介紹了ASP.NET Core中使用xUnit進(jìn)行單元測(cè)試,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-11-11利用Service Fabric承載eShop On Containers的實(shí)現(xiàn)方法
下面小編就為大家分享一篇利用Service Fabric承載eShop On Containers的實(shí)現(xiàn)方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-01-01Asp.Net中的Action和Func委托實(shí)現(xiàn)
這篇文章主要介紹了Asp.Net中的Action和Func委托的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-12-12