運用.NetCore實例講解RabbitMQ死信隊列,延時隊列
一、死信隊列
描述:Q1隊列綁定了x-dead-letter-exchange(死信交換機)為X2,x-dead-letter-routing-key(死信路由key)指向Q2(隊列2)
P(生產(chǎn)者)發(fā)送消息經(jīng)X1(交換機1)路由到Q1(隊列1),Q1的消息觸發(fā)特定情況,自動把消息經(jīng)X2(交換機2)路由到Q2(隊列2),C(消費者)直接消息Q2的消息。
特定情況有哪些呢:
- 1.消息被拒(basic.reject or basic.nack)并且沒有重新入隊(requeue=false);
- 2.當前隊列中的消息數(shù)量已經(jīng)超過最大長度(創(chuàng)建隊列時指定" x-max-length參數(shù)設(shè)置隊列最大消息數(shù)量)。
- 3.消息在隊列中過期,即當前消息在隊列中的存活時間已經(jīng)超過了預(yù)先設(shè)置的TTL(Time To Live)時間;
這里演示情況1:
假如場景:Q1中隊列數(shù)據(jù)不完整,就算從新處理也會報錯,那就可以不ack,把這個消息轉(zhuǎn)到死信隊列另外處理。
生產(chǎn)者:
public static void SendMessage() { //死信交換機 string dlxexChange = "dlx.exchange"; //死信隊列 string dlxQueueName = "dlx.queue"; //消息交換機 string exchange = "direct-exchange"; //消息隊列 string queueName = "queue_a"; using (var connection = RabbitMQHelper.GetConnection()) { using (var channel = connection.CreateModel()) { //創(chuàng)建死信交換機 channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false); //創(chuàng)建死信隊列 channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false); //死信隊列綁定死信交換機 channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName); // 創(chuàng)建消息交換機 channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false); //創(chuàng)建消息隊列,并指定死信隊列 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> { { "x-dead-letter-exchange",dlxexChange}, //設(shè)置當前隊列的DLX(死信交換機) { "x-dead-letter-routing-key",dlxQueueName}, //設(shè)置DLX的路由key,DLX會根據(jù)該值去找到死信消息存放的隊列 }); //消息隊列綁定消息交換機 channel.QueueBind(queueName, exchange, routingKey: queueName); string message = "hello rabbitmq message"; var properties = channel.CreateBasicProperties(); properties.Persistent = true; //發(fā)布消息 channel.BasicPublish(exchange: exchange, routingKey: queueName, basicProperties: properties, body: Encoding.UTF8.GetBytes(message)); Console.WriteLine($"向隊列:{queueName}發(fā)送消息:{message}"); } } }
消費者:
public static void Consumer() { //死信交換機 string dlxexChange = "dlx.exchange"; //死信隊列 string dlxQueueName = "dlx.queue"; //消息交換機 string exchange = "direct-exchange"; //消息隊列 string queueName = "queue_a"; var connection = RabbitMQHelper.GetConnection(); { //創(chuàng)建信道 var channel = connection.CreateModel(); { //創(chuàng)建死信交換機 channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false); //創(chuàng)建死信隊列 channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false); //死信隊列綁定死信交換機 channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName); // 創(chuàng)建消息交換機 channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false); //創(chuàng)建消息隊列,并指定死信隊列 channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> { { "x-dead-letter-exchange",dlxexChange}, //設(shè)置當前隊列的DLX { "x-dead-letter-routing-key",dlxQueueName}, //設(shè)置DLX的路由key,DLX會根據(jù)該值去找到死信消息存放的隊列 }); //消息隊列綁定消息交換機 channel.QueueBind(queueName, exchange, routingKey: queueName); var consumer = new EventingBasicConsumer(channel); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true); consumer.Received += (model, ea) => { //處理業(yè)務(wù) var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"隊列{queueName}消費消息:{message},不做ack確認"); //channel.BasicAck(ea.DeliveryTag, false); //不ack(BasicNack),且不把消息放回隊列(requeue:false) channel.BasicNack(ea.DeliveryTag, false, requeue: false); }; channel.BasicConsume(queueName, autoAck: false, consumer); } } }
消費者加上channel.BasickNack()模擬消息處理不了,不ack確認。
執(zhí)行結(jié)果:
RabbitMQ管理界面:
看到消息隊列為queue_a,特性有DLX(死信交換機),DLK(死信路由)。因為消費端不nack,觸發(fā)了死信,被轉(zhuǎn)發(fā)到了死信隊列dlx.queue。
二、延時隊列
延時隊列其實也是配合死信隊列一起用,其實就是上面死信隊列的第二中情況。給隊列添加消息過時時間(TTL),變成延時隊列。
簡單的描述就是:P(生產(chǎn)者)發(fā)送消息到Q1(延時隊列),Q1的消息有過期時間,比如10s,那10s后消息過期就會觸發(fā)死信,從而把消息轉(zhuǎn)發(fā)到Q2(死信隊列)。
解決問題場景:像商城下單,未支付時取消訂單場景。下單時寫一條記錄入Q1,延時30分鐘后轉(zhuǎn)到Q2,消費Q2,檢查訂單,支付則不做操作,沒支付則取消訂單,恢復(fù)庫存。
生產(chǎn)者代碼:
public static void SendMessage() { //死信交換機 string dlxexChange = "dlx.exchange"; //死信隊列 string dlxQueueName = "dlx.queue"; //消息交換機 string exchange = "direct-exchange"; //消息隊列 string queueName = "delay_queue"; using (var connection = RabbitMQHelper.GetConnection()) { using (var channel = connection.CreateModel()) { //創(chuàng)建死信交換機 channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false); //創(chuàng)建死信隊列 channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false); //死信隊列綁定死信交換機 channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName); // 創(chuàng)建消息交換機 channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false); //創(chuàng)建消息隊列,并指定死信隊列,和設(shè)置這個隊列的消息過期時間為10s channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> { { "x-dead-letter-exchange",dlxexChange}, //設(shè)置當前隊列的DLX(死信交換機) { "x-dead-letter-routing-key",dlxQueueName}, //設(shè)置DLX的路由key,DLX會根據(jù)該值去找到死信消息存放的隊列 { "x-message-ttl",10000} //設(shè)置隊列的消息過期時間 }); //消息隊列綁定消息交換機 channel.QueueBind(queueName, exchange, routingKey: queueName); string message = "hello rabbitmq message"; var properties = channel.CreateBasicProperties(); properties.Persistent = true; //發(fā)布消息 channel.BasicPublish(exchange: exchange, routingKey: queueName, basicProperties: properties, body: Encoding.UTF8.GetBytes(message)); Console.WriteLine($"{DateTime.Now},向隊列:{queueName}發(fā)送消息:{message}"); } } }
消費者代碼:
public static void Consumer() { //死信交換機 string dlxexChange = "dlx.exchange"; //死信隊列 string dlxQueueName = "dlx.queue"; var connection = RabbitMQHelper.GetConnection(); { //創(chuàng)建信道 var channel = connection.CreateModel(); { //創(chuàng)建死信交換機 channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false); //創(chuàng)建死信隊列 channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false); //死信隊列綁定死信交換機 channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName); var consumer = new EventingBasicConsumer(channel); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true); consumer.Received += (model, ea) => { //處理業(yè)務(wù) var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"{DateTime.Now},隊列{dlxQueueName}消費消息:{message}"); channel.BasicAck(ea.DeliveryTag, false); }; channel.BasicConsume(dlxQueueName, autoAck: false, consumer); } } }
執(zhí)行代碼:
向延時隊列發(fā)送消息,監(jiān)聽死信隊列,發(fā)送和收到消息時間剛好是設(shè)置的10s。
RabbitMQ管理界面:
三、延時消息設(shè)置不同過期時間
上面的延時隊列能解決消息過期時間都是相同的場景,能不能解決消息的過期時間是不一樣的呢?
例如場景:機器人客服,為了更像人為操作,收到消息后要隨機3-10秒回復(fù)客戶。
- 1)隊列不設(shè)置TTL(消息過期時間),把過期時間設(shè)置在消息上。
生產(chǎn)者代碼:
public static void SendMessage() { //死信交換機 string dlxexChange = "dlx.exchange"; //死信隊列 string dlxQueueName = "dlx.queue"; //消息交換機 string exchange = "direct-exchange"; //消息隊列 string queueName = "delay_queue"; using (var connection = RabbitMQHelper.GetConnection()) { using (var channel = connection.CreateModel()) { //創(chuàng)建死信交換機 channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false); //創(chuàng)建死信隊列 channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false); //死信隊列綁定死信交換機 channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName); // 創(chuàng)建消息交換機 channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false); //創(chuàng)建消息隊列,并指定死信隊列,和設(shè)置這個隊列的消息過期時間為10s channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> { { "x-dead-letter-exchange",dlxexChange}, //設(shè)置當前隊列的DLX(死信交換機) { "x-dead-letter-routing-key",dlxQueueName}, //設(shè)置DLX的路由key,DLX會根據(jù)該值去找到死信消息存放的隊列 //{ "x-message-ttl",10000} //設(shè)置隊列的消息過期時間 }); //消息隊列綁定消息交換機 channel.QueueBind(queueName, exchange, routingKey: queueName); string message = "hello rabbitmq message 10s后處理"; var properties = channel.CreateBasicProperties(); properties.Persistent = true; properties.Expiration = "10000";//消息的有效期10s //發(fā)布消息,延時10s channel.BasicPublish(exchange: exchange, routingKey: queueName, basicProperties: properties, body: Encoding.UTF8.GetBytes(message)); Console.WriteLine($"{DateTime.Now},向隊列:{queueName}發(fā)送消息:{message},延時:10s"); string message2 = "hello rabbitmq message 5s后處理"; var properties2 = channel.CreateBasicProperties(); properties2.Persistent = true; properties2.Expiration = "5000";//消息有效期5s //發(fā)布消息,延時5s channel.BasicPublish(exchange: exchange, routingKey: queueName, basicProperties: properties2, body: Encoding.UTF8.GetBytes(message2)); Console.WriteLine($"{DateTime.Now},向隊列:{queueName}發(fā)送消息:{message2},延時:5s"); } } }
消費者代碼還是上面延時隊列的不變,先試下效果。
生產(chǎn)者向隊列中發(fā)送一條延時10s的消息再發(fā)一條延時5秒的消息,但消費者卻先拿到延時10s的,再拿到延時5秒的,我想要的結(jié)果是先拿到延時5s的再拿到延時10s的,是什么原因呢。
原因是:隊列是先進先出的,而RabbitMQ只會對首位第一條消息做檢測,第一條沒過期,那么后面的消息就會阻塞住等待前面的過期。
解決辦法:增加一個消費者對延時隊列消費,不ack,把第一條消息放到隊列尾部。一直讓消息在流動,這樣就能檢測到了。
- 2)新增消費者代碼:
public static void SendMessage() { //死信交換機 string dlxexChange = "dlx.exchange"; //死信隊列 string dlxQueueName = "dlx.queue"; //消息交換機 string exchange = "direct-exchange"; //消息隊列 string queueName = "delay_queue"; using (var connection = RabbitMQHelper.GetConnection()) { using (var channel = connection.CreateModel()) { //創(chuàng)建死信交換機 channel.ExchangeDeclare(dlxexChange, type: ExchangeType.Direct, durable: true, autoDelete: false); //創(chuàng)建死信隊列 channel.QueueDeclare(dlxQueueName, durable: true, exclusive: false, autoDelete: false); //死信隊列綁定死信交換機 channel.QueueBind(dlxQueueName, dlxexChange, routingKey: dlxQueueName); // 創(chuàng)建消息交換機 channel.ExchangeDeclare(exchange, type: ExchangeType.Direct, durable: true, autoDelete: false); //創(chuàng)建消息隊列,并指定死信隊列,和設(shè)置這個隊列的消息過期時間為10s channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> { { "x-dead-letter-exchange",dlxexChange}, //設(shè)置當前隊列的DLX(死信交換機) { "x-dead-letter-routing-key",dlxQueueName}, //設(shè)置DLX的路由key,DLX會根據(jù)該值去找到死信消息存放的隊列 //{ "x-message-ttl",10000} //設(shè)置隊列的消息過期時間 }); //消息隊列綁定消息交換機 channel.QueueBind(queueName, exchange, routingKey: queueName); string message = "hello rabbitmq message 10s后處理"; var properties = channel.CreateBasicProperties(); properties.Persistent = true; properties.Expiration = "10000";//消息的有效期10s //發(fā)布消息,延時10s channel.BasicPublish(exchange: exchange, routingKey: queueName, basicProperties: properties, body: Encoding.UTF8.GetBytes(message)); Console.WriteLine($"{DateTime.Now},向隊列:{queueName}發(fā)送消息:{message},延時:10s"); string message2 = "hello rabbitmq message 5s后處理"; var properties2 = channel.CreateBasicProperties(); properties2.Persistent = true; properties2.Expiration = "5000";//消息有效期5s //發(fā)布消息,延時5s channel.BasicPublish(exchange: exchange, routingKey: queueName, basicProperties: properties2, body: Encoding.UTF8.GetBytes(message2)); Console.WriteLine($"{DateTime.Now},向隊列:{queueName}發(fā)送消息:{message2},延時:5s"); } } }
執(zhí)行效果:
這會得到了想要的效果。
RabbitMQ管理界面:
四、延時消息用延時插件的方式實現(xiàn)
相比上面第三的延時消息,這里的插件方式會顯的更加簡單,也推薦用這種。
因為這里只需要一個交換機和一個對隊列,生產(chǎn)者向隊列發(fā)送消息,會直接是延時才會到隊列。
安裝插件:
地址:https://www.rabbitmq.com/community-plugins.html
找到和自己RabbitMQ一樣的版本,下載下來上傳到Linux,或F12查看這個文件的地址,直接Linux上下載(這里用這種)
Linux下載插件:
#下載插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
已經(jīng)下載到Linux上
#把文件復(fù)制到rabbitmq docker容器下的plugins文件夾
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/plugins
#進入rabbitmq docker容器
docker exec -it rabbitmq bash
#開啟插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
做完上面這些在RabbitMQ管理界面可以看到多了一個延時消息的交換機。
插件裝好了,生產(chǎn)者代碼:
public static void SendMessage() { //延時消息交換機 string delayExchange = "delay.exchange"; //延時消息隊列 string delayQueueName = "delay_queue"; using (var connection = RabbitMQHelper.GetConnection()) { using (var channel = connection.CreateModel()) { Dictionary<string, object> args = new Dictionary<string, object>(); args.Add("x-delayed-type", "direct"); //x-delayed-type必須加 //創(chuàng)建延時交換機,type類型為x-delayed-message channel.ExchangeDeclare(delayExchange, type: "x-delayed-message", durable: true, autoDelete: false,arguments: args); //創(chuàng)建延時消息隊列 channel.QueueDeclare(delayQueueName, durable: true, exclusive: false, autoDelete: false); //交換機綁定隊列 channel.QueueBind(delayQueueName, delayExchange, routingKey: delayQueueName); string message = "hello rabbitmq message 10s后處理"; var properties = channel.CreateBasicProperties(); properties.Persistent = true; //延時時間從header賦值 Dictionary<string, object> headers = new Dictionary<string, object>(); headers.Add("x-delay", 10000); properties.Headers = headers; //發(fā)布消息,按時10s channel.BasicPublish(exchange: delayExchange, routingKey: delayQueueName, basicProperties: properties, body: Encoding.UTF8.GetBytes(message)); Console.WriteLine($"{DateTime.Now},向隊列:{delayQueueName}發(fā)送消息:{message},延時:10s"); string message2 = "hello rabbitmq message 5s后處理"; var properties2 = channel.CreateBasicProperties(); properties2.Persistent = true; //延時時間從header賦值 Dictionary<string, object> headers2 = new Dictionary<string, object>(); headers2.Add("x-delay", 5000); properties2.Headers = headers2; //發(fā)布消息,延時5s channel.BasicPublish(exchange: delayExchange, routingKey: delayQueueName, basicProperties: properties2, body: Encoding.UTF8.GetBytes(message2)); Console.WriteLine($"{DateTime.Now},向隊列:{delayQueueName}發(fā)送消息:{message2},延時:5s"); } } }
消費者代碼:
public static void DelayMessageConsumer() { //延時隊列 string queueName = "delay_queue"; var connection = RabbitMQHelper.GetConnection(); { //創(chuàng)建信道 var channel = connection.CreateModel(); { var consumer = new EventingBasicConsumer(channel); channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true); consumer.Received += (model, ea) => { //處理業(yè)務(wù) var message = Encoding.UTF8.GetString(ea.Body.ToArray()); Console.WriteLine($"{DateTime.Now},接收到消息:{message}"); channel.BasicAck(ea.DeliveryTag, false); }; channel.BasicConsume(queueName, autoAck: false, consumer); } } }
執(zhí)行代碼:
RabbitMQ管理界面,只有一個隊列:
到此這篇關(guān)于運用.NetCore實例講解RabbitMQ死信隊列,延時隊列的文章就介紹到這了,更多相關(guān).NetCore RabbitMQ死信隊列,延時隊列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
ASP.NET2.0服務(wù)器控件之類型轉(zhuǎn)換器
ASP.NET2.0服務(wù)器控件之類型轉(zhuǎn)換器...2006-09-09asp.net 請求輸入到輸出的全過程及httpHandler和httpModuler詳細介紹
看了幾篇講述httpHandler和HttpModuler的文章,雖然說沒有完全了解底層操作,但是我也算明白了一個請求從進入IIS到最后輸出都經(jīng)歷了哪些過程,感興趣的朋友可以了解下2013-01-01