.Net實現(xiàn)延遲隊列
介紹
具有隊列的特性,再給它附加一個延遲消費隊列消息的功能,也就是說可以指定隊列中的消息在哪個時間點被消費。
使用場景
延時隊列在項目中的應用還是比較多的,尤其像電商類平臺:
- 訂單成功后,在30分鐘內(nèi)沒有支付,自動取消訂單
- 外賣平臺發(fā)送訂餐通知,下單成功后60s給用戶推送短信。
- 如果訂單一直處于某一個未完結狀態(tài)時,及時處理關單,并退還庫存
- 淘寶新建商戶一個月內(nèi)還沒上傳商品信息,將凍結商鋪等
該介紹來自其他文章
方案
下面的例子沒有進行封裝,所以代碼僅供參考
Redis過期事件
注意:
不保證在設定的過期時間立即刪除并發(fā)送通知,數(shù)據(jù)量大的時候會延遲比較大
不保證一定送達
發(fā)送即忘策略,不包含持久化
但是比如有些場景,對這個時間不是那么看重,并且有其他措施雙層保障,該實現(xiàn)方案是比較簡單。
redis自2.8.0之后版本提供Keyspace Notifications功能,允許客戶訂閱Pub / Sub頻道,以便以某種方式接收影響Redis數(shù)據(jù)集事件。
配置
需要修改配置啟用過期事件,比如在windows客戶端中,需要修改redis.windows.conf文件,在linux中需要修改redis.conf,修改內(nèi)容是:
-- 取消注釋 notify-keyspace-events Ex -- 注釋 #notify-keyspace-events ""
然后重新啟動服務器,比如windows
.\redis-server.exe .\redis.windows.conf
或者linux中使用docker-compose重新部署redis
redis:
container_name: redis
image: redis
hostname: redis
restart: always
ports:
- "6379:6379"
volumes:
- $PWD/redis/redis.conf:/etc/redis.conf
- /root/common-docker-compose/redis/data:/data
command:
/bin/bash -c "redis-server /etc/redis.conf" #啟動執(zhí)行指定的redis.conf文件然后使用客戶端訂閱事件
-- windows .\redis-cli -- linux docker exec -it 容器標識 redis-cli psubscribe __keyevent@0__:expired
控制臺訂閱
使用StackExchange.Redis組件訂閱過期事件
var connectionMultiplexer = ConnectionMultiplexer.Connect(_redisConnection);
var db = connectionMultiplexer.GetDatabase(0);
db.StringSet("orderno:123456", "訂單創(chuàng)建", TimeSpan.FromSeconds(10));
Console.WriteLine("開始訂閱");
var subscriber = connectionMultiplexer.GetSubscriber();
//訂閱庫0的過期通知事件
subscriber.Subscribe("__keyevent@0__:expired", (channel, key) =>
{
Console.WriteLine($"key過期 channel:{channel} key:{key}");
});
Console.ReadLine();輸出結果:
key過期 channel:keyevent@0:expired key:orderno:123456
如果啟動多個客戶端監(jiān)聽,那么多個客戶端都可以收到過期事件。
WebApi中訂閱
創(chuàng)建RedisListenService繼承自:BackgroundService
public class RedisListenService : BackgroundService
{
private readonly ISubscriber _subscriber;
public RedisListenService(IServiceScopeFactory serviceScopeFactory)
{
using var scope = serviceScopeFactory.CreateScope();
var configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>();
var connectionMultiplexer = ConnectionMultiplexer.Connect(configuration["redis"]);
var db = connectionMultiplexer.GetDatabase(0);
_subscriber = connectionMultiplexer.GetSubscriber();
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
//訂閱庫0的過期通知事件
_subscriber.Subscribe("__keyevent@0__:expired", (channel, key) =>
{
Console.WriteLine($"key過期 channel:{channel} key:{key}");
});
return Task.CompletedTask;
}
}注冊該后臺服務
services.AddHostedService<RedisListenService>();
啟用項目,給redis指定庫設置值,等過期后會接收到過期通知事件。
RabbitMq延遲隊列
版本信息 Rabbitmq版本:3.10.5 Erlang版本:24.3.4.2
要使用rabbitmq做延遲是需要安裝插件(rabbitmq_delayed_message_exchange)的
插件介紹:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
下載地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
將下載好的插件(d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez)映射到容器的plugins目錄下:
docker run -d --name myrabbit -p 9005:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_VHOST=customer -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez rabbitmq:3-management-alpine
進入容器
docker exec -it 容器名稱/標識 bash
啟用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
查看是否啟用
rabbitmq-plugins list
[E]和[e]表示啟用,然后重啟服務
rabbitmq-server restart
然后在管理界面添加交換機可以看到
生產(chǎn)消息
發(fā)送的消息類型是:x-delayed-message
[HttpGet("send/delay")]
public string SendDelayedMessage()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",//IP地址
Port = 5672,//端口號
UserName = "admin",//用戶賬號
Password = "123456",//用戶密碼
VirtualHost = "customer"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var exchangeName = "delay-exchange";
var routingkey = "delay.delay";
var queueName = "delay_queueName";
//設置Exchange隊列類型
var argMaps = new Dictionary<string, object>()
{
{"x-delayed-type", "topic"}
};
//設置當前消息為延時隊列
channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
channel.QueueDeclare(queueName, true, false, false, argMaps);
channel.QueueBind(queueName, exchangeName, routingkey);
var time = 1000 * 5;
var message = $"發(fā)送時間為 {DateTime.Now:yyyy-MM-dd HH:mm:ss} 延時時間為:{time}";
var body = Encoding.UTF8.GetBytes(message);
var props = channel.CreateBasicProperties();
//設置消息的過期時間
props.Headers = new Dictionary<string, object>()
{
{ "x-delay", time }
};
channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body);
Console.WriteLine("成功發(fā)送消息:" + message);
return "success";
}消費消息
消費消息我是弄了一個后臺任務(RabbitmqDelayedHostService)在處理
public class RabbitmqDelayedHostService : BackgroundService
{
private readonly IModel _channel;
private readonly IConnection _connection;
public RabbitmqDelayedHostService()
{
var connFactory = new ConnectionFactory//創(chuàng)建連接工廠對象
{
HostName = "localhost",//IP地址
Port = 5672,//端口號
UserName = "admin",//用戶賬號
Password = "123456",//用戶密碼
VirtualHost = "customer"
};
_connection = connFactory.CreateConnection();
_channel = _connection.CreateModel();
//交換機名稱
var exchangeName = "exchangeDelayed";
var queueName = "delay_queueName";
var routingkey = "delay.delay";
var argMaps = new Dictionary<string, object>()
{
{"x-delayed-type", "topic"}
};
_channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
_channel.QueueDeclare(queueName, true, false, false, argMaps);
_channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey);
//聲明為手動確認
_channel.BasicQos(0, 1, false);
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
var queueName = "delay_queueName";
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var routingKey = ea.RoutingKey;
Console.WriteLine($"接受到消息的時間為 {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} ");
//手動確認
_channel.BasicAck(ea.DeliveryTag, true);
};
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
return Task.CompletedTask;
}
public override void Dispose()
{
_connection.Dispose();
_channel.Dispose();
base.Dispose();
}
}注冊該后臺任務
services.AddHostedService<RabbitmqDelayedHostService>();
輸出結果
成功發(fā)送消息:發(fā)送時間為 2022-07-02 18:54:22 延時時間為:5000
成功發(fā)送消息:發(fā)送時間為 2022-07-02 18:54:22 延時時間為:5000
成功發(fā)送消息:發(fā)送時間為 2022-07-02 18:54:22 延時時間為:5000
成功發(fā)送消息:發(fā)送時間為 2022-07-02 18:54:23 延時時間為:5000
成功發(fā)送消息:發(fā)送時間為 2022-07-02 18:54:23 延時時間為:5000
成功發(fā)送消息:發(fā)送時間為 2022-07-02 18:54:23 延時時間為:5000
接受到消息的時間為 2022-07-02 18:54:27,routingKey:delay.delay message:發(fā)送時間為 2022-07-02 18:54:22 延時時間為:5000
接受到消息的時間為 2022-07-02 18:54:27,routingKey:delay.delay message:發(fā)送時間為 2022-07-02 18:54:22 延時時間為:5000
接受到消息的時間為 2022-07-02 18:54:27,routingKey:delay.delay message:發(fā)送時間為 2022-07-02 18:54:22 延時時間為:5000
接受到消息的時間為 2022-07-02 18:54:28,routingKey:delay.delay message:發(fā)送時間為 2022-07-02 18:54:23 延時時間為:5000
接受到消息的時間為 2022-07-02 18:54:28,routingKey:delay.delay message:發(fā)送時間為 2022-07-02 18:54:23 延時時間為:5000
接受到消息的時間為 2022-07-02 18:54:28,routingKey:delay.delay message:發(fā)送時間為 2022-07-02 18:54:23 延時時間為:5000
其他方案
- Hangfire延遲隊列
BackgroundJob.Schedule(
() => Console.WriteLine("Delayed!"),
TimeSpan.FromDays(7));- 時間輪
- Redisson DelayQueue
- 計時管理器
到此這篇關于.Net實現(xiàn)延遲隊列的文章就介紹到這了。希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關文章
在Apache環(huán)境下成功的運行ASP.NET的注意事項
在Apache環(huán)境下成功的運行ASP.NET的注意事項...2007-08-08
一文帶你了解.Net基于Threading.Mutex實現(xiàn)互斥鎖
互斥鎖是一個互斥的同步對象,意味著同一時間有且僅有一個線程可以獲取它。這篇文章主要介紹了一文帶你了解.Net基于Threading.Mutex實現(xiàn)互斥鎖,感興趣的可以了解一下2021-06-06
利用Service Fabric承載eShop On Containers的實現(xiàn)方法
下面小編就為大家分享一篇利用Service Fabric承載eShop On Containers的實現(xiàn)方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-01-01
Asp.Net中的Action和Func委托實現(xiàn)
這篇文章主要介紹了Asp.Net中的Action和Func委托的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-12-12

