欧美bbbwbbbw肥妇,免费乱码人妻系列日韩,一级黄片

.Net實(shí)現(xiàn)延遲隊(duì)列

 更新時(shí)間:2022年07月05日 10:23:56   作者:AZRNG  
這篇文章介紹了.Net實(shí)現(xiàn)延遲隊(duì)列的方法,文中通過(guò)示例代碼介紹的非常詳細(xì)。對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下

介紹

具有隊(duì)列的特性,再給它附加一個(gè)延遲消費(fèi)隊(duì)列消息的功能,也就是說(shuō)可以指定隊(duì)列中的消息在哪個(gè)時(shí)間點(diǎn)被消費(fèi)。

使用場(chǎng)景

延時(shí)隊(duì)列在項(xiàng)目中的應(yīng)用還是比較多的,尤其像電商類平臺(tái):

  • 訂單成功后,在30分鐘內(nèi)沒(méi)有支付,自動(dòng)取消訂單
  • 外賣平臺(tái)發(fā)送訂餐通知,下單成功后60s給用戶推送短信。
  • 如果訂單一直處于某一個(gè)未完結(jié)狀態(tài)時(shí),及時(shí)處理關(guān)單,并退還庫(kù)存
  • 淘寶新建商戶一個(gè)月內(nèi)還沒(méi)上傳商品信息,將凍結(jié)商鋪等

該介紹來(lái)自其他文章

方案

下面的例子沒(méi)有進(jìn)行封裝,所以代碼僅供參考

Redis過(guò)期事件

注意:

不保證在設(shè)定的過(guò)期時(shí)間立即刪除并發(fā)送通知,數(shù)據(jù)量大的時(shí)候會(huì)延遲比較大

不保證一定送達(dá)

發(fā)送即忘策略,不包含持久化

但是比如有些場(chǎng)景,對(duì)這個(gè)時(shí)間不是那么看重,并且有其他措施雙層保障,該實(shí)現(xiàn)方案是比較簡(jiǎn)單。

redis自2.8.0之后版本提供Keyspace Notifications功能,允許客戶訂閱Pub / Sub頻道,以便以某種方式接收影響Redis數(shù)據(jù)集事件。

配置

需要修改配置啟用過(guò)期事件,比如在windows客戶端中,需要修改redis.windows.conf文件,在linux中需要修改redis.conf,修改內(nèi)容是:

-- 取消注釋
notify-keyspace-events Ex

-- 注釋
#notify-keyspace-events ""

然后重新啟動(dòng)服務(wù)器,比如windows

 .\redis-server.exe  .\redis.windows.conf

或者linux中使用docker-compose重新部署redis

  redis:
    container_name: redis
    image: redis
    hostname: redis
    restart: always
    ports: 
      - "6379:6379"
    volumes: 
      - $PWD/redis/redis.conf:/etc/redis.conf
      - /root/common-docker-compose/redis/data:/data
    command: 
      /bin/bash -c "redis-server /etc/redis.conf" #啟動(dòng)執(zhí)行指定的redis.conf文件

然后使用客戶端訂閱事件

-- windows
.\redis-cli
 
-- linux
docker exec -it 容器標(biāo)識(shí) redis-cli
 
psubscribe __keyevent@0__:expired

控制臺(tái)訂閱

使用StackExchange.Redis組件訂閱過(guò)期事件

var connectionMultiplexer = ConnectionMultiplexer.Connect(_redisConnection);
var db = connectionMultiplexer.GetDatabase(0);

db.StringSet("orderno:123456", "訂單創(chuàng)建", TimeSpan.FromSeconds(10));
Console.WriteLine("開(kāi)始訂閱");

var subscriber = connectionMultiplexer.GetSubscriber();

//訂閱庫(kù)0的過(guò)期通知事件
subscriber.Subscribe("__keyevent@0__:expired", (channel, key) =>
{
    Console.WriteLine($"key過(guò)期 channel:{channel} key:{key}");
});

Console.ReadLine();

輸出結(jié)果:

key過(guò)期 channel:keyevent@0:expired key:orderno:123456

如果啟動(dòng)多個(gè)客戶端監(jiān)聽(tīng),那么多個(gè)客戶端都可以收到過(guò)期事件。

WebApi中訂閱

創(chuàng)建RedisListenService繼承自:BackgroundService

public class RedisListenService : BackgroundService
{
    private readonly ISubscriber _subscriber;

    public RedisListenService(IServiceScopeFactory serviceScopeFactory)
    {
        using var scope = serviceScopeFactory.CreateScope();
        var configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>();

        var connectionMultiplexer = ConnectionMultiplexer.Connect(configuration["redis"]);
        var db = connectionMultiplexer.GetDatabase(0);
        _subscriber = connectionMultiplexer.GetSubscriber();
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        //訂閱庫(kù)0的過(guò)期通知事件
        _subscriber.Subscribe("__keyevent@0__:expired", (channel, key) =>
        {
            Console.WriteLine($"key過(guò)期 channel:{channel} key:{key}");
        });

        return Task.CompletedTask;
    }
}

注冊(cè)該后臺(tái)服務(wù)

services.AddHostedService<RedisListenService>();

啟用項(xiàng)目,給redis指定庫(kù)設(shè)置值,等過(guò)期后會(huì)接收到過(guò)期通知事件。

RabbitMq延遲隊(duì)列

版本信息 Rabbitmq版本:3.10.5 Erlang版本:24.3.4.2

要使用rabbitmq做延遲是需要安裝插件(rabbitmq_delayed_message_exchange)的

插件介紹:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

下載地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

將下載好的插件(d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez)映射到容器的plugins目錄下:

docker run -d --name myrabbit -p 9005:15672 -p 5672:5672  -e RABBITMQ_DEFAULT_VHOST=customer -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez  rabbitmq:3-management-alpine

進(jìn)入容器

docker exec -it 容器名稱/標(biāo)識(shí) bash

啟用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

查看是否啟用

rabbitmq-plugins list

[E]和[e]表示啟用,然后重啟服務(wù)

rabbitmq-server restart

然后在管理界面添加交換機(jī)可以看到

生產(chǎn)消息

發(fā)送的消息類型是:x-delayed-message

[HttpGet("send/delay")]
public string SendDelayedMessage()
{
    var factory = new ConnectionFactory()
    {
        HostName = "localhost",//IP地址
        Port = 5672,//端口號(hào)
        UserName = "admin",//用戶賬號(hào)
        Password = "123456",//用戶密碼
        VirtualHost = "customer"
    };
    using var connection = factory.CreateConnection();
    using var channel = connection.CreateModel();

    var exchangeName = "delay-exchange";
    var routingkey = "delay.delay";
    var queueName = "delay_queueName";

    //設(shè)置Exchange隊(duì)列類型
    var argMaps = new Dictionary<string, object>()
    {
        {"x-delayed-type", "topic"}
    };
    //設(shè)置當(dāng)前消息為延時(shí)隊(duì)列
    channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
    channel.QueueDeclare(queueName, true, false, false, argMaps);
    channel.QueueBind(queueName, exchangeName, routingkey);

    var time = 1000 * 5;
    var message = $"發(fā)送時(shí)間為 {DateTime.Now:yyyy-MM-dd HH:mm:ss} 延時(shí)時(shí)間為:{time}";
    var body = Encoding.UTF8.GetBytes(message);
    var props = channel.CreateBasicProperties();
    //設(shè)置消息的過(guò)期時(shí)間
    props.Headers = new Dictionary<string, object>()
            {
                {  "x-delay", time }
            };
    channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body);
    Console.WriteLine("成功發(fā)送消息:" + message);

    return "success";
}

消費(fèi)消息

消費(fèi)消息我是弄了一個(gè)后臺(tái)任務(wù)(RabbitmqDelayedHostService)在處理

public class RabbitmqDelayedHostService : BackgroundService
{
    private readonly IModel _channel;
    private readonly IConnection _connection;

    public RabbitmqDelayedHostService()
    {
        var connFactory = new ConnectionFactory//創(chuàng)建連接工廠對(duì)象
        {
            HostName = "localhost",//IP地址
            Port = 5672,//端口號(hào)
            UserName = "admin",//用戶賬號(hào)
            Password = "123456",//用戶密碼
            VirtualHost = "customer"
        };
        _connection = connFactory.CreateConnection();
        _channel = _connection.CreateModel();

        //交換機(jī)名稱
        var exchangeName = "exchangeDelayed";
        var queueName = "delay_queueName";
        var routingkey = "delay.delay";
        var argMaps = new Dictionary<string, object>()
        {
            {"x-delayed-type", "topic"}
        };
        _channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
        _channel.QueueDeclare(queueName, true, false, false, argMaps);
        _channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey);
        //聲明為手動(dòng)確認(rèn)
        _channel.BasicQos(0, 1, false);
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var queueName = "delay_queueName";

        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
            var routingKey = ea.RoutingKey;
            Console.WriteLine($"接受到消息的時(shí)間為 {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} ");

            //手動(dòng)確認(rèn)
            _channel.BasicAck(ea.DeliveryTag, true);
        };
        _channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

        return Task.CompletedTask;
    }

    public override void Dispose()
    {
        _connection.Dispose();
        _channel.Dispose();
        base.Dispose();
    }
}

注冊(cè)該后臺(tái)任務(wù)

services.AddHostedService<RabbitmqDelayedHostService>();

輸出結(jié)果

成功發(fā)送消息:發(fā)送時(shí)間為 2022-07-02 18:54:22 延時(shí)時(shí)間為:5000

成功發(fā)送消息:發(fā)送時(shí)間為 2022-07-02 18:54:22 延時(shí)時(shí)間為:5000

成功發(fā)送消息:發(fā)送時(shí)間為 2022-07-02 18:54:22 延時(shí)時(shí)間為:5000

成功發(fā)送消息:發(fā)送時(shí)間為 2022-07-02 18:54:23 延時(shí)時(shí)間為:5000

成功發(fā)送消息:發(fā)送時(shí)間為 2022-07-02 18:54:23 延時(shí)時(shí)間為:5000

成功發(fā)送消息:發(fā)送時(shí)間為 2022-07-02 18:54:23 延時(shí)時(shí)間為:5000

接受到消息的時(shí)間為 2022-07-02 18:54:27,routingKey:delay.delay message:發(fā)送時(shí)間為 2022-07-02 18:54:22 延時(shí)時(shí)間為:5000

接受到消息的時(shí)間為 2022-07-02 18:54:27,routingKey:delay.delay message:發(fā)送時(shí)間為 2022-07-02 18:54:22 延時(shí)時(shí)間為:5000

接受到消息的時(shí)間為 2022-07-02 18:54:27,routingKey:delay.delay message:發(fā)送時(shí)間為 2022-07-02 18:54:22 延時(shí)時(shí)間為:5000

接受到消息的時(shí)間為 2022-07-02 18:54:28,routingKey:delay.delay message:發(fā)送時(shí)間為 2022-07-02 18:54:23 延時(shí)時(shí)間為:5000

接受到消息的時(shí)間為 2022-07-02 18:54:28,routingKey:delay.delay message:發(fā)送時(shí)間為 2022-07-02 18:54:23 延時(shí)時(shí)間為:5000

接受到消息的時(shí)間為 2022-07-02 18:54:28,routingKey:delay.delay message:發(fā)送時(shí)間為 2022-07-02 18:54:23 延時(shí)時(shí)間為:5000

其他方案

  • Hangfire延遲隊(duì)列
BackgroundJob.Schedule(
  () => Console.WriteLine("Delayed!"),
   TimeSpan.FromDays(7));
  • 時(shí)間輪
  • Redisson DelayQueue
  • 計(jì)時(shí)管理器

到此這篇關(guān)于.Net實(shí)現(xiàn)延遲隊(duì)列的文章就介紹到這了。希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。

相關(guān)文章

最新評(píng)論